Skip to content

Commit dda6ca9

Browse files
mcollinaaduh95
authored andcommitted
worker: add worker.getHeapStatistics()
Adds worker.getHeapStatistics() so that the heap usage of the worker could be observer from the parent thread. Signed-off-by: Matteo Collina <[email protected]> PR-URL: #57888 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Chengzhong Wu <[email protected]> Reviewed-By: Darshan Sen <[email protected]> Reviewed-By: Stephen Belanger <[email protected]>
1 parent f606352 commit dda6ca9

File tree

9 files changed

+219
-0
lines changed

9 files changed

+219
-0
lines changed

doc/api/worker_threads.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,18 @@ If the Worker thread is no longer running, which may occur before the
13201320
[`'exit'` event][] is emitted, the returned `Promise` is rejected
13211321
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.
13221322

1323+
### `worker.getHeapStatistics()`
1324+
1325+
<!-- YAML
1326+
added: REPLACEME
1327+
-->
1328+
1329+
* Returns: {Promise}
1330+
1331+
This method returns a `Promise` that will resolve to an object identical to [`v8.getHeapStatistics()`][],
1332+
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
1333+
This methods allows the statistics to be observed from outside the actual thread.
1334+
13231335
### `worker.performance`
13241336

13251337
<!-- YAML
@@ -1614,6 +1626,7 @@ thread spawned will spawn another until the application crashes.
16141626
[`require('node:worker_threads').workerData`]: #workerworkerdata
16151627
[`trace_events`]: tracing.md
16161628
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
1629+
[`v8.getHeapStatistics()`]: v8.md#v8getheapstatistics
16171630
[`vm`]: vm.md
16181631
[`worker.SHARE_ENV`]: #workershare_env
16191632
[`worker.on('message')`]: #event-message_1

lib/internal/worker.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,17 @@ class Worker extends EventEmitter {
459459
};
460460
});
461461
}
462+
463+
getHeapStatistics() {
464+
const taker = this[kHandle]?.getHeapStatistics();
465+
466+
return new Promise((resolve, reject) => {
467+
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
468+
taker.ondone = (handle) => {
469+
resolve(handle);
470+
};
471+
});
472+
}
462473
}
463474

464475
/**

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ namespace node {
7979
V(SIGINTWATCHDOG) \
8080
V(WORKER) \
8181
V(WORKERHEAPSNAPSHOT) \
82+
V(WORKERHEAPSTATISTICS) \
8283
V(WRITEWRAP) \
8384
V(ZLIB)
8485

src/env_properties.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@
443443
V(tty_constructor_template, v8::FunctionTemplate) \
444444
V(write_wrap_template, v8::ObjectTemplate) \
445445
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
446+
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
446447
V(x509_constructor_template, v8::FunctionTemplate)
447448

448449
#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \

src/node_worker.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,116 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
811811
}
812812
}
813813

814+
class WorkerHeapStatisticsTaker : public AsyncWrap {
815+
public:
816+
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
817+
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSTATISTICS) {}
818+
819+
SET_NO_MEMORY_INFO()
820+
SET_MEMORY_INFO_NAME(WorkerHeapStatisticsTaker)
821+
SET_SELF_SIZE(WorkerHeapStatisticsTaker)
822+
};
823+
824+
void Worker::GetHeapStatistics(const FunctionCallbackInfo<Value>& args) {
825+
Worker* w;
826+
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
827+
828+
Environment* env = w->env();
829+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
830+
Local<Object> wrap;
831+
if (!env->worker_heap_statistics_taker_template()
832+
->NewInstance(env->context())
833+
.ToLocal(&wrap)) {
834+
return;
835+
}
836+
837+
// The created WorkerHeapStatisticsTaker is an object owned by main
838+
// thread's Isolate, it can not be accessed by worker thread
839+
std::unique_ptr<BaseObjectPtr<WorkerHeapStatisticsTaker>> taker =
840+
std::make_unique<BaseObjectPtr<WorkerHeapStatisticsTaker>>(
841+
MakeDetachedBaseObject<WorkerHeapStatisticsTaker>(env, wrap));
842+
843+
// Interrupt the worker thread and take a snapshot, then schedule a call
844+
// on the parent thread that turns that snapshot into a readable stream.
845+
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
846+
env](Environment* worker_env) mutable {
847+
// We create a unique pointer to HeapStatistics so that the actual object
848+
// it's not copied in the lambda, but only the pointer is.
849+
auto heap_stats = std::make_unique<v8::HeapStatistics>();
850+
worker_env->isolate()->GetHeapStatistics(heap_stats.get());
851+
852+
// Here, the worker thread temporarily owns the WorkerHeapStatisticsTaker
853+
// object.
854+
855+
env->SetImmediateThreadsafe(
856+
[taker = std::move(taker),
857+
heap_stats = std::move(heap_stats)](Environment* env) mutable {
858+
Isolate* isolate = env->isolate();
859+
HandleScope handle_scope(isolate);
860+
Context::Scope context_scope(env->context());
861+
862+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
863+
864+
Local<v8::Name> heap_stats_names[] = {
865+
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size"),
866+
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size_executable"),
867+
FIXED_ONE_BYTE_STRING(isolate, "total_physical_size"),
868+
FIXED_ONE_BYTE_STRING(isolate, "total_available_size"),
869+
FIXED_ONE_BYTE_STRING(isolate, "used_heap_size"),
870+
FIXED_ONE_BYTE_STRING(isolate, "heap_size_limit"),
871+
FIXED_ONE_BYTE_STRING(isolate, "malloced_memory"),
872+
FIXED_ONE_BYTE_STRING(isolate, "peak_malloced_memory"),
873+
FIXED_ONE_BYTE_STRING(isolate, "does_zap_garbage"),
874+
FIXED_ONE_BYTE_STRING(isolate, "number_of_native_contexts"),
875+
FIXED_ONE_BYTE_STRING(isolate, "number_of_detached_contexts"),
876+
FIXED_ONE_BYTE_STRING(isolate, "total_global_handles_size"),
877+
FIXED_ONE_BYTE_STRING(isolate, "used_global_handles_size"),
878+
FIXED_ONE_BYTE_STRING(isolate, "external_memory")};
879+
880+
// Define an array of property values
881+
Local<Value> heap_stats_values[] = {
882+
Number::New(isolate, heap_stats->total_heap_size()),
883+
Number::New(isolate, heap_stats->total_heap_size_executable()),
884+
Number::New(isolate, heap_stats->total_physical_size()),
885+
Number::New(isolate, heap_stats->total_available_size()),
886+
Number::New(isolate, heap_stats->used_heap_size()),
887+
Number::New(isolate, heap_stats->heap_size_limit()),
888+
Number::New(isolate, heap_stats->malloced_memory()),
889+
Number::New(isolate, heap_stats->peak_malloced_memory()),
890+
Boolean::New(isolate, heap_stats->does_zap_garbage()),
891+
Number::New(isolate, heap_stats->number_of_native_contexts()),
892+
Number::New(isolate, heap_stats->number_of_detached_contexts()),
893+
Number::New(isolate, heap_stats->total_global_handles_size()),
894+
Number::New(isolate, heap_stats->used_global_handles_size()),
895+
Number::New(isolate, heap_stats->external_memory())};
896+
897+
DCHECK_EQ(arraysize(heap_stats_names), arraysize(heap_stats_values));
898+
899+
// Create the object with the property names and values
900+
Local<Object> stats = Object::New(isolate,
901+
Null(isolate),
902+
heap_stats_names,
903+
heap_stats_values,
904+
arraysize(heap_stats_names));
905+
906+
Local<Value> args[] = {stats};
907+
taker->get()->MakeCallback(
908+
env->ondone_string(), arraysize(args), args);
909+
// implicitly delete `taker`
910+
},
911+
CallbackFlags::kUnrefed);
912+
913+
// Now, the lambda is delivered to the main thread, as a result, the
914+
// WorkerHeapStatisticsTaker object is delivered to the main thread, too.
915+
});
916+
917+
if (scheduled) {
918+
args.GetReturnValue().Set(wrap);
919+
} else {
920+
args.GetReturnValue().Set(Local<Object>());
921+
}
922+
}
923+
814924
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
815925
Worker* w;
816926
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
@@ -991,6 +1101,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
9911101
SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
9921102
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
9931103
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
1104+
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
9941105

9951106
SetConstructorFunction(isolate, target, "Worker", w);
9961107
}
@@ -1009,6 +1120,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
10091120
wst->InstanceTemplate());
10101121
}
10111122

1123+
{
1124+
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
1125+
1126+
wst->InstanceTemplate()->SetInternalFieldCount(
1127+
WorkerHeapSnapshotTaker::kInternalFieldCount);
1128+
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
1129+
1130+
Local<String> wst_string =
1131+
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapStatisticsTaker");
1132+
wst->SetClassName(wst_string);
1133+
isolate_data->set_worker_heap_statistics_taker_template(
1134+
wst->InstanceTemplate());
1135+
}
1136+
10121137
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
10131138
}
10141139

@@ -1074,6 +1199,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
10741199
registry->Register(Worker::TakeHeapSnapshot);
10751200
registry->Register(Worker::LoopIdleTime);
10761201
registry->Register(Worker::LoopStartTime);
1202+
registry->Register(Worker::GetHeapStatistics);
10771203
}
10781204

10791205
} // anonymous namespace

src/node_worker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class Worker : public AsyncWrap {
7878
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
7979
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
8080
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
81+
static void GetHeapStatistics(
82+
const v8::FunctionCallbackInfo<v8::Value>& args);
8183

8284
private:
8385
bool CreateEnvMessagePort(Environment* env);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fixtures = require('../common/fixtures');
5+
6+
common.skipIfInspectorDisabled();
7+
8+
const {
9+
Worker,
10+
isMainThread,
11+
} = require('worker_threads');
12+
13+
if (!isMainThread) {
14+
common.skip('This test only works on a main thread');
15+
}
16+
17+
// Ensures that worker.getHeapStatistics() returns valid data
18+
19+
const assert = require('assert');
20+
21+
if (isMainThread) {
22+
const name = 'Hello Thread';
23+
const worker = new Worker(fixtures.path('worker-name.js'), {
24+
name,
25+
});
26+
worker.once('message', common.mustCall(async (message) => {
27+
const stats = await worker.getHeapStatistics();
28+
const keys = [
29+
`total_heap_size`,
30+
`total_heap_size_executable`,
31+
`total_physical_size`,
32+
`total_available_size`,
33+
`used_heap_size`,
34+
`heap_size_limit`,
35+
`malloced_memory`,
36+
`peak_malloced_memory`,
37+
`does_zap_garbage`,
38+
`number_of_native_contexts`,
39+
`number_of_detached_contexts`,
40+
`total_global_handles_size`,
41+
`used_global_handles_size`,
42+
`external_memory`,
43+
].sort();
44+
assert.deepStrictEqual(keys, Object.keys(stats).sort());
45+
for (const key of keys) {
46+
if (key === 'does_zap_garbage') {
47+
assert.strictEqual(typeof stats[key], 'boolean', `Expected ${key} to be a boolean`);
48+
continue;
49+
}
50+
assert.strictEqual(typeof stats[key], 'number', `Expected ${key} to be a number`);
51+
assert.ok(stats[key] >= 0, `Expected ${key} to be >= 0`);
52+
}
53+
54+
worker.postMessage('done');
55+
}));
56+
57+
worker.once('exit', common.mustCall(async (code) => {
58+
assert.strictEqual(code, 0);
59+
await assert.rejects(worker.getHeapStatistics(), {
60+
code: 'ERR_WORKER_NOT_RUNNING'
61+
});
62+
}));
63+
}

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const { getSystemErrorName } = require('util');
6161
delete providers.ELDHISTOGRAM;
6262
delete providers.SIGINTWATCHDOG;
6363
delete providers.WORKERHEAPSNAPSHOT;
64+
delete providers.WORKERHEAPSTATISTICS;
6465
delete providers.BLOBREADER;
6566
delete providers.RANDOMPRIMEREQUEST;
6667
delete providers.CHECKPRIMEREQUEST;

typings/internalBinding/worker.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ declare namespace InternalWorkerBinding {
1515
unref(): void;
1616
getResourceLimits(): Float64Array;
1717
takeHeapSnapshot(): object;
18+
getHeapStatistics(): Promise<object>;
1819
loopIdleTime(): number;
1920
loopStartTime(): number;
2021
}

0 commit comments

Comments
 (0)