Skip to content

Commit 772bc83

Browse files
authored
MultiOperation retries non-durable Update (#1652)
1 parent 974ccc0 commit 772bc83

File tree

2 files changed

+195
-55
lines changed

2 files changed

+195
-55
lines changed

internal/internal_workflow_client.go

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,75 +1715,99 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation(
17151715
withStartOp,
17161716
},
17171717
}
1718-
multiResp, err := w.client.workflowService.ExecuteMultiOperation(ctx, &multiRequest)
17191718

1720-
var multiErr *serviceerror.MultiOperationExecution
1721-
if errors.As(err, &multiErr) {
1722-
if len(multiErr.OperationErrors()) != len(multiRequest.Operations) {
1723-
return nil, fmt.Errorf("%w: %v instead of %v operation errors",
1724-
errInvalidServerResponse, len(multiErr.OperationErrors()), len(multiRequest.Operations))
1719+
var startResp *workflowservice.StartWorkflowExecutionResponse
1720+
var updateResp *workflowservice.UpdateWorkflowExecutionResponse
1721+
for {
1722+
multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) {
1723+
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
1724+
defer cancel()
1725+
1726+
multiResp, err := w.client.workflowService.ExecuteMultiOperation(grpcCtx, &multiRequest)
1727+
if err != nil {
1728+
if ctx.Err() != nil {
1729+
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
1730+
}
1731+
if status := serviceerror.ToStatus(err); status.Code() == codes.Canceled || status.Code() == codes.DeadlineExceeded {
1732+
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
1733+
}
1734+
return nil, err
1735+
}
1736+
1737+
return multiResp, err
1738+
}()
1739+
1740+
var multiErr *serviceerror.MultiOperationExecution
1741+
if errors.As(err, &multiErr) {
1742+
if len(multiErr.OperationErrors()) != len(multiRequest.Operations) {
1743+
return nil, fmt.Errorf("%w: %v instead of %v operation errors",
1744+
errInvalidServerResponse, len(multiErr.OperationErrors()), len(multiRequest.Operations))
1745+
}
1746+
1747+
var abortedErr *serviceerror.MultiOperationAborted
1748+
startErr := errors.New("failed to start workflow")
1749+
for i, opReq := range multiRequest.Operations {
1750+
// if an operation error is of type MultiOperationAborted, it means it was only aborted because
1751+
// of another operation's error and is therefore not interesting or helpful
1752+
opErr := multiErr.OperationErrors()[i]
1753+
1754+
switch t := opReq.Operation.(type) {
1755+
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
1756+
if !errors.As(opErr, &abortedErr) {
1757+
startErr = opErr
1758+
}
1759+
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
1760+
if !errors.As(opErr, &abortedErr) {
1761+
startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr)
1762+
}
1763+
default:
1764+
// this would only happen if a case statement for a newly added operation is missing above
1765+
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
1766+
}
1767+
}
1768+
return nil, startErr
1769+
} else if err != nil {
1770+
return nil, err
1771+
}
1772+
1773+
if len(multiResp.Responses) != len(multiRequest.Operations) {
1774+
return nil, fmt.Errorf("%w: %v instead of %v operation results",
1775+
errInvalidServerResponse, len(multiResp.Responses), len(multiRequest.Operations))
17251776
}
17261777

1727-
var startErr error
1728-
var abortedErr *serviceerror.MultiOperationAborted
17291778
for i, opReq := range multiRequest.Operations {
1730-
// if an operation error is of type MultiOperationAborted, it means it was only aborted because
1731-
// of another operation's error and is therefore not interesting or helpful
1732-
opErr := multiErr.OperationErrors()[i]
1779+
resp := multiResp.Responses[i].Response
17331780

17341781
switch t := opReq.Operation.(type) {
17351782
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
1736-
if !errors.As(opErr, &abortedErr) {
1737-
startErr = opErr
1783+
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok {
1784+
startResp = opResp.StartWorkflow
1785+
} else {
1786+
return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
17381787
}
17391788
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
1740-
if !errors.As(opErr, &abortedErr) {
1741-
startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr)
1789+
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow); ok {
1790+
updateResp = opResp.UpdateWorkflow
1791+
} else {
1792+
return nil, fmt.Errorf("%w: UpdateWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
17421793
}
17431794
default:
17441795
// this would only happen if a case statement for a newly added operation is missing above
17451796
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
17461797
}
17471798
}
1748-
return nil, startErr
1749-
} else if err != nil {
1750-
return nil, err
1751-
}
17521799

1753-
if len(multiResp.Responses) != len(multiRequest.Operations) {
1754-
return nil, fmt.Errorf("%w: %v instead of %v operation results",
1755-
errInvalidServerResponse, len(multiResp.Responses), len(multiRequest.Operations))
1800+
if w.updateIsDurable(updateResp) {
1801+
break
1802+
}
17561803
}
17571804

1758-
var startResp *workflowservice.StartWorkflowExecutionResponse
1759-
for i, opReq := range multiRequest.Operations {
1760-
resp := multiResp.Responses[i].Response
1761-
1762-
switch t := opReq.Operation.(type) {
1763-
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
1764-
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok {
1765-
startResp = opResp.StartWorkflow
1766-
} else {
1767-
return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
1768-
}
1769-
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
1770-
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow); ok {
1771-
handle, err := w.updateHandleFromResponse(
1772-
ctx,
1773-
enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED,
1774-
opResp.UpdateWorkflow)
1775-
operation.(*UpdateWithStartWorkflowOperation).set(handle, err)
1776-
if err != nil {
1777-
return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err)
1778-
}
1779-
} else {
1780-
return nil, fmt.Errorf("%w: UpdateWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
1781-
}
1782-
default:
1783-
// this would only happen if a case statement for a newly added operation is missing above
1784-
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
1785-
}
1805+
handle, err := w.updateHandleFromResponse(ctx, enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, updateResp)
1806+
operation.(*UpdateWithStartWorkflowOperation).set(handle, err)
1807+
if err != nil {
1808+
return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err)
17861809
}
1810+
17871811
return startResp, nil
17881812
}
17891813

@@ -2028,11 +2052,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
20282052
}
20292053
return nil, err
20302054
}
2031-
// Once the update is past admitted we know it is durable
2032-
// Note: old server version may return UNSPECIFIED if the update request
2033-
// did not reach the desired lifecycle stage.
2034-
if resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED &&
2035-
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED {
2055+
if w.updateIsDurable(resp) {
20362056
break
20372057
}
20382058
}
@@ -2042,6 +2062,14 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
20422062
return w.updateHandleFromResponse(ctx, desiredLifecycleStage, resp)
20432063
}
20442064

2065+
func (w *workflowClientInterceptor) updateIsDurable(resp *workflowservice.UpdateWorkflowExecutionResponse) bool {
2066+
// Once the update is past admitted we know it is durable
2067+
// Note: old server version may return UNSPECIFIED if the update request
2068+
// did not reach the desired lifecycle stage.
2069+
return resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED &&
2070+
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED
2071+
}
2072+
20452073
func createUpdateWorkflowInput(
20462074
options UpdateWorkflowOptions,
20472075
) (*ClientUpdateWorkflowInput, error) {

internal/internal_workflow_client_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,118 @@ func (s *workflowRunSuite) TestGetWorkflowNoExtantWorkflowAndNoRunId() {
976976
s.Equal("", workflowRunNoRunID.GetRunID())
977977
}
978978

979+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() {
980+
s.workflowServiceClient.EXPECT().
981+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
982+
Return(&workflowservice.ExecuteMultiOperationResponse{
983+
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
984+
{
985+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{},
986+
},
987+
{
988+
// 1st response: empty response, Update is not durable yet, client retries
989+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{},
990+
},
991+
},
992+
}, nil).
993+
Return(&workflowservice.ExecuteMultiOperationResponse{
994+
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
995+
{
996+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{
997+
StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{
998+
RunId: "RUN_ID",
999+
},
1000+
},
1001+
},
1002+
{
1003+
// 2nd response: non-empty response, Update is durable
1004+
Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{
1005+
UpdateWorkflow: &workflowservice.UpdateWorkflowExecutionResponse{
1006+
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
1007+
},
1008+
},
1009+
},
1010+
},
1011+
}, nil)
1012+
1013+
updOp := NewUpdateWithStartWorkflowOperation(
1014+
UpdateWorkflowOptions{
1015+
UpdateName: "update",
1016+
WaitForStage: WorkflowUpdateStageCompleted,
1017+
})
1018+
1019+
_, err := s.workflowClient.ExecuteWorkflow(
1020+
context.Background(),
1021+
StartWorkflowOptions{
1022+
ID: workflowID,
1023+
TaskQueue: taskqueue,
1024+
WithStartOperation: updOp,
1025+
}, workflowType,
1026+
)
1027+
s.NoError(err)
1028+
}
1029+
1030+
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() {
1031+
tests := []struct {
1032+
name string
1033+
expectedErr string
1034+
respFunc func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error)
1035+
}{
1036+
{
1037+
name: "Timeout",
1038+
expectedErr: "context deadline exceeded",
1039+
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
1040+
<-ctx.Done()
1041+
return nil, ctx.Err()
1042+
},
1043+
},
1044+
{
1045+
name: "Cancelled",
1046+
expectedErr: "was_cancelled",
1047+
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
1048+
return nil, serviceerror.NewCanceled("was_cancelled")
1049+
},
1050+
},
1051+
{
1052+
name: "DeadlineExceeded",
1053+
expectedErr: "deadline_exceeded",
1054+
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
1055+
return nil, serviceerror.NewDeadlineExceeded("deadline_exceeded")
1056+
},
1057+
},
1058+
}
1059+
1060+
for _, tt := range tests {
1061+
s.Run(tt.name, func() {
1062+
s.workflowServiceClient.EXPECT().
1063+
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
1064+
DoAndReturn(tt.respFunc)
1065+
1066+
updOp := NewUpdateWithStartWorkflowOperation(
1067+
UpdateWorkflowOptions{
1068+
UpdateName: "update",
1069+
WaitForStage: WorkflowUpdateStageCompleted,
1070+
})
1071+
1072+
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
1073+
defer cancel()
1074+
1075+
_, err := s.workflowClient.ExecuteWorkflow(
1076+
ctxWithTimeout,
1077+
StartWorkflowOptions{
1078+
ID: workflowID,
1079+
TaskQueue: taskqueue,
1080+
WithStartOperation: updOp,
1081+
}, workflowType,
1082+
)
1083+
1084+
var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError
1085+
require.ErrorAs(s.T(), err, &expectedErr)
1086+
require.ErrorContains(s.T(), err, tt.expectedErr)
1087+
})
1088+
}
1089+
}
1090+
9791091
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() {
9801092
s.workflowServiceClient.EXPECT().
9811093
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).

0 commit comments

Comments
 (0)