Skip to content

Commit 347ea10

Browse files
addaleaxjasnell
authored andcommitted
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. Originally landed in the QUIC repo Original review metadata: ``` PR-URL: nodejs/quic#165 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Daniel Bevenius <[email protected]> ``` Signed-off-by: James M Snell <[email protected]>
1 parent a0c3c4d commit 347ea10

File tree

3 files changed

+261
-59
lines changed

3 files changed

+261
-59
lines changed

lib/dgram.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
231231
this.on('listening', onListening);
232232
}
233233

234-
if (port instanceof UDP) {
234+
if (port !== null &&
235+
typeof port === 'object' &&
236+
typeof port.recvStart === 'function') {
235237
replaceHandle(this, port);
236238
startListening(this);
237239
return this;

src/udp_wrap.cc

Lines changed: 149 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
99+
100+
UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
101+
CHECK_GT(obj->InternalFieldCount(), UDPWrap::kUDPWrapBaseField);
102+
return static_cast<UDPWrapBase*>(
103+
obj->GetAlignedPointerFromInternalField(UDPWrap::kUDPWrapBaseField));
104+
}
105+
106+
void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
107+
env->SetProtoMethod(t, "recvStart", RecvStart);
108+
env->SetProtoMethod(t, "recvStop", RecvStop);
109+
}
76110

77111
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78112
: HandleWrap(env,
79113
object,
80114
reinterpret_cast<uv_handle_t*>(&handle_),
81115
AsyncWrap::PROVIDER_UDPWRAP) {
116+
object->SetAlignedPointerInInternalField(
117+
kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
118+
82119
int r = uv_udp_init(env->event_loop(), &handle_);
83120
CHECK_EQ(r, 0); // can't fail anyway
121+
122+
set_listener(this);
84123
}
85124

86125

@@ -112,6 +151,7 @@ void UDPWrap::Initialize(Local<Object> target,
112151
Local<FunctionTemplate>(),
113152
attributes);
114153

154+
UDPWrapBase::AddMethods(env, t);
115155
env->SetProtoMethod(t, "open", Open);
116156
env->SetProtoMethod(t, "bind", Bind);
117157
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +160,6 @@ void UDPWrap::Initialize(Local<Object> target,
120160
env->SetProtoMethod(t, "connect6", Connect6);
121161
env->SetProtoMethod(t, "send6", Send6);
122162
env->SetProtoMethod(t, "disconnect", Disconnect);
123-
env->SetProtoMethod(t, "recvStart", RecvStart);
124-
env->SetProtoMethod(t, "recvStop", RecvStop);
125163
env->SetProtoMethod(t, "getpeername",
126164
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
127165
env->SetProtoMethod(t, "getsockname",
@@ -220,6 +258,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
220258
flags);
221259
}
222260

261+
if (err == 0)
262+
wrap->listener()->OnAfterBind();
263+
223264
args.GetReturnValue().Set(err);
224265
}
225266

@@ -464,14 +505,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
464505
CHECK(args[3]->IsBoolean());
465506
}
466507

467-
Local<Object> req_wrap_obj = args[0].As<Object>();
468508
Local<Array> chunks = args[1].As<Array>();
469509
// it is faster to fetch the length of the
470510
// array in js-land
471511
size_t count = args[2].As<Uint32>()->Value();
472-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
473-
474-
size_t msg_size = 0;
475512

476513
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
477514

@@ -482,7 +519,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
482519
size_t length = Buffer::Length(chunk);
483520

484521
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
485-
msg_size += length;
486522
}
487523

488524
int err = 0;
@@ -492,14 +528,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
492528
const unsigned short port = args[3].As<Uint32>()->Value();
493529
node::Utf8Value address(env->isolate(), args[4]);
494530
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
495-
if (err == 0) {
531+
if (err == 0)
496532
addr = reinterpret_cast<sockaddr*>(&addr_storage);
497-
}
498533
}
499534

500-
uv_buf_t* bufs_ptr = *bufs;
501-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
502-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
535+
if (err == 0) {
536+
wrap->current_send_req_wrap_ = args[0].As<Object>();
537+
wrap->current_send_has_callback_ =
538+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
539+
540+
err = wrap->Send(*bufs, count, addr);
541+
542+
wrap->current_send_req_wrap_.Clear();
543+
wrap->current_send_has_callback_ = false;
544+
}
545+
546+
args.GetReturnValue().Set(err);
547+
}
548+
549+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
550+
size_t count,
551+
const sockaddr* addr) {
552+
if (IsHandleClosing()) return UV_EBADF;
553+
554+
size_t msg_size = 0;
555+
for (size_t i = 0; i < count; i++)
556+
msg_size += bufs_ptr[i].len;
557+
558+
int err = 0;
559+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
560+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
503561
if (err == UV_ENOSYS || err == UV_EAGAIN) {
504562
err = 0;
505563
} else if (err >= 0) {
@@ -517,28 +575,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
517575
CHECK_EQ(static_cast<size_t>(err), msg_size);
518576
// + 1 so that the JS side can distinguish 0-length async sends from
519577
// 0-length sync sends.
520-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
521-
return;
578+
return msg_size + 1;
522579
}
523580
}
524581
}
525582

526583
if (err == 0) {
527-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
528-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
529-
req_wrap->msg_size = msg_size;
530-
531-
err = req_wrap->Dispatch(uv_udp_send,
532-
&wrap->handle_,
533-
bufs_ptr,
534-
count,
535-
addr,
536-
OnSend);
584+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
585+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
586+
if (req_wrap == nullptr) return UV_ENOSYS;
587+
588+
err = req_wrap->Dispatch(
589+
uv_udp_send,
590+
&handle_,
591+
bufs_ptr,
592+
count,
593+
addr,
594+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
595+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
596+
self->listener()->OnSendDone(
597+
ReqWrap<uv_udp_send_t>::from_req(req), status);
598+
}});
537599
if (err)
538600
delete req_wrap;
539601
}
540602

541-
args.GetReturnValue().Set(err);
603+
return err;
604+
}
605+
606+
607+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
608+
SendWrap* req_wrap = new SendWrap(env(),
609+
current_send_req_wrap_,
610+
current_send_has_callback_);
611+
req_wrap->msg_size = msg_size;
612+
return req_wrap;
542613
}
543614

544615

@@ -552,31 +623,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
552623
}
553624

554625

555-
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
556-
UDPWrap* wrap;
557-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
558-
args.Holder(),
559-
args.GetReturnValue().Set(UV_EBADF));
560-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
626+
AsyncWrap* UDPWrap::GetAsyncWrap() {
627+
return this;
628+
}
629+
630+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
631+
return uv_udp_getpeername(&handle_, name, namelen);
632+
}
633+
634+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
635+
return uv_udp_getsockname(&handle_, name, namelen);
636+
}
637+
638+
void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
639+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
640+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
641+
}
642+
643+
int UDPWrap::RecvStart() {
644+
if (IsHandleClosing()) return UV_EBADF;
645+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
561646
// UV_EALREADY means that the socket is already bound but that's okay
562647
if (err == UV_EALREADY)
563648
err = 0;
564-
args.GetReturnValue().Set(err);
649+
return err;
565650
}
566651

567652

568-
void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
569-
UDPWrap* wrap;
570-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
571-
args.Holder(),
572-
args.GetReturnValue().Set(UV_EBADF));
573-
int r = uv_udp_recv_stop(&wrap->handle_);
574-
args.GetReturnValue().Set(r);
653+
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
654+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
655+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
656+
}
657+
658+
int UDPWrap::RecvStop() {
659+
if (IsHandleClosing()) return UV_EBADF;
660+
return uv_udp_recv_stop(&handle_);
575661
}
576662

577663

578-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
579-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
664+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
665+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
580666
if (req_wrap->have_callback()) {
581667
Environment* env = req_wrap->env();
582668
HandleScope handle_scope(env->isolate());
@@ -593,43 +679,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
593679
void UDPWrap::OnAlloc(uv_handle_t* handle,
594680
size_t suggested_size,
595681
uv_buf_t* buf) {
596-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
597-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
682+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
683+
reinterpret_cast<uv_udp_t*>(handle));
684+
*buf = wrap->listener()->OnAlloc(suggested_size);
685+
}
686+
687+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
688+
return env()->AllocateManaged(suggested_size).release();
598689
}
599690

600691
void UDPWrap::OnRecv(uv_udp_t* handle,
601692
ssize_t nread,
602-
const uv_buf_t* buf_,
603-
const struct sockaddr* addr,
693+
const uv_buf_t* buf,
694+
const sockaddr* addr,
604695
unsigned int flags) {
605-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
606-
Environment* env = wrap->env();
696+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
697+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
698+
}
607699

608-
AllocatedBuffer buf(env, *buf_);
700+
void UDPWrap::OnRecv(ssize_t nread,
701+
const uv_buf_t& buf_,
702+
const sockaddr* addr,
703+
unsigned int flags) {
704+
Environment* env = this->env();
705+
AllocatedBuffer buf(env, buf_);
609706
if (nread == 0 && addr == nullptr) {
610707
return;
611708
}
612709

613710
HandleScope handle_scope(env->isolate());
614711
Context::Scope context_scope(env->context());
615712

616-
Local<Object> wrap_obj = wrap->object();
617713
Local<Value> argv[] = {
618714
Integer::New(env->isolate(), nread),
619-
wrap_obj,
715+
object(),
620716
Undefined(env->isolate()),
621717
Undefined(env->isolate())
622718
};
623719

624720
if (nread < 0) {
625-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
721+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
626722
return;
627723
}
628724

629725
buf.Resize(nread);
630726
argv[2] = buf.ToBuffer().ToLocalChecked();
631727
argv[3] = AddressToJS(env, addr);
632-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
728+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
633729
}
634730

635731
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

0 commit comments

Comments
 (0)