Skip to content

Commit 1a3b09b

Browse files
committed
Fix error mapping and complete the upgrade
1 parent 0629bb2 commit 1a3b09b

13 files changed

+279
-120
lines changed

internal/error.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,11 @@ type (
297297
// Operation name.
298298
Operation string
299299
// Operation ID - may be empty if the operation completed synchronously.
300+
//
301+
// Deprecated: Use OperationToken instead.
300302
OperationID string
303+
// Operation token - may be empty if the operation completed synchronously.
304+
OperationToken string
301305
// Chained cause - typically an ApplicationError or a CanceledError.
302306
Cause error
303307
}
@@ -909,8 +913,8 @@ func (e *ChildWorkflowExecutionError) RetryState() enumspb.RetryState {
909913
// Error implements the error interface.
910914
func (e *NexusOperationError) Error() string {
911915
msg := fmt.Sprintf(
912-
"%s (endpoint: %q, service: %q, operation: %q, operation ID: %q, scheduledEventID: %d)",
913-
e.Message, e.Endpoint, e.Service, e.Operation, e.OperationID, e.ScheduledEventID)
916+
"%s (endpoint: %q, service: %q, operation: %q, operation token: %q, scheduledEventID: %d)",
917+
e.Message, e.Endpoint, e.Service, e.Operation, e.OperationToken, e.ScheduledEventID)
914918
if e.Cause != nil {
915919
msg = fmt.Sprintf("%s: %v", msg, e.Cause)
916920
}

internal/failure_converter.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,17 @@ func (dfc *DefaultFailureConverter) ErrorToFailure(err error) *failurepb.Failure
170170
}
171171
failure.FailureInfo = &failurepb.Failure_ChildWorkflowExecutionFailureInfo{ChildWorkflowExecutionFailureInfo: failureInfo}
172172
case *NexusOperationError:
173+
var token = err.OperationToken
174+
if token == "" {
175+
token = err.OperationID
176+
}
173177
failureInfo := &failurepb.NexusOperationFailureInfo{
174178
ScheduledEventId: err.ScheduledEventID,
175179
Endpoint: err.Endpoint,
176180
Service: err.Service,
177181
Operation: err.Operation,
178-
OperationId: err.OperationID,
182+
OperationId: token,
183+
OperationToken: token,
179184
}
180185
failure.FailureInfo = &failurepb.Failure_NexusOperationExecutionFailureInfo{NexusOperationExecutionFailureInfo: failureInfo}
181186
case *nexus.HandlerError:
@@ -278,6 +283,10 @@ func (dfc *DefaultFailureConverter) FailureToError(failure *failurepb.Failure) e
278283
dfc.FailureToError(failure.GetCause()),
279284
)
280285
} else if info := failure.GetNexusOperationExecutionFailureInfo(); info != nil {
286+
token := info.GetOperationToken()
287+
if token == "" {
288+
token = info.GetOperationId()
289+
}
281290
err = &NexusOperationError{
282291
Message: failure.Message,
283292
Cause: dfc.FailureToError(failure.GetCause()),
@@ -286,7 +295,8 @@ func (dfc *DefaultFailureConverter) FailureToError(failure *failurepb.Failure) e
286295
Endpoint: info.GetEndpoint(),
287296
Service: info.GetService(),
288297
Operation: info.GetOperation(),
289-
OperationID: info.GetOperationId(),
298+
OperationToken: token,
299+
OperationID: token,
290300
}
291301
} else if info := failure.GetNexusHandlerFailureInfo(); info != nil {
292302
err = &nexus.HandlerError{

internal/interceptor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ type RequestCancelNexusOperationInput struct {
217217
Client NexusClient
218218
// Operation name or OperationReference from the Nexus SDK.
219219
Operation any
220-
// Operation ID. May be empty if the operation is synchronous or has not started yet.
221-
ID string
220+
// Operation Token. May be empty if the operation is synchronous or has not started yet.
221+
Token string
222222
// seq number. For internal use only.
223223
seq int64
224224
}

internal/internal_event_handlers.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ type (
8585
}
8686

8787
scheduledNexusOperation struct {
88-
startedCallback func(operationID string, err error)
88+
startedCallback func(token string, err error)
8989
completedCallback func(result *commonpb.Payload, err error)
9090
endpoint string
9191
service string
@@ -627,7 +627,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
627627
tagWorkflowType, params.WorkflowType.Name)
628628
}
629629

630-
func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(opID string, e error)) int64 {
630+
func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64 {
631631
seq := wc.GenerateSequence()
632632
scheduleTaskAttr := &commandpb.ScheduleNexusOperationCommandAttributes{
633633
Endpoint: params.client.Endpoint(),
@@ -1918,7 +1918,11 @@ func (weh *workflowExecutionEventHandlerImpl) handleNexusOperationStarted(event
19181918
command := weh.commandsHelper.handleNexusOperationStarted(attributes.ScheduledEventId)
19191919
state := command.getData().(*scheduledNexusOperation)
19201920
if state.startedCallback != nil {
1921-
state.startedCallback(attributes.OperationId, nil)
1921+
token := attributes.OperationToken
1922+
if token == "" {
1923+
token = attributes.OperationId //lint:ignore SA1019 this field is sent by servers older than 1.27.0.
1924+
}
1925+
state.startedCallback(token, nil)
19221926
state.startedCallback = nil
19231927
}
19241928
return nil

internal/internal_nexus_task_handler.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (h *nexusTaskHandler) handleStartOperation(
221221
if !panic {
222222
nctx.Log.Error("Handler returned error while processing Nexus task", tagError, err)
223223
}
224-
var unsuccessfulOperationErr *nexus.UnsuccessfulOperationError
224+
var unsuccessfulOperationErr *nexus.OperationError
225225
err = convertKnownErrors(err)
226226
if errors.As(err, &unsuccessfulOperationErr) {
227227
failure, err := h.errorToFailure(unsuccessfulOperationErr.Cause)
@@ -260,13 +260,19 @@ func (h *nexusTaskHandler) handleStartOperation(
260260
Type: nexusLink.Type,
261261
})
262262
}
263+
token := t.OperationToken
264+
//lint:ignore SA1019 this field might be set by users of older SDKs.
265+
if t.OperationID != "" {
266+
token = t.OperationID //lint:ignore SA1019 this field might be set by users of older SDKs.
267+
}
263268
return &nexuspb.Response{
264269
Variant: &nexuspb.Response_StartOperation{
265270
StartOperation: &nexuspb.StartOperationResponse{
266271
Variant: &nexuspb.StartOperationResponse_AsyncSuccess{
267272
AsyncSuccess: &nexuspb.StartOperationResponse_Async{
268-
OperationId: t.OperationID,
269-
Links: links,
273+
OperationToken: token,
274+
OperationId: token,
275+
Links: links,
270276
},
271277
},
272278
},
@@ -313,7 +319,12 @@ func (h *nexusTaskHandler) handleCancelOperation(ctx context.Context, nctx *Nexu
313319
nctx.Log.Error("Panic captured while handling Nexus task", tagStackTrace, string(debug.Stack()), tagError, err)
314320
}
315321
}()
316-
err = h.nexusHandler.CancelOperation(ctx, req.GetService(), req.GetOperation(), req.GetOperationId(), cancelOptions)
322+
token := req.GetOperationToken()
323+
if token == "" {
324+
// Support servers older than 1.27.0.
325+
token = req.GetOperationId()
326+
}
327+
err = h.nexusHandler.CancelOperation(ctx, req.GetService(), req.GetOperation(), token, cancelOptions)
317328
}()
318329
if ctx.Err() != nil {
319330
if !panic {
@@ -469,16 +480,13 @@ var emptyReaderNopCloser = io.NopCloser(bytes.NewReader([]byte{}))
469480

470481
// convertKnownErrors converts known errors to corresponding Nexus HandlerError.
471482
func convertKnownErrors(err error) error {
472-
// Handle common errors returned from various client methods.
473-
if workflowErr, ok := err.(*WorkflowExecutionError); ok {
474-
return nexus.NewFailedOperationError(workflowErr)
475-
}
476-
if queryRejectedErr, ok := err.(*QueryRejectedError); ok {
477-
return nexus.NewFailedOperationError(queryRejectedErr)
478-
}
479483
// Not using errors.As to be consistent ApplicationError checking with the rest of the SDK.
480484
if appErr, ok := err.(*ApplicationError); ok && appErr.NonRetryable() {
481-
return nexus.NewFailedOperationError(appErr)
485+
return &nexus.HandlerError{
486+
// TODO(bergundy): Change this to a non retryable internal error after the 1.27.0 server release.
487+
Type: nexus.HandlerErrorTypeBadRequest,
488+
Cause: appErr,
489+
}
482490
}
483491
return convertServiceError(err)
484492
}
@@ -502,7 +510,10 @@ func convertServiceError(err error) error {
502510
st = stGetter.Status()
503511

504512
switch st.Code() {
505-
case codes.AlreadyExists, codes.InvalidArgument, codes.FailedPrecondition, codes.OutOfRange:
513+
case codes.InvalidArgument:
514+
return &nexus.HandlerError{Type: nexus.HandlerErrorTypeBadRequest, Cause: err}
515+
case codes.AlreadyExists, codes.FailedPrecondition, codes.OutOfRange:
516+
// TODO(bergundy): Change this to a non retryable internal error after the 1.27.0 server release.
506517
return &nexus.HandlerError{Type: nexus.HandlerErrorTypeBadRequest, Cause: err}
507518
case codes.Aborted, codes.Unavailable:
508519
return &nexus.HandlerError{Type: nexus.HandlerErrorTypeUnavailable, Cause: err}

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ type (
9999
RequestCancelChildWorkflow(namespace, workflowID string)
100100
RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback ResultHandler)
101101
ExecuteChildWorkflow(params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error))
102-
ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(opID string, e error)) int64
102+
ExecuteNexusOperation(params executeNexusOperationParams, callback func(*commonpb.Payload, error), startedHandler func(token string, e error)) int64
103103
RequestCancelNexusOperation(seq int64)
104104
GetLogger() log.Logger
105105
GetMetricsHandler() metrics.Handler

internal/internal_workflow_testsuite.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type (
106106
env *testWorkflowEnvironmentImpl
107107
seq int64
108108
params executeNexusOperationParams
109-
operationID string
109+
operationToken string
110110
cancelRequested bool
111111
started bool
112112
done bool
@@ -2504,7 +2504,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteNexusOperation(
25042504
}, true)
25052505
case *nexuspb.StartOperationResponse_AsyncSuccess:
25062506
env.postCallback(func() {
2507-
opID = v.AsyncSuccess.GetOperationId()
2507+
opID = v.AsyncSuccess.GetOperationToken()
25082508
handle.startedCallback(opID, nil)
25092509
if handle.cancelRequested {
25102510
handle.cancel()
@@ -2551,7 +2551,7 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelNexusOperation(seq int64) {
25512551
func (env *testWorkflowEnvironmentImpl) RegisterNexusAsyncOperationCompletion(
25522552
service string,
25532553
operation string,
2554-
operationID string,
2554+
token string,
25552555
result any,
25562556
err error,
25572557
delay time.Duration,
@@ -2586,7 +2586,7 @@ func (env *testWorkflowEnvironmentImpl) RegisterNexusAsyncOperationCompletion(
25862586
env.setNexusAsyncOperationCompletionHandle(
25872587
service,
25882588
operation,
2589-
operationID,
2589+
token,
25902590
&testNexusAsyncOperationHandle{
25912591
result: data,
25922592
err: err,
@@ -2599,28 +2599,28 @@ func (env *testWorkflowEnvironmentImpl) RegisterNexusAsyncOperationCompletion(
25992599
func (env *testWorkflowEnvironmentImpl) getNexusAsyncOperationCompletionHandle(
26002600
service string,
26012601
operation string,
2602-
operationID string,
2602+
token string,
26032603
) *testNexusAsyncOperationHandle {
2604-
uniqueOpID := env.makeUniqueNexusOperationID(service, operation, operationID)
2604+
uniqueOpID := env.makeUniqueNexusOperationToken(service, operation, token)
26052605
return env.nexusAsyncOpHandle[uniqueOpID]
26062606
}
26072607

26082608
func (env *testWorkflowEnvironmentImpl) setNexusAsyncOperationCompletionHandle(
26092609
service string,
26102610
operation string,
2611-
operationID string,
2611+
token string,
26122612
handle *testNexusAsyncOperationHandle,
26132613
) {
2614-
uniqueOpID := env.makeUniqueNexusOperationID(service, operation, operationID)
2614+
uniqueOpID := env.makeUniqueNexusOperationToken(service, operation, token)
26152615
env.nexusAsyncOpHandle[uniqueOpID] = handle
26162616
}
26172617

26182618
func (env *testWorkflowEnvironmentImpl) deleteNexusAsyncOperationCompletionHandle(
26192619
service string,
26202620
operation string,
2621-
operationID string,
2621+
token string,
26222622
) {
2623-
uniqueOpID := env.makeUniqueNexusOperationID(service, operation, operationID)
2623+
uniqueOpID := env.makeUniqueNexusOperationToken(service, operation, token)
26242624
delete(env.nexusAsyncOpHandle, uniqueOpID)
26252625
}
26262626

@@ -2630,21 +2630,21 @@ func (env *testWorkflowEnvironmentImpl) scheduleNexusAsyncOperationCompletion(
26302630
completionHandle := env.getNexusAsyncOperationCompletionHandle(
26312631
handle.params.client.Service(),
26322632
handle.params.operation,
2633-
handle.operationID,
2633+
handle.operationToken,
26342634
)
26352635
if completionHandle == nil {
26362636
return
26372637
}
26382638
env.deleteNexusAsyncOperationCompletionHandle(
26392639
handle.params.client.Service(),
26402640
handle.params.operation,
2641-
handle.operationID,
2641+
handle.operationToken,
26422642
)
26432643
var nexusErr error
26442644
if completionHandle.err != nil {
26452645
nexusErr = env.failureConverter.FailureToError(nexusOperationFailure(
26462646
handle.params,
2647-
handle.operationID,
2647+
handle.operationToken,
26482648
&failurepb.Failure{
26492649
Message: completionHandle.err.Error(),
26502650
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
@@ -2670,7 +2670,7 @@ func (env *testWorkflowEnvironmentImpl) resolveNexusOperation(seq int64, result
26702670
}
26712671
if err != nil {
26722672
failure := env.failureConverter.ErrorToFailure(err)
2673-
err = env.failureConverter.FailureToError(nexusOperationFailure(handle.params, handle.operationID, failure.GetCause()))
2673+
err = env.failureConverter.FailureToError(nexusOperationFailure(handle.params, handle.operationToken, failure.GetCause()))
26742674
handle.completedCallback(nil, err)
26752675
} else {
26762676
handle.completedCallback(result, nil)
@@ -2696,12 +2696,12 @@ func (env *testWorkflowEnvironmentImpl) deleteNexusOperationHandle(seqID int64)
26962696
delete(env.runningNexusOperations, seqID)
26972697
}
26982698

2699-
func (env *testWorkflowEnvironmentImpl) makeUniqueNexusOperationID(
2699+
func (env *testWorkflowEnvironmentImpl) makeUniqueNexusOperationToken(
27002700
service string,
27012701
operation string,
2702-
operationID string,
2702+
token string,
27032703
) string {
2704-
return fmt.Sprintf("%s_%s_%s", service, operation, operationID)
2704+
return fmt.Sprintf("%s_%s_%s", service, operation, token)
27052705
}
27062706

27072707
func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
@@ -3202,9 +3202,9 @@ func (h *testNexusOperationHandle) newCancelTask() *workflowservice.PollNexusTas
32023202
Header: h.params.nexusHeader,
32033203
Variant: &nexuspb.Request_CancelOperation{
32043204
CancelOperation: &nexuspb.CancelOperationRequest{
3205-
Service: h.params.client.Service(),
3206-
Operation: h.params.operation,
3207-
OperationId: h.operationID,
3205+
Service: h.params.client.Service(),
3206+
Operation: h.params.operation,
3207+
OperationToken: h.operationToken,
32083208
},
32093209
},
32103210
},
@@ -3241,7 +3241,7 @@ func (h *testNexusOperationHandle) startedCallback(opID string, e error) {
32413241
// Ignore duplciate starts.
32423242
return
32433243
}
3244-
h.operationID = opID
3244+
h.operationToken = opID
32453245
h.started = true
32463246
h.onStarted(opID, e)
32473247
h.env.runningCount--
@@ -3251,7 +3251,7 @@ func (h *testNexusOperationHandle) cancel() {
32513251
if h.done {
32523252
return
32533253
}
3254-
if h.started && h.operationID == "" {
3254+
if h.started && h.operationToken == "" {
32553255
panic(fmt.Errorf("incomplete operation has no operation ID: (%s, %s, %s)",
32563256
h.params.client.Endpoint(), h.params.client.Service(), h.params.operation))
32573257
}
@@ -3445,34 +3445,34 @@ func (r *testNexusHandler) CancelOperation(
34453445
ctx context.Context,
34463446
service string,
34473447
operation string,
3448-
operationID string,
3448+
token string,
34493449
options nexus.CancelOperationOptions,
34503450
) error {
34513451
if r.opHandle.isMocked {
34523452
// if the operation was mocked, then there's no workflow running
34533453
return nil
34543454
}
3455-
return r.handler.CancelOperation(ctx, service, operation, operationID, options)
3455+
return r.handler.CancelOperation(ctx, service, operation, token, options)
34563456
}
34573457

34583458
func (r *testNexusHandler) GetOperationInfo(
34593459
ctx context.Context,
34603460
service string,
34613461
operation string,
3462-
operationID string,
3462+
token string,
34633463
options nexus.GetOperationInfoOptions,
34643464
) (*nexus.OperationInfo, error) {
3465-
return r.handler.GetOperationInfo(ctx, service, operation, operationID, options)
3465+
return r.handler.GetOperationInfo(ctx, service, operation, token, options)
34663466
}
34673467

34683468
func (r *testNexusHandler) GetOperationResult(
34693469
ctx context.Context,
34703470
service string,
34713471
operation string,
3472-
operationID string,
3472+
token string,
34733473
options nexus.GetOperationResultOptions,
34743474
) (any, error) {
3475-
return r.handler.GetOperationResult(ctx, service, operation, operationID, options)
3475+
return r.handler.GetOperationResult(ctx, service, operation, token, options)
34763476
}
34773477

34783478
func (env *testWorkflowEnvironmentImpl) registerNexusOperationReference(

0 commit comments

Comments
 (0)