Skip to content

Commit c82a8ac

Browse files
Error if history contains unexpected events after the StartedEventId (#1662)
1 parent 75bd94b commit c82a8ac

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

internal/internal_task_handlers_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2689,7 +2689,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
26892689
})
26902690
}
26912691

2692-
func TestHistoryIterator(t *testing.T) {
2692+
func TestHistoryIteratorMaxEventID(t *testing.T) {
26932693
testEvents := []*historypb.HistoryEvent{
26942694
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}),
26952695
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
@@ -2725,6 +2725,7 @@ func TestHistoryIterator(t *testing.T) {
27252725
WorkflowId: "test-workflow-id",
27262726
RunId: "test-run-id",
27272727
},
2728+
3,
27282729
metrics.NopHandler,
27292730
"test-task-queue",
27302731
),
@@ -2733,6 +2734,6 @@ func TestHistoryIterator(t *testing.T) {
27332734
_, err := historyIterator.GetNextPage()
27342735
require.NoError(t, err)
27352736
_, err = historyIterator.GetNextPage()
2736-
require.NoError(t, err)
2737+
require.Error(t, err)
27372738

27382739
}

internal/internal_task_pollers.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,14 @@ type (
138138
}
139139

140140
historyIteratorImpl struct {
141-
iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error)
142-
execution *commonpb.WorkflowExecution
143-
nextPageToken []byte
144-
namespace string
145-
service workflowservice.WorkflowServiceClient
141+
iteratorFunc func(nextPageToken []byte) (*historypb.History, []byte, error)
142+
execution *commonpb.WorkflowExecution
143+
nextPageToken []byte
144+
namespace string
145+
service workflowservice.WorkflowServiceClient
146+
// maxEventID is the maximum eventID that the history iterator is expected to return.
147+
// 0 means that the iterator will return all history events.
148+
maxEventID int64
146149
metricsHandler metrics.Handler
147150
taskQueue string
148151
}
@@ -869,6 +872,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollWork
869872
nextPageToken: response.NextPageToken,
870873
namespace: wtp.namespace,
871874
service: wtp.service,
875+
maxEventID: response.GetStartedEventId(),
872876
metricsHandler: wtp.metricsHandler,
873877
taskQueue: wtp.taskQueueName,
874878
}
@@ -886,6 +890,7 @@ func (h *historyIteratorImpl) GetNextPage() (*historypb.History, error) {
886890
h.service,
887891
h.namespace,
888892
h.execution,
893+
h.maxEventID,
889894
h.metricsHandler,
890895
h.taskQueue,
891896
)
@@ -912,6 +917,7 @@ func newGetHistoryPageFunc(
912917
service workflowservice.WorkflowServiceClient,
913918
namespace string,
914919
execution *commonpb.WorkflowExecution,
920+
lastEventID int64,
915921
metricsHandler metrics.Handler,
916922
taskQueue string,
917923
) func(nextPageToken []byte) (*historypb.History, []byte, error) {
@@ -941,6 +947,19 @@ func newGetHistoryPageFunc(
941947
} else {
942948
h = resp.History
943949
}
950+
951+
size := len(h.Events)
952+
// While the SDK is processing a workflow task, the workflow task could timeout and server would start
953+
// a new workflow task or the server looses the workflow task if it is a speculative workflow task. In either
954+
// case, the new workflow task could have events that are beyond the last event ID that the SDK expects to process.
955+
// In such cases, the SDK should return error indicating that the workflow task is stale since the result will not be used.
956+
if size > 0 && lastEventID > 0 &&
957+
h.Events[size-1].GetEventId() > lastEventID {
958+
return nil, nil, fmt.Errorf("history contains events past expected last event ID (%v) "+
959+
"likely this means the current workflow task is no longer valid", lastEventID)
960+
961+
}
962+
944963
return h, resp.NextPageToken, nil
945964
}
946965
}

0 commit comments

Comments
 (0)