From f947f3ae4cb48989f207646787fd1a952837a6d8 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 4 Apr 2025 20:17:27 +0200 Subject: [PATCH] worker: add everysync Signed-off-by: Matteo Collina --- LICENSE | 25 +++++ doc/api/worker_threads.md | 75 +++++++++++++ lib/internal/worker/everysync/index.js | 100 ++++++++++++++++++ lib/internal/worker/everysync/indexes.js | 27 +++++ lib/internal/worker/everysync/objects.js | 55 ++++++++++ lib/worker_threads.js | 7 ++ test/fixtures/everysync/echo.mjs | 10 ++ test/fixtures/everysync/failure.mjs | 9 ++ test/parallel/test-worker-everysync-base.js | 58 ++++++++++ .../parallel/test-worker-everysync-objects.js | 76 +++++++++++++ 10 files changed, 442 insertions(+) create mode 100644 lib/internal/worker/everysync/index.js create mode 100644 lib/internal/worker/everysync/indexes.js create mode 100644 lib/internal/worker/everysync/objects.js create mode 100644 test/fixtures/everysync/echo.mjs create mode 100644 test/fixtures/everysync/failure.mjs create mode 100644 test/parallel/test-worker-everysync-base.js create mode 100644 test/parallel/test-worker-everysync-objects.js diff --git a/LICENSE b/LICENSE index 966aeac83d37bb..e86760aae5ec8a 100644 --- a/LICENSE +++ b/LICENSE @@ -2639,3 +2639,28 @@ The externally maintained libraries used by Node.js are: OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + +- everysync, located at lib/internal/worker/everysync, is licensed as follows: + """ + MIT License + + Copyright (c) 2024 Matteo Collina + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + """ diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index e9219bbea0d6e5..740a38f91f53b2 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only active handle in the event system. If the worker is already `unref()`ed calling `unref()` again has no effect. +## `worker.makeSync(buffer[, options])` + + + +* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication. +* `options` {Object} + * `timeout` {number} The timeout in milliseconds for synchronous calls. **Default:** `5000`. + * `expandable` {boolean} Whether the buffer can be resized. **Default:** `true` if the buffer + supports `growable` option. +* Returns: {Object} An object with synchronous methods mirroring those exposed through [`worker.wire()`][]. + +Creates a synchronous API facade that communicates with a worker thread over a shared memory buffer. +The worker thread must call [`worker.wire()`][] on the same buffer to register the methods that can be called. + +This function enables making synchronous calls to a worker thread, which is particularly useful +when code requires blocking operations but still wants to benefit from the worker thread's isolation. + +```js +const { Worker, makeSync } = require('node:worker_threads'); + +// Create a SharedArrayBuffer for communication +const buffer = new SharedArrayBuffer(1024, { + maxByteLength: 64 * 1024 * 1024, +}); + +// Create a worker, passing the buffer +const worker = new Worker('worker-script.js', { + workerData: { buffer }, +}); + +// Create a synchronous API facade +const api = makeSync(buffer); + +// Call a method synchronously - this will block until the worker responds +const result = api.methodName(arg1, arg2); +``` + +## `worker.wire(buffer, methods)` + + + +* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication. +* `methods` {Object} An object whose properties are methods to expose to the main thread. + +Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][]. +The methods can be async functions or return promises, and the main thread will wait +for the promise to resolve or reject. + +```js +const { workerData, wire } = require('node:worker_threads'); + +// Expose methods synchronously to the main thread +wire(workerData.buffer, { + async methodName(arg1, arg2) { + // Do work asynchronously + return result; + }, + + syncMethod(arg) { + // Do synchronous work + return result; + }, +}); +``` + +The `wire()` function should be called early in the worker's lifecycle to register +the methods before the main thread attempts to call them. Any values returned by +these methods are serialized and passed back to the main thread. + ## Notes ### Synchronous blocking of stdio @@ -1633,10 +1706,12 @@ thread spawned will spawn another until the application crashes. [`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions [`vm`]: vm.md [`worker.SHARE_ENV`]: #workershare_env +[`worker.makeSync()`]: #workermakesyncbuffer-options [`worker.on('message')`]: #event-message_1 [`worker.postMessage()`]: #workerpostmessagevalue-transferlist [`worker.terminate()`]: #workerterminate [`worker.threadId`]: #workerthreadid_1 +[`worker.wire()`]: #workerwirebuffer-methods [async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool [browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort [child processes]: child_process.md diff --git a/lib/internal/worker/everysync/index.js b/lib/internal/worker/everysync/index.js new file mode 100644 index 00000000000000..bf8cd44c107a7a --- /dev/null +++ b/lib/internal/worker/everysync/index.js @@ -0,0 +1,100 @@ +'use strict'; + +const { + AtomicsNotify, + AtomicsStore, + AtomicsWait, + AtomicsWaitAsync, + Int32Array, + ObjectKeys, +} = primordials; + +const { + codes: { + ERR_WORKER_MESSAGING_TIMEOUT, + }, +} = require('internal/errors'); + +const { read, write } = require('internal/worker/everysync/objects'); +const { + OFFSET, + TO_MAIN, + TO_WORKER, +} = require('internal/worker/everysync/indexes'); + +/** + * Creates a synchronous API facade from a shared memory buffer. + * This function is meant to be used in the main thread to communicate with + * a worker thread that has called `wire()` on the same shared memory. + * @param {SharedArrayBuffer} data - The shared memory buffer for communication + * @param {object} [opts={}] - Options object + * @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations + * @returns {object} - An object with methods that match the ones exposed by the worker + */ +function makeSync(data, opts = {}) { + const timeout = opts.timeout || 1000; + const metaView = new Int32Array(data); + + const res = AtomicsWait(metaView, TO_WORKER, 0, timeout); + AtomicsStore(metaView, TO_WORKER, 0); + + if (res === 'ok') { + const obj = read(data, OFFSET); + + const api = {}; + for (const key of obj) { + api[key] = (...args) => { + write(data, { key, args }, OFFSET); + AtomicsStore(metaView, TO_MAIN, 1); + AtomicsNotify(metaView, TO_MAIN, 1); + const res = AtomicsWait(metaView, TO_WORKER, 0, timeout); + AtomicsStore(metaView, TO_WORKER, 0); + if (res === 'ok') { + const obj = read(data, OFFSET); + return obj; + } + throw new ERR_WORKER_MESSAGING_TIMEOUT(); + }; + } + + return api; + } + throw new ERR_WORKER_MESSAGING_TIMEOUT(); +} + +/** + * Wires up a shared memory buffer to invoke methods on an object. + * This function is meant to be used in a worker thread to expose methods + * to the main thread that has called `makeSync()` on the same shared memory. + * @param {SharedArrayBuffer} data - The shared memory buffer for communication + * @param {object} obj - Object with methods to expose to the main thread + * @returns {Promise} - A promise that never resolves unless there's an error + */ +async function wire(data, obj) { + write(data, ObjectKeys(obj), OFFSET); + + const metaView = new Int32Array(data); + + AtomicsStore(metaView, TO_WORKER, 1); + AtomicsNotify(metaView, TO_WORKER); + + while (true) { + const waitAsync = AtomicsWaitAsync(metaView, TO_MAIN, 0); + const res = await waitAsync.value; + AtomicsStore(metaView, TO_MAIN, 0); + + if (res === 'ok') { + const { key, args } = read(data, OFFSET); + // This is where the magic happens - invoke the requested method + const result = await obj[key](...args); + write(data, result, OFFSET); + AtomicsStore(metaView, TO_WORKER, 1); + AtomicsNotify(metaView, TO_WORKER, 1); + } + } +} + +module.exports = { + makeSync, + wire, +}; diff --git a/lib/internal/worker/everysync/indexes.js b/lib/internal/worker/everysync/indexes.js new file mode 100644 index 00000000000000..d33b37f2a64183 --- /dev/null +++ b/lib/internal/worker/everysync/indexes.js @@ -0,0 +1,27 @@ +'use strict'; + +/** + * Byte offset where the actual data begins in the shared memory + * @type {number} + */ +const OFFSET = 64; + +/** + * Index in the Int32Array for signaling from worker to main thread + * 0: writing from worker, reading from main + * @type {number} + */ +const TO_WORKER = 0; + +/** + * Index in the Int32Array for signaling from main to worker thread + * 1: writing from main, reading from worker + * @type {number} + */ +const TO_MAIN = 1; + +module.exports = { + OFFSET, + TO_WORKER, + TO_MAIN, +}; diff --git a/lib/internal/worker/everysync/objects.js b/lib/internal/worker/everysync/objects.js new file mode 100644 index 00000000000000..a2273389ec3f76 --- /dev/null +++ b/lib/internal/worker/everysync/objects.js @@ -0,0 +1,55 @@ +'use strict'; + +const { + DataView, + Uint8Array, +} = primordials; + +const { + codes: { + ERR_INVALID_BUFFER_SIZE, + }, +} = require('internal/errors'); + +const { serialize, deserialize } = require('v8'); + +/** + * Reads an object from a shared memory buffer + * @param {SharedArrayBuffer} buffer - The shared memory buffer containing serialized data + * @param {number} [byteOffset=0] - Byte offset where the data begins + * @returns {any} - The deserialized object + */ +function read(buffer, byteOffset = 0) { + const view = new DataView(buffer, byteOffset); + const length = view.getUint32(0, true); + const object = deserialize(new Uint8Array(buffer, byteOffset + 4, length)); + return object; +} + +/** + * Writes an object to a shared memory buffer + * @param {SharedArrayBuffer} buffer - The shared memory buffer to write to + * @param {any} object - The object to serialize and write + * @param {number} [byteOffset=0] - Byte offset where to write the data + * @throws {Error} If the buffer is too small and not growable + */ +function write(buffer, object, byteOffset = 0) { + const data = serialize(object); + + if (buffer.byteLength < data.byteLength + 4 + byteOffset) { + // Check if buffer is growable (has grow method from ShareArrayBuffer.prototype) + if (typeof buffer.grow !== 'function') { + throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); + } + buffer.grow(data.byteLength + 4 + byteOffset); + } + + const view = new DataView(buffer, byteOffset); + view.setUint32(0, data.byteLength, true); + new Uint8Array(buffer, byteOffset + 4).set(data); +} + +module.exports = { + read, + write, +}; diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 1e7a175fc8638e..83d58607ba997f 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -29,6 +29,11 @@ const { isMarkedAsUntransferable, } = require('internal/buffer'); +const { + makeSync, + wire, +} = require('internal/worker/everysync/index'); + module.exports = { isInternalThread, isMainThread, @@ -49,4 +54,6 @@ module.exports = { BroadcastChannel, setEnvironmentData, getEnvironmentData, + makeSync, + wire, }; diff --git a/test/fixtures/everysync/echo.mjs b/test/fixtures/everysync/echo.mjs new file mode 100644 index 00000000000000..83c628f3cfe35f --- /dev/null +++ b/test/fixtures/everysync/echo.mjs @@ -0,0 +1,10 @@ +import { workerData, wire } from 'node:worker_threads'; + +wire(workerData.data, { + async echo(arg) { + return arg; + }, +}); + +// Keep the event loop alive +setInterval(() => {}, 100000); \ No newline at end of file diff --git a/test/fixtures/everysync/failure.mjs b/test/fixtures/everysync/failure.mjs new file mode 100644 index 00000000000000..cce9bd8cb7ae13 --- /dev/null +++ b/test/fixtures/everysync/failure.mjs @@ -0,0 +1,9 @@ +import { workerData, wire } from 'node:worker_threads'; + +wire(workerData.data, { + fail(arg) { + return new Promise((resolve, reject) => { + // nothing to do here, we will fail + }); + }, +}); \ No newline at end of file diff --git a/test/parallel/test-worker-everysync-base.js b/test/parallel/test-worker-everysync-base.js new file mode 100644 index 00000000000000..7ce1825cf40fb0 --- /dev/null +++ b/test/parallel/test-worker-everysync-base.js @@ -0,0 +1,58 @@ +'use strict'; + +// eslint-disable-next-line no-unused-vars +const common = require('../common'); +const assert = require('assert'); +const { join } = require('path'); +const { Worker, makeSync } = require('worker_threads'); + +// Test makeSync and wire functionality +{ + const buffer = new SharedArrayBuffer(1024, { + maxByteLength: 64 * 1024 * 1024, + }); + const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), { + workerData: { + data: buffer, + }, + }); + + const api = makeSync(buffer); + + assert.strictEqual(api.echo(42), 42); + assert.strictEqual(api.echo('test'), 'test'); + assert.deepStrictEqual(api.echo({ foo: 'bar' }), { foo: 'bar' }); + + worker.terminate(); +} + +// Test timeout failure +{ + const buffer = new SharedArrayBuffer(1024, { + maxByteLength: 64 * 1024 * 1024, + }); + const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'failure.mjs'), { + workerData: { + data: buffer, + }, + }); + + const api = makeSync(buffer, { timeout: 100 }); + + assert.throws(() => api.fail(), { + code: 'ERR_WORKER_MESSAGING_TIMEOUT' + }); + + worker.terminate(); +} + +// Test initialization timeout +{ + const buffer = new SharedArrayBuffer(1024, { + maxByteLength: 64 * 1024 * 1024, + }); + + assert.throws(() => makeSync(buffer, { timeout: 100 }), { + code: 'ERR_WORKER_MESSAGING_TIMEOUT' + }); +} diff --git a/test/parallel/test-worker-everysync-objects.js b/test/parallel/test-worker-everysync-objects.js new file mode 100644 index 00000000000000..a57efe4dd9abc8 --- /dev/null +++ b/test/parallel/test-worker-everysync-objects.js @@ -0,0 +1,76 @@ +// Flags: --expose-internals +'use strict'; + +// eslint-disable-next-line no-unused-vars +const common = require('../common'); +const assert = require('assert'); +const { read, write } = require('internal/worker/everysync/objects'); + +// Test basic serialization/deserialization +{ + const obj = { foo: 'bar' }; + const buffer = new SharedArrayBuffer(1024); + write(buffer, obj); + const obj2 = read(buffer); + assert.deepStrictEqual(obj, obj2); +} + +// Test serialization/deserialization with offset +{ + const obj = { foo: 'bar' }; + const buffer = new SharedArrayBuffer(1024); + write(buffer, obj, 4); + const obj2 = read(buffer, 4); + assert.deepStrictEqual(obj, obj2); +} + +// Test insufficient buffer size +{ + const obj = { foo: 'bar' }; + const buffer = new SharedArrayBuffer(10); + assert.throws(() => write(buffer, obj), { + name: 'TypeError', + }); +} + +// Test with growable buffer +{ + const obj = { foo: 'bar' }; + const buffer = new SharedArrayBuffer(2, { + maxByteLength: 1024, + }); + write(buffer, obj); + const obj2 = read(buffer); + assert.deepStrictEqual(obj, obj2); +} + +// Test with growable buffer and offset +{ + const obj = { foo: 'bar' }; + const buffer = new SharedArrayBuffer(2, { + maxByteLength: 1024, + }); + write(buffer, obj, 4); + const obj2 = read(buffer, 4); + assert.deepStrictEqual(obj, obj2); +} + +// Test complex objects +{ + const obj = { + string: 'hello', + number: 42, + boolean: true, + null: null, + array: [1, 2, 3], + nested: { + a: 1, + b: [true, false] + } + }; + + const buffer = new SharedArrayBuffer(1024); + write(buffer, obj); + const obj2 = read(buffer); + assert.deepStrictEqual(obj, obj2); +}