Skip to content

Commit cce88e0

Browse files
committed
feat: support lazy initializing ports
Signed-off-by: Daeyeon Jeong <[email protected]>
1 parent 389a098 commit cce88e0

17 files changed

+129
-106
lines changed

deps/message-port/CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,3 @@ add_library(nd-message-port async-uv.cc message-port.cc debug-mem-trace.cc)
99
target_link_libraries(nd-message-port PUBLIC libuv::libuv nd-utils)
1010
target_include_directories(nd-message-port PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
1111
target_compile_definitions(nd-message-port PUBLIC MSG_PORTS_DEFS)
12-
if(BUILD_SHARED_LIBS)
13-
target_compile_options(nd-message-port PRIVATE -fvisibility=hidden)
14-
endif()

deps/message-port/VERSION

Lines changed: 0 additions & 1 deletion
This file was deleted.

deps/message-port/async-uv.cc

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,22 @@ AsyncUV::~AsyncUV() {
4545
});
4646
}
4747

48-
void AsyncUV::Send(uv_loop_t* loop, Task task) {
48+
bool AsyncUV::Send(uv_loop_t* loop, Task task) {
4949
if (loop == nullptr) {
50-
EnqueueTask(task);
51-
return;
50+
return false;
5251
}
53-
(new AsyncUV(loop, task))->Send();
52+
return (new AsyncUV(loop, task))->Send();
5453
}
5554

56-
void AsyncUV::EnqueueTask(Task task) {
55+
size_t AsyncUV::EnqueueTask(Task task) {
5756
std::lock_guard<std::mutex> lock(queue_mutex_);
5857
queue_.push(task);
58+
return queue_.size();
5959
}
6060

6161
bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) {
6262
std::lock_guard<std::mutex> lock(queue_mutex_);
63-
TRACE(MSGPORT, "drain pending queue", queue_.size());
63+
TRACE(MSGPORT, "drain pending tasks", queue_.size());
6464

6565
if (loop == nullptr) {
6666
return false;
@@ -73,6 +73,20 @@ bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) {
7373
return true;
7474
}
7575

76+
void AsyncUV::DeletePendingTasks() {
77+
std::lock_guard<std::mutex> lock(queue_mutex_);
78+
TRACE(MSGPORT, "delete pending tasks", queue_.size());
79+
if (!queue_.empty()) {
80+
std::queue<Task> empty;
81+
std::swap(queue_, empty);
82+
}
83+
}
84+
85+
bool AsyncUV::IsPendingTasksEmpty() {
86+
std::lock_guard<std::mutex> lock(queue_mutex_);
87+
return queue_.empty();
88+
}
89+
7690
void AsyncUV::Init(uv_loop_t* loop, Task task) {
7791
task_ = task;
7892

@@ -88,9 +102,10 @@ void AsyncUV::Init(uv_loop_t* loop, Task task) {
88102
TRACE_ADD(ASYNC_UV, uv_h_);
89103
}
90104

91-
void AsyncUV::Send() {
105+
bool AsyncUV::Send() {
92106
if (!uv_h_) {
93-
return;
107+
return false;
94108
}
95109
uv_async_send(uv_h_);
110+
return true;
96111
}

deps/message-port/async-uv.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@ class EXPORT_API AsyncUV {
3333

3434
// If loop is nullptr, the task is enqueued to the pending queue.
3535
// When a valid loop is provided later, all pending tasks will be executed.
36-
static void Send(uv_loop_t* loop, Task task);
36+
static bool Send(uv_loop_t* loop, Task task);
3737
static bool DrainPendingTasks(uv_loop_t* loop);
38-
static void EnqueueTask(Task task);
38+
static size_t EnqueueTask(Task task);
39+
static void DeletePendingTasks();
40+
static bool IsPendingTasksEmpty();
3941

4042
AsyncUV(uv_loop_t* loop = nullptr, Task task = nullptr);
4143
~AsyncUV();
4244

4345
void Init(uv_loop_t* loop, Task task);
44-
void Send();
46+
bool Send();
4547

4648
private:
4749
uv_async_t* uv_h_;

deps/message-port/channel.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include <future>
20+
#include <memory>
2021

2122
#ifndef EXPORT_API
2223
#define EXPORT_API __attribute__((visibility("default")))
@@ -36,6 +37,8 @@ struct EXPORT_API Channel {
3637
static Channel New(std::shared_future<uv_loop_t*> loop,
3738
const char* origin = nullptr);
3839
static void DrainPendingMessages(uv_loop_t* loop);
40+
// This deletes messages if any exists in the pending queue.
41+
static void DeletePendingMessages();
3942

4043
void Reset();
4144
};

deps/message-port/message-port.cc

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ Port::Result Port::PostMessage(std::shared_ptr<MessageEvent> event) {
102102
}
103103

104104
// Check if the sink has a receiver.
105-
if (sink->internal_->callback == nullptr) {
105+
if (internal_->loop != nullptr && sink->internal_->callback == nullptr) {
106+
// TODO: Enqueue events if the number of the events in queues is acceptable.
106107
TRACE(MSGPORT, "sink has no callback.");
107108
return Result::NoOnMessage;
108109
}
@@ -134,21 +135,24 @@ Port::Result Port::PostMessage(std::shared_ptr<MessageEvent> event) {
134135
}
135136
}
136137

137-
// Sends a task immediately if the loop handle is valid. If the loop handle is
138-
// nullptr, the task is enqueued to the pending queue and will be executed
139-
// later when a valid loop handle is provided.
140-
AsyncUV::Send(internal_->loop,
141-
[event = event, sink_weak = internal_->sink](uv_async_t*) {
142-
// event: shared_ptr, sink_weak: weak_ptr
143-
auto sink = sink_weak.lock();
144-
if (sink && sink->internal_->callback) {
145-
// Since sink is locked, event->target() is always valid
146-
// inside the callback.
147-
sink->internal_->callback(event.get());
148-
} else {
149-
TRACE(MSGPORT, "sink port released, or no callback");
150-
}
151-
});
138+
auto task = [event = event, sink_weak = internal_->sink](uv_async_t*) {
139+
// event: shared_ptr, sink_weak: weak_ptr
140+
auto sink = sink_weak.lock();
141+
if (sink && sink->internal_->callback) {
142+
// Since sink is locked, event->target() is always valid
143+
// inside the callback.
144+
sink->internal_->callback(event.get());
145+
} else {
146+
TRACE(MSGPORT, "sink port released, or no callback");
147+
}
148+
};
149+
150+
if (internal_->loop == nullptr) {
151+
AsyncUV::EnqueueTask(std::move(task));
152+
return Result::MessageEventQueued;
153+
}
154+
155+
AsyncUV::Send(internal_->loop, std::move(task));
152156
return Result::NoError;
153157
}
154158

@@ -191,6 +195,10 @@ void Channel::DrainPendingMessages(uv_loop_t* loop) {
191195
AsyncUV::DrainPendingTasks(loop);
192196
}
193197

198+
void Channel::DeletePendingMessages() {
199+
AsyncUV::DeletePendingTasks();
200+
}
201+
194202
void Channel::Reset() {
195203
port1.reset();
196204
port2.reset();

deps/message-port/message-port.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class EXPORT_API Port {
5252
using OnMessageCallback = std::function<void(const MessageEvent*)>;
5353
enum Result {
5454
NoError = 0,
55+
MessageEventQueued,
5556
NoSink,
5657
NoOnMessage,
5758
InvalidMessageEvent,

deps/node/src/env.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
#ifdef LWNODE
2828
#include "lwnode.h"
29-
#include <nd-vm-message-channel.h>
29+
#include <nd-vm-main-message-port.h>
3030
#include <uv-loop-holder.h>
3131
#endif
3232

@@ -359,10 +359,12 @@ Environment::Environment(IsolateData* isolate_data,
359359
}
360360

361361
#if defined(LWNODE)
362-
message_channel_ = new MessageChannel();
362+
channel_ = Channel::New(uv_promise_.get_future(), "embedder");
363+
main_message_port_ =
364+
new MainMessagePort(channel_.port2, std::move(uv_promise_));
363365
loop_holder_ = new LoopHolderUV(isolate_data->event_loop());
364-
LWNode::InitMessageChannel(
365-
context, message_channel_, loop_holder_, isolate_data->event_loop());
366+
LWNode::InitMainMessagePort(
367+
context, main_message_port_, loop_holder_, isolate_data->event_loop());
366368
#endif
367369

368370
#if HAVE_INSPECTOR
@@ -495,8 +497,8 @@ Environment::~Environment() {
495497

496498
#if defined(LWNODE)
497499
// @lwnode
498-
delete message_channel_;
499-
message_channel_ = nullptr;
500+
delete main_message_port_;
501+
main_message_port_ = nullptr;
500502

501503
delete loop_holder_;
502504
loop_holder_ = nullptr;
@@ -1103,11 +1105,11 @@ void Environment::RunWeakRefCleanup() {
11031105

11041106
#if defined(LWNODE)
11051107
std::shared_ptr<Port> Environment::GetPort() {
1106-
return message_channel_->port1();
1108+
return channel_.port1;
11071109
}
11081110

1109-
MessageChannel* Environment::message_channel() {
1110-
return message_channel_;
1111+
MainMessagePort* Environment::main_message_port() {
1112+
return main_message_port_;
11111113
}
11121114
#endif
11131115

deps/node/src/env.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@
5151
#include <vector>
5252

5353
// @lwnode
54-
class MessageChannel;
54+
#include <future>
55+
#include <channel.h>
56+
class MainMessagePort;
5557
class Port;
5658
class LoopHolderUV;
59+
struct Channel;
5760
// @lwnode
5861

5962
namespace node {
@@ -1406,11 +1409,13 @@ class Environment : public MemoryRetainer {
14061409
// Port
14071410
public:
14081411
std::shared_ptr<Port> GetPort();
1409-
MessageChannel* message_channel();
1412+
MainMessagePort* main_message_port();
14101413

14111414
private:
1412-
MessageChannel* message_channel_;
1413-
LoopHolderUV* loop_holder_;
1415+
std::promise<uv_loop_t*> uv_promise_;
1416+
Channel channel_;
1417+
MainMessagePort* main_message_port_ = nullptr;
1418+
LoopHolderUV* loop_holder_ = nullptr;
14141419
#endif
14151420
};
14161421

escargotshim.gyp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
'src/lwnode/lwnode-gc-strategy.cc',
7878
'src/lwnode/nd-mod-base.cc',
7979
'src/lwnode/nd-mod-message-port.cc',
80-
'src/lwnode/nd-vm-message-channel.cc',
80+
'src/lwnode/nd-vm-main-message-port.cc',
8181
],
8282
'defines': ['V8_PROMISE_INTERNAL_FIELD_COUNT=1',
8383
'LWNODE_ENABLE_EXPERIMENTAL_SERIALIZATION=1',

include/lwnode/lwnode.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ContextRef;
2828
class ValueRef;
2929
} // namespace Escargot
3030

31-
class MessageChannel;
31+
class MainMessagePort;
3232
class Port;
3333
class LoopHolderUV;
3434

@@ -38,17 +38,17 @@ namespace LWNode {
3838

3939
enum ContextEmbedderIndex {
4040
// Others are listed in deps/node/src/node_context_data.h.
41-
kMessageChannel = 90,
41+
kMainMessagePort = 90,
4242
kLoopHolder = 91,
4343
};
4444

4545
void InitializeProcessMethods(v8::Local<v8::Object> target,
4646
v8::Local<v8::Context> context);
4747

48-
void InitMessageChannel(v8::Local<v8::Context> context,
49-
MessageChannel* channel,
50-
LoopHolderUV* loop_holder,
51-
uv_loop_t* loop);
48+
void InitMainMessagePort(v8::Local<v8::Context> context,
49+
MainMessagePort* port,
50+
LoopHolderUV* loop_holder,
51+
uv_loop_t* loop);
5252

5353
void IdleGC(v8::Isolate* isolate = nullptr);
5454
void initDebugger();

include/lwnode/message-port.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class EXPORT_API Port {
5252
using OnMessageCallback = std::function<void(const MessageEvent*)>;
5353
enum Result {
5454
NoError = 0,
55+
MessageEventQueued,
5556
NoSink,
5657
NoOnMessage,
5758
InvalidMessageEvent,

include/lwnode/nd-vm-message-channel.h renamed to include/lwnode/nd-vm-main-message-port.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <future>
1920
#include <memory>
2021

2122
namespace Escargot {
@@ -26,25 +27,22 @@ class FunctionObjectRef;
2627
class Port;
2728
using uv_loop_t = struct uv_loop_s;
2829

29-
class MessageChannel {
30+
class MainMessagePort {
3031
public:
31-
MessageChannel();
32-
~MessageChannel();
32+
MainMessagePort(std::shared_ptr<Port> port,
33+
std::promise<uv_loop_t*>&& promise);
34+
~MainMessagePort();
3335

3436
void Init(Escargot::ContextRef* context, uv_loop_t* loop);
35-
void Start();
3637

37-
std::shared_ptr<Port> port1() { return port1_; }
38-
std::shared_ptr<Port> port2() { return port2_; }
38+
std::shared_ptr<Port> port() { return port_; }
3939
Escargot::ContextRef* context() { return context_; }
4040

4141
void SetMessageEventClass(Escargot::FunctionObjectRef* klass);
4242
Escargot::FunctionObjectRef* MessageEventClass();
4343

4444
private:
45-
// In runtime perspective, port1 is a sink.
46-
std::shared_ptr<Port> port1_;
47-
std::shared_ptr<Port> port2_;
45+
std::shared_ptr<Port> port_;
4846
Escargot::ContextRef* context_;
4947
uv_loop_t* uv_loop_;
5048

src/lwnode/lwnode.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#include "lwnode/lwnode.h"
1818
#include <EscargotPublic.h>
1919
#include <malloc.h> // for malloc_trim
20-
#include <nd-vm-message-channel.h>
20+
#include <nd-vm-main-message-port.h>
2121
#include <uv-loop-holder.h>
2222
#include <codecvt>
2323
#include <fstream>
@@ -240,15 +240,15 @@ static ValueRef* Unref(ExecutionStateRef* state,
240240
return ValueRef::create(loop_holder->ref_count());
241241
}
242242

243-
void InitMessageChannel(Local<Context> context,
244-
MessageChannel* channel,
243+
void InitMainMessagePort(Local<Context> context,
244+
MainMessagePort* main_port,
245245
LoopHolderUV* loop_holder,
246246
uv_loop_t* loop) {
247247
auto lwContext = CVAL(*context)->context();
248248
auto esContext = lwContext->get();
249-
channel->Init(esContext, loop);
249+
main_port->Init(esContext, loop);
250250

251-
lwContext->SetAlignedPointerInEmbedderData(kMessageChannel, channel);
251+
lwContext->SetAlignedPointerInEmbedderData(kMainMessagePort, main_port);
252252
lwContext->SetAlignedPointerInEmbedderData(kLoopHolder, loop_holder);
253253
}
254254

0 commit comments

Comments
 (0)