diff --git a/internal/app/coroutines/completePromise.go b/internal/app/coroutines/completePromise.go index 4170e97c..067ffc8e 100644 --- a/internal/app/coroutines/completePromise.go +++ b/internal/app/coroutines/completePromise.go @@ -159,14 +159,14 @@ func completePromise(tags map[string]string, cmd *t_aio.UpdatePromiseCommand, ad Kind: t_aio.CompleteTasks, CompleteTasks: &t_aio.CompleteTasksCommand{ RootPromiseId: cmd.Id, - CompletedOn: cmd.CompletedOn, + CompletedOn: c.Time(), }, }, { Kind: t_aio.CreateTasks, CreateTasks: &t_aio.CreateTasksCommand{ PromiseId: cmd.Id, - CreatedOn: cmd.CompletedOn, + CreatedOn: c.Time(), }, }, { diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index 82929da6..940aa8bf 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -96,10 +96,13 @@ func createPromiseAndTask( return nil, err } var promiseRowsAffected int64 - if taskCmd == nil { + + // CreatePromise could be merged with a task command in `createPromise` + if completion.Store.Results[0].Kind == t_aio.CreatePromise { promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected } else { promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected + util.Assert(promiseRowsAffected == completion.Store.Results[0].CreatePromiseAndTask.TaskRowsAffected, "number of promises and tasks affected must be equal.") } if promiseRowsAffected == 0 { @@ -212,27 +215,9 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman promiseCmd.Tags = map[string]string{} } - isCreatePromiseAndTask := taskCmd != nil - return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) { commands := []*t_aio.Command{} - // Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd - if isCreatePromiseAndTask { - commands = append(commands, &t_aio.Command{ - Kind: t_aio.CreatePromiseAndTask, - CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{ - PromiseCommand: promiseCmd, - TaskCommand: taskCmd, - }, - }) - } else { - commands = append(commands, &t_aio.Command{ - Kind: t_aio.CreatePromise, - CreatePromise: promiseCmd, - }) - } - // check router to see if a task needs to be created completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{ Kind: t_aio.Router, @@ -254,16 +239,21 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err) } - if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) { + if taskCmd != nil && (err != nil || !completion.Router.Matched) { slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd) return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err) } + cmd := &t_aio.Command{ + Kind: t_aio.CreatePromise, + CreatePromise: promiseCmd, + } + if err == nil && completion.Router.Matched { util.Assert(completion.Router.Recv != nil, "recv must not be nil") // If there is a taskCmd just update the Recv otherwise create a tasks for the match - if isCreatePromiseAndTask { + if taskCmd != nil { // Note: we are mutating the taskCmd that is already merged with the createPromiseCmd taskCmd.Recv = completion.Router.Recv } else { @@ -275,17 +265,19 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman State: task.Init, CreatedOn: promiseCmd.CreatedOn, } - - // add create task command if matched - commands = append(commands, &t_aio.Command{ - Kind: t_aio.CreateTask, - CreateTask: taskCmd, - }) - } + cmd = &t_aio.Command{ + Kind: t_aio.CreatePromiseAndTask, + CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{ + PromiseCommand: promiseCmd, + TaskCommand: taskCmd, + }, + } } + // Add the main command + commands = append(commands, cmd) // add additional commands commands = append(commands, additionalCmds...) @@ -307,15 +299,15 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman util.Assert(completion.Store != nil, "completion must not be nil") util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands") - if isCreatePromiseAndTask { + if completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask { promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows") - if promiseAndTaskResult.PromiseRowsAffected == 0 { - util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created") - } - } else { + util.Assert(promiseAndTaskResult.TaskRowsAffected == promiseAndTaskResult.PromiseRowsAffected, "If not promise was created a task must have not been created") + } else if completion.Store.Results[0].Kind == t_aio.CreatePromise { createPromiseResult := completion.Store.Results[0].CreatePromise util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows") + } else { + panic("First result must be CreatePromise or CreatePromiseAndTask") } return completion, nil diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index fdc5654a..18c47d11 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -1025,8 +1025,11 @@ func (w *SqliteStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.St // Couldn't create a promise if promiseResult.CreatePromise.RowsAffected == 0 { return &t_aio.Result{ - Kind: t_aio.CreatePromiseAndTask, - CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{}, + Kind: t_aio.CreatePromiseAndTask, + CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{ + PromiseRowsAffected: 0, + TaskRowsAffected: 0, + }, }, nil } diff --git a/test/dst/dst.go b/test/dst/dst.go index 1996d2ee..189c07de 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -160,6 +160,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System) if d.config.PrintOps { // log slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "id", id, "req", req, "res", res, "err", err) + } // extract cursors for subsequent requests @@ -482,6 +483,12 @@ func (d *DST) Model() porcupine.Model { var tasks string for _, t := range *model.tasks { + var completedOn string + if t.value.CompletedOn == nil { + completedOn = "--" + } else { + completedOn = fmt.Sprintf("%d", *t.value.CompletedOn) + } tasks = tasks + fmt.Sprintf(` %s @@ -490,8 +497,9 @@ func (d *DST) Model() porcupine.Model { %d %d %d + %s - `, t.value.Id, t.value.State, t.value.RootPromiseId, t.value.ExpiresAt, t.value.Timeout, *t.value.CreatedOn) + `, t.value.Id, t.value.State, t.value.RootPromiseId, t.value.ExpiresAt, t.value.Timeout, *t.value.CreatedOn, completedOn) } return fmt.Sprintf(` @@ -526,6 +534,7 @@ func (d *DST) Model() porcupine.Model { +
expiresAt timeout createdOncompletedOn