Skip to content

Commit 918dbc4

Browse files
authored
Merge ba9c167 into ba07bcc
2 parents ba07bcc + ba9c167 commit 918dbc4

File tree

3 files changed

+300
-58
lines changed

3 files changed

+300
-58
lines changed

iroh/src/disco.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424
};
2525

2626
use anyhow::{anyhow, bail, ensure, Context, Result};
27+
use bytes::Bytes;
2728
use data_encoding::HEXLOWER;
2829
use iroh_base::{PublicKey, RelayUrl};
2930
use serde::{Deserialize, Serialize};
@@ -102,6 +103,19 @@ pub fn source_and_box(p: &[u8]) -> Option<(PublicKey, &[u8])> {
102103
Some((sender, sealed_box))
103104
}
104105

106+
/// If `p` looks like a disco message it returns the slice of `p` that represents the disco public key source,
107+
/// and the part that is the box.
108+
pub fn source_and_box_bytes(p: &Bytes) -> Option<(PublicKey, Bytes)> {
109+
if !looks_like_disco_wrapper(p) {
110+
return None;
111+
}
112+
113+
let source = &p[MAGIC_LEN..MAGIC_LEN + KEY_LEN];
114+
let sender = PublicKey::try_from(source).ok()?;
115+
let sealed_box = p.slice(MAGIC_LEN + KEY_LEN..);
116+
Some((sender, sealed_box))
117+
}
118+
105119
/// A discovery message.
106120
#[derive(Debug, Clone, PartialEq, Eq)]
107121
pub enum Message {

iroh/src/magicsock.rs

Lines changed: 81 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,11 +1094,6 @@ impl MagicSock {
10941094
return None;
10951095
}
10961096

1097-
if self.handle_relay_disco_message(&dm.buf, &dm.url, dm.src) {
1098-
// DISCO messages are handled internally in the MagicSock, do not pass to Quinn.
1099-
return None;
1100-
}
1101-
11021097
let quic_mapped_addr = self.node_map.receive_relay(&dm.url, dm.src);
11031098

11041099
// Normalize local_ip
@@ -1119,32 +1114,6 @@ impl MagicSock {
11191114
Some((dm.src, meta, dm.buf))
11201115
}
11211116

1122-
fn handle_relay_disco_message(
1123-
&self,
1124-
msg: &[u8],
1125-
url: &RelayUrl,
1126-
relay_node_src: PublicKey,
1127-
) -> bool {
1128-
match disco::source_and_box(msg) {
1129-
Some((source, sealed_box)) => {
1130-
if relay_node_src != source {
1131-
// TODO: return here?
1132-
warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short());
1133-
}
1134-
self.handle_disco_message(
1135-
source,
1136-
sealed_box,
1137-
DiscoMessageSource::Relay {
1138-
url: url.clone(),
1139-
key: relay_node_src,
1140-
},
1141-
);
1142-
true
1143-
}
1144-
None => false,
1145-
}
1146-
}
1147-
11481117
/// Handles a discovery message.
11491118
#[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))]
11501119
fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) {
@@ -1827,7 +1796,13 @@ impl Handle {
18271796

18281797
let mut actor_tasks = JoinSet::default();
18291798

1830-
let relay_actor = RelayActor::new(msock.clone(), relay_datagram_recv_queue, relay_protocol);
1799+
let (relay_disco_recv_tx, mut relay_disco_recv_rx) = tokio::sync::mpsc::channel(1024);
1800+
let relay_actor = RelayActor::new(
1801+
msock.clone(),
1802+
relay_datagram_recv_queue,
1803+
relay_disco_recv_tx,
1804+
relay_protocol,
1805+
);
18311806
let relay_actor_cancel_token = relay_actor.cancel_token();
18321807
actor_tasks.spawn(
18331808
async move {
@@ -1837,6 +1812,23 @@ impl Handle {
18371812
}
18381813
.instrument(info_span!("relay-actor")),
18391814
);
1815+
actor_tasks.spawn({
1816+
let msock = msock.clone();
1817+
async move {
1818+
while let Some(message) = relay_disco_recv_rx.recv().await {
1819+
msock.handle_disco_message(
1820+
message.source,
1821+
&message.sealed_box,
1822+
DiscoMessageSource::Relay {
1823+
url: message.relay_url,
1824+
key: message.relay_remote_node_id,
1825+
},
1826+
);
1827+
}
1828+
debug!("relay-disco-recv actor closed");
1829+
}
1830+
.instrument(info_span!("relay-disco-recv"))
1831+
});
18401832

18411833
#[cfg(not(wasm_browser))]
18421834
let _ = actor_tasks.spawn({
@@ -2123,15 +2115,17 @@ impl RelayDatagramSendChannelReceiver {
21232115
#[derive(Debug)]
21242116
struct RelayDatagramRecvQueue {
21252117
queue: ConcurrentQueue<RelayRecvDatagram>,
2126-
waker: AtomicWaker,
2118+
recv_waker: AtomicWaker,
2119+
send_wakers: ConcurrentQueue<Waker>,
21272120
}
21282121

21292122
impl RelayDatagramRecvQueue {
21302123
/// Creates a new, empty queue with a fixed size bound of 512 items.
21312124
fn new() -> Self {
21322125
Self {
21332126
queue: ConcurrentQueue::bounded(512),
2134-
waker: AtomicWaker::new(),
2127+
recv_waker: AtomicWaker::new(),
2128+
send_wakers: ConcurrentQueue::unbounded(),
21352129
}
21362130
}
21372131

@@ -2144,10 +2138,49 @@ impl RelayDatagramRecvQueue {
21442138
item: RelayRecvDatagram,
21452139
) -> Result<(), concurrent_queue::PushError<RelayRecvDatagram>> {
21462140
self.queue.push(item).inspect(|_| {
2147-
self.waker.wake();
2141+
self.recv_waker.wake();
21482142
})
21492143
}
21502144

2145+
/// Polls for whether the queue has free slots for sending items.
2146+
///
2147+
/// If the queue has free slots, this returns [`Poll::Ready`].
2148+
/// If the queue is full, [`Poll::Pending`] is returned and the waker
2149+
/// is stored and woken once the queue has free slots.
2150+
///
2151+
/// This can be called from multiple tasks concurrently. If a slot becomes
2152+
/// available, all stored wakers will be woken simultaneously.
2153+
/// This also means that even if [`Poll::Ready`] is returned, it is not
2154+
/// guaranteed that [`Self::try_send`] will return `Ok` on the next call,
2155+
/// because another send task could have used the slot already.
2156+
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
2157+
if self.queue.is_closed() {
2158+
Poll::Ready(Err(anyhow!("Queue closed")))
2159+
} else if !self.queue.is_full() {
2160+
Poll::Ready(Ok(()))
2161+
} else {
2162+
match self.send_wakers.push(cx.waker().clone()) {
2163+
Ok(()) => Poll::Pending,
2164+
Err(concurrent_queue::PushError::Full(_)) => {
2165+
unreachable!("Send waker queue is unbounded")
2166+
}
2167+
Err(concurrent_queue::PushError::Closed(_)) => {
2168+
Poll::Ready(Err(anyhow!("Queue closed")))
2169+
}
2170+
}
2171+
}
2172+
}
2173+
2174+
async fn send_ready(&self) -> Result<()> {
2175+
std::future::poll_fn(|cx| self.poll_send_ready(cx)).await
2176+
}
2177+
2178+
fn wake_senders(&self) {
2179+
while let Ok(waker) = self.send_wakers.pop() {
2180+
waker.wake();
2181+
}
2182+
}
2183+
21512184
/// Polls for new items in the queue.
21522185
///
21532186
/// Although this method is available from `&self`, it must not be
@@ -2162,23 +2195,31 @@ impl RelayDatagramRecvQueue {
21622195
/// to be able to poll from `&self`.
21632196
fn poll_recv(&self, cx: &mut Context) -> Poll<Result<RelayRecvDatagram>> {
21642197
match self.queue.pop() {
2165-
Ok(value) => Poll::Ready(Ok(value)),
2198+
Ok(value) => {
2199+
self.wake_senders();
2200+
Poll::Ready(Ok(value))
2201+
}
21662202
Err(concurrent_queue::PopError::Empty) => {
2167-
self.waker.register(cx.waker());
2203+
self.recv_waker.register(cx.waker());
21682204

21692205
match self.queue.pop() {
21702206
Ok(value) => {
2171-
self.waker.take();
2207+
self.recv_waker.take();
2208+
self.wake_senders();
21722209
Poll::Ready(Ok(value))
21732210
}
21742211
Err(concurrent_queue::PopError::Empty) => Poll::Pending,
21752212
Err(concurrent_queue::PopError::Closed) => {
2176-
self.waker.take();
2213+
self.recv_waker.take();
2214+
self.wake_senders();
21772215
Poll::Ready(Err(anyhow!("Queue closed")))
21782216
}
21792217
}
21802218
}
2181-
Err(concurrent_queue::PopError::Closed) => Poll::Ready(Err(anyhow!("Queue closed"))),
2219+
Err(concurrent_queue::PopError::Closed) => {
2220+
self.wake_senders();
2221+
Poll::Ready(Err(anyhow!("Queue closed")))
2222+
}
21822223
}
21832224
}
21842225
}

0 commit comments

Comments
 (0)