-
Notifications
You must be signed in to change notification settings - Fork 31.3k
worker_threads: add everysync #57749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1527,6 +1527,79 @@ Calling `unref()` on a worker allows the thread to exit if this is the only | |||||
active handle in the event system. If the worker is already `unref()`ed calling | ||||||
`unref()` again has no effect. | ||||||
|
||||||
## `worker.makeSync(buffer[, options])` | ||||||
|
||||||
<!-- YAML | ||||||
added: REPLACEME | ||||||
--> | ||||||
|
||||||
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication. | ||||||
* `options` {Object} | ||||||
* `timeout` {number} The timeout in milliseconds for synchronous calls. **Default:** `5000`. | ||||||
* `expandable` {boolean} Whether the buffer can be resized. **Default:** `true` if the buffer | ||||||
supports `growable` option. | ||||||
* Returns: {Object} An object with synchronous methods mirroring those exposed through [`worker.wire()`][]. | ||||||
|
||||||
Creates a synchronous API facade that communicates with a worker thread over a shared memory buffer. | ||||||
The worker thread must call [`worker.wire()`][] on the same buffer to register the methods that can be called. | ||||||
|
||||||
This function enables making synchronous calls to a worker thread, which is particularly useful | ||||||
when code requires blocking operations but still wants to benefit from the worker thread's isolation. | ||||||
|
||||||
```js | ||||||
const { Worker, makeSync } = require('node:worker_threads'); | ||||||
|
||||||
// Create a SharedArrayBuffer for communication | ||||||
const buffer = new SharedArrayBuffer(1024, { | ||||||
maxByteLength: 64 * 1024 * 1024, | ||||||
}); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some explanation about how to select a size for the SharedArrayBuffer would be helpful. What is it used for? What are the limitations? What is the impact if someone passes a buffer that is too small, etc. |
||||||
|
||||||
// Create a worker, passing the buffer | ||||||
const worker = new Worker('worker-script.js', { | ||||||
workerData: { buffer }, | ||||||
}); | ||||||
|
||||||
// Create a synchronous API facade | ||||||
const api = makeSync(buffer); | ||||||
|
||||||
// Call a method synchronously - this will block until the worker responds | ||||||
const result = api.methodName(arg1, arg2); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not obvious from this example how the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those are key problem of this API. I don't think it would be possible to overcome those details - it would not work if passed to multiple workers. Do you think it would be better to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you wrap the |
||||||
``` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The API here feels a bit awkward. I wonder if some kind of wrapper to hide the // in the main thread
const { Worker, APIProxy } = require('node:worker_threads');
const api = new APIProxy({ min: 1024, max: 64 * 1024 * 1024 });
const worker = new Worker('...', { workerData: api });
// in the worker
const { workerData } = require('node:worker_threads');
workerData.bind({ foo() { }); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it 100% could be. I think we could actually do better: // in the main thread
const { Worker, SyncAPIProxy } = require('node:worker_threads');
const api = new APIProxy('myworker.js', { min: 1024, max: 64 * 1024 * 1024 });
api.echo()
// in the worker
export async function echo (...args) { return args } There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we can make the |
||||||
|
||||||
## `worker.wire(buffer, methods)` | ||||||
|
||||||
<!-- YAML | ||||||
added: REPLACEME | ||||||
--> | ||||||
|
||||||
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication. | ||||||
* `methods` {Object} An object whose properties are methods to expose to the main thread. | ||||||
|
||||||
Exposes methods to the main thread that can be called synchronously using [`worker.makeSync()`][]. | ||||||
The methods can be async functions or return promises, and the main thread will wait | ||||||
for the promise to resolve or reject. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Can we clarify what's the behavior if the promise never settles? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will time out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant in the docs, I can tell by reading the code it's going to timeout, it can't by reading the docs. |
||||||
|
||||||
```js | ||||||
const { workerData, wire } = require('node:worker_threads'); | ||||||
|
||||||
// Expose methods synchronously to the main thread | ||||||
wire(workerData.buffer, { | ||||||
async methodName(arg1, arg2) { | ||||||
// Do work asynchronously | ||||||
return result; | ||||||
}, | ||||||
|
||||||
syncMethod(arg) { | ||||||
// Do synchronous work | ||||||
return result; | ||||||
}, | ||||||
}); | ||||||
``` | ||||||
|
||||||
The `wire()` function should be called early in the worker's lifecycle to register | ||||||
the methods before the main thread attempts to call them. Any values returned by | ||||||
these methods are serialized and passed back to the main thread. | ||||||
Comment on lines
+1599
to
+1601
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some questions to maybe answer in the documentation:
|
||||||
|
||||||
## Notes | ||||||
|
||||||
### Synchronous blocking of stdio | ||||||
|
@@ -1633,10 +1706,12 @@ thread spawned will spawn another until the application crashes. | |||||
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions | ||||||
[`vm`]: vm.md | ||||||
[`worker.SHARE_ENV`]: #workershare_env | ||||||
[`worker.makeSync()`]: #workermakesyncbuffer-options | ||||||
[`worker.on('message')`]: #event-message_1 | ||||||
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist | ||||||
[`worker.terminate()`]: #workerterminate | ||||||
[`worker.threadId`]: #workerthreadid_1 | ||||||
[`worker.wire()`]: #workerwirebuffer-methods | ||||||
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool | ||||||
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort | ||||||
[child processes]: child_process.md | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,100 @@ | ||||||
'use strict'; | ||||||
|
||||||
const { | ||||||
AtomicsNotify, | ||||||
AtomicsStore, | ||||||
AtomicsWait, | ||||||
AtomicsWaitAsync, | ||||||
Int32Array, | ||||||
ObjectKeys, | ||||||
} = primordials; | ||||||
|
||||||
const { | ||||||
codes: { | ||||||
ERR_WORKER_MESSAGING_TIMEOUT, | ||||||
}, | ||||||
} = require('internal/errors'); | ||||||
|
||||||
const { read, write } = require('internal/worker/everysync/objects'); | ||||||
const { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment explaining what these mean would be helpful. |
||||||
OFFSET, | ||||||
TO_MAIN, | ||||||
TO_WORKER, | ||||||
} = require('internal/worker/everysync/indexes'); | ||||||
|
||||||
/** | ||||||
* Creates a synchronous API facade from a shared memory buffer. | ||||||
* This function is meant to be used in the main thread to communicate with | ||||||
* a worker thread that has called `wire()` on the same shared memory. | ||||||
* @param {SharedArrayBuffer} data - The shared memory buffer for communication | ||||||
* @param {object} [opts={}] - Options object | ||||||
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations | ||||||
* @returns {object} - An object with methods that match the ones exposed by the worker | ||||||
*/ | ||||||
function makeSync(data, opts = {}) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: I wonder if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is supposed to be used only on the main thread, I'm wondering if the first thing this should do is throw when it's not in the main thread. |
||||||
const timeout = opts.timeout || 1000; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a timeout of |
||||||
const metaView = new Int32Array(data); | ||||||
|
||||||
const res = AtomicsWait(metaView, TO_WORKER, 0, timeout); | ||||||
AtomicsStore(metaView, TO_WORKER, 0); | ||||||
|
||||||
if (res === 'ok') { | ||||||
const obj = read(data, OFFSET); | ||||||
|
||||||
const api = {}; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
for (const key of obj) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why this pattern and not a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I think I would actually significantly simplify this. |
||||||
api[key] = (...args) => { | ||||||
write(data, { key, args }, OFFSET); | ||||||
AtomicsStore(metaView, TO_MAIN, 1); | ||||||
AtomicsNotify(metaView, TO_MAIN, 1); | ||||||
const res = AtomicsWait(metaView, TO_WORKER, 0, timeout); | ||||||
AtomicsStore(metaView, TO_WORKER, 0); | ||||||
if (res === 'ok') { | ||||||
const obj = read(data, OFFSET); | ||||||
return obj; | ||||||
} | ||||||
throw new ERR_WORKER_MESSAGING_TIMEOUT(); | ||||||
}; | ||||||
} | ||||||
|
||||||
return api; | ||||||
} | ||||||
throw new ERR_WORKER_MESSAGING_TIMEOUT(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Wires up a shared memory buffer to invoke methods on an object. | ||||||
* This function is meant to be used in a worker thread to expose methods | ||||||
* to the main thread that has called `makeSync()` on the same shared memory. | ||||||
* @param {SharedArrayBuffer} data - The shared memory buffer for communication | ||||||
* @param {object} obj - Object with methods to expose to the main thread | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* @returns {Promise<void>} - A promise that never resolves unless there's an error | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
async function wire(data, obj) { | ||||||
write(data, ObjectKeys(obj), OFFSET); | ||||||
|
||||||
const metaView = new Int32Array(data); | ||||||
|
||||||
AtomicsStore(metaView, TO_WORKER, 1); | ||||||
AtomicsNotify(metaView, TO_WORKER); | ||||||
|
||||||
while (true) { | ||||||
const waitAsync = AtomicsWaitAsync(metaView, TO_MAIN, 0); | ||||||
const res = await waitAsync.value; | ||||||
AtomicsStore(metaView, TO_MAIN, 0); | ||||||
|
||||||
if (res === 'ok') { | ||||||
const { key, args } = read(data, OFFSET); | ||||||
// This is where the magic happens - invoke the requested method | ||||||
const result = await obj[key](...args); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your example in the docs show calling |
||||||
write(data, result, OFFSET); | ||||||
AtomicsStore(metaView, TO_WORKER, 1); | ||||||
AtomicsNotify(metaView, TO_WORKER, 1); | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
module.exports = { | ||||||
makeSync, | ||||||
wire, | ||||||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
'use strict'; | ||
|
||
/** | ||
* Byte offset where the actual data begins in the shared memory | ||
* @type {number} | ||
*/ | ||
const OFFSET = 64; | ||
|
||
/** | ||
* Index in the Int32Array for signaling from worker to main thread | ||
* 0: writing from worker, reading from main | ||
* @type {number} | ||
*/ | ||
const TO_WORKER = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is for signaling from worker to main, should it be named |
||
|
||
/** | ||
* Index in the Int32Array for signaling from main to worker thread | ||
* 1: writing from main, reading from worker | ||
* @type {number} | ||
*/ | ||
const TO_MAIN = 1; | ||
|
||
module.exports = { | ||
OFFSET, | ||
TO_WORKER, | ||
TO_MAIN, | ||
}; |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,55 @@ | ||||||||||||||||||||||||||||||||||||||||||||
'use strict'; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const { | ||||||||||||||||||||||||||||||||||||||||||||
DataView, | ||||||||||||||||||||||||||||||||||||||||||||
Uint8Array, | ||||||||||||||||||||||||||||||||||||||||||||
} = primordials; | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const { | ||||||||||||||||||||||||||||||||||||||||||||
codes: { | ||||||||||||||||||||||||||||||||||||||||||||
ERR_INVALID_BUFFER_SIZE, | ||||||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||||||
} = require('internal/errors'); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const { serialize, deserialize } = require('v8'); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Reads an object from a shared memory buffer | ||||||||||||||||||||||||||||||||||||||||||||
* @param {SharedArrayBuffer} buffer - The shared memory buffer containing serialized data | ||||||||||||||||||||||||||||||||||||||||||||
* @param {number} [byteOffset=0] - Byte offset where the data begins | ||||||||||||||||||||||||||||||||||||||||||||
* @returns {any} - The deserialized object | ||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||
function read(buffer, byteOffset = 0) { | ||||||||||||||||||||||||||||||||||||||||||||
const view = new DataView(buffer, byteOffset); | ||||||||||||||||||||||||||||||||||||||||||||
const length = view.getUint32(0, true); | ||||||||||||||||||||||||||||||||||||||||||||
const object = deserialize(new Uint8Array(buffer, byteOffset + 4, length)); | ||||||||||||||||||||||||||||||||||||||||||||
return object; | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||
* Writes an object to a shared memory buffer | ||||||||||||||||||||||||||||||||||||||||||||
* @param {SharedArrayBuffer} buffer - The shared memory buffer to write to | ||||||||||||||||||||||||||||||||||||||||||||
* @param {any} object - The object to serialize and write | ||||||||||||||||||||||||||||||||||||||||||||
* @param {number} [byteOffset=0] - Byte offset where to write the data | ||||||||||||||||||||||||||||||||||||||||||||
* @throws {Error} If the buffer is too small and not growable | ||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||
function write(buffer, object, byteOffset = 0) { | ||||||||||||||||||||||||||||||||||||||||||||
const data = serialize(object); | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
if (buffer.byteLength < data.byteLength + 4 + byteOffset) { | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+37
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype) | ||||||||||||||||||||||||||||||||||||||||||||
if (typeof buffer.grow !== 'function') { | ||||||||||||||||||||||||||||||||||||||||||||
throw new ERR_INVALID_BUFFER_SIZE('Buffer is too small and not growable'); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
buffer.grow(data.byteLength + 4 + byteOffset); | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This is correct though? Don't you need only the difference between neededSize and current size? |
||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
const view = new DataView(buffer, byteOffset); | ||||||||||||||||||||||||||||||||||||||||||||
view.setUint32(0, data.byteLength, true); | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+39
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should call the getter only once
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
new Uint8Array(buffer, byteOffset + 4).set(data); | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
module.exports = { | ||||||||||||||||||||||||||||||||||||||||||||
read, | ||||||||||||||||||||||||||||||||||||||||||||
write, | ||||||||||||||||||||||||||||||||||||||||||||
}; |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,10 @@ | ||||||||
import { workerData, wire } from 'node:worker_threads'; | ||||||||
|
||||||||
wire(workerData.data, { | ||||||||
async echo(arg) { | ||||||||
return arg; | ||||||||
}, | ||||||||
}); | ||||||||
|
||||||||
// Keep the event loop alive | ||||||||
setInterval(() => {}, 100000); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,9 @@ | ||||||||||||||||||||||
import { workerData, wire } from 'node:worker_threads'; | ||||||||||||||||||||||
|
||||||||||||||||||||||
wire(workerData.data, { | ||||||||||||||||||||||
fail(arg) { | ||||||||||||||||||||||
return new Promise((resolve, reject) => { | ||||||||||||||||||||||
// nothing to do here, we will fail | ||||||||||||||||||||||
}); | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
}); | ||||||||||||||||||||||
Comment on lines
+5
to
+9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added as experimental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the name
makeSync
is a bit of a misnomer. Maybe…asyncToSync
orsyncFacade
?