-
Notifications
You must be signed in to change notification settings - Fork 955
[CHASM] timer queue pure task processing logic #7702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chasm_tree_pure_tasks
Are you sure you want to change the base?
Changes from 1 commit
575aad5
4777a96
3199ef2
0d00df5
54f170c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
taskqueuepb "go.temporal.io/api/taskqueue/v1" | ||
enumsspb "go.temporal.io/server/api/enums/v1" | ||
"go.temporal.io/server/api/matchingservice/v1" | ||
"go.temporal.io/server/chasm" | ||
"go.temporal.io/server/client" | ||
"go.temporal.io/server/common" | ||
"go.temporal.io/server/common/log" | ||
|
@@ -100,6 +101,8 @@ func (t *timerQueueStandbyTaskExecutor) Execute( | |
err = t.executeDeleteHistoryEventTask(ctx, task) | ||
case *tasks.StateMachineTimerTask: | ||
err = t.executeStateMachineTimerTask(ctx, task) | ||
case *tasks.ChasmTaskPure: | ||
err = t.executeChasmPureTimerTask(ctx, task) | ||
default: | ||
err = queues.NewUnprocessableTaskError("unknown task type") | ||
} | ||
|
@@ -111,6 +114,70 @@ func (t *timerQueueStandbyTaskExecutor) Execute( | |
} | ||
} | ||
|
||
func (t *timerQueueStandbyTaskExecutor) executeChasmPureTimerTask( | ||
ctx context.Context, | ||
task *tasks.ChasmTaskPure, | ||
) error { | ||
actionFn := func( | ||
ctx context.Context, | ||
wfContext historyi.WorkflowContext, | ||
mutableState historyi.MutableState, | ||
) (any, error) { | ||
processedTimers, err := t.executeChasmPureTimers( | ||
ctx, | ||
wfContext, | ||
mutableState, | ||
task, | ||
func(node *chasm.Node, task any) error { | ||
// If this line of code is reached, the task's Validate() function returned no error, which indicates | ||
// that it is still expected to run. Return ErrTaskRetry to wait the machine to transition on the active | ||
// cluster. | ||
return consts.ErrTaskRetry | ||
}, | ||
) | ||
if err != nil { | ||
if errors.Is(err, consts.ErrTaskRetry) { | ||
// This handles the ErrTaskRetry error returned by executeChasmPureTimers. | ||
return &struct{}{}, nil | ||
} | ||
return nil, err | ||
} | ||
|
||
// We haven't done any work, return without committing. | ||
if processedTimers == 0 { | ||
return nil, nil | ||
} | ||
|
||
if t.config.EnableUpdateWorkflowModeIgnoreCurrent() { | ||
return nil, wfContext.UpdateWorkflowExecutionAsPassive(ctx, t.shardContext) | ||
} | ||
|
||
// TODO: remove following code once EnableUpdateWorkflowModeIgnoreCurrent config is deprecated. | ||
if mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { | ||
// Can't use UpdateWorkflowExecutionAsPassive since it updates the current run, | ||
// and we are operating on a closed workflow. | ||
return nil, wfContext.SubmitClosedWorkflowSnapshot( | ||
ctx, | ||
t.shardContext, | ||
historyi.TransactionPolicyPassive, | ||
) | ||
} | ||
return nil, wfContext.UpdateWorkflowExecutionAsPassive(ctx, t.shardContext) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. standby executor doesn't need this part I think. |
||
} | ||
|
||
return t.processTimer( | ||
ctx, | ||
task, | ||
actionFn, | ||
getStandbyPostActionFn( | ||
task, | ||
t.getCurrentTime, | ||
t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()), | ||
t.checkWorkflowStillExistOnSourceBeforeDiscard, | ||
), | ||
) | ||
} | ||
|
||
func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( | ||
ctx context.Context, | ||
timerTask *tasks.UserTimerTask, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import ( | |
"go.temporal.io/api/serviceerror" | ||
enumsspb "go.temporal.io/server/api/enums/v1" | ||
persistencespb "go.temporal.io/server/api/persistence/v1" | ||
"go.temporal.io/server/chasm" | ||
"go.temporal.io/server/common/locks" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/log/tag" | ||
|
@@ -240,8 +241,51 @@ func (t *timerQueueTaskExecutorBase) executeSingleStateMachineTimer( | |
return nil | ||
} | ||
|
||
// executeStateMachineTimers gets the state machine timers, processed the expired timers, | ||
// and return a slice of unprocessed timers. | ||
// executeChasmPureTimers walks a CHASM tree for expired pure task timers, | ||
// executes them, and returns a count of timers processed. | ||
func (t *timerQueueTaskExecutorBase) executeChasmPureTimers( | ||
ctx context.Context, | ||
workflowContext historyi.WorkflowContext, | ||
ms historyi.MutableState, | ||
task *tasks.ChasmTaskPure, | ||
execute func(node *chasm.Node, task any) error, | ||
) (processedCount int, err error) { | ||
// Because CHASM timers can target closed workflows, we need to specifically | ||
// exclude zombie workflows, instead of merely checking that the workflow is | ||
// running. | ||
if ms.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { | ||
return 0, consts.ErrWorkflowZombie | ||
} | ||
|
||
tree := ms.ChasmTree() | ||
if tree == nil { | ||
return 0, consts.ErrStaleReference | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: shall we return an internal error here? I think this is not expected? |
||
} | ||
|
||
rootNode, ok := tree.(*chasm.Node) | ||
if !ok { | ||
return 0, fmt.Errorf("failed type assertion for ChasmTree") | ||
} | ||
|
||
pureTasks, err := tree.GetPureTasks(t.Now()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. commented in the previous PR, t.Now() is not good enough. Check queues.IsTimeExpired(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. I address skew between DB and physical task queue within the timer task queue executor, and then also truncate times against the reference time within the CHASM tree. |
||
if err != nil { | ||
return | ||
} | ||
|
||
for _, pureTask := range pureTasks { | ||
err = execute(rootNode, pureTask) | ||
if err != nil { | ||
return | ||
} | ||
|
||
processedCount += 1 | ||
} | ||
|
||
return | ||
} | ||
|
||
// executeStateMachineTimers gets the state machine timers, processes the expired timers, | ||
// and returns a count of timers processed. | ||
func (t *timerQueueTaskExecutorBase) executeStateMachineTimers( | ||
ctx context.Context, | ||
workflowContext historyi.WorkflowContext, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am slightly concerned that the verification may never completes if new valid tasks are keep getting generated.
We probably want to add VT to the physical pure tasks and only execute/verify those tasks generated <= the VT recorded in the physical task. Can you help create a follow up task on this? Not high priority and we can address it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created OSS-4273 for follow-up.