Skip to content

break out startUpdate vs executeUpdate #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ core/rust/lib*
.devenv
.idea/
.pre-commit-config.yaml
.helix
.helix/
.zed/
178 changes: 126 additions & 52 deletions sdk/src/Temporal/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ module Temporal.Client (
-- * Sending Updates to Workflows
UpdateOptions (..),
UpdateLifecycleStage (..),
update,
startUpdate,
executeUpdate,

-- * Producing handles for existing workflows
getHandle,
Expand Down Expand Up @@ -114,10 +115,10 @@ import qualified Proto.Temporal.Api.Update.V1.Message_Fields as Update
import Proto.Temporal.Api.Workflowservice.V1.RequestResponse (
GetWorkflowExecutionHistoryRequest,
GetWorkflowExecutionHistoryResponse,
PollWorkflowExecutionUpdateRequest,
QueryWorkflowRequest,
QueryWorkflowResponse,
UpdateWorkflowExecutionRequest,
UpdateWorkflowExecutionResponse,
)
import qualified Proto.Temporal.Api.Workflowservice.V1.RequestResponse_Fields as RR
import qualified Proto.Temporal.Api.Workflowservice.V1.RequestResponse_Fields as WF
Expand All @@ -139,7 +140,7 @@ import Unsafe.Coerce
-- WorkflowClient stuff

workflowClient
:: MonadIO m
:: (MonadIO m)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure if the precommit hook made these formatting changes, or zed (which i took for a test drive here), or what, but i can change it back if anyone hates them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried zed and it did this too. I set "format_on_save": "off" in the config to make it a bit less aggressive about doing this. I don't particularly mind the change though

=> Core.Client
-> WorkflowClientConfig
-> m WorkflowClient
Expand All @@ -155,7 +156,7 @@ class HasWorkflowClient m where
askWorkflowClient :: m WorkflowClient


instance {-# OVERLAPS #-} Monad m => HasWorkflowClient (ReaderT WorkflowClient m) where
instance {-# OVERLAPS #-} (Monad m) => HasWorkflowClient (ReaderT WorkflowClient m) where
askWorkflowClient = ask


Expand Down Expand Up @@ -535,16 +536,14 @@ startFromPayloads k@(KnownWorkflow codec _) wfId opts payloads = do
& WF.namespace .~ rawNamespace c.clientConfig.namespace
& WF.workflowId .~ rawWorkflowId wfId'
& WF.workflowType
.~ ( defMessage & Common.name .~ rawWorkflowType wfName
)
.~ (defMessage & Common.name .~ rawWorkflowType wfName)
& WF.taskQueue
.~ ( defMessage
& Common.name .~ tq
& TQ.kind .~ TASK_QUEUE_KIND_UNSPECIFIED
)
& WF.input
.~ ( defMessage & Common.vec'payloads .~ (convertToProtoPayload <$> payloads'')
)
.~ (defMessage & Common.vec'payloads .~ (convertToProtoPayload <$> payloads''))
& WF.maybe'workflowExecutionTimeout .~ (durationToProto <$> opts'.timeouts.executionTimeout)
& WF.maybe'workflowRunTimeout .~ (durationToProto <$> opts'.timeouts.runTimeout)
& WF.maybe'workflowTaskTimeout .~ (durationToProto <$> opts'.timeouts.taskTimeout)
Expand Down Expand Up @@ -667,8 +666,7 @@ signalWithStart (workflowRef -> k@(KnownWorkflow codec _)) wfId opts (signalRef
& RR.namespace .~ rawNamespace c.clientConfig.namespace
& RR.workflowId .~ rawWorkflowId opts'.signalWithStartWorkflowId
& RR.workflowType
.~ ( defMessage & Common.name .~ rawWorkflowType opts'.signalWithStartWorkflowType
)
.~ (defMessage & Common.name .~ rawWorkflowType opts'.signalWithStartWorkflowType)
& WF.requestId .~ UUID.toText reqId
& RR.searchAttributes .~ (defMessage & Common.indexedFields .~ searchAttrs)
& RR.taskQueue
Expand All @@ -677,8 +675,7 @@ signalWithStart (workflowRef -> k@(KnownWorkflow codec _)) wfId opts (signalRef
& TQ.kind .~ TASK_QUEUE_KIND_UNSPECIFIED
)
& RR.input
.~ ( defMessage & Common.vec'payloads .~ fmap convertToProtoPayload wfArgs'
)
.~ (defMessage & Common.vec'payloads .~ fmap convertToProtoPayload wfArgs')
& RR.maybe'workflowExecutionTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.executionTimeout)
& RR.maybe'workflowRunTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.runTimeout)
& RR.maybe'workflowTaskTimeout .~ (durationToProto <$> opts'.signalWithStartOptions.timeouts.taskTimeout)
Expand Down Expand Up @@ -733,7 +730,7 @@ data TerminationOptions = TerminationOptions
{- | Terminating a workflow immediately signals to the worker that the workflow should
cease execution. The workflow will not be given a chance to react to the termination.
-}
terminate :: MonadIO m => WorkflowHandle a -> TerminationOptions -> m ()
terminate :: (MonadIO m) => WorkflowHandle a -> TerminationOptions -> m ()
terminate h req =
void $
throwEither $
Expand Down Expand Up @@ -773,7 +770,7 @@ instance Exception WorkflowContinuedAsNewException

This is useful for Workflow replay tests.
-}
fetchHistory :: MonadIO m => WorkflowHandle a -> m History
fetchHistory :: (MonadIO m) => WorkflowHandle a -> m History
fetchHistory h = do
let startingReq :: GetWorkflowExecutionHistoryRequest
startingReq =
Expand All @@ -795,7 +792,7 @@ fetchHistory h = do


applyNewExecutionRunId
:: HasField s "newExecutionRunId" Text
:: (HasField s "newExecutionRunId" Text)
=> s
-> GetWorkflowExecutionHistoryRequest
-> Maybe GetWorkflowExecutionHistoryRequest
Expand Down Expand Up @@ -921,45 +918,28 @@ data UpdateLifecycleStage
data UpdateOptions = UpdateOptions
{ updateId :: UpdateId
, updateHeaders :: Map Text Payload
, waitPolicy :: UpdateLifecycleStage
}


{- | An Update is a synchronous message to a Workflow Execution, which waits until the message
handling is complete, and returns a result or error response.

The Update handler can do anything that normal Workflow code can do.

Conceptually, an Update is similar to a combination of Signal (which can do anything normal
Workflow code can do, but cannot return a result) and Query (which can return a result, but
cannot affect the Workflow state or execution). By combining those capabilities it supports
operations that neither Signal nor Query can.

Update handlers can optionally include a validator, which can return a boolean indicating whether
the update is valid and should be processed by the Workflow. If the validator returns true,
the update handler is called. If it returns false (or throws an exception), an error is returned
to the client.
-}
update
:: forall m args result a error
. (MonadIO m, HasWorkflowClient m)
startUpdateFromPayloads
:: (MonadIO m, HasWorkflowClient m)
=> WorkflowHandle a
-> KnownUpdate args result error
-> UpdateOptions
-> args
:->: m result
update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts = withArgs @args @(m result) updateCodec $ \inputs -> liftIO $ do
inputs' <- sequence inputs
-> V.Vector UnencodedPayload
-> m (UpdateHandle result)
startUpdateFromPayloads h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts payloads = do
payloads' <- liftIO $ sequence payloads
let processor = h.workflowHandleClient.clientConfig.payloadProcessor
baseInput =
UpdateWorkflowInput
{ updateWorkflowType = updateName
, updateWorkflowRunId = h.workflowHandleRunId
, updateWorkflowWorkflowId = h.workflowHandleWorkflowId
, updateWorkflowHeaders = opts.updateHeaders
, updateWorkflowArgs = inputs'
, updateWorkflowArgs = payloads'
}
eRes <- h.workflowHandleClient.clientConfig.interceptors.updateWorkflow baseInput $ \input -> do
updateHandle <- liftIO $ h.workflowHandleClient.clientConfig.interceptors.updateWorkflow baseInput $ \input -> do
updateArgs <- processorEncodePayloads processor input.updateWorkflowArgs
headerPayloads <- processorEncodePayloads processor input.updateWorkflowHeaders
let msg :: UpdateWorkflowExecutionRequest
Expand All @@ -974,11 +954,7 @@ update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts
& WF.firstExecutionRunId .~ maybe "" rawRunId h.workflowHandleFirstExecutionRunId
& WF.waitPolicy
.~ ( defMessage
& Update.lifecycleStage .~ case opts.waitPolicy of
UpdateLifecycleStageUnspecified -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED
UpdateLifecycleStageAdmitted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED
UpdateLifecycleStageAccepted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
UpdateLifecycleStageCompleted -> Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
& Update.lifecycleStage .~ Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought this was wrong, but I guess you're able to poll later for other lifecycle stages, so 👍🏻

)
& WF.request
.~ ( defMessage
Expand All @@ -995,14 +971,112 @@ update h@(WorkflowHandle _ _ c _ _ _) (KnownUpdate updateCodec updateName) opts
)
)

(res :: UpdateWorkflowExecutionResponse) <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.updateWorkflowExecution h.workflowHandleClient.clientCore msg
res <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.updateWorkflowExecution h.workflowHandleClient.clientCore msg

-- We're not going to look for a successful result yet (waitForUpdateOutcome will do that), but we do want to check for failures
-- so that we can report validataion failures via UpdateFailure rather than RpcError.
case res ^. Update.maybe'outcome of
Nothing -> throwIO $ ValueError "No return value payloads provided by update response"
Just outcome -> do
case outcome ^. Update.maybe'value of
Just (Update.Outcome'Success payloads) -> case (payloads ^. Common.vec'payloads) V.!? 0 of
Nothing -> throwIO $ ValueError "No return value payloads provided by update response"
Just p -> pure $ convertFromProtoPayload p
Just (Update.Outcome'Failure failure) -> throwIO $ UpdateFailure failure
Nothing -> error "Unsupported update result"
payloadProcessorDecode processor eRes >>= either (throwIO . ValueError) pure >>= decode updateCodec >>= either (throwIO . ValueError) pure
_ -> pure ()
Nothing -> pure ()

let updateId = UpdateId (res ^. (RR.updateRef . Update.updateId))
workflowId = WorkflowId (res ^. (RR.updateRef . WF.workflowExecution . WF.workflowId))
runId = RunId (res ^. (RR.updateRef . WF.workflowExecution . WF.runId))

pure $
UpdateHandle
{ updateHandleUpdateId = updateId
, updateHandleWorkflowId = workflowId
, updateHandleWorkflowRunId = Just runId
, updateHandleReadResult = either (throwIO . ValueError) pure <=< payloadProcessorDecode processor
, updateHandleWorkflowClient = c
, updateHandleType = updateName
}
pure $
updateHandle
{ updateHandleReadResult = \a -> do
updateHandleReadResult updateHandle a >>= \b -> do
result <- decode updateCodec b
either (throwIO . ValueError) pure result
}


{- | Begin a new Update operation.

This function does not wait for the Update to complete. Instead, it returns an 'UpdateHandle'
that can be used to wait for the Update to complete or perform other operations. However, it
throws an UpdateFailed exception if an update's validator fails.

This can be used to "fire-and-forget" an Update by discarding the handle.
-}
startUpdate
:: forall m args result a error
. (MonadIO m, HasWorkflowClient m)
=> WorkflowHandle a
-> KnownUpdate args result error
-> UpdateOptions
-> args
:->: m (UpdateHandle result)
startUpdate wfH u@(KnownUpdate updateCodec _) opts = withArgs @args @(m (UpdateHandle result)) updateCodec $ \inputs -> do
startUpdateFromPayloads wfH u opts inputs


{- | Given an 'UpdateHandle', wait for the update operation to complete and return the result.

This function will block until the update completes, and will return the result of the update
or throw an exception if the update failed.
-}
waitForUpdateOutcome :: (MonadIO m) => UpdateHandle a -> m a
waitForUpdateOutcome h = do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this waitUpdateResult to match the waitWorkflowResult function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pushed a commit doing this

let msg :: PollWorkflowExecutionUpdateRequest
msg =
defMessage
& WF.namespace .~ rawNamespace h.updateHandleWorkflowClient.clientConfig.namespace
& WF.updateRef
.~ ( defMessage
& WF.workflowExecution
.~ ( defMessage
& WF.workflowId .~ rawWorkflowId h.updateHandleWorkflowId
& WF.runId .~ maybe "" rawRunId h.updateHandleWorkflowRunId
)
& Update.updateId .~ rawUpdateId h.updateHandleUpdateId
)
& WF.identity .~ Core.identity (Core.clientConfig h.updateHandleWorkflowClient.clientCore)
& WF.waitPolicy
.~ ( defMessage
& Update.lifecycleStage .~ Update.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
)
go = do
res <- either throwIO pure =<< Temporal.Core.Client.WorkflowService.pollWorkflowExecutionUpdate h.updateHandleWorkflowClient.clientCore msg
case res ^. Update.maybe'outcome of
Nothing -> go
Just outcome -> do
case outcome ^. Update.maybe'value of
Just (Update.Outcome'Success payloads) -> case (payloads ^. Common.vec'payloads) V.!? 0 of
Nothing -> throwIO $ ValueError "No return value payloads provided by update response"
Just p -> pure $ convertFromProtoPayload p
Just (Update.Outcome'Failure failure) -> throwIO $ UpdateFailure failure
Nothing -> error "Unsupported update result"
payload <- liftIO go
liftIO $ h.updateHandleReadResult payload


{- | Run an Update operation, synchronously waiting for it to complete.

This function will block until the update completes, and will return the result of the update
or throw an exception if the update or its validator failed.
-}
executeUpdate
:: forall m args result a error
. (MonadIO m, HasWorkflowClient m)
=> WorkflowHandle a
-> KnownUpdate args result error
-> UpdateOptions
-> args
:->: m result
executeUpdate wfH u@(KnownUpdate updateCodec _) opts = withArgs @args @(m result) updateCodec $ \inputs -> do
updateHandle <- startUpdateFromPayloads wfH u opts inputs
waitForUpdateOutcome updateHandle
16 changes: 15 additions & 1 deletion sdk/src/Temporal/Client/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,20 @@ data UpdateWorkflowInput = UpdateWorkflowInput
}


data UpdateHandle a = UpdateHandle
{ updateHandleUpdateId :: UpdateId
, updateHandleWorkflowId :: WorkflowId
, updateHandleWorkflowRunId :: Maybe RunId
, updateHandleReadResult :: Payload -> IO a
, updateHandleWorkflowClient :: WorkflowClient
, updateHandleType :: Text
}


instance Functor UpdateHandle where
fmap x y = y {updateHandleReadResult = fmap x . updateHandleReadResult y}


data WorkflowExecutionStatus
= Running
| Completed
Expand Down Expand Up @@ -213,7 +227,7 @@ data ClientInterceptors = ClientInterceptors
{ start :: WorkflowType -> WorkflowId -> StartWorkflowOptions -> Vector Payload -> (WorkflowType -> WorkflowId -> StartWorkflowOptions -> Vector Payload -> IO (WorkflowHandle Payload)) -> IO (WorkflowHandle Payload)
, queryWorkflow :: QueryWorkflowInput -> (QueryWorkflowInput -> IO (Either QueryRejected Payload)) -> IO (Either QueryRejected Payload)
, signalWithStart :: SignalWithStartWorkflowInput -> (SignalWithStartWorkflowInput -> IO (WorkflowHandle Payload)) -> IO (WorkflowHandle Payload)
, updateWorkflow :: UpdateWorkflowInput -> (UpdateWorkflowInput -> IO Payload) -> IO Payload
, updateWorkflow :: UpdateWorkflowInput -> (UpdateWorkflowInput -> IO (UpdateHandle Payload)) -> IO (UpdateHandle Payload)
-- TODO
-- signal
-- terminate
Expand Down
Loading