-
Notifications
You must be signed in to change notification settings - Fork 0
break out startUpdate
vs executeUpdate
#107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,4 +20,5 @@ core/rust/lib* | |
.devenv | ||
.idea/ | ||
.pre-commit-config.yaml | ||
.helix | ||
.helix/ | ||
.zed/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,8 @@ module Temporal.Client ( | |
-- * Sending Updates to Workflows | ||
UpdateOptions (..), | ||
UpdateLifecycleStage (..), | ||
update, | ||
startUpdate, | ||
executeUpdate, | ||
|
||
-- * Producing handles for existing workflows | ||
getHandle, | ||
|
@@ -114,10 +115,10 @@ import qualified Proto.Temporal.Api.Update.V1.Message_Fields as Update | |
import Proto.Temporal.Api.Workflowservice.V1.RequestResponse ( | ||
GetWorkflowExecutionHistoryRequest, | ||
GetWorkflowExecutionHistoryResponse, | ||
PollWorkflowExecutionUpdateRequest, | ||
QueryWorkflowRequest, | ||
QueryWorkflowResponse, | ||
UpdateWorkflowExecutionRequest, | ||
UpdateWorkflowExecutionResponse, | ||
) | ||
import qualified Proto.Temporal.Api.Workflowservice.V1.RequestResponse_Fields as RR | ||
import qualified Proto.Temporal.Api.Workflowservice.V1.RequestResponse_Fields as WF | ||
|
@@ -139,7 +140,7 @@ import Unsafe.Coerce | |
-- WorkflowClient stuff | ||
|
||
workflowClient | ||
:: MonadIO m | ||
:: (MonadIO m) | ||
=> Core.Client | ||
-> WorkflowClientConfig | ||
-> m WorkflowClient | ||
|
@@ -155,7 +156,7 @@ class HasWorkflowClient m where | |
askWorkflowClient :: m WorkflowClient | ||
|
||
|
||
instance {-# OVERLAPS #-} Monad m => HasWorkflowClient (ReaderT WorkflowClient m) where | ||
instance {-# OVERLAPS #-} (Monad m) => HasWorkflowClient (ReaderT WorkflowClient m) where | ||
askWorkflowClient = ask | ||
|
||
|
||
|
@@ -535,16 +536,14 @@ startFromPayloads k@(KnownWorkflow codec _) wfId opts payloads = do | |
& WF.namespace .~ rawNamespace c.clientConfig.namespace | ||
& WF.workflowId .~ rawWorkflowId wfId' | ||
& WF.workflowType | ||
.~ ( defMessage & Common.name .~ rawWorkflowType wfName | ||
) | ||
.~ (defMessage & Common.name .~ rawWorkflowType wfName) | ||
& WF.taskQueue | ||
.~ ( defMessage | ||
& Common.name .~ tq | ||
& TQ.kind .~ TASK_QUEUE_KIND_UNSPECIFIED | ||
) | ||
& WF.input | ||
.~ ( defMessage & Common.vec'payloads .~ (convertToProtoPayload <$> payloads'') | ||
) | ||
.~ (defMessage & Common.vec'payloads .~ (convertToProtoPayload <$> payloads'')) | ||
& WF.maybe'workflowExecutionTimeout .~ (durationToProto <$> opts'.timeouts.executionTimeout) | ||
& WF.maybe'workflowRunTimeout .~ (durationToProto <$> opts'.timeouts.runTimeout) | ||
& WF.maybe'workflowTaskTimeout .~ (durationToProto <$> opts'.timeouts.taskTimeout) | ||
|
@@ -667,8 +666,7 @@ signalWithStart (workflowRef -> k@(KnownWorkflow codec _)) wfId opts (signalRef | |
& RR.namespace .~ rawNamespace c.clientConfig.namespace | ||
& RR.workflowId .~ rawWorkflowId opts'.signalWithStartWorkflowId | ||
& RR.workflowType | ||
.~ ( defMessage & Common.name .~ rawWorkflowType opts'.signalWithStartWorkflowType | ||
) | ||
.~ (defMessage & Common.name .~ rawWorkflowType opts'.signalWithStartWorkflowType) | ||
& WF.requestId .~ UUID.toText reqId | ||
& RR.searchAttributes .~ (defMessage & Common.indexedFields .~ searchAttrs) | ||
& RR.taskQueue | ||
|
@@ -677,8 +675,7 @@ signalWithStart (workflowRef -> k@(KnownWorkflow codec _)) wfId opts (signalRef | |
& TQ.kind .~ TASK_QUEUE_KIND_UNSPECIFIED | ||
) | ||
& RR.input | ||
.~ ( defMessage & Common.vec'payloads .~ fmap convertToProtoPayload wfArgs' | ||
) | ||
.~ (defMessage & Common.vec'payloads .~ fmap convertToProtoPayload wfArgs') | ||
& RR.maybe'workflowExecutionTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.executionTimeout) | ||
& RR.maybe'workflowRunTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.runTimeout) | ||
& RR.maybe'workflowTaskTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.taskTimeout) | ||
|
@@ -733,7 +730,7 @@ data TerminationOptions = TerminationOptions | |
{- | Terminating a workflow immediately signals to the worker that the workflow should | ||
cease execution. The workflow will not be given a chance to react to the termination. | ||
-} | ||
terminate :: MonadIO m => WorkflowHandle a -> TerminationOptions -> m () | ||
terminate :: (MonadIO m) => WorkflowHandle a -> TerminationOptions -> m () | ||
terminate h req = | ||
void $ | ||
throwEither $ | ||
|
@@ -773,7 +770,7 @@ instance Exception WorkflowContinuedAsNewException | |
|
||
This is useful for Workflow replay tests. | ||
-} | ||
fetchHistory :: MonadIO m => WorkflowHandle a -> m History | ||
fetchHistory :: (MonadIO m) => WorkflowHandle a -> m History | ||
fetchHistory h = do | ||
let startingReq :: GetWorkflowExecutionHistoryRequest | ||
startingReq = | ||
|
@@ -795,7 +792,7 @@ fetchHistory h = do | |
|
||
|
||
applyNewExecutionRunId | ||
:: HasField s "newExecutionRunId" Text | ||
:: (HasField s "newExecutionRunId" Text) | ||
=> s | ||
-> GetWorkflowExecutionHistoryRequest | ||
-> Maybe GetWorkflowExecutionHistoryRequest | ||
|
@@ -921,45 +918,28 @@ data UpdateLifecycleStage | |
data UpdateOptions = UpdateOptions | ||
{ updateId :: UpdateId | ||
, updateHeaders :: Map Text Payload | ||
, waitPolicy :: UpdateLifecycleStage | ||
} | ||
|
||
|
||
{- | An Update is a synchronous message to a Workflow Execution, which waits until the message | ||
handling is complete, and returns a result or error response. | ||
|
||
The Update handler can do anything that normal Workflow code can do. | ||
|
||
Conceptually, an Update is similar to a combination of Signal (which can do anything normal | ||
Workflow code can do, but cannot return a result) and Query (which can return a result, but | ||
cannot affect the Workflow state or execution). By combining those capabilities it supports | ||
operations that neither Signal nor Query can. | ||
|
||
Update handlers can optionally include a validator, which can return a boolean indicating whether | ||
the update is valid and should be processed by the Workflow. If the validator returns true, | ||
the update handler is called. If it returns false (or throws an exception), an error is returned | ||
to the client. | ||
-} | ||
update | ||
:: forall m args result a error | ||
. (MonadIO m, HasWorkflowClient m) | ||
startUpdateFromPayloads | ||
:: (MonadIO m, HasWorkflowClient m) | ||
=> WorkflowHandle a | ||
-> KnownUpdate args result error | ||
-> UpdateOptions | ||
-> args | ||
:->: m result | ||
update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts = withArgs @args @(m result) updateCodec $ \inputs -> liftIO $ do | ||
inputs' <- sequence inputs | ||
-> V.Vector UnencodedPayload | ||
-> m (UpdateHandle result) | ||
startUpdateFromPayloads h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts payloads = do | ||
payloads' <- liftIO $ sequence payloads | ||
let processor = h.workflowHandleClient.clientConfig.payloadProcessor | ||
baseInput = | ||
UpdateWorkflowInput | ||
{ updateWorkflowType = updateName | ||
, updateWorkflowRunId = h.workflowHandleRunId | ||
, updateWorkflowWorkflowId = h.workflowHandleWorkflowId | ||
, updateWorkflowHeaders = opts.updateHeaders | ||
, updateWorkflowArgs = inputs' | ||
, updateWorkflowArgs = payloads' | ||
} | ||
eRes <- h.workflowHandleClient.clientConfig.interceptors.updateWorkflow baseInput $ \input -> do | ||
updateHandle <- liftIO $ h.workflowHandleClient.clientConfig.interceptors.updateWorkflow baseInput $ \input -> do | ||
updateArgs <- processorEncodePayloads processor input.updateWorkflowArgs | ||
headerPayloads <- processorEncodePayloads processor input.updateWorkflowHeaders | ||
let msg :: UpdateWorkflowExecutionRequest | ||
|
@@ -974,11 +954,7 @@ update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts | |
& WF.firstExecutionRunId .~ maybe "" rawRunId h.workflowHandleFirstExecutionRunId | ||
& WF.waitPolicy | ||
.~ ( defMessage | ||
& Update.lifecycleStage .~ case opts.waitPolicy of | ||
UpdateLifecycleStageUnspecified -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED | ||
UpdateLifecycleStageAdmitted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED | ||
UpdateLifecycleStageAccepted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED | ||
UpdateLifecycleStageCompleted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED | ||
& Update.lifecycleStage .~ Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially thought this was wrong, but I guess you're able to poll later for other lifecycle stages, so 👍🏻 |
||
) | ||
& WF.request | ||
.~ ( defMessage | ||
|
@@ -995,14 +971,112 @@ update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts | |
) | ||
) | ||
|
||
(res :: UpdateWorkflowExecutionResponse) <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.updateWorkflowExecution h.workflowHandleClient.clientCore msg | ||
res <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.updateWorkflowExecution h.workflowHandleClient.clientCore msg | ||
|
||
-- We're not going to look for a successful result yet (waitForUpdateOutcome will do that), but we do want to check for failures | ||
-- so that we can report validataion failures via UpdateFailure rather than RpcError. | ||
case res ^. Update.maybe'outcome of | ||
Nothing -> throwIO $ ValueError "No return value payloads provided by update response" | ||
Just outcome -> do | ||
case outcome ^. Update.maybe'value of | ||
Just (Update.Outcome'Success payloads) -> case (payloads ^. Common.vec'payloads) V.!? 0 of | ||
Nothing -> throwIO $ ValueError "No return value payloads provided by update response" | ||
Just p -> pure $ convertFromProtoPayload p | ||
Just (Update.Outcome'Failure failure) -> throwIO $ UpdateFailure failure | ||
Nothing -> error "Unsupported update result" | ||
payloadProcessorDecode processor eRes >>= either (throwIO . ValueError) pure >>= decode updateCodec >>= either (throwIO . ValueError) pure | ||
_ -> pure () | ||
Nothing -> pure () | ||
|
||
let updateId = UpdateId (res ^. (RR.updateRef . Update.updateId)) | ||
workflowId = WorkflowId (res ^. (RR.updateRef . WF.workflowExecution . WF.workflowId)) | ||
runId = RunId (res ^. (RR.updateRef . WF.workflowExecution . WF.runId)) | ||
|
||
pure $ | ||
UpdateHandle | ||
{ updateHandleUpdateId = updateId | ||
, updateHandleWorkflowId = workflowId | ||
, updateHandleWorkflowRunId = Just runId | ||
, updateHandleReadResult = either (throwIO . ValueError) pure <=< payloadProcessorDecode processor | ||
, updateHandleWorkflowClient = c | ||
, updateHandleType = updateName | ||
} | ||
pure $ | ||
updateHandle | ||
{ updateHandleReadResult = \a -> do | ||
updateHandleReadResult updateHandle a >>= \b -> do | ||
result <- decode updateCodec b | ||
either (throwIO . ValueError) pure result | ||
} | ||
|
||
|
||
{- | Begin a new Update operation. | ||
|
||
This function does not wait for the Update to complete. Instead, it returns an 'UpdateHandle' | ||
that can be used to wait for the Update to complete or perform other operations. However, it | ||
throws an UpdateFailed exception if an update's validator fails. | ||
|
||
This can be used to "fire-and-forget" an Update by discarding the handle. | ||
-} | ||
startUpdate | ||
:: forall m args result a error | ||
. (MonadIO m, HasWorkflowClient m) | ||
=> WorkflowHandle a | ||
-> KnownUpdate args result error | ||
-> UpdateOptions | ||
-> args | ||
:->: m (UpdateHandle result) | ||
startUpdate wfH u@(KnownUpdate updateCodec _) opts = withArgs @args @(m (UpdateHandle result)) updateCodec $ \inputs -> do | ||
startUpdateFromPayloads wfH u opts inputs | ||
|
||
|
||
{- | Given an 'UpdateHandle', wait for the update operation to complete and return the result. | ||
|
||
This function will block until the update completes, and will return the result of the update | ||
or throw an exception if the update failed. | ||
-} | ||
waitForUpdateOutcome :: (MonadIO m) => UpdateHandle a -> m a | ||
waitForUpdateOutcome h = do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we name this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just pushed a commit doing this |
||
let msg :: PollWorkflowExecutionUpdateRequest | ||
msg = | ||
defMessage | ||
& WF.namespace .~ rawNamespace h.updateHandleWorkflowClient.clientConfig.namespace | ||
& WF.updateRef | ||
.~ ( defMessage | ||
& WF.workflowExecution | ||
.~ ( defMessage | ||
& WF.workflowId .~ rawWorkflowId h.updateHandleWorkflowId | ||
& WF.runId .~ maybe "" rawRunId h.updateHandleWorkflowRunId | ||
) | ||
& Update.updateId .~ rawUpdateId h.updateHandleUpdateId | ||
) | ||
& WF.identity .~ Core.identity (Core.clientConfig h.updateHandleWorkflowClient.clientCore) | ||
& WF.waitPolicy | ||
.~ ( defMessage | ||
& Update.lifecycleStage .~ Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED | ||
) | ||
go = do | ||
res <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.pollWorkflowExecutionUpdate h.updateHandleWorkflowClient.clientCore msg | ||
case res ^. Update.maybe'outcome of | ||
Nothing -> go | ||
Just outcome -> do | ||
case outcome ^. Update.maybe'value of | ||
Just (Update.Outcome'Success payloads) -> case (payloads ^. Common.vec'payloads) V.!? 0 of | ||
Nothing -> throwIO $ ValueError "No return value payloads provided by update response" | ||
Just p -> pure $ convertFromProtoPayload p | ||
Just (Update.Outcome'Failure failure) -> throwIO $ UpdateFailure failure | ||
Nothing -> error "Unsupported update result" | ||
payload <- liftIO go | ||
liftIO $ h.updateHandleReadResult payload | ||
|
||
|
||
{- | Run an Update operation, synchronously waiting for it to complete. | ||
|
||
This function will block until the update completes, and will return the result of the update | ||
or throw an exception if the update or its validator failed. | ||
-} | ||
executeUpdate | ||
:: forall m args result a error | ||
. (MonadIO m, HasWorkflowClient m) | ||
=> WorkflowHandle a | ||
-> KnownUpdate args result error | ||
-> UpdateOptions | ||
-> args | ||
:->: m result | ||
executeUpdate wfH u@(KnownUpdate updateCodec _) opts = withArgs @args @(m result) updateCodec $ \inputs -> do | ||
updateHandle <- startUpdateFromPayloads wfH u opts inputs | ||
waitForUpdateOutcome updateHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure if the precommit hook made these formatting changes, or zed (which i took for a test drive here), or what, but i can change it back if anyone hates them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried zed and it did this too. I set
"format_on_save": "off"
in the config to make it a bit less aggressive about doing this. I don't particularly mind the change though