Skip to content

Commit 7db1d47

Browse files
authored
Merge f00014b into 870c76e
2 parents 870c76e + f00014b commit 7db1d47

File tree

1 file changed

+22
-21
lines changed

1 file changed

+22
-21
lines changed

iroh/src/magicsock.rs

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::{
2525
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
2626
Arc, RwLock,
2727
},
28-
task::{Context, Poll},
28+
task::{Context, Poll, Waker},
2929
time::{Duration, Instant},
3030
};
3131

@@ -1548,7 +1548,7 @@ impl Handle {
15481548

15491549
let (actor_sender, actor_receiver) = mpsc::channel(256);
15501550
let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
1551-
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_sender();
1551+
let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel();
15521552
let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);
15531553

15541554
// load the node data
@@ -1743,27 +1743,17 @@ enum DiscoBoxError {
17431743
///
17441744
/// These includes the waker coordination required to support [`AsyncUdpSocket::try_send`]
17451745
/// and [`quinn::UdpPoller::poll_writable`].
1746-
///
1747-
/// Note that this implementation has several bugs in them, but they have existed for rather
1748-
/// a while:
1749-
///
1750-
/// - There can be multiple senders, which all have to be woken if they were blocked. But
1751-
/// only the last sender to install the waker is unblocked.
1752-
///
1753-
/// - poll_writable may return blocking when it doesn't need to. Leaving the sender stuck
1754-
/// until another recv is called (which hopefully would happen soon given that the channel
1755-
/// is probably still rather full, but still).
1756-
fn relay_datagram_sender() -> (
1746+
fn relay_datagram_send_channel() -> (
17571747
RelayDatagramSendChannelSender,
17581748
RelayDatagramSendChannelReceiver,
17591749
) {
17601750
let (sender, receiver) = mpsc::channel(256);
1761-
let waker = Arc::new(AtomicWaker::new());
1751+
let wakers = Arc::new(std::sync::Mutex::new(Vec::new()));
17621752
let tx = RelayDatagramSendChannelSender {
17631753
sender,
1764-
waker: waker.clone(),
1754+
wakers: wakers.clone(),
17651755
};
1766-
let rx = RelayDatagramSendChannelReceiver { receiver, waker };
1756+
let rx = RelayDatagramSendChannelReceiver { receiver, wakers };
17671757
(tx, rx)
17681758
}
17691759

@@ -1774,7 +1764,7 @@ fn relay_datagram_sender() -> (
17741764
#[derive(Debug, Clone)]
17751765
struct RelayDatagramSendChannelSender {
17761766
sender: mpsc::Sender<RelaySendItem>,
1777-
waker: Arc<AtomicWaker>,
1767+
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
17781768
}
17791769

17801770
impl RelayDatagramSendChannelSender {
@@ -1788,8 +1778,18 @@ impl RelayDatagramSendChannelSender {
17881778
fn poll_writable(&self, cx: &mut Context) -> Poll<io::Result<()>> {
17891779
match self.sender.capacity() {
17901780
0 => {
1791-
self.waker.register(cx.waker());
1792-
Poll::Pending
1781+
let mut wakers = self.wakers.lock().expect("poisoned");
1782+
if !wakers.iter().any(|waker| waker.will_wake(cx.waker())) {
1783+
wakers.push(cx.waker().clone());
1784+
}
1785+
drop(wakers);
1786+
if self.sender.capacity() != 0 {
1787+
// We "risk" a spurious wake-up in this case, but rather that
1788+
// than potentially skipping a receive.
1789+
Poll::Ready(Ok(()))
1790+
} else {
1791+
Poll::Pending
1792+
}
17931793
}
17941794
_ => Poll::Ready(Ok(())),
17951795
}
@@ -1803,13 +1803,14 @@ impl RelayDatagramSendChannelSender {
18031803
#[derive(Debug)]
18041804
struct RelayDatagramSendChannelReceiver {
18051805
receiver: mpsc::Receiver<RelaySendItem>,
1806-
waker: Arc<AtomicWaker>,
1806+
wakers: Arc<std::sync::Mutex<Vec<Waker>>>,
18071807
}
18081808

18091809
impl RelayDatagramSendChannelReceiver {
18101810
async fn recv(&mut self) -> Option<RelaySendItem> {
18111811
let item = self.receiver.recv().await;
1812-
self.waker.wake();
1812+
let mut wakers = self.wakers.lock().expect("poisoned");
1813+
wakers.drain(..).for_each(Waker::wake);
18131814
item
18141815
}
18151816
}

0 commit comments

Comments
 (0)