Skip to content

Commit f96ade2

Browse files
feat: wasm worker manager (#966)
<!-- Pull requests are squashed and merged using: - their title as the commit message - their description as the commit body Having a good title and description is important for the users to get readable changelog. --> <!-- 1. Explain WHAT the change is about --> - Closes [MET-805](https://linear.app/metatypedev/issue/MET-805/worker-manager-implementation-for-wasm-runtime-and-python). <!-- 3. Explain HOW users should update their code --> #### Migration notes --- - [x] The change comes with new or modified tests - [x] Hard-to-understand functions have explanatory comments - [x] End-user documentation is updated to reflect the change <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added WebAssembly (WASM) worker management capabilities. - Introduced new type definitions for WASM runtime interactions. - Enhanced worker pool and worker manager functionality. - Introduced a new `WasmWorker` class for managing WebAssembly workers. - **Configuration Changes** - Updated global configuration to include WASM worker-related settings. - Replaced `substantial_worker_wait_timeout_ms` with more granular worker configuration options. - Added new environment variables for WASM worker management: `MIN_WASM_WORKERS` and `MAX_WASM_WORKERS`. - **Runtime Improvements** - Refactored runtime initialization processes. - Improved worker lifecycle management. - Enhanced error handling and logging for various runtime environments. - Removed explicit `destroy` operations in some runtime components. - **Dependency Updates** - Updated Metatype version to `0.5.1-rc.0`. - **Performance Optimizations** - Added pre-warming steps in performance tests. - Streamlined worker initialization and management. - **Code Quality** - Improved code formatting and readability. - Simplified runtime and worker management interfaces. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Natoandro <[email protected]>
1 parent 1c5420a commit f96ade2

File tree

24 files changed

+551
-292
lines changed

24 files changed

+551
-292
lines changed

deno.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/metatype.dev/docs/reference/typegate/index.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,6 @@ The following environment variables can be used to configure the typegate. `SYNC
8787
| DENO_WORKER_WAIT_TIMEOUT_MS | Timeout for waiting for a free deno worker | 5000 | 2000 |
8888
| MIN_SUBSTANTIAL_WORKERS | Minimal number of available substantial workers | 2 | 4 |
8989
| MAX_SUBSTANTIAL_WORKERS | Maximal number of available substantial workers | 8 | 16 |
90+
| MIN_WASM_WORKERS | Minimal number of available wasm workers | 2 | 4 |
91+
| MAX_WASM_WORKERS | Maximal number of available wasm workers | 8 | 16 |
9092
| SUBSTANTIAL_WORKER_WAIT_TIMEOUT_MS | Timeout for waiting for a free substantial worker | 15000 | 2000 |

src/typegate/engine/00_runtime.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ const Meta = {
5555
wasmtimeWit: getOp("op_wasmtime_wit"),
5656
wit_wire: {
5757
init: getOp("op_wit_wire_init"),
58-
destroy: getOp("op_wit_wire_destroy"),
5958
handle: getOp("op_wit_wire_handle"),
6059
},
6160
substantial: {

src/typegate/engine/runtime.d.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ export type MetaNS = {
4545
args: WitWireInitArgs,
4646
cb: (op_name: string, json: string) => Promise<string>,
4747
) => Promise<WitWireInitResponse>;
48-
destroy: (instanceId: string) => Promise<void>;
4948
handle: (
5049
instanceId: string,
5150
args: WitWireReq,

src/typegate/engine/src/ext.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ deno_core::extension!(
4040
prisma::op_archive,
4141
wit_wire::op_wit_wire_init,
4242
wit_wire::op_wit_wire_handle,
43-
wit_wire::op_wit_wire_destroy,
4443
grpc::op_grpc_register,
4544
grpc::op_grpc_unregister,
4645
grpc::op_call_grpc_method,

src/typegate/engine/src/runtimes/wit_wire.rs

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,40 @@ impl Ctx {
9696
}
9797

9898
struct Instance {
99+
id: String,
99100
bindings: wit::WitWire,
100101
store: wasmtime::Store<InstanceState>,
101102
preopen_dir: PathBuf,
102103
}
103104

105+
// An Instance's lifetime is tied to a thread (worker)
106+
// The Instance is dropped when a worker is terminated
107+
impl Drop for Instance {
108+
fn drop(&mut self) {
109+
let id = self.id.clone();
110+
let preopen_dir = self.preopen_dir.clone();
111+
112+
debug!("destroying wit_wire instance {id}");
113+
114+
// FIXME: Does it really have to be async?
115+
std::mem::drop(tokio::task::spawn(async move {
116+
debug!("Removing preopened dir: {preopen_dir:?}");
117+
118+
match tokio::fs::remove_dir_all(&preopen_dir).await {
119+
Err(err) if err.kind() != std::io::ErrorKind::NotFound => {
120+
error!(
121+
"error removing preopend dir for instance {id} at {:?}: {err}",
122+
preopen_dir
123+
)
124+
}
125+
_ => {
126+
debug!("preopened dir removed for instance {id}")
127+
}
128+
}
129+
}));
130+
}
131+
}
132+
104133
struct InstanceState {
105134
table: wasmtime_wasi::ResourceTable,
106135
ctx: wasmtime_wasi::WasiCtx,
@@ -157,14 +186,6 @@ struct TypegateHost {
157186
struct SendPtr<T>(NonNull<T>);
158187
unsafe impl<T> Send for SendPtr<T> {}
159188

160-
impl TypegateHost {
161-
fn drop(self, scope: &mut v8::HandleScope) {
162-
unsafe {
163-
_ = v8::Global::from_raw(scope, self.js_fn.0);
164-
}
165-
}
166-
}
167-
168189
#[wasmtime_wasi::async_trait]
169190
impl Host for TypegateHost {
170191
async fn hostcall(
@@ -389,8 +410,9 @@ pub async fn op_wit_wire_init<'scope>(
389410
})??; // <- note second try for the wit err. we have an into impl above
390411
assert!(res.ok);
391412
ctx.instances.insert(
392-
instance_id,
413+
instance_id.clone(),
393414
Instance {
415+
id: instance_id,
394416
bindings,
395417
store,
396418
preopen_dir: work_dir,
@@ -399,34 +421,6 @@ pub async fn op_wit_wire_init<'scope>(
399421
Ok(WitWireInitResponse {})
400422
}
401423

402-
#[deno_core::op2(fast)]
403-
pub fn op_wit_wire_destroy(
404-
state: Rc<RefCell<OpState>>,
405-
scope: &mut v8::HandleScope<'_>,
406-
#[string] instance_id: String,
407-
) {
408-
debug!("destroying wit_wire instance {instance_id}");
409-
let ctx = {
410-
let state = state.borrow();
411-
let ctx = state.borrow::<Ctx>();
412-
ctx.clone()
413-
};
414-
415-
let Some((_id, instance)) = ctx.instances.remove(&instance_id) else {
416-
return;
417-
};
418-
let tg_host = instance.store.into_data().tg_host;
419-
tg_host.drop(scope);
420-
std::mem::drop(tokio::task::spawn(async move {
421-
if let Err(err) = tokio::fs::remove_dir_all(&instance.preopen_dir).await {
422-
error!(
423-
"error removing preopend dir for instance {_id} at {:?}: {err}",
424-
instance.preopen_dir
425-
)
426-
}
427-
}));
428-
}
429-
430424
#[derive(Deserialize)]
431425
#[serde(crate = "serde")]
432426
pub struct WitWireReq {

src/typegate/src/config/types.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,13 @@ export const globalConfigSchema = z.object({
6666
// deno_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers
6767
min_substantial_workers: z.coerce.number().positive().default(2),
6868
max_substantial_workers: z.coerce.number().positive().default(8),
69-
substantial_worker_wait_timeout_ms: z.coerce.number().positive().default(
70-
15000,
71-
),
69+
min_wasm_workers: z.coerce.number().positive().default(2),
70+
max_wasm_workers: z.coerce.number().positive().default(8),
71+
wasm_worker_wait_timeout_ms: z.coerce.number().positive().default(5000),
72+
substantial_worker_wait_timeout_ms: z.coerce
73+
.number()
74+
.positive()
75+
.default(15000),
7276
// substantial_idle_worker_timeout_ms: z.coerce.number().positive().optional(), // auto remove idle workers
7377
});
7478
export type GlobalConfig = z.infer<typeof globalConfigSchema>;
@@ -88,8 +92,7 @@ export const typegateConfigBaseSchema = z.object({
8892
if (bytes.length != 64) {
8993
ctx.addIssue({
9094
code: z.ZodIssueCode.custom,
91-
message:
92-
`Base64 contains ${bytes.length} instead of 64 bytes (use openssl rand -base64 64 | tr -d '\n')`,
95+
message: `Base64 contains ${bytes.length} instead of 64 bytes (use openssl rand -base64 64 | tr -d '\n')`,
9396
});
9497
}
9598
return bytes;

src/typegate/src/runtimes/deno/deno.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,21 @@ const predefinedFuncs: Record<
3434
string,
3535
(param: any) => Resolver<Record<string, unknown>>
3636
> = {
37-
identity: () => ({ _, ...args }) => args,
37+
identity:
38+
() =>
39+
({ _, ...args }) =>
40+
args,
3841
true: () => () => true,
3942
false: () => () => false,
4043
allow: () => () => "ALLOW" as PolicyResolverOutput,
4144
deny: () => () => "DENY" as PolicyResolverOutput,
4245
pass: () => () => "PASS" as PolicyResolverOutput,
43-
internal_policy: () => ({ _: { context } }) =>
44-
context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
46+
internal_policy:
47+
() =>
48+
({ _: { context } }) =>
49+
context.provider === "internal"
50+
? "ALLOW"
51+
: ("PASS" as PolicyResolverOutput),
4552
context_check: ({ key, value }) => {
4653
let check: (value: any) => boolean;
4754
switch (value.type) {
@@ -63,7 +70,7 @@ const predefinedFuncs: Record<
6370
for (const segment of path) {
6471
value = value?.[segment];
6572
}
66-
return check(value) ? "PASS" : "DENY" as PolicyResolverOutput;
73+
return check(value) ? "PASS" : ("DENY" as PolicyResolverOutput);
6774
};
6875
},
6976
};
@@ -102,9 +109,8 @@ export class DenoRuntime extends Runtime {
102109
for (const m of materializers) {
103110
let matSecrets = (m.data.secrets as string[]) ?? [];
104111
if (m.name === "outjection") {
105-
matSecrets = m.data.source === "secret"
106-
? [...getInjectionValues(m.data)]
107-
: [];
112+
matSecrets =
113+
m.data.source === "secret" ? [...getInjectionValues(m.data)] : [];
108114
}
109115
for (const secretName of (m.data.secrets as []) ?? []) {
110116
secrets[secretName] = secretManager.secretOrFail(secretName);
@@ -137,7 +143,7 @@ export class DenoRuntime extends Runtime {
137143
const matData = mat.data;
138144
const entryPoint = artifacts[matData.entryPoint as string];
139145
const depMetas = (matData.deps as string[]).map((dep) =>
140-
createArtifactMeta(typegraphName, artifacts[dep])
146+
createArtifactMeta(typegraphName, artifacts[dep]),
141147
);
142148
const moduleMeta = createArtifactMeta(typegraphName, entryPoint);
143149

@@ -273,7 +279,7 @@ export class DenoRuntime extends Runtime {
273279
const entryPoint =
274280
this.tg.meta.artifacts[modMat.data.entryPoint as string];
275281
const depMetas = (modMat.data.deps as string[]).map((dep) =>
276-
createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep])
282+
createArtifactMeta(this.typegraphName, this.tg.meta.artifacts[dep]),
277283
);
278284
const moduleMeta = createArtifactMeta(this.typegraphName, entryPoint);
279285

src/typegate/src/runtimes/patterns/worker_manager/deno.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ export interface DenoWorkerError extends BaseMessage {
1212

1313
export type BaseDenoWorkerMessage = BaseMessage | DenoWorkerError;
1414

15-
export class DenoWorker<M extends BaseMessage, E extends BaseDenoWorkerMessage>
16-
extends BaseWorker<M, E> {
15+
export class DenoWorker<
16+
M extends BaseMessage,
17+
E extends BaseDenoWorkerMessage,
18+
> extends BaseWorker<M, E> {
1719
#worker: Worker;
1820
#workerId: string;
1921
constructor(workerId: string, workerPath: string) {
@@ -44,12 +46,10 @@ export class DenoWorker<M extends BaseMessage, E extends BaseDenoWorkerMessage>
4446
};
4547

4648
this.#worker.onerror = async (event) => {
47-
await handlerFn(
48-
{
49-
type: "WORKER_ERROR",
50-
event,
51-
} as E,
52-
);
49+
await handlerFn({
50+
type: "WORKER_ERROR",
51+
event,
52+
} as E);
5353
};
5454
}
5555

src/typegate/src/runtimes/patterns/worker_manager/mod.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@ export class BaseWorkerManager<
2929
M extends BaseMessage,
3030
E extends BaseMessage,
3131
> {
32-
#activeTasks: Map<TaskId, {
33-
worker: BaseWorker<M, E>;
34-
taskSpec: T;
35-
}> = new Map();
32+
#activeTasks: Map<
33+
TaskId,
34+
{
35+
worker: BaseWorker<M, E>;
36+
taskSpec: T;
37+
}
38+
> = new Map();
3639
#tasksByName: Map<string, Set<TaskId>> = new Map();
3740
#startedAt: Map<TaskId, Date> = new Map();
3841
#pool: WorkerPool<T, M, E>;
@@ -94,11 +97,12 @@ export class BaseWorkerManager<
9497
if (activeTaskNames.length > 0) {
9598
if (options.destroy) {
9699
logger.warn(
97-
`destroying workers for tasks ${
98-
activeTaskNames.map((w) => `"${w}"`).join(", ")
99-
}`,
100+
`destroying workers for tasks ${activeTaskNames
101+
.map((w) => `"${w}"`)
102+
.join(", ")}`,
100103
);
101104
}
105+
102106
for (const name of activeTaskNames) {
103107
this.deallocateWorkersByName(name, options);
104108
}
@@ -111,8 +115,8 @@ export class BaseWorkerManager<
111115
) {
112116
const taskIds = this.#tasksByName.get(name);
113117
if (taskIds) {
114-
for (const taskId of taskIds) {
115-
this.deallocateWorker(name, taskId, options);
118+
for (const id of taskIds) {
119+
this.deallocateWorker(name, id, options);
116120
}
117121
return true;
118122
}
@@ -163,7 +167,6 @@ export class BaseWorkerManager<
163167
deinit() {
164168
this.deallocateAllWorkers({ destroy: true, shutdown: true });
165169
this.#pool.clear(); // this will be called more than once, but that is ok
166-
return Promise.resolve();
167170
}
168171
}
169172

src/typegate/src/runtimes/patterns/worker_manager/pooling.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export type Consumer<T> = (x: T) => void;
2121
export interface WaitQueue<W> {
2222
push(consumer: Consumer<W>, onCancel: () => void): void;
2323
shift(produce: () => W): boolean;
24+
clear(): void;
2425
}
2526

2627
export function createSimpleWaitQueue<W>(): WaitQueue<W> {
@@ -37,6 +38,7 @@ export function createSimpleWaitQueue<W>(): WaitQueue<W> {
3738
}
3839
return false;
3940
},
41+
clear() {},
4042
};
4143
}
4244

@@ -76,15 +78,21 @@ export class WaitQueueWithTimeout<W> implements WaitQueue<W> {
7678
return false;
7779
}
7880

81+
clear() {
82+
if (this.#timerId != null) {
83+
clearTimeout(this.#timerId);
84+
}
85+
}
86+
7987
#timeoutHandler() {
8088
this.#cancelNextEntry();
8189
this.#updateTimer();
8290
}
8391

8492
#updateTimer() {
8593
if (this.#queue.length > 0) {
86-
const timeoutMs = this.#queue[0].addedAt + this.#waitTimeoutMs -
87-
Date.now();
94+
const timeoutMs =
95+
this.#queue[0].addedAt + this.#waitTimeoutMs - Date.now();
8896
if (timeoutMs <= 0) {
8997
this.#cancelNextEntry();
9098
this.#updateTimer();
@@ -97,13 +105,11 @@ export class WaitQueueWithTimeout<W> implements WaitQueue<W> {
97105
}
98106

99107
#cancelNextEntry() {
100-
this.#queue.shift()!.cancellationHandler();
108+
this.#queue.shift()?.cancellationHandler();
101109
}
102110

103111
[Symbol.dispose]() {
104-
if (this.#timerId != null) {
105-
clearTimeout(this.#timerId);
106-
}
112+
this.clear();
107113
}
108114
}
109115

@@ -214,15 +220,12 @@ export class WorkerPool<
214220

215221
clear() {
216222
logger.warn(
217-
`destroying idle workers: ${
218-
this.#idleWorkers
219-
.map((w) => `"${w.id}"`)
220-
.join(", ")
221-
}`,
223+
`destroying idle workers: ${this.#idleWorkers
224+
.map((w) => `"${w.id}"`)
225+
.join(", ")}`,
222226
);
223-
for (const worker of this.#idleWorkers) {
224-
worker.destroy();
225-
}
227+
this.#idleWorkers.forEach((worker) => worker.destroy());
226228
this.#idleWorkers = [];
229+
this.#waitQueue.clear();
227230
}
228231
}

0 commit comments

Comments
 (0)