Skip to content

Commit b8535be

Browse files
committed
feat: add EMNAPI_WORKER_POOL_SIZE to limit max parallel async work execution (#8)
1 parent ccd7280 commit b8535be

File tree

9 files changed

+145
-14
lines changed

9 files changed

+145
-14
lines changed

packages/emnapi/include/node_api.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ typedef napi_value (*napi_addon_register_func)(napi_env env,
2525
NAPI_MODULE_INITIALIZER_X(napi_register_wasm_v, NAPI_MODULE_VERSION)
2626
#define NAPI_MODULE(modname, regfunc) \
2727
EXTERN_C_START \
28-
void _emnapi_runtime_init(const char** key, const char*** error_messages); \
28+
void _emnapi_runtime_init(const char** k, const char*** msg, int* size); \
2929
void _emnapi_execute_async_work(napi_async_work work); \
3030
NAPI_MODULE_EXPORT napi_value NAPI_WASM_INITIALIZER(napi_env env, \
3131
napi_value exports) { \
32-
_emnapi_runtime_init(NULL, NULL); \
32+
_emnapi_runtime_init(NULL, NULL, NULL); \
3333
_emnapi_execute_async_work(NULL); \
3434
return regfunc(env, exports); \
3535
} \

packages/emnapi/src/emnapi.c

+11-1
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,16 @@ const char* emnapi_error_messages[] = {
9494
#define EMNAPI_MOD_NAME_X(modname) EMNAPI_MOD_NAME_X_HELPER(modname)
9595

9696
EMSCRIPTEN_KEEPALIVE
97-
void _emnapi_runtime_init(const char** key, const char*** error_messages) {
97+
void _emnapi_runtime_init(const char** key, const char*** error_messages, int* size) {
9898
if (key) {
9999
*key = EMNAPI_MOD_NAME_X(NODE_GYP_MODULE_NAME);
100100
}
101101
if (error_messages) *error_messages = emnapi_error_messages;
102+
#if defined(EMNAPI_WORKER_POOL_SIZE) && EMNAPI_WORKER_POOL_SIZE > 0
103+
if (size) *size = EMNAPI_WORKER_POOL_SIZE;
104+
#else
105+
if (size) *size = 0;
106+
#endif
102107
}
103108

104109
napi_status napi_adjust_external_memory(napi_env env,
@@ -165,6 +170,7 @@ struct napi_async_work__ {
165170
typedef struct worker_count {
166171
int unused;
167172
int running;
173+
int async_work_unused;
168174
} worker_count;
169175

170176
// extern void _emnapi_create_async_work_js(napi_async_work work);
@@ -262,7 +268,11 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
262268

263269
worker_count count;
264270
_emnapi_get_worker_count_js(&count);
271+
#if defined(EMNAPI_WORKER_POOL_SIZE) && EMNAPI_WORKER_POOL_SIZE > 0
272+
if ((count.unused > 0 || count.running == 0) && count.async_work_unused > 0) {
273+
#else
265274
if (count.unused > 0 || count.running == 0) {
275+
#endif
266276
_emnapi_execute_async_work(work);
267277
} else {
268278
_emnapi_queue_async_work_js(work); // queue work

packages/emnapi/src/init.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ declare const __EMNAPI_RUNTIME_INIT__: string
88
// eslint-disable-next-line @typescript-eslint/no-unused-vars
99
declare let napiExtendedErrorInfoPtr: number | undefined
1010
declare function _napi_register_wasm_v1 (env: Ptr, exports: Ptr): napi_value
11-
declare function __emnapi_runtime_init (...args: [Ptr, Ptr]): void
11+
declare function __emnapi_runtime_init (...args: [Ptr, Ptr, Ptr]): void
1212
declare function _free (ptr: Ptr): void
1313

1414
mergeInto(LibraryManager.library, {
@@ -49,6 +49,7 @@ mergeInto(LibraryManager.library, {
4949
'napi_register_wasm_v1',
5050
'_emnapi_runtime_init',
5151
'$napiExtendedErrorInfoPtr',
52+
'$emnapiWorkerPoolSize',
5253
'free'
5354
],
5455
$emnapiInit: function () {
@@ -126,12 +127,13 @@ mergeInto(LibraryManager.library, {
126127
// HEAP.*?\[.*?\]
127128
// @ts-expect-error
128129
// eslint-disable-next-line @typescript-eslint/no-unused-vars
129-
const key_pp = stackAlloc($POINTER_SIZE); const errormessages_pp = stackAlloc($POINTER_SIZE)
130-
__emnapi_runtime_init($to64('key_pp'), $to64('errormessages_pp'))
130+
const key_pp = stackAlloc($POINTER_SIZE); const errormessages_pp = stackAlloc($POINTER_SIZE); const poolSize_p = stackAlloc(4)
131+
__emnapi_runtime_init($to64('key_pp'), $to64('errormessages_pp'), $to64('poolSize_p'))
131132
const key_p = $makeGetValue('key_pp', 0, '*')
132133
exportsKey = (key_p ? UTF8ToString(key_p) : 'emnapiExports') || 'emnapiExports'
133134
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
134135
errorMessagesPtr = $makeGetValue('errormessages_pp', 0, '*') || 0
136+
emnapiWorkerPoolSize = $makeGetValue('poolSize_p', 0, 'i32') as number || 0
135137
})
136138

137139
// Module.emnapiModuleRegister = moduleRegister

packages/emnapi/src/simple-async-operation.ts

+20-7
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,40 @@
22
/* eslint-disable no-unreachable */
33

44
declare const PThread: any
5+
declare let emnapiWorkerPoolSize: number
56
declare const emnapiAsyncWorkerQueue: number[]
7+
declare const emnapiRunningWorkers: Worker[]
68
// declare function __emnapi_execute_async_work (work: number): void
79

810
mergeInto(LibraryManager.library, {
11+
$emnapiWorkerPoolSize: undefined,
12+
$emnapiRunningWorkers: [],
13+
914
$emnapiAsyncWorkerQueue: [],
10-
$emnapiAsyncWorkerQueue__deps: ['$PThread', '_emnapi_execute_async_work'],
11-
$emnapiAsyncWorkerQueue__postset: 'PThread.unusedWorkers.push=function(){' +
12-
'var r=Array.prototype.push.apply(this,arguments);' +
13-
'setTimeout(function(){' +
14-
'if(PThread.unusedWorkers.length>0&&emnapiAsyncWorkerQueue.length>0){' +
15+
$emnapiAsyncWorkerQueue__deps: ['$PThread', '_emnapi_execute_async_work', '$emnapiRunningWorkers', '$emnapiWorkerPoolSize'],
16+
$emnapiAsyncWorkerQueue__postset: 'PThread.unusedWorkers.push = function () {' +
17+
'var r = Array.prototype.push.apply(this, arguments);' +
18+
'var worker = arguments[0]; var index;' +
19+
'if ((index = emnapiRunningWorkers.indexOf(worker)) !== -1) emnapiRunningWorkers.splice(index, 1);' +
20+
'setTimeout(function () {' +
21+
'var canExecute = PThread.unusedWorkers.length > 0 && emnapiAsyncWorkerQueue.length > 0;' +
22+
'if (emnapiWorkerPoolSize > 0) {' +
23+
'canExecute = canExecute && (emnapiWorkerPoolSize - emnapiRunningWorkers.length > 0);' +
24+
'}' +
25+
'if (canExecute) {' +
1526
'__emnapi_execute_async_work(emnapiAsyncWorkerQueue.shift());' +
1627
'}' +
1728
'});' +
1829
'return r;' +
1930
'};',
2031

21-
_emnapi_get_worker_count_js__deps: ['$PThread'],
32+
_emnapi_get_worker_count_js__deps: ['$PThread', '$emnapiWorkerPoolSize', '$emnapiRunningWorkers'],
2233
_emnapi_get_worker_count_js: function (struct: number) {
2334
$from64('struct')
2435
const address = struct >> 2
2536
HEAP32[address] = PThread.unusedWorkers.length
2637
HEAP32[address + 1] = PThread.runningWorkers.length
38+
HEAP32[address + 2] = emnapiWorkerPoolSize - emnapiRunningWorkers.length
2739
},
2840

2941
// _emnapi_delete_async_work_js: function (_work: number) {}
@@ -45,6 +57,7 @@ function _emnapi_queue_async_work_js (work: number): void {
4557
}
4658
const pthreadValue = PThread.pthreads[tid]
4759
const worker = (('worker' in pthreadValue) && ('threadInfoStruct' in pthreadValue)) ? pthreadValue.worker : pthreadValue
60+
emnapiRunningWorkers.push(worker)
4861
if (!worker._emnapiAsyncWorkListener) {
4962
worker._emnapiAsyncWorkListener = function (this: Worker, e: MessageEvent<any>): any {
5063
const data = ENVIRONMENT_IS_NODE ? e : e.data
@@ -115,5 +128,5 @@ function napi_cancel_async_work (env: napi_env, work: number): napi_status {
115128
// #endif
116129
}
117130

118-
emnapiImplement('_emnapi_queue_async_work_js', _emnapi_queue_async_work_js, ['$PThread', '$emnapiAsyncWorkerQueue', '$emnapiGetDynamicCalls'])
131+
emnapiImplement('_emnapi_queue_async_work_js', _emnapi_queue_async_work_js, ['$PThread', '$emnapiAsyncWorkerQueue', '$emnapiGetDynamicCalls', '$emnapiRunningWorkers'])
119132
emnapiImplement('napi_cancel_async_work', napi_cancel_async_work, ['$emnapiAsyncWorkerQueue', '$emnapiGetDynamicCalls', 'napi_set_last_error'])

packages/test/cgen.config.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@ module.exports = function (_options, { isDebug, isEmscripten }) {
3636
exports: ['emnapi']
3737
},
3838
includePaths,
39+
defines: [
40+
...(isEmscripten && pthread ? ['EMNAPI_WORKER_POOL_SIZE=1'] : [])
41+
],
3942
libs: ['testcommon'],
4043
compileOptions: [...compilerFlags, ...(isEmscripten && pthread ? ['-sUSE_PTHREADS=1'] : [])],
4144
// eslint-disable-next-line no-template-curly-in-string
4245
linkOptions: [
4346
...linkerFlags,
4447
...(isEmscripten ? [jsLib] : []),
45-
...(isEmscripten && pthread ? ['-sUSE_PTHREADS=1', '-sPTHREAD_POOL_SIZE=4'] : [])
48+
...(isEmscripten && pthread ? ['-sUSE_PTHREADS=1', '-sPTHREAD_POOL_SIZE=4', '-sPTHREAD_POOL_SIZE_STRICT=2'] : [])
4649
]
4750
})
4851

@@ -93,6 +96,7 @@ module.exports = function (_options, { isDebug, isEmscripten }) {
9396
createTarget('env', ['./env/binding.c']),
9497
createTarget('hello', ['./hello/binding.c']),
9598
...(!(isEmscripten && process.env.MEMORY64) ? [createTarget('async', ['./async/binding.c'], false, true)] : []),
99+
...(!(isEmscripten && process.env.MEMORY64) ? [createTarget('pool', ['./pool/binding.c'], false, true)] : []),
96100
...(isEmscripten && !process.env.MEMORY64 ? [createTarget('tsfn', ['./tsfn/binding.c'], false, true)] : []),
97101
// ...(isEmscripten ? [createTarget('tsfn', ['./tsfn/binding.c'], false, true)] : []),
98102
createTarget('arg', ['./arg/binding.c'], true),

packages/test/pool/binding.c

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#include <pthread.h>
2+
#include <unistd.h>
3+
#include <node_api.h>
4+
#include "../common.h"
5+
6+
typedef struct {
7+
napi_deferred _deferred;
8+
napi_async_work _request;
9+
} carrier;
10+
11+
static void* F(void* data) {
12+
sleep(1);
13+
return NULL;
14+
}
15+
16+
static void some_method() {
17+
pthread_t pid[3];
18+
for (int i = 0; i < 3; ++i) {
19+
pthread_create(pid + i, NULL, F, NULL);
20+
}
21+
for (int i = 0; i < 3; ++i) {
22+
pthread_join(*(pid + i), NULL);
23+
}
24+
}
25+
26+
static void Execute(napi_env env, void* data) {
27+
some_method();
28+
}
29+
30+
static void Complete(napi_env env, napi_status status, void* data) {
31+
carrier* c = (carrier*)(data);
32+
33+
if (status != napi_ok) {
34+
napi_throw_type_error(env, NULL, "Execute callback failed.");
35+
return;
36+
}
37+
38+
napi_value argv;
39+
40+
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &argv));
41+
NAPI_CALL_RETURN_VOID(env, napi_resolve_deferred(env, c->_deferred, argv));
42+
NAPI_CALL_RETURN_VOID(env, napi_delete_async_work(env, c->_request));
43+
free(c);
44+
}
45+
46+
static napi_value async_method(napi_env env, napi_callback_info info) {
47+
napi_value promise;
48+
carrier* the_carrier = (carrier*) malloc(sizeof(carrier));
49+
NAPI_CALL(env, napi_create_promise(env, &the_carrier->_deferred, &promise));
50+
NAPI_CALL(env, napi_create_async_work(env, NULL, NULL,
51+
Execute, Complete, the_carrier, &the_carrier->_request));
52+
NAPI_CALL(env, napi_queue_async_work(env, the_carrier->_request));
53+
54+
return promise;
55+
}
56+
57+
static napi_value Init(napi_env env, napi_value exports) {
58+
napi_property_descriptor properties[] = {
59+
DECLARE_NAPI_PROPERTY("async_method", async_method),
60+
};
61+
62+
NAPI_CALL(env, napi_define_properties(
63+
env, exports, sizeof(properties) / sizeof(*properties), properties));
64+
65+
return exports;
66+
}
67+
68+
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

packages/test/pool/pool.html

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<meta http-equiv="X-UA-Compatible" content="IE=edge">
6+
<meta name="viewport" content="width=device-width, initial-scale=1.0">
7+
<title>pool</title>
8+
</head>
9+
<body>
10+
<script src="../../runtime/dist/emnapi.min.js"></script>
11+
<script src="../.cgenbuild/Debug/pool.js"></script>
12+
<script>
13+
(async function main () {
14+
const loadPromise = window.pool.default()
15+
const A = (await loadPromise).Module.emnapiExports
16+
17+
await Promise.all([undefined, undefined].map(() => A.async_method()))
18+
console.log('done')
19+
})()
20+
</script>
21+
</body>
22+
</html>

packages/test/pool/pool.test.js

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
'use strict'
2+
const { load } = require('../util')
3+
4+
async function main () {
5+
const loadPromise = load('pool')
6+
const A = await loadPromise
7+
8+
await Promise.all([undefined, undefined].map(() => A.async_method()))
9+
}
10+
11+
module.exports = main()

packages/test/script/test.js

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const files = glob.sync('**/*.test.js', {
1010
: process.env.MEMORY64
1111
? [
1212
'async/**/*',
13+
'pool/**/*',
1314
'tsfn/**/*'
1415
]
1516
: []

0 commit comments

Comments
 (0)