Skip to content

feat: port libuv threadpool #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/emnapi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ if(EMNAPI_INSTALL_SRC)
install(FILES
${EMNAPI_SRC}
${CMAKE_CURRENT_SOURCE_DIR}/src/queue.h
${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.c
DESTINATION "src/${PROJECT_NAME}")
endif()
24 changes: 24 additions & 0 deletions packages/emnapi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,30 @@ Output code can run in recent version modern browsers and Node.js latest LTS. IE

If a JS error is thrown on runtime initialization, Node.js process will exit. You can use `-sNODEJS_CATCH_EXIT=0` and add `ununcaughtException` handler yourself to avoid this. Alternatively, you can use `Module.onEmnapiInitialized` callback to catch error.

## Preprocess Macro Options

### `-DEMNAPI_WORKER_POOL_SIZE=4`

This is equivalent to [`UV_THREADPOOL_SIZE`](http://docs.libuv.org/en/v1.x/threadpool.html?highlight=UV_THREADPOOL_SIZE), default is `4`.
It represent max of `EMNAPI_WORKER_POOL_SIZE` async work (`napi_queue_async_work`) can be executed in parallel.

You can set both `PTHREAD_POOL_SIZE` and `EMNAPI_WORKER_POOL` to `number of CPU cores` in general.
If you use another library function which may create `N` child threads in async work,
then you need to set `PTHREAD_POOL_SIZE` to `EMNAPI_WORKER_POOL_SIZE * (N + 1)`.

This option only has effect if you use `-sUSE_PTHREADS`.
Emnapi will create `EMNAPI_WORKER_POOL_SIZE` threads when initializing,
it will throw error if `PTHREAD_POOL_SIZE < EMNAPI_WORKER_POOL_SIZE && PTHREAD_POOL_SIZE_STRICT == 2`.

See [Issue #8](https://github.com/toyobayashi/emnapi/issues/8) and [PR #9](https://github.com/toyobayashi/emnapi/pull/9) for more detail.

### `-DEMNAPI_ASYNC_SEND_TYPE=0`

This option only has effect if you use `-sUSE_PTHREADS`, Default is `0`.

- `0`: Use `setImmediate()` (`MessageChannel` and `postMessage`) to send async work.
- `1`: Use `Promise.resolve().then()` to send async work.

## Emnapi Runtime

Most APIs are implemented in JavaScript and they are depend on runtime code shipped in `library_napi.js` library file. So if you are building multiple wasm target, the same runtime code will be linked into each wasm glue js file. This is problematic when passing JavaScript objects across wasm bindings in same web page, we need to share emnapi's runtime code between multiple wasms like this:
Expand Down
4 changes: 1 addition & 3 deletions packages/emnapi/include/node_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ typedef napi_value (*napi_addon_register_func)(napi_env env,
NAPI_MODULE_INITIALIZER_X(napi_register_wasm_v, NAPI_MODULE_VERSION)
#define NAPI_MODULE(modname, regfunc) \
EXTERN_C_START \
void _emnapi_runtime_init(const char** key, const char*** error_messages); \
void _emnapi_execute_async_work(napi_async_work work); \
void _emnapi_runtime_init(const char** k, const char*** msg); \
NAPI_MODULE_EXPORT napi_value NAPI_WASM_INITIALIZER(napi_env env, \
napi_value exports) { \
_emnapi_runtime_init(NULL, NULL); \
_emnapi_execute_async_work(NULL); \
return regfunc(env, exports); \
} \
EXTERN_C_END
Expand Down
225 changes: 170 additions & 55 deletions packages/emnapi/src/emnapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#ifdef __EMSCRIPTEN_PTHREADS__
#include <pthread.h>
// #include <emscripten/threading.h>
#endif

#include <emscripten.h>
Expand Down Expand Up @@ -45,6 +46,8 @@

#define CHECK_NOT_NULL(val) CHECK((val) != NULL)

#define CHECK_EQ(a, b) CHECK((a) == (b))

EXTERN_C_START

extern napi_status napi_set_last_error(napi_env env,
Expand Down Expand Up @@ -154,69 +157,163 @@ emnapi_get_emscripten_version(napi_env env,

#ifdef __EMSCRIPTEN_PTHREADS__

// #ifdef __wasm64__
// #define __EMNAPI_ASYNC_SEND_CALLBACK_SIG \
// (EM_FUNC_SIG_RETURN_VALUE_V | \
// EM_FUNC_SIG_WITH_N_PARAMETERS(1) | \
// EM_FUNC_SIG_SET_PARAM(0, EM_FUNC_SIG_PARAM_I64))
// #else
// #define __EMNAPI_ASYNC_SEND_CALLBACK_SIG EM_FUNC_SIG_VI
// #endif

#ifndef EMNAPI_ASYNC_SEND_TYPE
#define EMNAPI_ASYNC_SEND_TYPE 0
#endif

#if EMNAPI_ASYNC_SEND_TYPE == 0
extern void _emnapi_set_immediate(void (*callback)(void*), void* data);
#define NEXT_TICK(callback, data) _emnapi_set_immediate((callback), (data))
#elif EMNAPI_ASYNC_SEND_TYPE == 1
extern void _emnapi_next_tick(void (*callback)(void*), void* data);
#define NEXT_TICK(callback, data) _emnapi_next_tick((callback), (data))
#else
#error "Invalid EMNAPI_ASYNC_SEND_TYPE"
#endif

extern void _emnapi_async_send_js(int type,
void (*callback)(void*),
void* data);

static void _emnapi_async_send(void (*callback)(void*), void* data) {
// TODO(?): need help
// Neither emscripten_dispatch_to_thread_async nor MAIN_THREAD_ASYNC_EM_ASM
// invoke the async complete callback if there is a printf() in worker thread.
// This breaks "packages/test/pool" tests.
// Not sure what happens, maybe has deadlock,
// and not sure whether this is Emscripten bug or my incorrect usage.
// BTW emscripten_dispatch_to_thread_async seems
// not support __wasm64__ V_I64 signature yet

// pthread_t main_thread = emscripten_main_browser_thread_id();
// if (pthread_equal(main_thread, pthread_self())) {
// NEXT_TICK(callback, data);
// } else {
// emscripten_dispatch_to_thread_async(main_thread,
// __EMNAPI_ASYNC_SEND_CALLBACK_SIG,
// callback,
// NULL,
// data);
// // or
// // MAIN_THREAD_ASYNC_EM_ASM({
// // emnapiGetDynamicCalls.call_vp($0, $1);
// // }, callback, data);
// }
Comment on lines +188 to +210
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RReverser I left comments here. If you prefer that emnapi should not depend on PThread object, I would appreciate it if you could open another PR to provide help. Sorry for my broken English, forget it if there is some strange or offensive expression.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RReverser c3a3cdb1 At last I found emscripten_proxy_async and em_proxying_queue_create works well, but using emscripten_proxy_get_system_queue() will also stuck if call printf in child thread. I could not figure out why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you prefer that emnapi should not depend on PThread object, I would appreciate it if you could open another PR to provide help

Heh unfortunately I'm finding a bit hard to follow how threading works in this library, so not sure I can submit a PR for that yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW one other reason I want it to have its own pool of workers is that upstream there are talks of making it possible to use pthreads without a pthread pool, and I now have a working implementation of that at least for Node.js.

However, it seems that emnapi won't work with that approach in the current form since it manipulates internal arrays and waits until unusedWorkers is not empty. But, if Emscripten will use pthreads without a pthread pool, it will be empty at the start so emnapi will wait forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like I had a slightly outdated dist version and you already removed some more usages. This is looking promising, going to give it a try with the pool-less version of Emscripten.


// Currently still use JavaScript to send work
// it's simple and clear
_emnapi_async_send_js(EMNAPI_ASYNC_SEND_TYPE, callback, data);
}

#include "threadpool.c"

typedef void (*_emnapi_call_into_module_callback)(napi_env env, void* args);
extern void _emnapi_call_into_module(napi_env env, _emnapi_call_into_module_callback callback, void* args);

struct napi_async_work__ {
napi_env env;
void* data;
napi_async_execute_callback execute;
napi_async_complete_callback complete;
void* data;
pthread_t tid;
uv_work_t work_req_;
};

typedef struct worker_count {
int unused;
int running;
} worker_count;

// extern void _emnapi_create_async_work_js(napi_async_work work);
// extern void _emnapi_delete_async_work_js(napi_async_work work);
extern void _emnapi_queue_async_work_js(napi_async_work work);
extern void _emnapi_on_execute_async_work_js(napi_async_work work);
extern int _emnapi_get_worker_count_js(worker_count* count);

static napi_async_work _emnapi_async_work_init(
static napi_async_work async_work_init(
napi_env env,
napi_async_execute_callback execute,
napi_async_complete_callback complete,
void* data
) {
napi_async_work work = (napi_async_work)malloc(sizeof(struct napi_async_work__));
napi_async_work work = (napi_async_work)calloc(1, sizeof(struct napi_async_work__));
if (work == NULL) return NULL;
work->env = env;
work->execute = execute;
work->complete = complete;
work->data = data;
work->tid = NULL;
EMNAPI_KEEPALIVE_PUSH();
return work;
}

static void _emnapi_async_work_destroy(napi_async_work work) {
if (work != NULL) {
free(work);
EMNAPI_KEEPALIVE_POP();
}
static void async_work_delete(napi_async_work work) {
free(work);
}

static void* _emnapi_on_execute_async_work(void* arg) {
napi_async_work work = (napi_async_work) arg;
static void async_work_do_thread_pool_work(napi_async_work work) {
work->execute(work->env, work->data);
_emnapi_on_execute_async_work_js(work); // postMessage to main thread
return NULL;
}
#endif

EMSCRIPTEN_KEEPALIVE
void _emnapi_execute_async_work(napi_async_work work) {
if (!work) return;
#ifdef __EMSCRIPTEN_PTHREADS__
pthread_t t;
pthread_create(&t, NULL, _emnapi_on_execute_async_work, work);
work->tid = t;
_emnapi_queue_async_work_js(work); // listen complete event
pthread_detach(t);
#endif
typedef struct complete_wrap_s {
int status;
napi_async_work work;
} complete_wrap_t;

static napi_status convert_error_code(int code) {
switch (code) {
case 0:
return napi_ok;
case EINVAL:
return napi_invalid_arg;
case ECANCELED:
return napi_cancelled;
default:
return napi_generic_failure;
}
}

static void async_work_on_complete(napi_env env, void* args) {
complete_wrap_t* wrap = (complete_wrap_t*) args;
napi_env _env = wrap->work->env;
napi_status status = convert_error_code(wrap->status);
void* data = wrap->work->data;
free(wrap);
wrap->work->complete(_env, status, data);
}

static void async_work_after_thread_pool_work(napi_async_work work, int status) {
if (work->complete == NULL) return;
napi_handle_scope scope;
napi_open_handle_scope(work->env, &scope);
complete_wrap_t* wrap = (complete_wrap_t*) malloc(sizeof(complete_wrap_t));
wrap->status = status;
wrap->work = work;
_emnapi_call_into_module(work->env, async_work_on_complete, wrap);
napi_close_handle_scope(work->env, scope);
}

static void async_work_schedule_work_on_execute(uv_work_t* req) {
napi_async_work self = container_of(req, struct napi_async_work__, work_req_);
async_work_do_thread_pool_work(self);
}

static void async_work_schedule_work_on_complete(uv_work_t* req, int status) {
napi_async_work self = container_of(req, struct napi_async_work__, work_req_);
EMNAPI_KEEPALIVE_POP();
async_work_after_thread_pool_work(self, status);
}

static void async_work_schedule_work(napi_async_work work) {
EMNAPI_KEEPALIVE_PUSH();
int status = uv_queue_work(&loop,
&work->work_req_,
async_work_schedule_work_on_execute,
async_work_schedule_work_on_complete);
CHECK_EQ(status, 0);
}

static int async_work_cancel_work(napi_async_work work) {
return uv_cancel((uv_req_t*)&work->work_req_);
}

#endif

napi_status napi_create_async_work(napi_env env,
napi_value async_resource,
napi_value async_resource_name,
Expand All @@ -229,11 +326,14 @@ napi_status napi_create_async_work(napi_env env,
CHECK_ARG(env, execute);
CHECK_ARG(env, result);

napi_async_work work = _emnapi_async_work_init(env, execute, complete, data);
napi_async_work work = async_work_init(env,
execute,
complete,
data);
if (work == NULL) {
return napi_set_last_error(env, napi_generic_failure, 0, NULL);
}
// _emnapi_create_async_work_js(work); // listen complete event

*result = work;

return napi_clear_last_error(env);
Expand All @@ -247,8 +347,8 @@ napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
CHECK_ENV(env);
CHECK_ARG(env, work);

// _emnapi_delete_async_work_js(work); // clean listeners
_emnapi_async_work_destroy(work);
async_work_delete(work);

return napi_clear_last_error(env);
#else
return napi_set_last_error(env, napi_generic_failure, 0, NULL);
Expand All @@ -260,12 +360,32 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
CHECK_ENV(env);
CHECK_ARG(env, work);

worker_count count;
_emnapi_get_worker_count_js(&count);
if (count.unused > 0 || count.running == 0) {
_emnapi_execute_async_work(work);
} else {
_emnapi_queue_async_work_js(work); // queue work
async_work_schedule_work(work);

return napi_clear_last_error(env);
#else
return napi_set_last_error(env, napi_generic_failure, 0, NULL);
#endif
}

#define CALL_UV(env, condition) \
do { \
int result = (condition); \
napi_status status = uvimpl::ConvertUVErrorCode(result); \
if (status != napi_ok) { \
return napi_set_last_error(env, status, result); \
} \
} while (0)

napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
#ifdef __EMSCRIPTEN_PTHREADS__
CHECK_ENV(env);
CHECK_ARG(env, work);

int result = async_work_cancel_work(work);
napi_status status = convert_error_code(async_work_cancel_work(work));
if (status != napi_ok) {
return napi_set_last_error(env, status, result, NULL);
}

return napi_clear_last_error(env);
Expand Down Expand Up @@ -316,12 +436,7 @@ struct napi_threadsafe_function__ {
bool async_ref;
};

typedef void (*_emnapi_call_into_module_callback)(napi_env env, void* args);

extern void _emnapi_tsfn_send_js(void (*callback)(void*), void* data);
extern void _emnapi_tsfn_dispatch_one_js(napi_env env, napi_ref ref, napi_threadsafe_function_call_js call_js_cb, void* context, void* data);
extern void _emnapi_call_into_module(napi_env env, _emnapi_call_into_module_callback callback, void* args);
extern void _emnapi_set_timeout(void (*callback)(void*), void* data, int delay);

static void _emnapi_tsfn_default_call_js(napi_env env, napi_value cb, void* context, void* data) {
if (!(env == NULL || cb == NULL)) {
Expand Down Expand Up @@ -428,7 +543,7 @@ static napi_status _emnapi_tsfn_init(napi_threadsafe_function func) {
if (func->max_queue_size == 0 || func->cond) {
return napi_ok;
}
_emnapi_set_timeout(_emnapi_tsfn_do_destroy, func, 0);
NEXT_TICK(_emnapi_tsfn_do_destroy, func);
return napi_generic_failure;
}

Expand Down Expand Up @@ -485,7 +600,7 @@ static void _emnapi_tsfn_close_handles_and_maybe_delete(
}
func->handles_closing = true;

_emnapi_set_timeout(_emnapi_tsfn_do_finalize, func, 0);
NEXT_TICK(_emnapi_tsfn_do_finalize, func);

napi_close_handle_scope(func->env, scope);
}
Expand Down Expand Up @@ -549,7 +664,7 @@ static void _emnapi_tsfn_send(napi_threadsafe_function func) {
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return;
}
_emnapi_tsfn_send_js(_emnapi_tsfn_async_cb, func);
_emnapi_async_send(_emnapi_tsfn_async_cb, func);
}

// only main thread
Expand Down
1 change: 0 additions & 1 deletion packages/emnapi/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ mergeInto(LibraryManager.library, {

delete Module._napi_register_wasm_v1
delete Module.__emnapi_runtime_init
delete Module.__emnapi_execute_async_work

callInStack(() => {
// HEAP.*?\[.*?\]
Expand Down
Loading