Skip to content

worker_threads: add everysync #57749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

mcollina
Copy link
Member

@mcollina mcollina commented Apr 4, 2025

As discussed in the collab summit meeting, here is my contribution of the everysync module.

We can 100% decide to change the public to something higher level too.

@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/tsc

Sorry, something went wrong.

@nodejs-github-bot nodejs-github-bot added lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. labels Apr 4, 2025
@mcollina mcollina requested a review from joyeecheung April 4, 2025 18:22
Signed-off-by: Matteo Collina <[email protected]>
Copy link

codecov bot commented Apr 4, 2025

Codecov Report

Attention: Patch coverage is 98.41270% with 3 lines in your changes missing coverage. Please review.

Project coverage is 90.26%. Comparing base (af75d04) to head (f947f3a).
Report is 92 commits behind head on main.

Files with missing lines Patch % Lines
lib/internal/worker/everysync/objects.js 94.54% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #57749      +/-   ##
==========================================
+ Coverage   90.23%   90.26%   +0.03%     
==========================================
  Files         630      633       +3     
  Lines      185055   185179     +124     
  Branches    36221    36233      +12     
==========================================
+ Hits       166984   167154     +170     
+ Misses      11043    10993      -50     
- Partials     7028     7032       +4     
Files with missing lines Coverage Δ
lib/internal/worker/everysync/index.js 100.00% <100.00%> (ø)
lib/internal/worker/everysync/indexes.js 100.00% <100.00%> (ø)
lib/worker_threads.js 100.00% <100.00%> (ø)
lib/internal/worker/everysync/objects.js 94.54% <94.54%> (ø)

... and 55 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@H4ad H4ad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic: I wonder if you wrote this with thread-stream in mind. It seems like we could implement similar functionality using this API.

Comment on lines +1599 to +1601
The `wire()` function should be called early in the worker's lifecycle to register
the methods before the main thread attempts to call them. Any values returned by
these methods are serialized and passed back to the main thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions to maybe answer in the documentation:

  • What happens when we call a method before calling wire?
  • From what I saw in the tests, this wire will not keep the worker thread alive, right?
    • If not, is it worth adding a flag as a second method to keep it alive? Most of the time, I think we developers will actually prefer to keep it alive indefinitely
  • What happens when the method we called throws an error?

* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
* @returns {object} - An object with methods that match the ones exposed by the worker
*/
function makeSync(data, opts = {}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I wonder if makeSync is a good name, is simple and short but hides what it does exactly, maybe createSyncWireFacade

Copy link
Contributor

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!


Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][].
The methods can be async functions or return promises, and the main thread will wait
for the promise to resolve or reject.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for the promise to resolve or reject.
for the promise to settle.

Can we clarify what's the behavior if the promise never settles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will time out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant in the docs, I can tell by reading the code it's going to timeout, it can't by reading the docs.

* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
* @returns {object} - An object with methods that match the ones exposed by the worker
*/
function makeSync(data, opts = {}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
function makeSync(data, opts = {}) {
function makeSync(data, opts = kEmptyObject) {

if (res === 'ok') {
const obj = read(data, OFFSET);

const api = {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const api = {};
const api = { __proto__: null };

if (res === 'ok') {
const { key, args } = read(data, OFFSET);
// This is where the magic happens - invoke the requested method
const result = await obj[key](...args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const result = await obj[key](...args);
const result = await ReflectApply(obj[key], obj, args);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example in the docs show calling wire(...) without awaiting the promise it returns. This could easily throw exceptions that end up not being handled. If it does throw, the error does not appear to be propagated to the caller so I assume that means the Atomics wait will just timeout? I think before we landed this it needs a better error handling strategy.

Comment on lines +39 to +48
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(data.byteLength + 4 + byteOffset);
}

const view = new DataView(buffer, byteOffset);
view.setUint32(0, data.byteLength, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call the getter only once

Suggested change
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(data.byteLength + 4 + byteOffset);
}
const view = new DataView(buffer, byteOffset);
view.setUint32(0, data.byteLength, true);
const dataByteLength = TypedArrayPrototypeGetByteLength(data);
if (buffer.byteLength < dataByteLength + 4 + byteOffset) {
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(dataByteLength + 4 + byteOffset);
}
const view = new DataView(buffer, byteOffset);
view.setUint32(0, dataByteLength, true);

});

// Keep the event loop alive
setInterval(() => {}, 100000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
setInterval(() => {}, 100000);
setInterval(() => {}, 100000);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 100_000 for grokkability

Comment on lines +5 to +9
return new Promise((resolve, reject) => {
// nothing to do here, we will fail
});
},
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new Promise((resolve, reject) => {
// nothing to do here, we will fail
});
},
});
// never settling promise, we will fail
return new Promise(() => {});
},
});

Comment on lines +8 to +14

// Test makeSync and wire functionality
{
const buffer = new SharedArrayBuffer(1024, {
maxByteLength: 64 * 1024 * 1024,
});
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Test makeSync and wire functionality
{
const buffer = new SharedArrayBuffer(1024, {
maxByteLength: 64 * 1024 * 1024,
});
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), {
const fixtures = require('../common/fixtures');
// Test makeSync and wire functionality
{
const buffer = new SharedArrayBuffer(1024, {
maxByteLength: 64 * 1024 * 1024,
});
const worker = new Worker(fixtures.path('everysync', 'echo.mjs'), {

* to the main thread that has called `makeSync()` on the same shared memory.
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
* @param {object} obj - Object with methods to expose to the main thread
* @returns {Promise<void>} - A promise that never resolves unless there's an error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @returns {Promise<void>} - A promise that never resolves unless there's an error
* @returns {Promise<never>} - A promise that never fulfils, might be reject if there's an error

* This function is meant to be used in a worker thread to expose methods
* to the main thread that has called `makeSync()` on the same shared memory.
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
* @param {object} obj - Object with methods to expose to the main thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {object} obj - Object with methods to expose to the main thread
* @param {Record<string, Function>} obj - Object with methods to expose to the main thread

const api = makeSync(buffer);

// Call a method synchronously - this will block until the worker responds
const result = api.methodName(arg1, arg2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious from this example how the makeSync(...) binds to the worker. Yes, I see the buffer is passed as worker data but that's an easily overlooked detail. It's also not clear what happens here if buffer ends up being passed to more than one worker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are key problem of this API. I don't think it would be possible to overcome those details - it would not work if passed to multiple workers. Do you think it would be better to add a createSyncAPIWorker(url) mechanism instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wrap the SharedArrayBuffer into a transferable object instead of using the SAB directly then you can likely better control where it goes and limit the number of workers it can go to.

// Create a SharedArrayBuffer for communication
const buffer = new SharedArrayBuffer(1024, {
maxByteLength: 64 * 1024 * 1024,
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some explanation about how to select a size for the SharedArrayBuffer would be helpful. What is it used for? What are the limitations? What is the impact if someone passes a buffer that is too small, etc.

const obj = read(data, OFFSET);

const api = {};
for (const key of obj) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why this pattern and not a Proxy? I know Proxy objects have some perf downsides but this would seem a tailor made use case for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I think I would actually significantly simplify this.

} = require('internal/errors');

const { read, write } = require('internal/worker/everysync/objects');
const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment explaining what these mean would be helpful.


const api = makeSync(buffer, { timeout: 100 });

assert.throws(() => api.fail(), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests need to be expanded to include coverage of more error conditions. What happens when a non-serializable value is passed as an arg or a return value? What happens if a method that does not exist is called? What happens if the worker thread is killed in mid call? What happens in the case of a dead lock? That is, I have to assume something like the following is possible...

// in the worker thread
import { workerData, wire, makeSync } from 'node:worker_threads';

const sab = new SharedArrayBuffer(...);
const api = makeSync(sab, { timeout: 1000 });
postMessage(sab);

wire(workerData.data, {
  doSomething(arg) {
    api.doSomethingElse();
  },
});

// in the main thread
const sab2 = new SharedArrayBuffer(...);
const api = makeSync(sab2, { timeout: 1000 });
const worker = new Worker('...', { workerData: sab2 });
worker.onmessage = (message) => {
  wire(message.data, {
    doSomethingElse() {
      api.doSomething();
    }
  });
}
api.doSomething();

@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only
active handle in the event system. If the worker is already `unref()`ed calling
`unref()` again has no effect.

## `worker.makeSync(buffer[, options])`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added as experimental

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the name makeSync is a bit of a misnomer. Maybe… asyncToSync or syncFacade?


// Call a method synchronously - this will block until the worker responds
const result = api.methodName(arg1, arg2);
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API here feels a bit awkward. I wonder if some kind of wrapper to hide the SharedArrayBuffer would be helpful...

// in the main thread
const { Worker, APIProxy } = require('node:worker_threads');

const api = new APIProxy({ min: 1024, max: 64 * 1024 * 1024 });
const worker = new Worker('...', { workerData: api });

// in the worker
const { workerData } = require('node:worker_threads');
workerData.bind({ foo() { });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it 100% could be. I think we could actually do better:

// in the main thread
const { Worker, SyncAPIProxy } = require('node:worker_threads');

const api = new APIProxy('myworker.js', { min: 1024, max: 64 * 1024 * 1024 });

api.echo()

// in the worker
export async function echo (...args) { return args }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can make the {min, max} part optional and use some sensible defaults which can be overridden by the user if they wish to.

Copy link
Member

@tniessen tniessen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any context on this but I am also wondering about the API surface.

About five years ago, before there were features such as waitAsync, I played around with similar ideas to let synchronous WebAssembly functions receive data from asynchronous inputs through a SharedArrayBuffer. The API surface in this case was just a (badly designed) synchronized message queue. Ideally, it should be possible to send and receive both synchronously and asynchronously. It'd be interesting to see what API best matches user expectations for this feature.

@mcollina
Copy link
Member Author

mcollina commented Apr 7, 2025

I don't have any context on this but I am also wondering about the API surface.

The key point of adding this utility is to provide a synchronous way of doing I/O so that the new module.registerHooks() hooks could block the main thread while I/O is happening underneath.

Ideally, it should be possible to send and receive both synchronously and asynchronously. It'd be interesting to see what API best matches user expectations for this feature.

I concur, however we need the synchronous API relatively quickly and this seems a major design. I think it's better to land something experimental and iterate.

* 0: writing from worker, reading from main
* @type {number}
*/
const TO_WORKER = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is for signaling from worker to main, should it be named TO_MAIN instead of TO_WORKER? Ditto for the other index name.

@joyeecheung
Copy link
Member

joyeecheung commented Apr 7, 2025

the new module.registerHooks() hooks could block the main thread while I/O is happening underneath.

If this is about the idea of reimplementing module.register on top of module.registerHooks(), FWIW from what I can tell the most straight-forward way seems to be to just reuse the existing worker orchestration code in lib/internal/modules/esm/hooks.js and lib/internal/modules/esm/worker.js (e.g. simply pass things on to hooksProxy.makeSyncRequest() in an internal synchronous resolve hook). Having a utility like everysync may be useful for users who want to orchestrate the worker themselves instead of relying on an internal helper to get more control, or maybe as a future refactoring primitive, but it would not be urgent since that'll be new primitives to migrate over to (while making sure that the behavior is consistent), instead of primitives that we can reuse existing logic with and can probably count on for consistency with existing behavior. For public usage there is no need to rush if there seem to be some good ideas to explore and polish the API a bit further.

@mcollina
Copy link
Member Author

mcollina commented Apr 7, 2025

@joyeecheung I took a note to do this as the result of the collab summit that this was some kind of blocker for future additions.

@joyeecheung
Copy link
Member

joyeecheung commented Apr 7, 2025

@mcollina Maybe the blocker part applies to the use case where users want to orchestrate the worker themselves? My impression from the summit is that both that and the reimplementation are we'd like to have eventually, but not necessarily urgent (depending on what defines "urgent" - does it have to happen in the next major cycle? Maybe not that urgent, unless someone is actively looking into finishing the reimplementation that soon. Should it happen in the next 5 years? Probably yes on the grounds that we should not let module.register to stay experimental for that long).

Code-wise from a glance it seems more straight-forward to imagine how to re-implement the async hooks using sync hooks by e.g. just enqueuing an internal sync resolve hook that calls out to hooksProxy.makeSyncRequest(), than to imagine how to re-implement it using everysync. While it also seems possible and may tidy up the code a bit, so this PR is appreciated, having new worker orchestration API is not a blocker for the reimplementation since we already have some more ad-hoc worker orchestration code, and reusing existing logic directly would take less effort than rewriting what existing code does on top of a new utility. Although if anyone can already see how it can be reimplemented using everysync fairly quickly, IMO it'd be also fine to land it internally with that hook integration code together. If not, then it seems fine to take your time here and polish it for more generic usage and there's no need to rush it for the sake of loaders :)

Copy link
Member

@JakobJingleheimer JakobJingleheimer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

why "everysync"?

* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
* @returns {object} - An object with methods that match the ones exposed by the worker
*/
function makeSync(data, opts = {}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is supposed to be used only on the main thread, I'm wondering if the first thing this should do is throw when it's not in the main thread.

* @returns {object} - An object with methods that match the ones exposed by the worker
*/
function makeSync(data, opts = {}) {
const timeout = opts.timeout || 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a timeout of 0 valid? If so, || should be ??.

Comment on lines +37 to +39
const data = serialize(object);

if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const data = serialize(object);
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
const data = serialize(object);
const neededSize = data.byteLength + 4 + byteOffset;
if (buffer.byteLength < neededSize) {

if (typeof buffer.grow !== 'function') {
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable');
}
buffer.grow(data.byteLength + 4 + byteOffset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buffer.grow(data.byteLength + 4 + byteOffset);
buffer.grow(neededSize);

This is correct though? Don't you need only the difference between neededSize and current size?

});

// Keep the event loop alive
setInterval(() => {}, 100000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 100_000 for grokkability

@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only
active handle in the event system. If the worker is already `unref()`ed calling
`unref()` again has no effect.

## `worker.makeSync(buffer[, options])`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the name makeSync is a bit of a misnomer. Maybe… asyncToSync or syncFacade?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants