Skip to content

Commit c336d40

Browse files
authored
Merge pull request #379 from cloudflare/bcaimano/web-socket-pump
Reset WebSocket outgoing message status in a single continuation
2 parents c417d75 + 2b857f8 commit c336d40

File tree

2 files changed

+44
-51
lines changed

2 files changed

+44
-51
lines changed

src/workerd/api/web-socket.c++

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -437,22 +437,22 @@ void WebSocket::send(jsg::Lock& js, kj::OneOf<kj::Array<byte>, kj::String> messa
437437
JSG_REQUIRE(native.state.is<Accepted>(), TypeError,
438438
"You must call accept() on this WebSocket before sending messages.");
439439

440-
KJ_SWITCH_ONEOF(message) {
441-
KJ_CASE_ONEOF(text, kj::String) {
442-
outgoingMessages->insert(GatedMessage{
443-
IoContext::current().waitForOutputLocksIfNecessary(),
444-
kj::mv(text),
445-
});
446-
break;
447-
}
448-
KJ_CASE_ONEOF(data, kj::Array<byte>) {
449-
outgoingMessages->insert(GatedMessage{
450-
IoContext::current().waitForOutputLocksIfNecessary(),
451-
kj::mv(data),
452-
});
453-
break;
440+
auto maybeOutputLock = IoContext::current().waitForOutputLocksIfNecessary();
441+
auto msg = [&]() -> kj::WebSocket::Message {
442+
KJ_SWITCH_ONEOF(message) {
443+
KJ_CASE_ONEOF(text, kj::String) {
444+
return kj::mv(text);
445+
break;
446+
}
447+
KJ_CASE_ONEOF(data, kj::Array<byte>) {
448+
return kj::mv(data);
449+
break;
450+
}
454451
}
455-
}
452+
453+
KJ_UNREACHABLE;
454+
}();
455+
outgoingMessages->insert(GatedMessage{kj::mv(maybeOutputLock), kj::mv(msg)});
456456

457457
ensurePumping(js);
458458
}
@@ -564,20 +564,19 @@ void WebSocket::dispatchOpen(jsg::Lock& js) {
564564
void WebSocket::ensurePumping(jsg::Lock& js) {
565565
auto& native = *farNative;
566566
if (!native.isPumping) {
567-
native.isPumping = true;
568567
auto& context = IoContext::current();
569568
auto& accepted = KJ_ASSERT_NONNULL(native.state.tryGet<Accepted>());
570569
auto promise = kj::evalNow([&]() {
571-
return accepted.canceler.wrap(pump(context, *outgoingMessages, *accepted.ws));
570+
return accepted.canceler.wrap(pump(context, *outgoingMessages, *accepted.ws, native));
572571
});
572+
573573
// TODO(cleanup): We use awaitIoLegacy() here because we don't want this to count as a pending
574574
// event if this is a WebSocketPair with the other end being handled in the same isolate.
575575
// In that case, the pump can hang if accept() is never called on the other end. Ideally,
576576
// this scenario would be handled in-isolate using jsg::Promise, but that would take some
577577
// refactoring.
578578
context.awaitIoLegacy(kj::mv(promise)).then(js, [this, thisHandle = JSG_THIS](jsg::Lock& js) {
579579
auto& native = *farNative;
580-
native.isPumping = false;
581580
if (native.outgoingAborted) {
582581
// Apparently, the peer stopped accepting messages (probably, disconnected entirely), but
583582
// this didn't cause our writes to fail, maybe due to timing. Let's set the error now.
@@ -588,26 +587,11 @@ void WebSocket::ensurePumping(jsg::Lock& js) {
588587
native.state.init<Released>();
589588
}
590589
}, [this](jsg::Lock& js, jsg::Value&& exception) mutable {
591-
farNative->isPumping = false;
592-
outgoingMessages->clear();
593590
reportError(js, kj::mv(exception));
594591
});
595592
}
596593
}
597594

598-
kj::Promise<void> WebSocket::pump(
599-
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws) {
600-
if (outgoingMessages.size() == 0) {
601-
return kj::READY_NOW;
602-
} else KJ_IF_MAYBE(promise, outgoingMessages.ordered().begin()->outputLock) {
603-
return promise->then([&context, &outgoingMessages, &ws]() mutable {
604-
return pumpAfterFrontOutputLock(context, outgoingMessages, ws);
605-
});
606-
} else {
607-
return pumpAfterFrontOutputLock(context, outgoingMessages, ws);
608-
}
609-
}
610-
611595
namespace {
612596
size_t countBytesFromMessage(const kj::WebSocket::Message& message) {
613597
// This does not count the extra data of the RPC frame or the savings from any compression.
@@ -633,37 +617,48 @@ size_t countBytesFromMessage(const kj::WebSocket::Message& message) {
633617
}
634618
}
635619

636-
kj::Promise<void> WebSocket::pumpAfterFrontOutputLock(
637-
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws) {
638-
GatedMessage gatedMessage =
639-
outgoingMessages.release(*outgoingMessages.ordered().begin());
640-
auto size = countBytesFromMessage(gatedMessage.message);
620+
kj::Promise<void> WebSocket::pump(
621+
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws, Native& native) {
622+
KJ_ASSERT(!native.isPumping);
623+
native.isPumping = true;
624+
KJ_DEFER({
625+
// We use a KJ_DEFER to set native.isPumping = false to ensure that it happens -- we had a bug
626+
// in the past where this was handled by the caller of WebSocket::pump() and it allowed for
627+
// messages to get stuck in `outgoingMessages` until the pump task was restarted.
628+
native.isPumping = false;
629+
630+
// Either we were already through all our outgoing messages or we experienced failure/
631+
// cancellation and cannot send these anyway.
632+
outgoingMessages.clear();
633+
});
634+
635+
while (outgoingMessages.size() > 0) {
636+
GatedMessage gatedMessage = outgoingMessages.release(*outgoingMessages.ordered().begin());
637+
KJ_IF_MAYBE(promise, gatedMessage.outputLock) {
638+
co_await *promise;
639+
}
640+
641+
auto size = countBytesFromMessage(gatedMessage.message);
641642

642-
kj::Promise<void> promise = nullptr;
643-
{
644643
KJ_SWITCH_ONEOF(gatedMessage.message) {
645644
KJ_CASE_ONEOF(text, kj::String) {
646-
promise = ws.send(text);
645+
co_await ws.send(text);
647646
break;
648647
}
649648
KJ_CASE_ONEOF(data, kj::Array<byte>) {
650-
promise = ws.send(data);
649+
co_await ws.send(data);
651650
break;
652651
}
653652
KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
654-
promise = ws.close(close.code, close.reason);
653+
co_await ws.close(close.code, close.reason);
655654
break;
656655
}
657656
}
658-
}
659657

660-
return promise.attach(kj::mv(gatedMessage.message))
661-
.then([&context, &outgoingMessages, &ws, size]() {
662658
KJ_IF_MAYBE(a, context.getActor()) {
663659
a->getMetrics().sentWebSocketMessage(size);
664660
}
665-
return pump(context, outgoingMessages, ws);
666-
});
661+
}
667662
}
668663

669664
kj::Promise<void> WebSocket::readLoop(kj::WebSocket& ws) {

src/workerd/api/web-socket.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,7 @@ class WebSocket: public EventTarget {
346346
void ensurePumping(jsg::Lock& js);
347347

348348
static kj::Promise<void> pump(
349-
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws);
350-
static kj::Promise<void> pumpAfterFrontOutputLock(
351-
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws);
349+
IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws, Native& native);
352350
// Write messages from `outgoingMessages` into `ws`.
353351
//
354352
// These are not necessarily called under isolate lock, but they are called on the given

0 commit comments

Comments
 (0)