Skip to content

[CHASM] Pure task processing - GetPureTasks, ExecutePureTasks #7701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 148 additions & 12 deletions chasm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/transitionhistory"
"go.temporal.io/server/common/softassert"
Expand Down Expand Up @@ -766,7 +767,13 @@ func (n *Node) closeTransactionUpdateComponentTasks() error {
validateContext := NewContext(context.Background(), n)
var validationErr error
deleteFunc := func(existingTask *persistencespb.ChasmComponentAttributes_Task) bool {
valid, err := node.validateComponentTask(validateContext, existingTask)
existingTaskInstance, err := node.deserializeComponentTask(existingTask)
if err != nil {
validationErr = err
return false
}

valid, err := node.validateTask(validateContext, existingTaskInstance)
if err != nil {
validationErr = err
return false
Expand Down Expand Up @@ -826,30 +833,42 @@ func (n *Node) closeTransactionUpdateComponentTasks() error {
return nil
}

func (n *Node) validateComponentTask(
validateContext Context,
func (n *Node) deserializeComponentTask(
componentTask *persistencespb.ChasmComponentAttributes_Task,
) (bool, error) {
) (any, error) {
registableTask, ok := n.registry.task(componentTask.Type)
if !ok {
return false, serviceerror.NewInternal(fmt.Sprintf("task type %s is not registered", componentTask.Type))
return nil, serviceerror.NewInternal(fmt.Sprintf("task type %s is not registered", componentTask.Type))
}

// TODO: cache validateMethod (reflect.Value) in the registry
validator := registableTask.validator
validateMethod := reflect.ValueOf(validator).MethodByName("Validate")

// TODO: cache deserialized task value (reflect.Value) in the node,
// use task VT and offset as the key
deserizedTaskValue, err := deserializeTask(registableTask, componentTask.Data)
taskValue, err := deserializeTask(registableTask, componentTask.Data)
if err != nil {
return false, err
return nil, err
}

return taskValue.Interface(), nil
}

func (n *Node) validateTask(
validateContext Context,
taskInstance any,
) (bool, error) {
registableTask, ok := n.registry.taskFor(taskInstance)
if !ok {
return false, serviceerror.NewInternal(
fmt.Sprintf("task type for goType %s is not registered", reflect.TypeOf(taskInstance).Name()))
}

// TODO: cache validateMethod (reflect.Value) in the registry
validator := registableTask.validator
validateMethod := reflect.ValueOf(validator).MethodByName("Validate")

retValues := validateMethod.Call([]reflect.Value{
reflect.ValueOf(validateContext),
reflect.ValueOf(n.value),
deserizedTaskValue,
reflect.ValueOf(taskInstance),
})
if !retValues[1].IsNil() {
//revive:disable-next-line:unchecked-type-assertion
Expand Down Expand Up @@ -1241,6 +1260,66 @@ func (n *Node) isValueNeedSerialize() bool {
return false
}

// isComponentTaskExpired returns true when the task's scheduled time is equal
// or before the reference time. The caller should also make sure to account
// for skew between the physical task queue and the database by adjusting
// referenceTime in advance.
func isComponentTaskExpired(
referenceTime time.Time,
task *persistencespb.ChasmComponentAttributes_Task,
) bool {
if task.ScheduledTime == nil {
return false
}

scheduledTime := task.ScheduledTime.AsTime().Truncate(persistence.ScheduledTaskMinPrecision)
referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision)

return !scheduledTime.After(referenceTime)
}

// EachPureTask runs the callback for all expired/runnable pure tasks within the
// CHASM tree (including invalid tasks). The CHASM tree is left untouched, even
// if invalid tasks are detected (these are cleaned up as part of transaction
// close).
func (n *Node) EachPureTask(
referenceTime time.Time,
callback func(node *Node, task any) error,
) error {
// Walk the tree to find all runnable tasks.
for _, node := range n.andAllChildren() {
// Skip nodes that aren't serialized yet.
if node.serializedNode == nil || node.serializedNode.Metadata == nil {
continue
}

componentAttr := node.serializedNode.Metadata.GetComponentAttributes()
// Skip nodes that aren't components.
if componentAttr == nil {
continue
}

for _, task := range componentAttr.GetPureTasks() {
if !isComponentTaskExpired(referenceTime, task) {
// Pure tasks are stored in-order, so we can skip scanning the rest once we hit
// an unexpired task deadline.
break
}

taskValue, err := node.deserializeComponentTask(task)
if err != nil {
return err
}

if err = callback(node, taskValue); err != nil {
return err
}
}
}

return nil
}

func newNode(
base *nodeBase,
parent *Node,
Expand Down Expand Up @@ -1433,3 +1512,60 @@ func serializeTask(

return blob, nil
}

// ExecutePureTask validates and then executes the given taskInstance against the
// node's component. Executing an invalid task is a no-op (no error returned).
func (n *Node) ExecutePureTask(baseCtx context.Context, taskInstance any) error {
registrableTask, ok := n.registry.taskFor(taskInstance)
if !ok {
return fmt.Errorf("unknown task type for task instance goType '%s'", reflect.TypeOf(taskInstance).Name())
}

if !registrableTask.isPureTask {
return fmt.Errorf("ExecutePureTask called on a SideEffect task '%s'", registrableTask.fqType())
}

// TODO - instantiate CHASM engine and attach to context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only need to do this for side effect tasks. Pure tasks should not call any chasm engine methods.

ctx := NewContext(baseCtx, n)

// Ensure this node's component value is hydrated before execution. Component
// will also check access rules.
component, err := n.Component(ctx, ComponentRef{})
if err != nil {
return err
}

// Run the task's registered value before execution.
valid, err := n.validateTask(ctx, taskInstance)
if err != nil {
return err
}
if !valid {
return nil
}

executor := registrableTask.handler
if executor == nil {
return fmt.Errorf("no handler registered for task type '%s'", registrableTask.taskType)
}

fn := reflect.ValueOf(executor).MethodByName("Execute")
result := fn.Call([]reflect.Value{
reflect.ValueOf(ctx),
reflect.ValueOf(component),
reflect.ValueOf(taskInstance),
})
if !result[0].IsNil() {
//nolint:revive // type cast result is unchecked
return result[0].Interface().(error)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an interesting question here. We are basically assuming for all the tasks processed, they will be invalidated at the end of the transaction.

If the validator implementation has a bug and doesn't invalid the task and the task status of that pure task happens to be Created, then we will won't generate a new physical task and the execution will get stuck.

If we blindly flip task status to be None, then we will have a infinite loop here.

It seems to be the best way is to Validate again after running the task and if the task is still validate return an internal error and DLQ the task.
Or we somehow mark the task as executed in-memory and upon close transaction, where we are already doing task Validating all the tasks, detect this case and return an error.

Not a super important issue but want to bring awareness here. Please help create a task to track this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task to follow up created, OSS-4272

// TODO - a task validator must succeed validation after a task executes
// successfully (without error), otherwise it will generate an infinite loop.
// Check for this case by marking the in-memory task as having executed, which the
// CloseTransaction method will check against.
//
// See: https://github.com/temporalio/temporal/pull/7701#discussion_r2072026993

return nil
}
170 changes: 170 additions & 0 deletions chasm/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,3 +1438,173 @@ func (s *nodeSuite) testComponentTree() *Node {

return node // maybe tc too
}

func (s *nodeSuite) TestEachPureTask() {
now := s.timeSource.Now()

payload := &commonpb.Payload{
Data: []byte("some-random-data"),
}
taskBlob, err := serialization.ProtoEncodeBlob(payload, enumspb.ENCODING_TYPE_PROTO3)
s.NoError(err)

// Set up a tree with expired and unexpired pure tasks.
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
// Expired
Type: "TestLibrary.test_pure_task",
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
"child": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
Type: "TestLibrary.test_pure_task",
// Unexpired
ScheduledTime: timestamppb.New(now.Add(time.Hour)),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
"child/grandchild1": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
Type: "TestLibrary.test_pure_task",
// Expired, and physical task not created
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 2,
PhysicalTaskStatus: physicalTaskStatusNone,
Data: taskBlob,
},
{
Type: "TestLibrary.test_pure_task",
// Expired
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
}

root, err := NewTree(persistenceNodes, s.registry, s.timeSource, s.nodeBackend, s.nodePathEncoder, s.logger)
s.NoError(err)
s.NotNil(root)

actualTaskCount := 0
err = root.EachPureTask(now.Add(time.Minute), func(node *Node, task any) error {
s.NotNil(node)

_, ok := task.(*TestPureTask)
s.True(ok)

actualTaskCount += 1
return nil
})
s.NoError(err)
s.Equal(3, actualTaskCount)
}

func (s *nodeSuite) TestExecutePureTask() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
},
},
},
},
}

pureTask := &TestPureTask{
Payload: &commonpb.Payload{
Data: []byte("some-random-data"),
},
}

rt, ok := s.registry.Task("TestLibrary.test_pure_task")
s.True(ok)

root, err := NewTree(persistenceNodes, s.registry, s.timeSource, s.nodeBackend, s.nodePathEncoder, s.logger)
s.NoError(err)
s.NotNil(root)
ctx := context.Background()

expectExecute := func(result error) {
rt.handler.(*MockPureTaskExecutor[any, *TestPureTask]).EXPECT().
Execute(
gomock.Any(),
gomock.AssignableToTypeOf(&TestComponent{}),
gomock.Eq(pureTask),
).Return(result).Times(1)
}

expectValidate := func(retValue bool, errValue error) {
rt.validator.(*MockTaskValidator[any, *TestPureTask]).EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Any()).Return(retValue, errValue).Times(1)
}

// Succeed task execution and validation (happy case).
expectExecute(nil)
expectValidate(true, nil)
err = root.ExecutePureTask(ctx, pureTask)
s.NoError(err)

expectedErr := errors.New("dummy")

// Succeed validation, fail execution.
expectExecute(expectedErr)
expectValidate(true, nil)
err = root.ExecutePureTask(ctx, pureTask)
s.ErrorIs(expectedErr, err)

// Fail task validation (no execution occurs).
expectValidate(false, nil)
err = root.ExecutePureTask(ctx, pureTask)
s.NoError(err)

// Error during task validation (no execution occurs).
expectValidate(false, expectedErr)
err = root.ExecutePureTask(ctx, pureTask)
s.ErrorIs(expectedErr, err)
}
7 changes: 7 additions & 0 deletions service/history/interfaces/chasm_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package interfaces

import (
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
)
Expand All @@ -15,4 +17,9 @@ type ChasmTree interface {
ApplyMutation(chasm.NodesMutation) error
ApplySnapshot(chasm.NodesSnapshot) error
IsDirty() bool

EachPureTask(
deadline time.Time,
callback func(node *chasm.Node, task any) error,
) error
}
Loading
Loading