Skip to content

worker_threads: add everysync #57749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -2639,3 +2639,28 @@ The externally maintained libraries used by Node.js are:
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

- everysync, located at lib/internal/worker/everysync, is licensed as follows:
"""
MIT License

Copyright (c) 2024 Matteo Collina

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
75 changes: 75 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only
active handle in the event system. If the worker is already `unref()`ed calling
`unref()` again has no effect.

## `worker.makeSync(buffer[, options])`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added as experimental

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the name makeSync is a bit of a misnomer. Maybe… asyncToSync or syncFacade?


<!-- YAML
added: REPLACEME
-->

* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
* `options` {Object}
* `timeout` {number} The timeout in milliseconds for synchronous calls. **Default:** `5000`.
* `expandable` {boolean} Whether the buffer can be resized. **Default:** `true` if the buffer
supports `growable` option.
* Returns: {Object} An object with synchronous methods mirroring those exposed through [`worker.wire()`][].

Creates a synchronous API facade that communicates with a worker thread over a shared memory buffer.
The worker thread must call [`worker.wire()`][] on the same buffer to register the methods that can be called.

This function enables making synchronous calls to a worker thread, which is particularly useful
when code requires blocking operations but still wants to benefit from the worker thread's isolation.

```js
const { Worker, makeSync } = require('node:worker_threads');

// Create a SharedArrayBuffer for communication
const buffer = new SharedArrayBuffer(1024, {
maxByteLength: 64 * 1024 * 1024,
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some explanation about how to select a size for the SharedArrayBuffer would be helpful. What is it used for? What are the limitations? What is the impact if someone passes a buffer that is too small, etc.


// Create a worker, passing the buffer
const worker = new Worker('worker-script.js', {
workerData: { buffer },
});

// Create a synchronous API facade
const api = makeSync(buffer);

// Call a method synchronously - this will block until the worker responds
const result = api.methodName(arg1, arg2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious from this example how the makeSync(...) binds to the worker. Yes, I see the buffer is passed as worker data but that's an easily overlooked detail. It's also not clear what happens here if buffer ends up being passed to more than one worker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are key problem of this API. I don't think it would be possible to overcome those details - it would not work if passed to multiple workers. Do you think it would be better to add a createSyncAPIWorker(url) mechanism instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wrap the SharedArrayBuffer into a transferable object instead of using the SAB directly then you can likely better control where it goes and limit the number of workers it can go to.

```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API here feels a bit awkward. I wonder if some kind of wrapper to hide the SharedArrayBuffer would be helpful...

// in the main thread
const { Worker, APIProxy } = require('node:worker_threads');

const api = new APIProxy({ min: 1024, max: 64 * 1024 * 1024 });
const worker = new Worker('...', { workerData: api });

// in the worker
const { workerData } = require('node:worker_threads');
workerData.bind({ foo() { });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it 100% could be. I think we could actually do better:

// in the main thread
const { Worker, SyncAPIProxy } = require('node:worker_threads');

const api = new APIProxy('myworker.js', { min: 1024, max: 64 * 1024 * 1024 });

api.echo()

// in the worker
export async function echo (...args) { return args }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can make the {min, max} part optional and use some sensible defaults which can be overridden by the user if they wish to.


## `worker.wire(buffer, methods)`

<!-- YAML
added: REPLACEME
-->

* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
* `methods` {Object} An object whose properties are methods to expose to the main thread.

Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][].
The methods can be async functions or return promises, and the main thread will wait
for the promise to resolve or reject.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for the promise to resolve or reject.
for the promise to settle.

Can we clarify what's the behavior if the promise never settles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will time out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant in the docs, I can tell by reading the code it's going to timeout, it can't by reading the docs.


```js
const { workerData, wire } = require('node:worker_threads');

// Expose methods synchronously to the main thread
wire(workerData.buffer, {
async methodName(arg1, arg2) {
// Do work asynchronously
return result;
},

syncMethod(arg) {
// Do synchronous work
return result;
},
});
```

The `wire()` function should be called early in the worker's lifecycle to register
the methods before the main thread attempts to call them. Any values returned by
these methods are serialized and passed back to the main thread.
Comment on lines +1599 to +1601
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions to maybe answer in the documentation:

  • What happens when we call a method before calling wire?
  • From what I saw in the tests, this wire will not keep the worker thread alive, right?
    • If not, is it worth adding a flag as a second method to keep it alive? Most of the time, I think we developers will actually prefer to keep it alive indefinitely
  • What happens when the method we called throws an error?


## Notes

### Synchronous blocking of stdio
Expand Down Expand Up @@ -1633,10 +1706,12 @@ thread spawned will spawn another until the application crashes.
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
[`vm`]: vm.md
[`worker.SHARE_ENV`]: #workershare_env
[`worker.makeSync()`]: #workermakesyncbuffer-options
[`worker.on('message')`]: #event-message_1
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
[`worker.terminate()`]: #workerterminate
[`worker.threadId`]: #workerthreadid_1
[`worker.wire()`]: #workerwirebuffer-methods
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.md
Expand Down
100 changes: 100 additions & 0 deletions lib/internal/worker/everysync/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
'use strict';

const {
AtomicsNotify,
AtomicsStore,
AtomicsWait,
AtomicsWaitAsync,
Int32Array,
ObjectKeys,
} = primordials;

const {
codes: {
ERR_WORKER_MESSAGING_TIMEOUT,
},
} = require('internal/errors');

const { read, write } = require('internal/worker/everysync/objects');
const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment explaining what these mean would be helpful.

OFFSET,
TO_MAIN,
TO_WORKER,
} = require('internal/worker/everysync/indexes');

/**
* Creates a synchronous API facade from a shared memory buffer.
* This function is meant to be used in the main thread to communicate with
* a worker thread that has called `wire()` on the same shared memory.
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
* @param {object} [opts={}] - Options object
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
* @returns {object} - An object with methods that match the ones exposed by the worker
*/
function makeSync(data, opts = {}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I wonder if makeSync is a good name, is simple and short but hides what it does exactly, maybe createSyncWireFacade

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
function makeSync(data, opts = {}) {
function makeSync(data, opts = kEmptyObject) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is supposed to be used only on the main thread, I'm wondering if the first thing this should do is throw when it's not in the main thread.

const timeout = opts.timeout || 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a timeout of 0 valid? If so, || should be ??.

const metaView = new Int32Array(data);

const res = AtomicsWait(metaView, TO_WORKER, 0, timeout);
AtomicsStore(metaView, TO_WORKER, 0);

if (res === 'ok') {
const obj = read(data, OFFSET);

const api = {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const api = {};
const api = { __proto__: null };

for (const key of obj) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why this pattern and not a Proxy? I know Proxy objects have some perf downsides but this would seem a tailor made use case for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I think I would actually significantly simplify this.

api[key] = (...args) => {
write(data, { key, args }, OFFSET);
AtomicsStore(metaView, TO_MAIN, 1);
AtomicsNotify(metaView, TO_MAIN, 1);
const res = AtomicsWait(metaView, TO_WORKER, 0, timeout);
AtomicsStore(metaView, TO_WORKER, 0);
if (res === 'ok') {
const obj = read(data, OFFSET);
return obj;
}
throw new ERR_WORKER_MESSAGING_TIMEOUT();
};
}

return api;
}
throw new ERR_WORKER_MESSAGING_TIMEOUT();
}

/**
* Wires up a shared memory buffer to invoke methods on an object.
* This function is meant to be used in a worker thread to expose methods
* to the main thread that has called `makeSync()` on the same shared memory.
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
* @param {object} obj - Object with methods to expose to the main thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {object} obj - Object with methods to expose to the main thread
* @param {Record<string, Function>} obj - Object with methods to expose to the main thread

* @returns {Promise<void>} - A promise that never resolves unless there's an error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @returns {Promise<void>} - A promise that never resolves unless there's an error
* @returns {Promise<never>} - A promise that never fulfils, might be reject if there's an error

*/
async function wire(data, obj) {
write(data, ObjectKeys(obj), OFFSET);

const metaView = new Int32Array(data);

AtomicsStore(metaView, TO_WORKER, 1);
AtomicsNotify(metaView, TO_WORKER);

while (true) {
const waitAsync = AtomicsWaitAsync(metaView, TO_MAIN, 0);
const res = await waitAsync.value;
AtomicsStore(metaView, TO_MAIN, 0);

if (res === 'ok') {
const { key, args } = read(data, OFFSET);
// This is where the magic happens - invoke the requested method
const result = await obj[key](...args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const result = await obj[key](...args);
const result = await ReflectApply(obj[key], obj, args);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example in the docs show calling wire(...) without awaiting the promise it returns. This could easily throw exceptions that end up not being handled. If it does throw, the error does not appear to be propagated to the caller so I assume that means the Atomics wait will just timeout? I think before we landed this it needs a better error handling strategy.

write(data, result, OFFSET);
AtomicsStore(metaView, TO_WORKER, 1);
AtomicsNotify(metaView, TO_WORKER, 1);
}
}
}

module.exports = {
makeSync,
wire,
};
27 changes: 27 additions & 0 deletions lib/internal/worker/everysync/indexes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

/**
* Byte offset where the actual data begins in the shared memory
* @type {number}
*/
const OFFSET = 64;

/**
* Index in the Int32Array for signaling from worker to main thread
* 0: writing from worker, reading from main
* @type {number}
*/
const TO_WORKER = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is for signaling from worker to main, should it be named TO_MAIN instead of TO_WORKER? Ditto for the other index name.


/**
* Index in the Int32Array for signaling from main to worker thread
* 1: writing from main, reading from worker
* @type {number}
*/
const TO_MAIN = 1;

module.exports = {
OFFSET,
TO_WORKER,
TO_MAIN,
};
55 changes: 55 additions & 0 deletions lib/internal/worker/everysync/objects.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';

const {
DataView,
Uint8Array,
} = primordials;

const {
codes: {
ERR_INVALID_BUFFER_SIZE,
},
} = require('internal/errors');

const { serialize, deserialize } = require('v8');

/**
* Reads an object from a shared memory buffer
* @param {SharedArrayBuffer} buffer - The shared memory buffer containing serialized data
* @param {number} [byteOffset=0] - Byte offset where the data begins
* @returns {any} - The deserialized object
*/
function read(buffer, byteOffset = 0) {
const view = new DataView(buffer, byteOffset);
const length = view.getUint32(0, true);
const object = deserialize(new Uint8Array(buffer, byteOffset + 4, length));
return object;
}

/**
* Writes an object to a shared memory buffer
* @param {SharedArrayBuffer} buffer - The shared memory buffer to write to
* @param {any} object - The object to serialize and write
* @param {number} [byteOffset=0] - Byte offset where to write the data
* @throws {Error} If the buffer is too small and not growable
*/
function write(buffer, object, byteOffset = 0) {
const data = serialize(object);

if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
Comment on lines +37 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const data = serialize(object);
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
const data = serialize(object);
const neededSize = data.byteLength + 4 + byteOffset;
if (buffer.byteLength < neededSize) {

// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(data.byteLength + 4 + byteOffset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buffer.grow(data.byteLength + 4 + byteOffset);
buffer.grow(neededSize);

This is correct though? Don't you need only the difference between neededSize and current size?

}

const view = new DataView(buffer, byteOffset);
view.setUint32(0, data.byteLength, true);
Comment on lines +39 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call the getter only once

Suggested change
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(data.byteLength + 4 + byteOffset);
}
const view = new DataView(buffer, byteOffset);
view.setUint32(0, data.byteLength, true);
const dataByteLength = TypedArrayPrototypeGetByteLength(data);
if (buffer.byteLength < dataByteLength + 4 + byteOffset) {
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(dataByteLength + 4 + byteOffset);
}
const view = new DataView(buffer, byteOffset);
view.setUint32(0, dataByteLength, true);

new Uint8Array(buffer, byteOffset + 4).set(data);
}

module.exports = {
read,
write,
};
7 changes: 7 additions & 0 deletions lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const {
isMarkedAsUntransferable,
} = require('internal/buffer');

const {
makeSync,
wire,
} = require('internal/worker/everysync/index');

module.exports = {
isInternalThread,
isMainThread,
Expand All @@ -49,4 +54,6 @@ module.exports = {
BroadcastChannel,
setEnvironmentData,
getEnvironmentData,
makeSync,
wire,
};
10 changes: 10 additions & 0 deletions test/fixtures/everysync/echo.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { workerData, wire } from 'node:worker_threads';

wire(workerData.data, {
async echo(arg) {
return arg;
},
});

// Keep the event loop alive
setInterval(() => {}, 100000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
setInterval(() => {}, 100000);
setInterval(() => {}, 100000);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 100_000 for grokkability

9 changes: 9 additions & 0 deletions test/fixtures/everysync/failure.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { workerData, wire } from 'node:worker_threads';

wire(workerData.data, {
fail(arg) {
return new Promise((resolve, reject) => {
// nothing to do here, we will fail
});
},
});
Comment on lines +5 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new Promise((resolve, reject) => {
// nothing to do here, we will fail
});
},
});
// never settling promise, we will fail
return new Promise(() => {});
},
});

Loading
Loading