diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index 21f9f2782d6..1eaf846ade5 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -12,6 +12,7 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/definition" @@ -114,6 +115,8 @@ func (t *timerQueueActiveTaskExecutor) Execute( err = t.executeDeleteHistoryEventTask(ctx, task) case *tasks.StateMachineTimerTask: err = t.executeStateMachineTimerTask(ctx, task) + case *tasks.ChasmTaskPure: + err = t.executeChasmPureTimerTask(ctx, task) default: err = queues.NewUnprocessableTaskError("unknown task type") } @@ -911,3 +914,61 @@ func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules( return nil } + +func (t *timerQueueActiveTaskExecutor) executeChasmPureTimerTask( + ctx context.Context, + task *tasks.ChasmTaskPure, +) error { + ctx, cancel := context.WithTimeout(ctx, taskTimeout) + defer cancel() + + wfCtx, release, err := getWorkflowExecutionContextForTask(ctx, t.shardContext, t.cache, task) + if err != nil { + return err + } + defer func() { release(err) }() + + ms, err := loadMutableStateForTimerTask(ctx, t.shardContext, wfCtx, task, t.metricsHandler, t.logger) + if err != nil { + return err + } + if ms == nil { + return nil + } + + // Execute all fired pure tasks for a component while holding the workflow lock. + processedTimers := 0 + err = t.executeChasmPureTimers( + ctx, + wfCtx, + ms, + task, + func(node *chasm.Node, task any) error { + // ExecutePureTask also calls the task's validator. Invalid tasks will no-op + // succeed. + if err := node.ExecutePureTask(ctx, task); err != nil { + return err + } + + processedTimers += 1 + return nil + }, + ) + + // Commit changes only if we processed any timers. + if processedTimers == 0 { + return nil + } + + if t.config.EnableUpdateWorkflowModeIgnoreCurrent() { + return wfCtx.UpdateWorkflowExecutionAsActive(ctx, t.shardContext) + } + + // TODO: remove following code once EnableUpdateWorkflowModeIgnoreCurrent config is deprecated. + if ms.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { + // Can't use UpdateWorkflowExecutionAsActive since it updates the current run, and we are operating on a + // closed workflow. + return wfCtx.SubmitClosedWorkflowSnapshot(ctx, t.shardContext, historyi.TransactionPolicyActive) + } + return wfCtx.UpdateWorkflowExecutionAsActive(ctx, t.shardContext) +} diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index cde442476d5..bfd3b06794d 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -12,6 +12,7 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/log" @@ -100,6 +101,8 @@ func (t *timerQueueStandbyTaskExecutor) Execute( err = t.executeDeleteHistoryEventTask(ctx, task) case *tasks.StateMachineTimerTask: err = t.executeStateMachineTimerTask(ctx, task) + case *tasks.ChasmTaskPure: + err = t.executeChasmPureTimerTask(ctx, task) default: err = queues.NewUnprocessableTaskError("unknown task type") } @@ -111,6 +114,48 @@ func (t *timerQueueStandbyTaskExecutor) Execute( } } +func (t *timerQueueStandbyTaskExecutor) executeChasmPureTimerTask( + ctx context.Context, + task *tasks.ChasmTaskPure, +) error { + actionFn := func( + ctx context.Context, + wfContext historyi.WorkflowContext, + mutableState historyi.MutableState, + ) (any, error) { + err := t.executeChasmPureTimers( + ctx, + wfContext, + mutableState, + task, + func(node *chasm.Node, task any) error { + // If this line of code is reached, the task's Validate() function succeeded, which + // indicates that it is still expected to run. Return ErrTaskRetry to wait for the + // task to complete on the active cluster, after which Validate will begun returning + // false. + return consts.ErrTaskRetry + }, + ) + if err != nil && errors.Is(err, consts.ErrTaskRetry) { + return &struct{}{}, nil + } + + return nil, nil + } + + return t.processTimer( + ctx, + task, + actionFn, + getStandbyPostActionFn( + task, + t.getCurrentTime, + t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()), + t.checkWorkflowStillExistOnSourceBeforeDiscard, + ), + ) +} + func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( ctx context.Context, timerTask *tasks.UserTimerTask, diff --git a/service/history/timer_queue_task_executor_base.go b/service/history/timer_queue_task_executor_base.go index 1cf76038f07..e97285a51dd 100644 --- a/service/history/timer_queue_task_executor_base.go +++ b/service/history/timer_queue_task_executor_base.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/locks" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -17,6 +18,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/deletemanager" @@ -240,8 +242,39 @@ func (t *timerQueueTaskExecutorBase) executeSingleStateMachineTimer( return nil } -// executeStateMachineTimers gets the state machine timers, processed the expired timers, -// and return a slice of unprocessed timers. +// executeChasmPureTimers walks a CHASM tree for expired pure task timers, +// executes them, and returns a count of timers processed. +func (t *timerQueueTaskExecutorBase) executeChasmPureTimers( + ctx context.Context, + workflowContext historyi.WorkflowContext, + ms historyi.MutableState, + task *tasks.ChasmTaskPure, + execute func(node *chasm.Node, task any) error, +) error { + // Because CHASM timers can target closed workflows, we need to specifically + // exclude zombie workflows, instead of merely checking that the workflow is + // running. + if ms.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { + return consts.ErrWorkflowZombie + } + + tree := ms.ChasmTree() + if tree == nil { + return serviceerror.NewInternal("mutable state associated with CHASM task has no CHASM tree") + } + + // Because the persistence layer can lose precision on the task compared to the + // physical task stored in the queue, we take the max of both here. Time is also + // truncated to a common (millisecond) precision later on. + // + // See also queues.IsTimeExpired. + referenceTime := util.MaxTime(t.Now(), task.GetKey().FireTime) + + return tree.EachPureTask(referenceTime, execute) +} + +// executeStateMachineTimers gets the state machine timers, processes the expired timers, +// and returns a count of timers processed. func (t *timerQueueTaskExecutorBase) executeStateMachineTimers( ctx context.Context, workflowContext historyi.WorkflowContext,