Skip to content

Commit a9657b0

Browse files
committed
Merge create promise and task always
1 parent 1b994e9 commit a9657b0

File tree

4 files changed

+32
-36
lines changed

4 files changed

+32
-36
lines changed

internal/app/coroutines/completePromise.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func completePromise(tags map[string]string, cmd *t_aio.UpdatePromiseCommand, ad
159159
Kind: t_aio.CompleteTasks,
160160
CompleteTasks: &t_aio.CompleteTasksCommand{
161161
RootPromiseId: cmd.Id,
162-
CompletedOn: cmd.CompletedOn,
162+
CompletedOn: c.Time(),
163163
},
164164
},
165165
{

internal/app/coroutines/createPromise.go

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,13 @@ func createPromiseAndTask(
9696
return nil, err
9797
}
9898
var promiseRowsAffected int64
99-
if taskCmd == nil {
99+
100+
// CreatePromise could be merged with a task command in `createPromise`
101+
if completion.Store.Results[0].Kind == t_aio.CreatePromise {
100102
promiseRowsAffected = completion.Store.Results[0].CreatePromise.RowsAffected
101103
} else {
102104
promiseRowsAffected = completion.Store.Results[0].CreatePromiseAndTask.PromiseRowsAffected
105+
util.Assert(promiseRowsAffected == completion.Store.Results[0].CreatePromiseAndTask.TaskRowsAffected, "number of promises and tasks affected must be equal.")
103106
}
104107

105108
if promiseRowsAffected == 0 {
@@ -212,27 +215,9 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
212215
promiseCmd.Tags = map[string]string{}
213216
}
214217

215-
isCreatePromiseAndTask := taskCmd != nil
216-
217218
return func(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, *t_aio.Completion]) (*t_aio.Completion, error) {
218219
commands := []*t_aio.Command{}
219220

220-
// Combine both commands if taskCmd is not null otherwise add just the CreatePromiseCmd
221-
if isCreatePromiseAndTask {
222-
commands = append(commands, &t_aio.Command{
223-
Kind: t_aio.CreatePromiseAndTask,
224-
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
225-
PromiseCommand: promiseCmd,
226-
TaskCommand: taskCmd,
227-
},
228-
})
229-
} else {
230-
commands = append(commands, &t_aio.Command{
231-
Kind: t_aio.CreatePromise,
232-
CreatePromise: promiseCmd,
233-
})
234-
}
235-
236221
// check router to see if a task needs to be created
237222
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
238223
Kind: t_aio.Router,
@@ -254,16 +239,21 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
254239
slog.Warn("failed to match promise", "cmd", promiseCmd, "err", err)
255240
}
256241

257-
if isCreatePromiseAndTask && (err != nil || !completion.Router.Matched) {
242+
if taskCmd != nil && (err != nil || !completion.Router.Matched) {
258243
slog.Error("failed to match promise with router when creating a task", "cmd", promiseCmd)
259244
return nil, t_api.NewError(t_api.StatusPromiseRecvNotFound, err)
260245
}
261246

247+
cmd := &t_aio.Command{
248+
Kind: t_aio.CreatePromise,
249+
CreatePromise: promiseCmd,
250+
}
251+
262252
if err == nil && completion.Router.Matched {
263253
util.Assert(completion.Router.Recv != nil, "recv must not be nil")
264254

265255
// If there is a taskCmd just update the Recv otherwise create a tasks for the match
266-
if isCreatePromiseAndTask {
256+
if taskCmd != nil {
267257
// Note: we are mutating the taskCmd that is already merged with the createPromiseCmd
268258
taskCmd.Recv = completion.Router.Recv
269259
} else {
@@ -275,17 +265,19 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
275265
State: task.Init,
276266
CreatedOn: promiseCmd.CreatedOn,
277267
}
278-
279-
// add create task command if matched
280-
commands = append(commands, &t_aio.Command{
281-
Kind: t_aio.CreateTask,
282-
CreateTask: taskCmd,
283-
})
284-
285268
}
286269

270+
cmd = &t_aio.Command{
271+
Kind: t_aio.CreatePromiseAndTask,
272+
CreatePromiseAndTask: &t_aio.CreatePromiseAndTaskCommand{
273+
PromiseCommand: promiseCmd,
274+
TaskCommand: taskCmd,
275+
},
276+
}
287277
}
288278

279+
// Add the main command
280+
commands = append(commands, cmd)
289281
// add additional commands
290282
commands = append(commands, additionalCmds...)
291283

@@ -307,15 +299,15 @@ func createPromise(tags map[string]string, promiseCmd *t_aio.CreatePromiseComman
307299

308300
util.Assert(completion.Store != nil, "completion must not be nil")
309301
util.Assert(len(completion.Store.Results) == len(commands), "completion must have same number of results as commands")
310-
if isCreatePromiseAndTask {
302+
if completion.Store.Results[0].Kind == t_aio.CreatePromiseAndTask {
311303
promiseAndTaskResult := completion.Store.Results[0].CreatePromiseAndTask
312304
util.Assert(promiseAndTaskResult.PromiseRowsAffected == 0 || promiseAndTaskResult.PromiseRowsAffected == 1, "Creating promise result must return 0 or 1 rows")
313-
if promiseAndTaskResult.PromiseRowsAffected == 0 {
314-
util.Assert(promiseAndTaskResult.TaskRowsAffected == 0, "If not promise was created a task must have not been created")
315-
}
316-
} else {
305+
util.Assert(promiseAndTaskResult.TaskRowsAffected == promiseAndTaskResult.PromiseRowsAffected, "If not promise was created a task must have not been created")
306+
} else if completion.Store.Results[0].Kind == t_aio.CreatePromise {
317307
createPromiseResult := completion.Store.Results[0].CreatePromise
318308
util.Assert(createPromiseResult.RowsAffected == 0 || createPromiseResult.RowsAffected == 1, "CreatePromise result must return 0 or 1 rows")
309+
} else {
310+
panic("First result must be CreatePromise or CreatePromiseAndTask")
319311
}
320312

321313
return completion, nil

internal/app/subsystems/aio/store/sqlite/sqlite.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,8 +1025,11 @@ func (w *SqliteStoreWorker) createPromiseAndTask(tx *sql.Tx, promiseStmt *sql.St
10251025
// Couldn't create a promise
10261026
if promiseResult.CreatePromise.RowsAffected == 0 {
10271027
return &t_aio.Result{
1028-
Kind: t_aio.CreatePromiseAndTask,
1029-
CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{},
1028+
Kind: t_aio.CreatePromiseAndTask,
1029+
CreatePromiseAndTask: &t_aio.AlterPromiseAndTaskResult{
1030+
PromiseRowsAffected: 0,
1031+
TaskRowsAffected: 0,
1032+
},
10301033
}, nil
10311034
}
10321035

test/dst/dst.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System)
160160
if d.config.PrintOps {
161161
// log
162162
slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "id", id, "req", req, "res", res, "err", err)
163+
163164
}
164165

165166
// extract cursors for subsequent requests

0 commit comments

Comments
 (0)