Skip to content

Commit 1816115

Browse files
authored
Fix current worker deployment getter (#1924)
* only return last v1 deployment in the shared queue consumer * be explicit about only returning managed deployments
1 parent 5f4c607 commit 1816115

File tree

6 files changed

+48
-20
lines changed

6 files changed

+48
-20
lines changed

apps/webapp/app/presenters/v3/TestPresenter.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class TestPresenter extends BasePresenter {
5656
JOIN ${sqlDatabaseSchema}."BackgroundWorkerTask" bwt ON bwt."workerId" = latest_workers.id
5757
ORDER BY slug ASC;`;
5858
} else {
59-
const currentDeployment = await findCurrentWorkerDeployment(envId);
59+
const currentDeployment = await findCurrentWorkerDeployment({ environmentId: envId });
6060
return currentDeployment?.worker?.tasks ?? [];
6161
}
6262
}

apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ export class TestTaskPresenter {
8888
}: TestTaskOptions): Promise<TestTaskResult> {
8989
let task: BackgroundWorkerTaskSlim | null = null;
9090
if (environment.type !== "DEVELOPMENT") {
91-
const deployment = await findCurrentWorkerDeployment(environment.id);
91+
const deployment = await findCurrentWorkerDeployment({ environmentId: environment.id });
9292
if (deployment) {
9393
task = deployment.worker?.tasks.find((t) => t.slug === taskIdentifier) ?? null;
9494
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,10 @@ export class SharedQueueConsumer {
612612
? await getWorkerDeploymentFromWorkerTask(existingTaskRun.lockedById)
613613
: existingTaskRun.lockedToVersionId
614614
? await getWorkerDeploymentFromWorker(existingTaskRun.lockedToVersionId)
615-
: await findCurrentWorkerDeployment(existingTaskRun.runtimeEnvironmentId);
615+
: await findCurrentWorkerDeployment({
616+
environmentId: existingTaskRun.runtimeEnvironmentId,
617+
type: "V1",
618+
});
616619
});
617620

618621
const worker = deployment?.worker;

apps/webapp/app/v3/models/workerDeployment.server.ts

+37-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Prettify } from "@trigger.dev/core";
2-
import { BackgroundWorker, RunEngineVersion, WorkerDeployment } from "@trigger.dev/database";
2+
import { BackgroundWorker, RunEngineVersion, WorkerDeploymentType } from "@trigger.dev/database";
33
import {
44
CURRENT_DEPLOYMENT_LABEL,
55
CURRENT_UNMANAGED_DEPLOYMENT_LABEL,
@@ -56,10 +56,23 @@ type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{
5656
};
5757
}>;
5858

59-
export async function findCurrentWorkerDeployment(
60-
environmentId: string,
61-
label = CURRENT_DEPLOYMENT_LABEL
62-
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
59+
/**
60+
* Finds the current worker deployment for a given environment.
61+
*
62+
* @param environmentId - The ID of the environment to find the current worker deployment for.
63+
* @param label - The label of the current worker deployment to find.
64+
* @param type - The type of worker deployment to find. If the current deployment is NOT of this type,
65+
* we will return the latest deployment of the given type.
66+
*/
67+
export async function findCurrentWorkerDeployment({
68+
environmentId,
69+
label = CURRENT_DEPLOYMENT_LABEL,
70+
type,
71+
}: {
72+
environmentId: string;
73+
label?: string;
74+
type?: WorkerDeploymentType;
75+
}): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
6376
const promotion = await prisma.workerDeploymentPromotion.findFirst({
6477
where: {
6578
environmentId,
@@ -93,16 +106,19 @@ export async function findCurrentWorkerDeployment(
93106
return undefined;
94107
}
95108

96-
if (promotion.deployment.type === "V1") {
97-
// This is a run engine v1 deployment, so return it
109+
if (!type) {
98110
return promotion.deployment;
99111
}
100112

101-
// We need to get the latest run engine v1 deployment
102-
const latestV1Deployment = await prisma.workerDeployment.findFirst({
113+
if (promotion.deployment.type === type) {
114+
return promotion.deployment;
115+
}
116+
117+
// We need to get the latest deployment of the given type
118+
const latestDeployment = await prisma.workerDeployment.findFirst({
103119
where: {
104120
environmentId,
105-
type: "V1",
121+
type,
106122
},
107123
orderBy: {
108124
id: "desc",
@@ -127,11 +143,11 @@ export async function findCurrentWorkerDeployment(
127143
},
128144
});
129145

130-
if (!latestV1Deployment) {
146+
if (!latestDeployment) {
131147
return undefined;
132148
}
133149

134-
return latestV1Deployment;
150+
return latestDeployment;
135151
}
136152

137153
export async function getCurrentWorkerDeploymentEngineVersion(
@@ -162,7 +178,11 @@ export async function getCurrentWorkerDeploymentEngineVersion(
162178
export async function findCurrentUnmanagedWorkerDeployment(
163179
environmentId: string
164180
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
165-
return await findCurrentWorkerDeployment(environmentId, CURRENT_UNMANAGED_DEPLOYMENT_LABEL);
181+
return await findCurrentWorkerDeployment({
182+
environmentId,
183+
label: CURRENT_UNMANAGED_DEPLOYMENT_LABEL,
184+
type: "UNMANAGED",
185+
});
166186
}
167187

168188
export async function findCurrentWorkerFromEnvironment(
@@ -183,7 +203,10 @@ export async function findCurrentWorkerFromEnvironment(
183203
});
184204
return latestDevWorker;
185205
} else {
186-
const deployment = await findCurrentWorkerDeployment(environment.id, label);
206+
const deployment = await findCurrentWorkerDeployment({
207+
environmentId: environment.id,
208+
label,
209+
});
187210
return deployment?.worker ?? null;
188211
}
189212
}

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ export class TriggerScheduledTaskService extends BaseService {
7373

7474
if (instance.environment.type !== "DEVELOPMENT") {
7575
// Get the current backgroundWorker for this environment
76-
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
76+
const currentWorkerDeployment = await findCurrentWorkerDeployment({
77+
environmentId: instance.environment.id,
78+
});
7779

7880
if (!currentWorkerDeployment) {
7981
logger.debug("No current worker deployment found, skipping task trigger", {

internal-packages/run-engine/src/engine/db/worker.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export async function getRunWithBackgroundWorkerTasks(
100100
} else {
101101
workerWithTasks = workerId
102102
? await getWorkerDeploymentFromWorker(prisma, workerId)
103-
: await getWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId);
103+
: await getManagedWorkerFromCurrentlyPromotedDeployment(prisma, run.runtimeEnvironmentId);
104104
}
105105

106106
if (!workerWithTasks) {
@@ -260,7 +260,7 @@ export async function getWorkerById(
260260
return { worker, tasks: worker.tasks, queues: worker.queues, deployment: worker.deployment };
261261
}
262262

263-
export async function getWorkerFromCurrentlyPromotedDeployment(
263+
export async function getManagedWorkerFromCurrentlyPromotedDeployment(
264264
prisma: PrismaClientOrTransaction,
265265
environmentId: string
266266
): Promise<WorkerDeploymentWithWorkerTasks | null> {

0 commit comments

Comments
 (0)