Skip to content

[CHASM] Pure task processing - GetPureTasks, ExecutePureTasks #7701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions chasm/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,83 @@
return false
}

// GetPureTasks returns all valid, expired/runnable pure tasks within the CHASM
// tree. The CHASM tree is left untouched, even if invalid tasks are
// detected (these are cleaned up as part of transaction close).
func (n *Node) GetPureTasks(deadline time.Time) ([]any, error) {
var componentTasks []*persistencespb.ChasmComponentAttributes_Task

// Walk the tree to find runnable, valid tasks.
err := n.walk(func(node *Node) error {

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / golangci

n.walk undefined (type *Node has no field or method walk)) (typecheck)

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / golangci

n.walk undefined (type *Node has no field or method walk) (typecheck)

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / golangci

n.walk undefined (type *Node has no field or method walk)) (typecheck)

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / golangci

n.walk undefined (type *Node has no field or method walk)) (typecheck)

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / Pre-build for cache

n.walk undefined (type *Node has no field or method walk)

Check failure on line 1251 in chasm/tree.go

View workflow job for this annotation

GitHub Actions / lint-workflows

n.walk undefined (type *Node has no field or method walk)
// Skip nodes that aren't serialized yet.
if node.serializedNode == nil || node.serializedNode.Metadata == nil {
return nil
}

componentAttr := node.serializedNode.Metadata.GetComponentAttributes()
// Skip nodes that aren't components.
if componentAttr == nil {
return nil
}

validateContext := NewContext(context.Background(), n)
for _, task := range componentAttr.GetPureTasks() {
if task.ScheduledTime.AsTime().After(deadline) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use ms as the precision for doing comparison. queues.IsTimeExpired() has some more details.

Also caller can't simply use time.Now() as input, since there might be clock skew, which may cause the timer task to be executed before the scheduled time recorded in the state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to truncate to the same precision as queues.

Also caller can't simply use time.Now() as input, since there might be clock skew, which may cause the timer task to be executed before the scheduled time recorded in the state.

The other PR with the caller is using t.Now() from the timer queue struct's ShardContext, as do other methods, and I'll update it to make use of IsTimeExpired for its physical task comparison.

// Pure tasks are stored in-order, so we can skip scanning the rest once we hit
// an unexpired task deadline.
break
}

if task.PhysicalTaskStatus != physicalTaskStatusCreated {
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm why? We only create physical task for the first pure task, so it's expected that some tasks will have TaskStatusNone.


// Component value must be prepared for validation to work.
if err := node.prepareComponentValue(validateContext); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: we can probably reuse node.Component() with an empty ComponentRef(). Then we only need to implement access rule in that method and task processing side can benefit from it as well.

return err
}

// Validate the task. If the task is invalid, skip it for processing (it'll be
// removed when the transaction closes).
ok, err := node.validateComponentTask(validateContext, task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I don't think we can do validation of tasks first and then run all validated them. The execution of one task may invalidates another task. so we have to validate t1 -> execute t1 -> validate t2 -> execute t2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will fix that. I think moving the validate check into ExecutePureTask probably makes more sense.

if !ok {
continue
}
if err != nil {
return err
}

componentTasks = append(componentTasks, task)
}

return nil
})
if err != nil {
return nil, err
}

// Map serialized component tasks to their deserialized values using the Registry.
taskValues := make([]any, len(componentTasks))
for idx, componentTask := range componentTasks {
registrableTask, ok := n.registry.task(componentTask.GetType())
if !ok {
return nil, serviceerror.NewInternal(fmt.Sprintf(
"unregistered CHASM task type '%s'",
componentTask.GetType(),
))
}

// TODO - validateComponentTask also calls deserializeTask, should share a cached value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, can we refactor validateComponentTask to take in a deserialized task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do.

taskValue, err := deserializeTask(registrableTask, componentTask.Data)
if err != nil {
return nil, err
}
taskValues[idx] = taskValue.Interface()
}

return taskValues, nil
}

func newNode(
base *nodeBase,
parent *Node,
Expand Down Expand Up @@ -1433,3 +1510,42 @@

return blob, nil
}

// ExecutePureTask executes the given taskInstance against the node's component.
func (n *Node) ExecutePureTask(taskInstance any) error {
registrableTask, ok := n.registry.taskFor(taskInstance)
if !ok {
return fmt.Errorf("unknown task type for task instance goType '%s'", reflect.TypeOf(taskInstance).Name())
}

if !registrableTask.isPureTask {
return fmt.Errorf("ExecutePureTask called on a SideEffect task '%s'", registrableTask.fqType())
}

// Ensure this node's component value is hydrated before execution.
ctx := NewContext(context.Background(), n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timer queue processor should pass in a base context.Context.

if err := n.prepareComponentValue(ctx); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm the n here is always root right? But the task maybe for a child component?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, child tasks are called on the proper child Node receiver now.

return err
}

// TODO - access rule check here?
// TODO - instantiate CHASM engine and attach to context

executor := registrableTask.handler
if executor == nil {
return fmt.Errorf("no handler registered for task type '%s'", registrableTask.taskType)
}

fn := reflect.ValueOf(executor).MethodByName("Execute")
result := fn.Call([]reflect.Value{
reflect.ValueOf(ctx),
reflect.ValueOf(n.value),
reflect.ValueOf(taskInstance),
})
if !result[0].IsNil() {
//nolint:revive // type cast result is unchecked
return result[0].Interface().(error)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an interesting question here. We are basically assuming for all the tasks processed, they will be invalidated at the end of the transaction.

If the validator implementation has a bug and doesn't invalid the task and the task status of that pure task happens to be Created, then we will won't generate a new physical task and the execution will get stuck.

If we blindly flip task status to be None, then we will have a infinite loop here.

It seems to be the best way is to Validate again after running the task and if the task is still validate return an internal error and DLQ the task.
Or we somehow mark the task as executed in-memory and upon close transaction, where we are already doing task Validating all the tasks, detect this case and return an error.

Not a super important issue but want to bring awareness here. Please help create a task to track this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task to follow up created, OSS-4272

return nil
}
154 changes: 154 additions & 0 deletions chasm/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,3 +1438,157 @@ func (s *nodeSuite) testComponentTree() *Node {

return node // maybe tc too
}

func (s *nodeSuite) TestGetPureTasks() {
now := s.timeSource.Now()

payload := &commonpb.Payload{
Data: []byte("some-random-data"),
}
taskBlob, err := serialization.ProtoEncodeBlob(payload, enumspb.ENCODING_TYPE_PROTO3)
s.NoError(err)

// Set up a tree with expired and unexpired pure tasks.
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
// Expired
Type: "TestLibrary.test_pure_task",
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
"child": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
Type: "TestLibrary.test_pure_task",
// Unexpired
ScheduledTime: timestamppb.New(now.Add(time.Hour)),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
"child/grandchild1": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
PureTasks: []*persistencespb.ChasmComponentAttributes_Task{
{
Type: "TestLibrary.test_pure_task",
// Expired, but physical task not created
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 2,
PhysicalTaskStatus: physicalTaskStatusNone,
Data: taskBlob,
},
{
Type: "TestLibrary.test_pure_task",
// Expired
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
{
Type: "TestLibrary.test_pure_task",
// Expired, but will fail to validate
ScheduledTime: timestamppb.New(now),
VersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
VersionedTransitionOffset: 1,
PhysicalTaskStatus: physicalTaskStatusCreated,
Data: taskBlob,
},
},
},
},
},
},
}

rt, ok := s.registry.Task("TestLibrary.test_pure_task")
s.True(ok)

// Succeed first two task validations, and then fail the third.
rt.validator.(*MockTaskValidator[any, *TestPureTask]).EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(2)
rt.validator.(*MockTaskValidator[any, *TestPureTask]).EXPECT().
Validate(gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).Times(1)

root, err := NewTree(persistenceNodes, s.registry, s.timeSource, s.nodeBackend, s.nodePathEncoder, s.logger)
s.NoError(err)
s.NotNil(root)

tasks, err := root.GetPureTasks(now.Add(time.Minute))
s.NoError(err)
s.NotNil(tasks)
s.Equal(2, len(tasks))

_, ok = tasks[0].(*TestPureTask)
s.True(ok)
}

func (s *nodeSuite) TestExecutePureTask() {
persistenceNodes := map[string]*persistencespb.ChasmNode{
"": {
Metadata: &persistencespb.ChasmNodeMetadata{
InitialVersionedTransition: &persistencespb.VersionedTransition{TransitionCount: 1},
Attributes: &persistencespb.ChasmNodeMetadata_ComponentAttributes{
ComponentAttributes: &persistencespb.ChasmComponentAttributes{
Type: "TestLibrary.test_component",
},
},
},
},
}

pureTask := &TestPureTask{
Payload: &commonpb.Payload{
Data: []byte("some-random-data"),
},
}

rt, ok := s.registry.Task("TestLibrary.test_pure_task")
s.True(ok)

rt.handler.(*MockPureTaskExecutor[any, *TestPureTask]).EXPECT().
Execute(
gomock.Any(),
gomock.AssignableToTypeOf(&TestComponent{}),
gomock.Eq(pureTask),
).Return(nil).Times(1)

root, err := NewTree(persistenceNodes, s.registry, s.timeSource, s.nodeBackend, s.nodePathEncoder, s.logger)
s.NoError(err)
s.NotNil(root)

err = root.ExecutePureTask(pureTask)
s.NoError(err)
}
5 changes: 5 additions & 0 deletions service/history/interfaces/chasm_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
package interfaces

import (
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"

Check failure on line 9 in service/history/interfaces/chasm_tree.go

View workflow job for this annotation

GitHub Actions / golangci

could not import go.temporal.io/server/chasm (-: # go.temporal.io/server/chasm
)

var _ ChasmTree = (*chasm.Node)(nil)
Expand All @@ -15,4 +17,7 @@
ApplyMutation(chasm.NodesMutation) error
ApplySnapshot(chasm.NodesSnapshot) error
IsDirty() bool

GetPureTasks(deadline time.Time) ([]any, error)
ExecutePureTask(taskInstance any) error
}
10 changes: 10 additions & 0 deletions service/history/workflow/noop_chasm_tree.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package workflow

import (
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
historyi "go.temporal.io/server/service/history/interfaces"
Expand Down Expand Up @@ -29,3 +31,11 @@ func (*noopChasmTree) ApplySnapshot(chasm.NodesSnapshot) error {
func (*noopChasmTree) IsDirty() bool {
return false
}

func (*noopChasmTree) GetPureTasks(deadline time.Time) ([]any, error) {
return nil, nil
}

func (*noopChasmTree) ExecutePureTask(taskInstance any) error {
return nil
}
Loading