Skip to content

Commit e85a098

Browse files
authored
Update-with-Start operation (#1579)
Adds support for Update-with-Start, using the MultiOperation API (temporalio/api#367).
1 parent 5364a47 commit e85a098

File tree

8 files changed

+779
-115
lines changed

8 files changed

+779
-115
lines changed

.github/workflows/docker/dynamic-config-custom.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ frontend.enableUpdateWorkflowExecution:
77
- value: true
88
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
99
- value: true
10+
frontend.enableExecuteMultiOperation:
11+
- value: true
1012
system.enableEagerWorkflowStart:
1113
- value: true
1214
frontend.workerVersioningRuleAPIs:

client/client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,16 @@ type (
162162
// StartWorkflowOptions configuration parameters for starting a workflow execution.
163163
StartWorkflowOptions = internal.StartWorkflowOptions
164164

165+
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
166+
// For example, use NewUpdateWithStartWorkflowOperation to perform Update-with-Start.
167+
// NOTE: Experimental
168+
WithStartWorkflowOperation = internal.WithStartWorkflowOperation
169+
170+
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
171+
// See NewUpdateWithStartWorkflowOperation for details.
172+
// NOTE: Experimental
173+
UpdateWithStartWorkflowOperation = internal.UpdateWithStartWorkflowOperation
174+
165175
// HistoryEventIterator is a iterator which can return history events.
166176
HistoryEventIterator = internal.HistoryEventIterator
167177

@@ -921,6 +931,14 @@ type MetricsTimer = metrics.Timer
921931
// MetricsNopHandler is a noop handler that does nothing with the metrics.
922932
var MetricsNopHandler = metrics.NopHandler
923933

934+
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation to perform Update-with-Start.
935+
// After executing Client.ExecuteWorkflow with the UpdateWithStartWorkflow in the start options,
936+
// the update result can be obtained.
937+
// NOTE: Experimental
938+
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
939+
return internal.NewUpdateWithStartWorkflowOperation(options)
940+
}
941+
924942
// Dial creates an instance of a workflow client. This will attempt to connect
925943
// to the server eagerly and will return an error if the server is not
926944
// available.

internal/client.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package internal
2727
import (
2828
"context"
2929
"crypto/tls"
30+
"errors"
3031
"fmt"
3132
"sync/atomic"
3233
"time"
@@ -643,9 +644,23 @@ type (
643644
// Optional: defaulted to Fail.
644645
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
645646

647+
// WithStartOperation - Operation to execute with Workflow Start.
648+
// For example, see NewUpdateWithStartWorkflowOperation to perform Update-with-Start. Note that if the workflow is
649+
// already running and WorkflowIDConflictPolicy is set to UseExisting, the start is skipped and only the
650+
// operation is executed. If instead the policy is set to Fail (the default), nothing is executed and
651+
// an error will be returned (i.e. the option WorkflowExecutionErrorWhenAlreadyStarted is ignored).
652+
// This option will be ignored when used with Client.SignalWithStartWorkflow.
653+
//
654+
// Optional: defaults to nil.
655+
//
656+
// NOTE: Experimental
657+
WithStartOperation WithStartWorkflowOperation
658+
646659
// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
647-
// workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false,
648-
// rather than erroring a WorkflowRun instance representing the current or last run will be returned.
660+
// workflow id has already been used and WorkflowIDReusePolicy or WorkflowIDConflictPolicy would
661+
// disallow a re-run. If it is set to false, rather than erroring a WorkflowRun instance representing
662+
// the current or last run will be returned. However, when WithStartOperation is set, this field is ignored and
663+
// the WorkflowIDConflictPolicy UseExisting must be used instead to prevent erroring.
649664
//
650665
// Optional: defaults to false
651666
WorkflowExecutionErrorWhenAlreadyStarted bool
@@ -714,6 +729,24 @@ type (
714729
links []*commonpb.Link
715730
}
716731

732+
// WithStartWorkflowOperation is a type of operation that can be executed as part of a workflow start.
733+
WithStartWorkflowOperation interface {
734+
isWithStartWorkflowOperation()
735+
}
736+
737+
// UpdateWithStartWorkflowOperation is used to perform Update-with-Start.
738+
// See NewUpdateWithStartWorkflowOperation for details.
739+
UpdateWithStartWorkflowOperation struct {
740+
input *ClientUpdateWorkflowInput
741+
// flag to ensure the operation is only executed once
742+
executed atomic.Bool
743+
// channel to indicate that handle or err is available
744+
doneCh chan struct{}
745+
// handle and err cannot be accessed before doneCh is closed
746+
handle WorkflowUpdateHandle
747+
err error
748+
}
749+
717750
// RetryPolicy defines the retry policy.
718751
// Note that the history of activity with retry policy will be different: the started event will be written down into
719752
// history only when the activity completes or "finally" timeouts/fails. And the started event only records the last
@@ -1004,6 +1037,50 @@ func DialCloudOperationsClient(ctx context.Context, options CloudOperationsClien
10041037
}, nil
10051038
}
10061039

1040+
// NewUpdateWithStartWorkflowOperation returns an UpdateWithStartWorkflowOperation that can be used to perform Update-with-Start.
1041+
func NewUpdateWithStartWorkflowOperation(options UpdateWorkflowOptions) *UpdateWithStartWorkflowOperation {
1042+
res := &UpdateWithStartWorkflowOperation{doneCh: make(chan struct{})}
1043+
1044+
input, err := createUpdateWorkflowInput(options)
1045+
if err != nil {
1046+
res.set(nil, err)
1047+
} else if options.RunID != "" {
1048+
res.set(nil, errors.New("RunID cannot be set because the workflow might not be running"))
1049+
}
1050+
if options.FirstExecutionRunID != "" {
1051+
res.set(nil, errors.New("FirstExecutionRunID cannot be set because the workflow might not be running"))
1052+
} else {
1053+
res.input = input
1054+
}
1055+
1056+
return res
1057+
}
1058+
1059+
// Get blocks until a server response has been received; or the context deadline is exceeded.
1060+
func (op *UpdateWithStartWorkflowOperation) Get(ctx context.Context) (WorkflowUpdateHandle, error) {
1061+
select {
1062+
case <-op.doneCh:
1063+
return op.handle, op.err
1064+
case <-ctx.Done():
1065+
return nil, ctx.Err()
1066+
}
1067+
}
1068+
1069+
func (op *UpdateWithStartWorkflowOperation) markExecuted() error {
1070+
if op.executed.Swap(true) {
1071+
return fmt.Errorf("was already executed")
1072+
}
1073+
return nil
1074+
}
1075+
1076+
func (op *UpdateWithStartWorkflowOperation) set(handle WorkflowUpdateHandle, err error) {
1077+
op.handle = handle
1078+
op.err = err
1079+
close(op.doneCh)
1080+
}
1081+
1082+
func (op *UpdateWithStartWorkflowOperation) isWithStartWorkflowOperation() {}
1083+
10071084
// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces.
10081085
func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) {
10091086
// Initialize root tags

internal/cmd/build/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ import (
4141

4242
_ "github.com/BurntSushi/toml"
4343
_ "github.com/kisielk/errcheck/errcheck"
44+
_ "honnef.co/go/tools/staticcheck"
45+
4446
"go.temporal.io/sdk/client"
4547
"go.temporal.io/sdk/testsuite"
46-
_ "honnef.co/go/tools/staticcheck"
4748
)
4849

4950
func main() {
@@ -145,6 +146,7 @@ func (b *builder) integrationTest() error {
145146
},
146147
LogLevel: "warn",
147148
ExtraArgs: []string{
149+
"--dynamic-config-value", "frontend.enableExecuteMultiOperation=true",
148150
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
149151
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
150152
"--dynamic-config-value", "frontend.workerVersioningRuleAPIs=true",

0 commit comments

Comments
 (0)