@@ -564,20 +564,19 @@ void WebSocket::dispatchOpen(jsg::Lock& js) {
564
564
void WebSocket::ensurePumping (jsg::Lock& js) {
565
565
auto & native = *farNative;
566
566
if (!native.isPumping ) {
567
- native.isPumping = true ;
568
567
auto & context = IoContext::current ();
569
568
auto & accepted = KJ_ASSERT_NONNULL (native.state .tryGet <Accepted>());
570
569
auto promise = kj::evalNow ([&]() {
571
- return accepted.canceler .wrap (pump (context, *outgoingMessages, *accepted.ws ));
570
+ return accepted.canceler .wrap (pump (context, *outgoingMessages, *accepted.ws , native ));
572
571
});
572
+
573
573
// TODO(cleanup): We use awaitIoLegacy() here because we don't want this to count as a pending
574
574
// event if this is a WebSocketPair with the other end being handled in the same isolate.
575
575
// In that case, the pump can hang if accept() is never called on the other end. Ideally,
576
576
// this scenario would be handled in-isolate using jsg::Promise, but that would take some
577
577
// refactoring.
578
578
context.awaitIoLegacy (kj::mv (promise)).then (js, [this , thisHandle = JSG_THIS](jsg::Lock& js) {
579
579
auto & native = *farNative;
580
- native.isPumping = false ;
581
580
if (native.outgoingAborted ) {
582
581
// Apparently, the peer stopped accepting messages (probably, disconnected entirely), but
583
582
// this didn't cause our writes to fail, maybe due to timing. Let's set the error now.
@@ -588,26 +587,11 @@ void WebSocket::ensurePumping(jsg::Lock& js) {
588
587
native.state .init <Released>();
589
588
}
590
589
}, [this ](jsg::Lock& js, jsg::Value&& exception) mutable {
591
- farNative->isPumping = false ;
592
- outgoingMessages->clear ();
593
590
reportError (js, kj::mv (exception));
594
591
});
595
592
}
596
593
}
597
594
598
- kj::Promise<void > WebSocket::pump (
599
- IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws) {
600
- if (outgoingMessages.size () == 0 ) {
601
- return kj::READY_NOW;
602
- } else KJ_IF_MAYBE (promise, outgoingMessages.ordered ().begin ()->outputLock ) {
603
- return promise->then ([&context, &outgoingMessages, &ws]() mutable {
604
- return pumpAfterFrontOutputLock (context, outgoingMessages, ws);
605
- });
606
- } else {
607
- return pumpAfterFrontOutputLock (context, outgoingMessages, ws);
608
- }
609
- }
610
-
611
595
namespace {
612
596
size_t countBytesFromMessage (const kj::WebSocket::Message& message) {
613
597
// This does not count the extra data of the RPC frame or the savings from any compression.
@@ -633,37 +617,48 @@ size_t countBytesFromMessage(const kj::WebSocket::Message& message) {
633
617
}
634
618
}
635
619
636
- kj::Promise<void > WebSocket::pumpAfterFrontOutputLock (
637
- IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws) {
638
- GatedMessage gatedMessage =
639
- outgoingMessages.release (*outgoingMessages.ordered ().begin ());
640
- auto size = countBytesFromMessage (gatedMessage.message );
620
+ kj::Promise<void > WebSocket::pump (
621
+ IoContext& context, OutgoingMessagesMap& outgoingMessages, kj::WebSocket& ws, Native& native) {
622
+ KJ_ASSERT (!native.isPumping );
623
+ native.isPumping = true ;
624
+ KJ_DEFER ({
625
+ // We use a KJ_DEFER to set native.isPumping = false to ensure that it happens -- we had a bug
626
+ // in the past where this was handled by the caller of WebSocket::pump() and it allowed for
627
+ // messages to get stuck in `outgoingMessages` until the pump task was restarted.
628
+ native.isPumping = false ;
629
+
630
+ // Either we were already through all our outgoing messages or we experienced failure/
631
+ // cancellation and cannot send these anyway.
632
+ outgoingMessages.clear ();
633
+ });
634
+
635
+ while (outgoingMessages.size () > 0 ) {
636
+ GatedMessage gatedMessage = outgoingMessages.release (*outgoingMessages.ordered ().begin ());
637
+ KJ_IF_MAYBE (promise, gatedMessage.outputLock ) {
638
+ co_await *promise;
639
+ }
640
+
641
+ auto size = countBytesFromMessage (gatedMessage.message );
641
642
642
- kj::Promise<void > promise = nullptr ;
643
- {
644
643
KJ_SWITCH_ONEOF (gatedMessage.message ) {
645
644
KJ_CASE_ONEOF (text, kj::String) {
646
- promise = ws.send (text);
645
+ co_await ws.send (text);
647
646
break ;
648
647
}
649
648
KJ_CASE_ONEOF (data, kj::Array<byte>) {
650
- promise = ws.send (data);
649
+ co_await ws.send (data);
651
650
break ;
652
651
}
653
652
KJ_CASE_ONEOF (close, kj::WebSocket::Close) {
654
- promise = ws.close (close.code , close.reason );
653
+ co_await ws.close (close.code , close.reason );
655
654
break ;
656
655
}
657
656
}
658
- }
659
657
660
- return promise.attach (kj::mv (gatedMessage.message ))
661
- .then ([&context, &outgoingMessages, &ws, size]() {
662
658
KJ_IF_MAYBE (a, context.getActor ()) {
663
659
a->getMetrics ().sentWebSocketMessage (size);
664
660
}
665
- return pump (context, outgoingMessages, ws);
666
- });
661
+ }
667
662
}
668
663
669
664
kj::Promise<void > WebSocket::readLoop (kj::WebSocket& ws) {
0 commit comments