Skip to content

DST Fix #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/app/coroutines/completePromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ func completePromise(tags map[string]string, cmd *t_aio.UpdatePromiseCommand, ad
Kind: t_aio.CompleteTasks,
CompleteTasks: &t_aio.CompleteTasksCommand{
RootPromiseId: cmd.Id,
CompletedOn: cmd.CompletedOn,
CompletedOn: c.Time(),
},
},
{
Kind: t_aio.CreateTasks,
CreateTasks: &t_aio.CreateTasksCommand{
PromiseId: cmd.Id,
CreatedOn: cmd.CompletedOn,
CreatedOn: c.Time(),
},
},
{
Expand Down
58 changes: 25 additions & 33 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@
return nil, err
}
var promiseRowsAffected int64
if taskCmd == nil {

// CreatePromise could be merged with a task command in `createPromise`
if completion.Store.Results[0].Kind == t_aio.CreatePromise {
promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected
} else {
promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected
util.Assert(promiseRowsAffected == completion.Store.Results[0].CreatePromiseAndTask.TaskRowsAffected, "number of promises and tasks affected must be equal.")
}

if promiseRowsAffected == 0 {
Expand Down Expand Up @@ -212,27 +215,9 @@
promiseCmd.Tags = map[string]string{}
}

isCreatePromiseAndTask := taskCmd != nil

return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
commands := []*t_aio.Command{}

// Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd
if isCreatePromiseAndTask {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
PromiseCommand: promiseCmd,
TaskCommand: taskCmd,
},
})
} else {
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
})
}

// check router to see if a task needs to be created
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Router,
Expand All @@ -254,16 +239,21 @@
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
}

if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) {
if taskCmd != nil && (err != nil || !completion.Router.Matched) {
slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd)
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
}

cmd := &t_aio.Command{
Kind: t_aio.CreatePromise,
CreatePromise: promiseCmd,
}

if err == nil && completion.Router.Matched {
util.Assert(completion.Router.Recv != nil, "recv must not be nil")

// If there is a taskCmd just update the Recv otherwise create a tasks for the match
if isCreatePromiseAndTask {
if taskCmd != nil {
// Note: we are mutating the taskCmd that is already merged with the createPromiseCmd
taskCmd.Recv = completion.Router.Recv
} else {
Expand All @@ -275,17 +265,19 @@
State: task.Init,
CreatedOn: promiseCmd.CreatedOn,
}

// add create task command if matched
commands = append(commands, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: taskCmd,
})

}

cmd = &t_aio.Command{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
PromiseCommand: promiseCmd,
TaskCommand: taskCmd,
},
}
}

// Add the main command
commands = append(commands, cmd)
// add additional commands
commands = append(commands, additionalCmds...)

Expand All @@ -307,15 +299,15 @@

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
if isCreatePromiseAndTask {
if completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask {
promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask
util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows")
if promiseAndTaskResult.PromiseRowsAffected == 0 {
util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created")
}
} else {
util.Assert(promiseAndTaskResult.TaskRowsAffected == promiseAndTaskResult.PromiseRowsAffected, "If not promise was created a task must have not been created")
} else if completion.Store.Results[0].Kind == t_aio.CreatePromise {
createPromiseResult := completion.Store.Results[0].CreatePromise
util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows")
} else {
panic("First result must be CreatePromise or CreatePromiseAndTask")

Check warning on line 310 in internal/app/coroutines/createPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createPromise.go#L310

Added line #L310 was not covered by tests
}

return completion, nil
Expand Down
7 changes: 5 additions & 2 deletions internal/app/subsystems/aio/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,8 +1025,11 @@ func (w *SqliteStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.St
// Couldn't create a promise
if promiseResult.CreatePromise.RowsAffected == 0 {
return &t_aio.Result{
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{},
Kind: t_aio.CreatePromiseAndTask,
CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{
PromiseRowsAffected: 0,
TaskRowsAffected: 0,
},
}, nil
}

Expand Down
11 changes: 10 additions & 1 deletion test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
if d.config.PrintOps {
// log
slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "id", id, "req", req, "res", res, "err", err)

Check warning on line 163 in test/dst/dst.go

View check run for this annotation

Codecov / codecov/patch

test/dst/dst.go#L163

Added line #L163 was not covered by tests
}

// extract cursors for subsequent requests
Expand Down Expand Up @@ -482,6 +483,12 @@

var tasks string
for _, t := range *model.tasks {
var completedOn string
if t.value.CompletedOn == nil {
completedOn = "--"
} else {
completedOn = fmt.Sprintf("%d", *t.value.CompletedOn)
}
tasks = tasks + fmt.Sprintf(`
<tr>
<td align="right">%s</td>
Expand All @@ -490,8 +497,9 @@
<td align="right">%d</td>
<td align="right">%d</td>
<td align="right">%d</td>
<td align="right">%s</td>
</tr>
`, t.value.Id, t.value.State, t.value.RootPromiseId, t.value.ExpiresAt, t.value.Timeout, *t.value.CreatedOn)
`, t.value.Id, t.value.State, t.value.RootPromiseId, t.value.ExpiresAt, t.value.Timeout, *t.value.CreatedOn, completedOn)
}
return fmt.Sprintf(`
<table border="0" cellspacing="0" cellpadding="5" style="background-color: white;">
Expand Down Expand Up @@ -526,6 +534,7 @@
<td><b>expiresAt</b></td>
<td><b>timeout</b></td>
<td><b>createdOn</b></td>
<td><b>completedOn</b></td>
</tr>
</thead>
<tbody>
Expand Down
Loading