From fbba1ca5998fb7c0a3e4e7e332ec7ac5878989ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 10 Jan 2025 14:14:19 +0100 Subject: [PATCH 1/2] fix: Try improving `relay_datagram_send_channel()` --- iroh/src/magicsock.rs | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 0bef9bd7b7b..6ffc509c56e 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -25,7 +25,7 @@ use std::{ atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, - task::{Context, Poll}, + task::{Context, Poll, Waker}, time::{Duration, Instant}, }; @@ -1548,7 +1548,7 @@ impl Handle { let (actor_sender, actor_receiver) = mpsc::channel(256); let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256); - let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_sender(); + let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel(); let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); // load the node data @@ -1743,27 +1743,17 @@ enum DiscoBoxError { /// /// These includes the waker coordination required to support [`AsyncUdpSocket::try_send`] /// and [`quinn::UdpPoller::poll_writable`]. -/// -/// Note that this implementation has several bugs in them, but they have existed for rather -/// a while: -/// -/// - There can be multiple senders, which all have to be woken if they were blocked. But -/// only the last sender to install the waker is unblocked. -/// -/// - poll_writable may return blocking when it doesn't need to. Leaving the sender stuck -/// until another recv is called (which hopefully would happen soon given that the channel -/// is probably still rather full, but still). -fn relay_datagram_sender() -> ( +fn relay_datagram_send_channel() -> ( RelayDatagramSendChannelSender, RelayDatagramSendChannelReceiver, ) { let (sender, receiver) = mpsc::channel(256); - let waker = Arc::new(AtomicWaker::new()); + let wakers = Arc::new(std::sync::Mutex::new(Vec::new())); let tx = RelayDatagramSendChannelSender { sender, - waker: waker.clone(), + wakers: wakers.clone(), }; - let rx = RelayDatagramSendChannelReceiver { receiver, waker }; + let rx = RelayDatagramSendChannelReceiver { receiver, wakers }; (tx, rx) } @@ -1774,7 +1764,7 @@ fn relay_datagram_sender() -> ( #[derive(Debug, Clone)] struct RelayDatagramSendChannelSender { sender: mpsc::Sender, - waker: Arc, + wakers: Arc>>, } impl RelayDatagramSendChannelSender { @@ -1788,8 +1778,18 @@ impl RelayDatagramSendChannelSender { fn poll_writable(&self, cx: &mut Context) -> Poll> { match self.sender.capacity() { 0 => { - self.waker.register(cx.waker()); - Poll::Pending + let mut wakers = self.wakers.lock().expect("poisoned"); + if !wakers.iter().any(|waker| waker.will_wake(cx.waker())) { + wakers.push(cx.waker().clone()); + } + drop(wakers); + if self.sender.capacity() != 0 { + // We "risk" a suprious wake-up in this case, but rather that + // than potentially skipping a receive. + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } _ => Poll::Ready(Ok(())), } @@ -1803,13 +1803,14 @@ impl RelayDatagramSendChannelSender { #[derive(Debug)] struct RelayDatagramSendChannelReceiver { receiver: mpsc::Receiver, - waker: Arc, + wakers: Arc>>, } impl RelayDatagramSendChannelReceiver { async fn recv(&mut self) -> Option { let item = self.receiver.recv().await; - self.waker.wake(); + let mut wakers = self.wakers.lock().expect("poisoned"); + wakers.drain(..).for_each(Waker::wake); item } } From f00014bfbc106dfcc7f4e40090513ab11547c57f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 10 Jan 2025 14:39:09 +0100 Subject: [PATCH 2/2] typo --- iroh/src/magicsock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 6ffc509c56e..66c1c0c4fff 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -1784,7 +1784,7 @@ impl RelayDatagramSendChannelSender { } drop(wakers); if self.sender.capacity() != 0 { - // We "risk" a suprious wake-up in this case, but rather that + // We "risk" a spurious wake-up in this case, but rather that // than potentially skipping a receive. Poll::Ready(Ok(())) } else {