Skip to content

Commit 0f2c8fa

Browse files
authored
feat: Unify worker manager (#954)
<!-- Pull requests are squashed and merged using: - their title as the commit message - their description as the commit body Having a good title and description is important for the users to get readable changelog. --> <!-- 1. Explain WHAT the change is about --> - Solves [MET-667](https://linear.app/metatypedev/issue/MET-667/gate-unify-the-worker-manager-between-workflows-and-runtime) - [x] `BaseWorkerManager` - [x] Use in Deno runtime - [ ] ~Use in Python runtime~ _(followup PR)_ - [ ] ~Use in Rust runtime~ _(followup PR)_ - [ ] ~Worker pooling~ _(followup PR)_ <!-- 2. Explain WHY the change cannot be made simpler --> <!-- 3. Explain HOW users should update their code --> #### Migration notes --- - [ ] The change comes with new or modified tests - [ ] Hard-to-understand functions have explanatory comments - [ ] End-user documentation is updated to reflect the change <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit Based on the comprehensive summary, here are the updated release notes: - **New Features** - Enhanced worker management system with improved task tracking and execution. - Introduced new `WorkerManager` for more robust Deno runtime operations. - Added support for inline artifact generation and management. - New asynchronous method `getInlineArtifact` in the `ArtifactStore` class. - **Improvements** - Streamlined messaging and event handling across different runtime components. - Improved error reporting and task lifecycle management. - Refined type definitions for better type safety. - **Breaking Changes** - Removed `DenoMessenger` and `LazyAsyncMessenger` classes. - Restructured workflow event and message handling. - Updated task ID generation mechanism. - **Performance** - Optimized worker initialization and task execution. - Introduced more efficient task tracking and resource management. - **Bug Fixes** - Improved error handling in worker and runtime environments. - Enhanced message communication between workers and main thread. - Removed outdated test cases to focus on relevant functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 3d8dac2 commit 0f2c8fa

File tree

21 files changed

+770
-944
lines changed

21 files changed

+770
-944
lines changed

deno.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/typegate/src/runtimes/deno/deno.ts

Lines changed: 64 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import type {
1818
} from "../../typegraph/types.ts";
1919
import * as ast from "graphql/ast";
2020
import { InternalAuth } from "../../services/auth/protocols/internal.ts";
21-
import { DenoMessenger } from "./deno_messenger.ts";
2221
import type { Task } from "./shared_types.ts";
2322
import { path } from "compress/deps.ts";
2423
import { globalConfig as config } from "../../config.ts";
@@ -27,6 +26,7 @@ import { PolicyResolverOutput } from "../../engine/planner/policies.ts";
2726
import { getInjectionValues } from "../../engine/planner/injection_utils.ts";
2827
import DynamicInjection from "../../engine/injection/dynamic.ts";
2928
import { getLogger } from "../../log.ts";
29+
import { WorkerManager } from "./worker_manager.ts";
3030

3131
const logger = getLogger(import.meta);
3232

@@ -37,7 +37,8 @@ const predefinedFuncs: Record<string, Resolver<Record<string, unknown>>> = {
3737
allow: () => "ALLOW" as PolicyResolverOutput,
3838
deny: () => "DENY" as PolicyResolverOutput,
3939
pass: () => "PASS" as PolicyResolverOutput,
40-
internal_policy: ({ _: { context } }) => context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
40+
internal_policy: ({ _: { context } }) =>
41+
context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
4142
};
4243

4344
export class DenoRuntime extends Runtime {
@@ -46,8 +47,7 @@ export class DenoRuntime extends Runtime {
4647
uuid: string,
4748
private tg: TypeGraphDS,
4849
private typegate: Typegate,
49-
private w: DenoMessenger,
50-
private registry: Map<string, number>,
50+
private workerManager: WorkerManager,
5151
private secrets: Record<string, string>,
5252
) {
5353
super(typegraphName, uuid);
@@ -138,36 +138,24 @@ export class DenoRuntime extends Runtime {
138138
}
139139
}
140140

141-
const w = new DenoMessenger(
142-
name,
143-
{
144-
...(args.permissions ?? {}),
145-
read: [basePath],
146-
} as Deno.PermissionOptionsObject,
147-
false,
148-
ops,
149-
typegate.config.base,
150-
);
151-
152-
if (Deno.env.get("DENO_TESTING") === "true") {
153-
w.disableLazyness();
154-
}
141+
const workerManager = new WorkerManager({
142+
timeout_ms: typegate.config.base.timer_max_timeout_ms,
143+
});
155144

156145
const rt = new DenoRuntime(
157146
typegraphName,
158147
uuid,
159148
tg,
160149
typegate,
161-
w,
162-
registry,
150+
workerManager,
163151
secrets,
164152
);
165153

166154
return rt;
167155
}
168156

169157
async deinit(): Promise<void> {
170-
await this.w.terminate();
158+
// await this.workerManager.deinit();
171159
}
172160

173161
materialize(
@@ -257,7 +245,10 @@ export class DenoRuntime extends Runtime {
257245
const modMat = this.tg.materializers[mat.data.mod as number];
258246
const entryPoint =
259247
this.tg.meta.artifacts[modMat.data.entryPoint as string];
260-
const op = this.registry.get(entryPoint.hash)!;
248+
const depMetas = (modMat.data.deps as string[]).map((dep) =>
249+
createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep])
250+
);
251+
const moduleMeta = createArtifactMeta(this.typegraphName, entryPoint);
261252

262253
return async ({
263254
_: {
@@ -269,33 +260,33 @@ export class DenoRuntime extends Runtime {
269260
}) => {
270261
const token = await InternalAuth.emit(this.typegate.cryptoKeys);
271262

272-
return await this.w.execute(
273-
op,
263+
// TODO cache??
264+
const entryModulePath = await this.typegate.artifactStore.getLocalPath(
265+
moduleMeta,
266+
depMetas,
267+
);
268+
269+
return await this.workerManager.callFunction(
270+
mat.data.name as string,
271+
entryModulePath,
272+
entryPoint.path,
273+
args,
274274
{
275-
type: "import_func",
276-
args,
277-
internals: {
278-
parent,
279-
context,
280-
secrets,
281-
effect: mat.effect.effect ?? null,
282-
meta: {
283-
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
284-
token,
285-
},
286-
headers,
275+
parent,
276+
context,
277+
secrets,
278+
effect: mat.effect.effect ?? null,
279+
meta: {
280+
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
281+
token,
287282
},
288-
name: mat.data.name as string,
289-
verbose,
283+
headers,
290284
},
291-
[],
292-
pulseCount,
293285
);
294286
};
295287
}
296288

297289
if (mat.name === "function") {
298-
const op = this.registry.get(mat.data.script as string)!;
299290
return async ({
300291
_: {
301292
context,
@@ -306,26 +297,29 @@ export class DenoRuntime extends Runtime {
306297
}) => {
307298
const token = await InternalAuth.emit(this.typegate.cryptoKeys);
308299

309-
return await this.w.execute(
310-
op,
300+
const modulePath = await this.typegate.artifactStore.getInlineArtifact(
301+
this.typegraphName,
302+
mat.data.script as string,
303+
".ts",
304+
exportInlineFunction("inlineFunction"),
305+
);
306+
307+
return await this.workerManager.callFunction(
308+
"inlineFunction",
309+
modulePath,
310+
"tg",
311+
args,
311312
{
312-
type: "func",
313-
args,
314-
internals: {
315-
parent,
316-
context,
317-
secrets,
318-
effect: mat.effect.effect ?? null,
319-
meta: {
320-
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
321-
token,
322-
},
323-
headers,
313+
parent,
314+
context,
315+
secrets,
316+
effect: mat.effect.effect ?? null,
317+
meta: {
318+
url: `${url.protocol}//${url.host}/${this.typegraphName}`,
319+
token,
324320
},
325-
verbose,
321+
headers,
326322
},
327-
[],
328-
pulseCount,
329323
);
330324
};
331325
}
@@ -365,6 +359,18 @@ export class DenoRuntime extends Runtime {
365359
}
366360
}
367361

362+
function exportInlineFunction(name = "fn", symbol = "_my_lambda") {
363+
if (!name.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) {
364+
throw new Error(`Invalid identifier: ${name}`);
365+
}
366+
if (!symbol.match(/^[a-zA-Z_][a-zA-Z0-9_]*$/)) {
367+
throw new Error(`Invalid identifier: ${symbol}`);
368+
}
369+
return (code: string) => {
370+
return `${code}\nexport const ${name} = ${symbol};`;
371+
};
372+
}
373+
368374
function getInjectionData(d: InjectionData) {
369375
if ("value" in d) {
370376
return d.value;

src/typegate/src/runtimes/deno/deno_messenger.ts

Lines changed: 0 additions & 74 deletions
This file was deleted.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
import { TaskContext } from "./shared_types.ts";
5+
6+
export type TaskSpec = {
7+
modulePath: string;
8+
functionName: string;
9+
};
10+
11+
export type DenoMessage = {
12+
type: "CALL";
13+
modulePath: string;
14+
functionName: string;
15+
args: unknown;
16+
internals: TaskContext;
17+
};
18+
19+
export type DenoEvent =
20+
| { type: "SUCCESS"; result: unknown }
21+
| { type: "FAILURE"; error: string; exception: Error | undefined };

0 commit comments

Comments
 (0)