Skip to content

Commit b19fc66

Browse files
authored
Catchup tasks for Nexus features in the test env (#1824)
- Propagate operation timeout to the handler via header. - Handle operation complete-before-start.
1 parent 4fb50dc commit b19fc66

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

internal/internal_workflow_testsuite.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,6 +2443,9 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(
24432443

24442444
var token string
24452445
if params.options.ScheduleToCloseTimeout > 0 {
2446+
// Propagate operation timeout to the handler via header.
2447+
params.nexusHeader[strings.ToLower(nexus.HeaderOperationTimeout)] = strconv.FormatInt(params.options.ScheduleToCloseTimeout.Milliseconds(), 10) + "ms"
2448+
24462449
// Timer to fail the nexus operation due to schedule to close timeout.
24472450
env.NewTimer(
24482451
params.options.ScheduleToCloseTimeout,
@@ -2676,7 +2679,7 @@ func (env *testWorkflowEnvironmentImpl) scheduleNexusAsyncOperationCompletion(
26762679
}, completionHandle.delay)
26772680
}
26782681

2679-
func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, result *commonpb.Payload, err error) {
2682+
func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, token string, result *commonpb.Payload, err error) {
26802683
env.postCallback(func() {
26812684
handle, ok := env.getNexusOperationHandle(seq)
26822685
if !ok {
@@ -2685,10 +2688,11 @@ func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, result
26852688
if err != nil {
26862689
failure := env.failureConverter.ErrorToFailure(err)
26872690
err = env.failureConverter.FailureToError(nexusOperationFailure(handle.params, handle.operationToken, failure.GetCause()))
2688-
handle.completedCallback(nil, err)
2689-
} else {
2690-
handle.completedCallback(result, nil)
26912691
}
2692+
// Populate the token in case the operation completes before it marked as started.
2693+
// startedCallback is idempotent and will be a noop in case the operation has already been marked as started.
2694+
handle.startedCallback(token, err)
2695+
handle.completedCallback(result, err)
26922696
}, true)
26932697
}
26942698

internal/nexus_operations.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func apiHandlerErrorToNexusHandlerError(apiErr *nexuspb.HandlerError, failureCon
185185
}
186186

187187
nexusErr := &nexus.HandlerError{
188-
Type: nexus.HandlerErrorType(apiErr.GetErrorType()),
188+
Type: nexus.HandlerErrorType(apiErr.GetErrorType()),
189189
RetryBehavior: retryBehavior,
190190
}
191191

@@ -356,14 +356,24 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
356356
panic(fmt.Errorf("unexpected operation sequence in callback header: %s: %w", seqStr, err))
357357
}
358358

359+
// Send the operation token to account for a race when the completion comes in before the response to the
360+
// StartOperation call is recorded.
361+
// The token is extracted from the callback header which is attached in ExecuteUntypedWorkflow.
362+
var operationToken string
363+
if len(options.callbacks) == 1 {
364+
if cbHeader := options.callbacks[0].GetNexus().GetHeader(); cbHeader != nil {
365+
operationToken = cbHeader[nexus.HeaderOperationToken]
366+
}
367+
}
368+
359369
if wfErr != nil {
360-
t.env.resolveNexusOperation(seq, nil, wfErr)
370+
t.env.resolveNexusOperation(seq, operationToken, nil, wfErr)
361371
} else {
362372
var payload *commonpb.Payload
363373
if len(result.GetPayloads()) > 0 {
364374
payload = result.Payloads[0]
365375
}
366-
t.env.resolveNexusOperation(seq, payload, nil)
376+
t.env.resolveNexusOperation(seq, operationToken, payload, nil)
367377
}
368378
}, func(r WorkflowExecution, err error) {
369379
run.WorkflowExecution = r

test/nexus_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,9 @@ func TestWorkflowTestSuite_WorkflowRunOperation_ScheduleToCloseTimeout(t *testin
11221122
"op",
11231123
handlerWF,
11241124
func(ctx context.Context, _ nexus.NoValue, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
1125+
if opts.Header.Get(nexus.HeaderOperationTimeout) == "" {
1126+
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "expected non empty operation timeout header")
1127+
}
11251128
time.Sleep(opSleepDuration)
11261129
return client.StartWorkflowOptions{ID: opts.RequestID}, nil
11271130
})

0 commit comments

Comments
 (0)