-
Notifications
You must be signed in to change notification settings - Fork 960
[CHASM] Pure task processing - GetPureTasks, ExecutePureTasks #7701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ import ( | |
"go.temporal.io/server/common/clock" | ||
"go.temporal.io/server/common/definition" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/persistence" | ||
"go.temporal.io/server/common/persistence/serialization" | ||
"go.temporal.io/server/common/persistence/transitionhistory" | ||
"go.temporal.io/server/common/softassert" | ||
|
@@ -766,7 +767,13 @@ func (n *Node) closeTransactionUpdateComponentTasks() error { | |
validateContext := NewContext(context.Background(), n) | ||
var validationErr error | ||
deleteFunc := func(existingTask *persistencespb.ChasmComponentAttributes_Task) bool { | ||
valid, err := node.validateComponentTask(validateContext, existingTask) | ||
existingTaskInstance, err := node.deserializeComponentTask(existingTask) | ||
if err != nil { | ||
validationErr = err | ||
return false | ||
} | ||
|
||
valid, err := node.validateTask(validateContext, existingTaskInstance) | ||
if err != nil { | ||
validationErr = err | ||
return false | ||
|
@@ -826,30 +833,42 @@ func (n *Node) closeTransactionUpdateComponentTasks() error { | |
return nil | ||
} | ||
|
||
func (n *Node) validateComponentTask( | ||
validateContext Context, | ||
func (n *Node) deserializeComponentTask( | ||
componentTask *persistencespb.ChasmComponentAttributes_Task, | ||
) (bool, error) { | ||
) (any, error) { | ||
registableTask, ok := n.registry.task(componentTask.Type) | ||
if !ok { | ||
return false, serviceerror.NewInternal(fmt.Sprintf("task type %s is not registered", componentTask.Type)) | ||
return nil, serviceerror.NewInternal(fmt.Sprintf("task type %s is not registered", componentTask.Type)) | ||
} | ||
|
||
// TODO: cache validateMethod (reflect.Value) in the registry | ||
validator := registableTask.validator | ||
validateMethod := reflect.ValueOf(validator).MethodByName("Validate") | ||
|
||
// TODO: cache deserialized task value (reflect.Value) in the node, | ||
// use task VT and offset as the key | ||
deserizedTaskValue, err := deserializeTask(registableTask, componentTask.Data) | ||
taskValue, err := deserializeTask(registableTask, componentTask.Data) | ||
if err != nil { | ||
return false, err | ||
return nil, err | ||
} | ||
|
||
return taskValue.Interface(), nil | ||
} | ||
|
||
func (n *Node) validateTask( | ||
validateContext Context, | ||
taskInstance any, | ||
) (bool, error) { | ||
registableTask, ok := n.registry.taskFor(taskInstance) | ||
if !ok { | ||
return false, serviceerror.NewInternal( | ||
fmt.Sprintf("task type for goType %s is not registered", reflect.TypeOf(taskInstance).Name())) | ||
} | ||
|
||
// TODO: cache validateMethod (reflect.Value) in the registry | ||
validator := registableTask.validator | ||
validateMethod := reflect.ValueOf(validator).MethodByName("Validate") | ||
|
||
retValues := validateMethod.Call([]reflect.Value{ | ||
reflect.ValueOf(validateContext), | ||
reflect.ValueOf(n.value), | ||
deserizedTaskValue, | ||
reflect.ValueOf(taskInstance), | ||
}) | ||
if !retValues[1].IsNil() { | ||
//revive:disable-next-line:unchecked-type-assertion | ||
|
@@ -1241,6 +1260,66 @@ func (n *Node) isValueNeedSerialize() bool { | |
return false | ||
} | ||
|
||
// isComponentTaskExpired returns true when the task's scheduled time is equal | ||
// or before the reference time. The caller should also make sure to account | ||
// for skew between the physical task queue and the database by adjusting | ||
// referenceTime in advance. | ||
func isComponentTaskExpired( | ||
referenceTime time.Time, | ||
task *persistencespb.ChasmComponentAttributes_Task, | ||
) bool { | ||
if task.ScheduledTime == nil { | ||
return false | ||
} | ||
|
||
scheduledTime := task.ScheduledTime.AsTime().Truncate(persistence.ScheduledTaskMinPrecision) | ||
referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision) | ||
|
||
return !scheduledTime.After(referenceTime) | ||
} | ||
|
||
// EachPureTask runs the callback for all expired/runnable pure tasks within the | ||
// CHASM tree (including invalid tasks). The CHASM tree is left untouched, even | ||
// if invalid tasks are detected (these are cleaned up as part of transaction | ||
// close). | ||
func (n *Node) EachPureTask( | ||
referenceTime time.Time, | ||
callback func(node *Node, task any) error, | ||
) error { | ||
// Walk the tree to find all runnable tasks. | ||
for _, node := range n.andAllChildren() { | ||
// Skip nodes that aren't serialized yet. | ||
if node.serializedNode == nil || node.serializedNode.Metadata == nil { | ||
continue | ||
} | ||
|
||
componentAttr := node.serializedNode.Metadata.GetComponentAttributes() | ||
// Skip nodes that aren't components. | ||
if componentAttr == nil { | ||
continue | ||
} | ||
|
||
for _, task := range componentAttr.GetPureTasks() { | ||
if !isComponentTaskExpired(referenceTime, task) { | ||
// Pure tasks are stored in-order, so we can skip scanning the rest once we hit | ||
// an unexpired task deadline. | ||
break | ||
} | ||
|
||
taskValue, err := node.deserializeComponentTask(task) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err = callback(node, taskValue); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func newNode( | ||
base *nodeBase, | ||
parent *Node, | ||
|
@@ -1433,3 +1512,60 @@ func serializeTask( | |
|
||
return blob, nil | ||
} | ||
|
||
// ExecutePureTask validates and then executes the given taskInstance against the | ||
// node's component. Executing an invalid task is a no-op (no error returned). | ||
func (n *Node) ExecutePureTask(baseCtx context.Context, taskInstance any) error { | ||
registrableTask, ok := n.registry.taskFor(taskInstance) | ||
if !ok { | ||
return fmt.Errorf("unknown task type for task instance goType '%s'", reflect.TypeOf(taskInstance).Name()) | ||
} | ||
|
||
if !registrableTask.isPureTask { | ||
return fmt.Errorf("ExecutePureTask called on a SideEffect task '%s'", registrableTask.fqType()) | ||
} | ||
|
||
// TODO - instantiate CHASM engine and attach to context | ||
ctx := NewContext(baseCtx, n) | ||
|
||
// Ensure this node's component value is hydrated before execution. Component | ||
// will also check access rules. | ||
component, err := n.Component(ctx, ComponentRef{}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Run the task's registered value before execution. | ||
valid, err := n.validateTask(ctx, taskInstance) | ||
if err != nil { | ||
return err | ||
} | ||
if !valid { | ||
return nil | ||
} | ||
|
||
executor := registrableTask.handler | ||
if executor == nil { | ||
return fmt.Errorf("no handler registered for task type '%s'", registrableTask.taskType) | ||
} | ||
|
||
fn := reflect.ValueOf(executor).MethodByName("Execute") | ||
result := fn.Call([]reflect.Value{ | ||
reflect.ValueOf(ctx), | ||
reflect.ValueOf(component), | ||
reflect.ValueOf(taskInstance), | ||
}) | ||
if !result[0].IsNil() { | ||
//nolint:revive // type cast result is unchecked | ||
return result[0].Interface().(error) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an interesting question here. We are basically assuming for all the tasks processed, they will be invalidated at the end of the transaction. If the validator implementation has a bug and doesn't invalid the task and the task status of that pure task happens to be Created, then we will won't generate a new physical task and the execution will get stuck. If we blindly flip task status to be None, then we will have a infinite loop here. It seems to be the best way is to Validate again after running the task and if the task is still validate return an internal error and DLQ the task. Not a super important issue but want to bring awareness here. Please help create a task to track this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Task to follow up created, OSS-4272 |
||
// TODO - a task validator must succeed validation after a task executes | ||
// successfully (without error), otherwise it will generate an infinite loop. | ||
// Check for this case by marking the in-memory task as having executed, which the | ||
// CloseTransaction method will check against. | ||
// | ||
// See: https://github.com/temporalio/temporal/pull/7701#discussion_r2072026993 | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only need to do this for side effect tasks. Pure tasks should not call any chasm engine methods.