Skip to content

Commit a51b9b9

Browse files
authored
Fix: custom queue releaseConcurrencyOnWaitpoint. (#1896)
* Make sure releaseConcurrencyOnWaitpoint is set on taskQueue * Allow task queue to have releaseConcurrencyOnWaitpoint (required to fix custom queues) * Test for releaseConcurrencyOnWaitpoint * Added a warning if you use the same queue twice with different settings * Improved the error some more
1 parent 9d261f4 commit a51b9b9

File tree

5 files changed

+183
-2
lines changed

5 files changed

+183
-2
lines changed

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ async function createWorkerQueue(
368368
concurrencyLimit ?? undefined,
369369
orderableName,
370370
queueType,
371+
queue.releaseConcurrencyOnWaitpoint,
371372
worker,
372373
prisma
373374
);
@@ -402,6 +403,7 @@ async function upsertWorkerQueueRecord(
402403
concurrencyLimit: number | undefined,
403404
orderableName: string,
404405
queueType: TaskQueueType,
406+
releaseConcurrencyOnWaitpoint: boolean | undefined,
405407
worker: BackgroundWorker,
406408
prisma: PrismaClientOrTransaction,
407409
attempt: number = 0
@@ -429,6 +431,7 @@ async function upsertWorkerQueueRecord(
429431
runtimeEnvironmentId: worker.runtimeEnvironmentId,
430432
projectId: worker.projectId,
431433
type: queueType,
434+
releaseConcurrencyOnWaitpoint,
432435
workers: {
433436
connect: {
434437
id: worker.id,
@@ -437,14 +440,15 @@ async function upsertWorkerQueueRecord(
437440
},
438441
});
439442
} else {
440-
await prisma.taskQueue.update({
443+
taskQueue = await prisma.taskQueue.update({
441444
where: {
442445
id: taskQueue.id,
443446
},
444447
data: {
445448
workers: { connect: { id: worker.id } },
446449
version: "V2",
447450
orderableName,
451+
releaseConcurrencyOnWaitpoint,
448452
},
449453
});
450454
}
@@ -458,6 +462,7 @@ async function upsertWorkerQueueRecord(
458462
concurrencyLimit,
459463
orderableName,
460464
queueType,
465+
releaseConcurrencyOnWaitpoint,
461466
worker,
462467
prisma,
463468
attempt + 1

packages/core/src/v3/resource-catalog/standardResourceCatalog.ts

+25
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,31 @@ export class StandardResourceCatalog implements ResourceCatalog {
2424
}
2525

2626
registerQueueMetadata(queue: QueueManifest): void {
27+
const existingQueue = this._queueMetadata.get(queue.name);
28+
29+
//if it exists already AND concurrencyLimit or releaseConcurrencyOnWaitpoint is different, log a warning
30+
if (existingQueue) {
31+
const isConcurrencyLimitDifferent = existingQueue.concurrencyLimit !== queue.concurrencyLimit;
32+
const isReleaseConcurrencyOnWaitpointDifferent =
33+
existingQueue.releaseConcurrencyOnWaitpoint !== queue.releaseConcurrencyOnWaitpoint;
34+
35+
if (isConcurrencyLimitDifferent || isReleaseConcurrencyOnWaitpointDifferent) {
36+
let message = `Queue "${queue.name}" is defined twice, with different settings.`;
37+
if (isConcurrencyLimitDifferent) {
38+
message += `\n - concurrencyLimit: ${existingQueue.concurrencyLimit} vs ${queue.concurrencyLimit}`;
39+
}
40+
if (isReleaseConcurrencyOnWaitpointDifferent) {
41+
message += `\n - releaseConcurrencyOnWaitpoint: ${existingQueue.releaseConcurrencyOnWaitpoint} vs ${queue.releaseConcurrencyOnWaitpoint}`;
42+
}
43+
44+
message += "\n Keeping the first definition:";
45+
message += `\n - concurrencyLimit: ${existingQueue.concurrencyLimit}`;
46+
message += `\n - releaseConcurrencyOnWaitpoint: ${existingQueue.releaseConcurrencyOnWaitpoint}`;
47+
console.warn(message);
48+
return;
49+
}
50+
}
51+
2752
this._queueMetadata.set(queue.name, queue);
2853
}
2954

packages/core/src/v3/types/tasks.ts

+1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ type CommonTaskOptions<
206206
queue?: {
207207
name?: string;
208208
concurrencyLimit?: number;
209+
releaseConcurrencyOnWaitpoint?: boolean;
209210
};
210211
/** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on.
211212
*

packages/trigger-sdk/src/v3/shared.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ export { SubtaskUnwrapError, TaskRunPromise };
119119
export type Context = TaskRunContext;
120120

121121
export function queue(options: QueueOptions): Queue {
122-
// TODO register queue here
123122
resourceCatalog.registerQueueMetadata(options);
124123

125124
// @ts-expect-error
@@ -215,6 +214,7 @@ export function createTask<
215214
resourceCatalog.registerQueueMetadata({
216215
name: queue.name,
217216
concurrencyLimit: queue.concurrencyLimit,
217+
releaseConcurrencyOnWaitpoint: queue.releaseConcurrencyOnWaitpoint,
218218
});
219219
}
220220

@@ -346,6 +346,7 @@ export function createSchemaTask<
346346
resourceCatalog.registerQueueMetadata({
347347
name: queue.name,
348348
concurrencyLimit: queue.concurrencyLimit,
349+
releaseConcurrencyOnWaitpoint: queue.releaseConcurrencyOnWaitpoint,
349350
});
350351
}
351352

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import { batch, logger, queue, task, wait } from "@trigger.dev/sdk";
2+
import assert from "node:assert";
3+
import { setTimeout } from "node:timers/promises";
4+
5+
// Queue with concurrency limit and release enabled
6+
const releaseEnabledQueue = queue({
7+
name: "release-concurrency-test-queue-enabled",
8+
concurrencyLimit: 2,
9+
releaseConcurrencyOnWaitpoint: true,
10+
});
11+
12+
// Queue with concurrency limit but release disabled
13+
const releaseDisabledQueue = queue({
14+
name: "release-concurrency-test-queue-disabled",
15+
concurrencyLimit: 2,
16+
releaseConcurrencyOnWaitpoint: false,
17+
});
18+
19+
// Task that runs on the release-enabled queue
20+
const releaseEnabledTask = task({
21+
id: "release-concurrency-enabled-task",
22+
queue: releaseEnabledQueue,
23+
retry: {
24+
maxAttempts: 1,
25+
},
26+
run: async (payload: { id: string; waitSeconds: number }, { ctx }) => {
27+
const startedAt = Date.now();
28+
logger.info(`Run ${payload.id} started at ${startedAt}`);
29+
30+
// Wait and release concurrency
31+
await wait.for({ seconds: payload.waitSeconds, releaseConcurrency: true });
32+
33+
const resumedAt = Date.now();
34+
await setTimeout(2000); // Additional work after resuming
35+
const completedAt = Date.now();
36+
37+
return { id: payload.id, startedAt, resumedAt, completedAt };
38+
},
39+
});
40+
41+
// Task that runs on the release-disabled queue
42+
const releaseDisabledTask = task({
43+
id: "release-concurrency-disabled-task",
44+
queue: releaseDisabledQueue,
45+
retry: {
46+
maxAttempts: 1,
47+
},
48+
run: async (payload: { id: string; waitSeconds: number }, { ctx }) => {
49+
const startedAt = Date.now();
50+
logger.info(`Run ${payload.id} started ${startedAt}`);
51+
52+
// Wait without releasing concurrency
53+
await wait.for({ seconds: payload.waitSeconds });
54+
55+
const resumedAt = Date.now();
56+
await setTimeout(2000);
57+
const completedAt = Date.now();
58+
59+
return { id: payload.id, startedAt, resumedAt, completedAt };
60+
},
61+
});
62+
63+
// Main test task
64+
export const waitReleaseConcurrencyTestTask = task({
65+
id: "wait-release-concurrency-test",
66+
retry: {
67+
maxAttempts: 1,
68+
},
69+
run: async (payload, { ctx }) => {
70+
logger.info("Starting wait release concurrency test");
71+
72+
// Test 1: Queue with release enabled
73+
logger.info("Testing queue with release enabled");
74+
const enabledResults = await batch.triggerAndWait([
75+
{ id: releaseEnabledTask.id, payload: { id: "e1", waitSeconds: 6 } },
76+
{ id: releaseEnabledTask.id, payload: { id: "e2", waitSeconds: 6 } },
77+
{ id: releaseEnabledTask.id, payload: { id: "e3", waitSeconds: 6 } },
78+
]);
79+
80+
// Verify all tasks completed
81+
assert(
82+
enabledResults.runs.every((r) => r.ok),
83+
"All enabled tasks should complete"
84+
);
85+
86+
// Get executions sorted by start time
87+
const enabledExecutions = enabledResults.runs
88+
.map((r) => r.output)
89+
.sort((a, b) => a.startedAt - b.startedAt);
90+
91+
// Verify that task e3 could start before e1 and e2 completed
92+
// (because concurrency was released during wait)
93+
const e3 = enabledExecutions.find((e) => e.id === "e3");
94+
const e1e2CompletedAt = Math.max(
95+
...enabledExecutions.filter((e) => ["e1", "e2"].includes(e.id)).map((e) => e.completedAt)
96+
);
97+
98+
assert(
99+
e3.startedAt < e1e2CompletedAt,
100+
"Task e3 should start before e1/e2 complete due to released concurrency"
101+
);
102+
103+
logger.info("✅ test with release enabled");
104+
105+
// Test 2: Queue with release disabled
106+
logger.info("Testing queue with release disabled");
107+
const disabledResults = await batch.triggerAndWait([
108+
{ id: releaseDisabledTask.id, payload: { id: "d1", waitSeconds: 6 } },
109+
{ id: releaseDisabledTask.id, payload: { id: "d2", waitSeconds: 6 } },
110+
{ id: releaseDisabledTask.id, payload: { id: "d3", waitSeconds: 6 } },
111+
]);
112+
113+
// Verify all tasks completed
114+
assert(
115+
disabledResults.runs.every((r) => r.ok),
116+
"All disabled tasks should complete"
117+
);
118+
119+
// Get executions sorted by start time
120+
const disabledExecutions = disabledResults.runs
121+
.map((r) => r.output)
122+
.sort((a, b) => a.startedAt - b.startedAt);
123+
124+
// Verify that task d3 could NOT start before d1 or d2 completed
125+
// (because concurrency was not released during wait)
126+
const d3 = disabledExecutions.find((e) => e.id === "d3");
127+
const d1d2CompletedAt = Math.max(
128+
...disabledExecutions.filter((e) => ["d1", "d2"].includes(e.id)).map((e) => e.completedAt)
129+
);
130+
131+
assert(
132+
d3.startedAt >= d1d2CompletedAt,
133+
"Task d3 should not start before d1/d2 complete when concurrency is not released"
134+
);
135+
136+
logger.info("✅ test with release disabled");
137+
138+
return {
139+
enabledQueueResults: {
140+
executions: enabledExecutions,
141+
concurrencyReleased: true,
142+
},
143+
disabledQueueResults: {
144+
executions: disabledExecutions,
145+
concurrencyReleased: false,
146+
},
147+
};
148+
},
149+
});

0 commit comments

Comments
 (0)