-
Notifications
You must be signed in to change notification settings - Fork 31.3k
worker_threads: add everysync #57749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Review requested:
|
Signed-off-by: Matteo Collina <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #57749 +/- ##
==========================================
+ Coverage 90.23% 90.26% +0.03%
==========================================
Files 630 633 +3
Lines 185055 185179 +124
Branches 36221 36233 +12
==========================================
+ Hits 166984 167154 +170
+ Misses 11043 10993 -50
- Partials 7028 7032 +4
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offtopic: I wonder if you wrote this with thread-stream in mind. It seems like we could implement similar functionality using this API.
The `wire()` function should be called early in the worker's lifecycle to register | ||
the methods before the main thread attempts to call them. Any values returned by | ||
these methods are serialized and passed back to the main thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions to maybe answer in the documentation:
- What happens when we call a method before calling wire?
- From what I saw in the tests, this wire will not keep the worker thread alive, right?
- If not, is it worth adding a flag as a second method to keep it alive? Most of the time, I think we developers will actually prefer to keep it alive indefinitely
- What happens when the method we called throws an error?
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations | ||
* @returns {object} - An object with methods that match the ones exposed by the worker | ||
*/ | ||
function makeSync(data, opts = {}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: I wonder if makeSync
is a good name, is simple and short but hides what it does exactly, maybe createSyncWireFacade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
|
||
Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][]. | ||
The methods can be async functions or return promises, and the main thread will wait | ||
for the promise to resolve or reject. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the promise to resolve or reject. | |
for the promise to settle. |
Can we clarify what's the behavior if the promise never settles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will time out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant in the docs, I can tell by reading the code it's going to timeout, it can't by reading the docs.
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations | ||
* @returns {object} - An object with methods that match the ones exposed by the worker | ||
*/ | ||
function makeSync(data, opts = {}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function makeSync(data, opts = {}) { | |
function makeSync(data, opts = kEmptyObject) { |
if (res === 'ok') { | ||
const obj = read(data, OFFSET); | ||
|
||
const api = {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const api = {}; | |
const api = { __proto__: null }; |
if (res === 'ok') { | ||
const { key, args } = read(data, OFFSET); | ||
// This is where the magic happens - invoke the requested method | ||
const result = await obj[key](...args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const result = await obj[key](...args); | |
const result = await ReflectApply(obj[key], obj, args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your example in the docs show calling wire(...)
without awaiting the promise it returns. This could easily throw exceptions that end up not being handled. If it does throw, the error does not appear to be propagated to the caller so I assume that means the Atomics wait will just timeout? I think before we landed this it needs a better error handling strategy.
if (buffer.byteLength < data.byteLength + 4 + byteOffset) { | ||
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype) | ||
if (typeof buffer.grow !== 'function') { | ||
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); | ||
} | ||
buffer.grow(data.byteLength + 4 + byteOffset); | ||
} | ||
|
||
const view = new DataView(buffer, byteOffset); | ||
view.setUint32(0, data.byteLength, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should call the getter only once
if (buffer.byteLength < data.byteLength + 4 + byteOffset) { | |
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype) | |
if (typeof buffer.grow !== 'function') { | |
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); | |
} | |
buffer.grow(data.byteLength + 4 + byteOffset); | |
} | |
const view = new DataView(buffer, byteOffset); | |
view.setUint32(0, data.byteLength, true); | |
const dataByteLength = TypedArrayPrototypeGetByteLength(data); | |
if (buffer.byteLength < dataByteLength + 4 + byteOffset) { | |
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype) | |
if (typeof buffer.grow !== 'function') { | |
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); | |
} | |
buffer.grow(dataByteLength + 4 + byteOffset); | |
} | |
const view = new DataView(buffer, byteOffset); | |
view.setUint32(0, dataByteLength, true); |
}); | ||
|
||
// Keep the event loop alive | ||
setInterval(() => {}, 100000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setInterval(() => {}, 100000); | |
setInterval(() => {}, 100000); | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 100_000
for grokkability
return new Promise((resolve, reject) => { | ||
// nothing to do here, we will fail | ||
}); | ||
}, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new Promise((resolve, reject) => { | |
// nothing to do here, we will fail | |
}); | |
}, | |
}); | |
// never settling promise, we will fail | |
return new Promise(() => {}); | |
}, | |
}); | |
|
||
// Test makeSync and wire functionality | ||
{ | ||
const buffer = new SharedArrayBuffer(1024, { | ||
maxByteLength: 64 * 1024 * 1024, | ||
}); | ||
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Test makeSync and wire functionality | |
{ | |
const buffer = new SharedArrayBuffer(1024, { | |
maxByteLength: 64 * 1024 * 1024, | |
}); | |
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), { | |
const fixtures = require('../common/fixtures'); | |
// Test makeSync and wire functionality | |
{ | |
const buffer = new SharedArrayBuffer(1024, { | |
maxByteLength: 64 * 1024 * 1024, | |
}); | |
const worker = new Worker(fixtures.path('everysync', 'echo.mjs'), { |
* to the main thread that has called `makeSync()` on the same shared memory. | ||
* @param {SharedArrayBuffer} data - The shared memory buffer for communication | ||
* @param {object} obj - Object with methods to expose to the main thread | ||
* @returns {Promise<void>} - A promise that never resolves unless there's an error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @returns {Promise<void>} - A promise that never resolves unless there's an error | |
* @returns {Promise<never>} - A promise that never fulfils, might be reject if there's an error |
* This function is meant to be used in a worker thread to expose methods | ||
* to the main thread that has called `makeSync()` on the same shared memory. | ||
* @param {SharedArrayBuffer} data - The shared memory buffer for communication | ||
* @param {object} obj - Object with methods to expose to the main thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param {object} obj - Object with methods to expose to the main thread | |
* @param {Record<string, Function>} obj - Object with methods to expose to the main thread |
const api = makeSync(buffer); | ||
|
||
// Call a method synchronously - this will block until the worker responds | ||
const result = api.methodName(arg1, arg2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not obvious from this example how the makeSync(...)
binds to the worker. Yes, I see the buffer
is passed as worker data but that's an easily overlooked detail. It's also not clear what happens here if buffer
ends up being passed to more than one worker thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are key problem of this API. I don't think it would be possible to overcome those details - it would not work if passed to multiple workers. Do you think it would be better to add a createSyncAPIWorker(url)
mechanism instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you wrap the SharedArrayBuffer
into a transferable object instead of using the SAB directly then you can likely better control where it goes and limit the number of workers it can go to.
// Create a SharedArrayBuffer for communication | ||
const buffer = new SharedArrayBuffer(1024, { | ||
maxByteLength: 64 * 1024 * 1024, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some explanation about how to select a size for the SharedArrayBuffer would be helpful. What is it used for? What are the limitations? What is the impact if someone passes a buffer that is too small, etc.
const obj = read(data, OFFSET); | ||
|
||
const api = {}; | ||
for (const key of obj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why this pattern and not a Proxy
? I know Proxy
objects have some perf downsides but this would seem a tailor made use case for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I think I would actually significantly simplify this.
} = require('internal/errors'); | ||
|
||
const { read, write } = require('internal/worker/everysync/objects'); | ||
const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment explaining what these mean would be helpful.
|
||
const api = makeSync(buffer, { timeout: 100 }); | ||
|
||
assert.throws(() => api.fail(), { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests need to be expanded to include coverage of more error conditions. What happens when a non-serializable value is passed as an arg or a return value? What happens if a method that does not exist is called? What happens if the worker thread is killed in mid call? What happens in the case of a dead lock? That is, I have to assume something like the following is possible...
// in the worker thread
import { workerData, wire, makeSync } from 'node:worker_threads';
const sab = new SharedArrayBuffer(...);
const api = makeSync(sab, { timeout: 1000 });
postMessage(sab);
wire(workerData.data, {
doSomething(arg) {
api.doSomethingElse();
},
});
// in the main thread
const sab2 = new SharedArrayBuffer(...);
const api = makeSync(sab2, { timeout: 1000 });
const worker = new Worker('...', { workerData: sab2 });
worker.onmessage = (message) => {
wire(message.data, {
doSomethingElse() {
api.doSomething();
}
});
}
api.doSomething();
@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only | |||
active handle in the event system. If the worker is already `unref()`ed calling | |||
`unref()` again has no effect. | |||
|
|||
## `worker.makeSync(buffer[, options])` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added as experimental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the name makeSync
is a bit of a misnomer. Maybe… asyncToSync
or syncFacade
?
|
||
// Call a method synchronously - this will block until the worker responds | ||
const result = api.methodName(arg1, arg2); | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API here feels a bit awkward. I wonder if some kind of wrapper to hide the SharedArrayBuffer
would be helpful...
// in the main thread
const { Worker, APIProxy } = require('node:worker_threads');
const api = new APIProxy({ min: 1024, max: 64 * 1024 * 1024 });
const worker = new Worker('...', { workerData: api });
// in the worker
const { workerData } = require('node:worker_threads');
workerData.bind({ foo() { });
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it 100% could be. I think we could actually do better:
// in the main thread
const { Worker, SyncAPIProxy } = require('node:worker_threads');
const api = new APIProxy('myworker.js', { min: 1024, max: 64 * 1024 * 1024 });
api.echo()
// in the worker
export async function echo (...args) { return args }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can make the {min, max}
part optional and use some sensible defaults which can be overridden by the user if they wish to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any context on this but I am also wondering about the API surface.
About five years ago, before there were features such as waitAsync
, I played around with similar ideas to let synchronous WebAssembly functions receive data from asynchronous inputs through a SharedArrayBuffer
. The API surface in this case was just a (badly designed) synchronized message queue. Ideally, it should be possible to send and receive both synchronously and asynchronously. It'd be interesting to see what API best matches user expectations for this feature.
The key point of adding this utility is to provide a synchronous way of doing I/O so that the new
I concur, however we need the synchronous API relatively quickly and this seems a major design. I think it's better to land something experimental and iterate. |
* 0: writing from worker, reading from main | ||
* @type {number} | ||
*/ | ||
const TO_WORKER = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is for signaling from worker to main, should it be named TO_MAIN
instead of TO_WORKER
? Ditto for the other index name.
If this is about the idea of reimplementing |
@joyeecheung I took a note to do this as the result of the collab summit that this was some kind of blocker for future additions. |
@mcollina Maybe the blocker part applies to the use case where users want to orchestrate the worker themselves? My impression from the summit is that both that and the reimplementation are we'd like to have eventually, but not necessarily urgent (depending on what defines "urgent" - does it have to happen in the next major cycle? Maybe not that urgent, unless someone is actively looking into finishing the reimplementation that soon. Should it happen in the next 5 years? Probably yes on the grounds that we should not let Code-wise from a glance it seems more straight-forward to imagine how to re-implement the async hooks using sync hooks by e.g. just enqueuing an internal sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
why "everysync"?
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations | ||
* @returns {object} - An object with methods that match the ones exposed by the worker | ||
*/ | ||
function makeSync(data, opts = {}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is supposed to be used only on the main thread, I'm wondering if the first thing this should do is throw when it's not in the main thread.
* @returns {object} - An object with methods that match the ones exposed by the worker | ||
*/ | ||
function makeSync(data, opts = {}) { | ||
const timeout = opts.timeout || 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a timeout of 0
valid? If so, ||
should be ??
.
const data = serialize(object); | ||
|
||
if (buffer.byteLength < data.byteLength + 4 + byteOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const data = serialize(object); | |
if (buffer.byteLength < data.byteLength + 4 + byteOffset) { | |
const data = serialize(object); | |
const neededSize = data.byteLength + 4 + byteOffset; | |
if (buffer.byteLength < neededSize) { |
if (typeof buffer.grow !== 'function') { | ||
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); | ||
} | ||
buffer.grow(data.byteLength + 4 + byteOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer.grow(data.byteLength + 4 + byteOffset); | |
buffer.grow(neededSize); |
This is correct though? Don't you need only the difference between neededSize and current size?
}); | ||
|
||
// Keep the event loop alive | ||
setInterval(() => {}, 100000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 100_000
for grokkability
@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only | |||
active handle in the event system. If the worker is already `unref()`ed calling | |||
`unref()` again has no effect. | |||
|
|||
## `worker.makeSync(buffer[, options])` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the name makeSync
is a bit of a misnomer. Maybe… asyncToSync
or syncFacade
?
As discussed in the collab summit meeting, here is my contribution of the everysync module.
We can 100% decide to change the public to something higher level too.