Skip to content

Commit 7e3d821

Browse files
authored
Fix child WF ID generation (#1803)
1 parent 8f38795 commit 7e3d821

10 files changed

+839
-6
lines changed

internal/internal_event_handlers.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
561561
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error),
562562
) {
563563
if params.WorkflowID == "" {
564-
params.WorkflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
564+
params.WorkflowID = wc.workflowInfo.currentRunID + "_" + wc.GenerateSequenceID()
565565
}
566566
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter)
567567
if err != nil {
@@ -1220,7 +1220,11 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
12201220
case enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
12211221
// No Operation
12221222
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
1223-
// No Operation
1223+
// update the childWorkflowIDSeed if the workflow was reset at this point.
1224+
attr := event.GetWorkflowTaskFailedEventAttributes()
1225+
if attr.GetCause() == enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW {
1226+
weh.workflowInfo.currentRunID = attr.GetNewRunId()
1227+
}
12241228
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
12251229
// No Operation
12261230
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:

internal/internal_task_handlers.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,7 @@ OrderEvents:
492492
break OrderEvents
493493
}
494494
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
495-
enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
496-
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
495+
enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
497496
// Skip
498497
default:
499498
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
@@ -744,6 +743,10 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
744743
Memo: attributes.Memo,
745744
SearchAttributes: attributes.SearchAttributes,
746745
RetryPolicy: convertFromPBRetryPolicy(attributes.RetryPolicy),
746+
// Use the original execution run ID from the start event as the initial seed.
747+
// Original execution run ID stays the same for the entire chain of workflow resets.
748+
// This helps us keep child workflow IDs consistent up until a reset-point is encountered.
749+
currentRunID: attributes.GetOriginalExecutionRunId(),
747750
}
748751

749752
return newWorkflowExecutionContext(workflowInfo, wth), nil

internal/internal_task_handlers_interfaces_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommands() {
176176
createTestEventWorkflowTaskStarted(3),
177177
{
178178
EventId: 4,
179-
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
179+
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
180180
},
181181
{
182182
EventId: 5,
@@ -278,7 +278,7 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {
278278
createTestEventWorkflowTaskStarted(3),
279279
{
280280
EventId: 4,
281-
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
281+
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
282282
},
283283
createTestEventWorkflowTaskScheduled(5, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
284284
createTestEventWorkflowTaskStarted(6),

internal/workflow.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,8 @@ type WorkflowInfo struct {
12811281
continueAsNewSuggested bool
12821282
currentHistorySize int
12831283
currentHistoryLength int
1284+
// currentRunID is the current run ID of the workflow task, deterministic over reset
1285+
currentRunID string
12841286
}
12851287

12861288
// UpdateInfo information about a currently running update

test/integration_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2040,6 +2040,143 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
20402040
ts.Equal(originalResult, newResult)
20412041
}
20422042

2043+
// TestResetWorkflowExecutionWithChildren tests the behavior of child workflow ID generation when a workflow with children is reset.
2044+
// It repeatedly resets the workflow at different points in its execution and verifies that the child workflow IDs are generated correctly.
2045+
func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithChildren() {
2046+
wfID := "reset-workflow-with-children"
2047+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
2048+
defer cancel()
2049+
2050+
// Start a workflow with 3 children.
2051+
options := ts.startWorkflowOptions(wfID)
2052+
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowWithChildren)
2053+
ts.NoError(err)
2054+
var originalResult string
2055+
err = run.Get(ctx, &originalResult)
2056+
ts.NoError(err)
2057+
2058+
// save child init childIDs for later comparison.
2059+
childIDs := ts.getChildWFIDsFromHistory(ctx, wfID, run.GetRunID())
2060+
ts.Len(childIDs, 3)
2061+
child1IDBeforeReset := childIDs[0]
2062+
child2IDBeforeReset := childIDs[1]
2063+
child3IDBeforeReset := childIDs[2]
2064+
2065+
resetRequest := &workflowservice.ResetWorkflowExecutionRequest{
2066+
Namespace: ts.config.Namespace,
2067+
WorkflowExecution: &commonpb.WorkflowExecution{
2068+
WorkflowId: wfID,
2069+
RunId: run.GetRunID(),
2070+
},
2071+
Reason: "integration test",
2072+
}
2073+
// (reset #1) - resetting the workflow execution before both child workflows are started.
2074+
resetRequest.RequestId = "reset-request-1"
2075+
resetRequest.WorkflowTaskFinishEventId = 4
2076+
resp, err := ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
2077+
ts.NoError(err)
2078+
// Wait for the new run to complete.
2079+
var resultAfterReset1 string
2080+
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset1)
2081+
ts.NoError(err)
2082+
ts.Equal(originalResult, resultAfterReset1)
2083+
2084+
childIDsAfterReset1 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
2085+
ts.Len(childIDsAfterReset1, 3)
2086+
// All 3 child workflow IDs should be different after reset.
2087+
ts.NotEqual(child1IDBeforeReset, childIDsAfterReset1[0])
2088+
ts.NotEqual(child2IDBeforeReset, childIDsAfterReset1[1])
2089+
ts.NotEqual(child3IDBeforeReset, childIDsAfterReset1[2])
2090+
2091+
// (reset #2) - resetting the new workflow execution after child-1 but before child-2
2092+
resetRequest.RequestId = "reset-request-2"
2093+
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
2094+
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset1[0])
2095+
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
2096+
ts.NoError(err)
2097+
// Wait for the new run to complete.
2098+
var resultAfterReset2 string
2099+
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset2)
2100+
ts.NoError(err)
2101+
ts.Equal(originalResult, resultAfterReset2)
2102+
2103+
childIDsAfterReset2 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
2104+
ts.Len(childIDsAfterReset2, 3)
2105+
ts.Equal(childIDsAfterReset1[0], childIDsAfterReset2[0]) // child-1 should be the same as before reset.
2106+
ts.NotEqual(childIDsAfterReset1[1], childIDsAfterReset2[1]) // child-2 should be different after reset.
2107+
ts.NotEqual(childIDsAfterReset1[2], childIDsAfterReset2[2]) // Child-3 should be different after reset.
2108+
2109+
// (reset #3) - resetting the new workflow execution after child-2 but before child-3
2110+
resetRequest.RequestId = "reset-request-3"
2111+
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
2112+
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset2[1])
2113+
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
2114+
ts.NoError(err)
2115+
// Wait for the new run to complete.
2116+
var resultAfterReset3 string
2117+
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset3)
2118+
ts.NoError(err)
2119+
ts.Equal(originalResult, resultAfterReset3)
2120+
2121+
childIDsAfterReset3 := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
2122+
ts.Len(childIDsAfterReset3, 3)
2123+
// child-1 & child-2 workflow IDs should be the same as before reset. Child-3 should be different.
2124+
ts.Equal(childIDsAfterReset2[0], childIDsAfterReset3[0])
2125+
ts.Equal(childIDsAfterReset2[1], childIDsAfterReset3[1])
2126+
ts.NotEqual(childIDsAfterReset2[2], childIDsAfterReset3[2])
2127+
2128+
// (reset #3) - resetting the new workflow execution one last time after child-3
2129+
// This should successfully replay all child events and not change the child workflow IDs from previous run.
2130+
resetRequest.RequestId = "reset-request-4"
2131+
resetRequest.WorkflowExecution.RunId = resp.GetRunId()
2132+
resetRequest.WorkflowTaskFinishEventId = ts.getWorkflowTaskFinishEventIdAfterChild(ctx, wfID, resp.GetRunId(), childIDsAfterReset3[2])
2133+
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
2134+
ts.NoError(err)
2135+
childIDsFinal := ts.getChildWFIDsFromHistory(ctx, wfID, resp.GetRunId())
2136+
ts.Len(childIDsFinal, 3)
2137+
ts.Equal(childIDsAfterReset3[0], childIDsFinal[0])
2138+
ts.Equal(childIDsAfterReset3[1], childIDsFinal[1])
2139+
ts.Equal(childIDsAfterReset3[2], childIDsFinal[2])
2140+
}
2141+
2142+
func (ts *IntegrationTestSuite) getChildWFIDsFromHistory(ctx context.Context, wfID string, runID string) []string {
2143+
iter := ts.client.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
2144+
var childIDs []string
2145+
for iter.HasNext() {
2146+
event, err1 := iter.Next()
2147+
if err1 != nil {
2148+
break
2149+
}
2150+
if event.GetEventType() == enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED {
2151+
childIDs = append(childIDs, event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetWorkflowId())
2152+
}
2153+
}
2154+
return childIDs
2155+
}
2156+
2157+
func (ts *IntegrationTestSuite) getWorkflowTaskFinishEventIdAfterChild(ctx context.Context, wfID string, runID string, childID string) int64 {
2158+
iter := ts.client.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
2159+
childFound := false
2160+
for iter.HasNext() {
2161+
event, err := iter.Next()
2162+
if err != nil {
2163+
break
2164+
}
2165+
if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED {
2166+
if event.GetChildWorkflowExecutionCompletedEventAttributes().GetWorkflowExecution().GetWorkflowId() == childID {
2167+
childFound = true
2168+
}
2169+
}
2170+
if !childFound {
2171+
continue
2172+
}
2173+
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
2174+
return event.GetEventId()
2175+
}
2176+
}
2177+
return 0
2178+
}
2179+
20432180
func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() {
20442181
ctx := context.Background()
20452182
wfId := "reset-workflow-execution-with-update"

test/replaytests/replay_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,24 @@ func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
493493
require.NoError(s.T(), err)
494494
}
495495

496+
func (s *replayTestSuite) TestResetWorkflowBeforeChildInit() {
497+
replayer := worker.NewWorkflowReplayer()
498+
replayer.RegisterWorkflow(ResetWorkflowWithChild)
499+
// Verify we can replay workflow history containing a reset before StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
500+
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-before-child-init.json")
501+
s.NoError(err)
502+
require.NoError(s.T(), err)
503+
}
504+
505+
func (s *replayTestSuite) TestResetWorkflowAfterChildComplete() {
506+
replayer := worker.NewWorkflowReplayer()
507+
replayer.RegisterWorkflow(ResetWorkflowWithChild)
508+
// Verify we can replay workflow history containing a reset event after StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
509+
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-after-child-complete.json")
510+
s.NoError(err)
511+
require.NoError(s.T(), err)
512+
}
513+
496514
type captureConverter struct {
497515
converter.DataConverter
498516
toPayloads []interface{}

0 commit comments

Comments
 (0)