Skip to content

Commit b2b75c9

Browse files
authored
[Nexus] Set OnConflictOptions for WorkflowRunOperation (#1797)
* [Nexus] Set OnConflictOptions for WorkflowRunOperation * address comments * set WorkflowExecutionErrorWhenAlreadyStarted to true
1 parent f5882aa commit b2b75c9

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

internal/client.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -647,8 +647,11 @@ type (
647647
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
648648
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
649649
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
650-
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
651-
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
650+
// the current or last run will be returned. However, this field is ignored in the following cases:
651+
// - when WithStartOperation is set;
652+
// - in the Nexus WorkflowRunOperation.
653+
// When this field is ignored, you must set WorkflowIDConflictPolicy to UseExisting to prevent
654+
// erroring.
652655
//
653656
// Optional: defaults to false
654657
WorkflowExecutionErrorWhenAlreadyStarted bool
@@ -742,6 +745,14 @@ type (
742745
callbacks []*commonpb.Callback
743746
// links. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
744747
links []*commonpb.Link
748+
749+
// OnConflictOptions - Optional workflow ID conflict options used in conjunction with conflict policy
750+
// WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is already
751+
// running, the options specifies the actions to be taken on the running workflow. If not set or use
752+
// together with any other WorkflowIDConflictPolicy, this parameter is ignored.
753+
//
754+
// NOTE: Only settable by the SDK -- e.g. [temporalnexus.workflowRunOperation].
755+
onConflictOptions *OnConflictOptions
745756
}
746757

747758
// WithStartWorkflowOperation defines how to start a workflow when using UpdateWithStartWorkflow.
@@ -1195,3 +1206,14 @@ func SetCallbacksOnStartWorkflowOptions(opts *StartWorkflowOptions, callbacks []
11951206
func SetLinksOnStartWorkflowOptions(opts *StartWorkflowOptions, links []*commonpb.Link) {
11961207
opts.links = links
11971208
}
1209+
1210+
// SetOnConflictOptionsOnStartWorkflowOptions is an internal only method for setting conflict
1211+
// options on StartWorkflowOptions.
1212+
// OnConflictOptions are purposefully not exposed to users for the time being.
1213+
func SetOnConflictOptionsOnStartWorkflowOptions(opts *StartWorkflowOptions) {
1214+
opts.onConflictOptions = &OnConflictOptions{
1215+
AttachRequestID: true,
1216+
AttachCompletionCallbacks: true,
1217+
AttachLinks: true,
1218+
}
1219+
}

internal/internal_workflow_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,6 +1678,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
16781678
CompletionCallbacks: in.Options.callbacks,
16791679
Links: in.Options.links,
16801680
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
1681+
OnConflictOptions: in.Options.onConflictOptions.ToProto(),
16811682
}
16821683

16831684
startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)

internal/internal_workflow_execution_options.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ type (
8181
// Required if behavior is [VersioningBehaviorPinned]. Must be absent if behavior is not [VersioningBehaviorPinned].
8282
PinnedVersion string
8383
}
84+
85+
// OnConflictOptions specifies the actions to be taken when using the workflow ID conflict policy
86+
// USE_EXISTING.
87+
//
88+
// NOTE: Experimental
89+
OnConflictOptions struct {
90+
AttachRequestID bool
91+
AttachCompletionCallbacks bool
92+
AttachLinks bool
93+
}
8494
)
8595

8696
// Mapping WorkflowExecutionOptions field names to proto ones.
@@ -209,3 +219,14 @@ func (r *UpdateWorkflowExecutionOptionsRequest) validateAndConvertToProto(namesp
209219

210220
return requestMsg, nil
211221
}
222+
223+
func (o *OnConflictOptions) ToProto() *workflowpb.OnConflictOptions {
224+
if o == nil {
225+
return nil
226+
}
227+
return &workflowpb.OnConflictOptions{
228+
AttachRequestId: o.AttachRequestID,
229+
AttachCompletionCallbacks: o.AttachCompletionCallbacks,
230+
AttachLinks: o.AttachLinks,
231+
}
232+
}

temporalnexus/operation.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ type WorkflowRunOperationOptions[I, O any] struct {
119119
// The options returned must include a workflow ID that is deterministically generated from the input in order
120120
// for the operation to be idempotent as the request to start the operation may be retried.
121121
// TaskQueue is optional and defaults to the current worker's task queue.
122+
// WorkflowExecutionErrorWhenAlreadyStarted is ignored and always set to true.
123+
// WorkflowIDConflictPolicy is by default set to fail if a workflow is already running. That is,
124+
// if a caller executes another operation that starts the same workflow, it will fail. You can set
125+
// it to WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING to attach the caller's callback to the existing
126+
// running workflow. This way, all attached callers will be notified when the workflow completes.
122127
GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
123128
// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
124129
// and GetOptions.
@@ -382,6 +387,13 @@ func ExecuteUntypedWorkflow[R any](
382387
}
383388
}
384389
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)
390+
internal.SetOnConflictOptionsOnStartWorkflowOptions(&startWorkflowOptions)
391+
392+
// This makes sure that ExecuteWorkflow will respect the WorkflowIDConflictPolicy, ie., if the
393+
// conflict policy is to fail (default value), then ExecuteWorkflow will return an error if the
394+
// workflow already running. For Nexus, this ensures that operation has only started successfully
395+
// when the callback has been attached to the workflow (new or existing running workflow).
396+
startWorkflowOptions.WorkflowExecutionErrorWhenAlreadyStarted = true
385397

386398
run, err := GetClient(ctx).ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
387399
if err != nil {

0 commit comments

Comments
 (0)