Skip to content

Commit c15d489

Browse files
authored
Merge 5d5398e into f3e4307
2 parents f3e4307 + 5d5398e commit c15d489

File tree

6 files changed

+170
-14
lines changed

6 files changed

+170
-14
lines changed

iroh-relay/src/protos/relay.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,49 @@ impl<T: AsRef<[u8]>> From<T> for Datagrams {
157157
}
158158

159159
impl Datagrams {
160+
/// Splits the current datagram into at maximum `num_segments` segments, potentially returning
161+
/// the batch with at most `num_segments` and leaving only the rest in `self`.
162+
///
163+
/// Calling this on a datagram batch that only contains a single datagram (`segment_size == None`)
164+
/// will result in returning essentially `Some(self.clone())`, while making `self` empty afterwards.
165+
///
166+
/// Calling this on a datagram batch with e.g. 15 datagrams with `num_segments == 10` will
167+
/// result in returning `Some(datagram_batch)` where that `datagram_batch` contains the first
168+
/// 10 datagrams and `self` contains the remaining 5 datagrams.
169+
///
170+
/// Calling this on a datagram batch that doesn't contain `num_segments` datagrams, but less
171+
/// will result in making `self` empty and returning essentially a clone of `self`.
172+
///
173+
/// Calling this on an empty datagram batch (i.e. one where `contents.is_empty()`) will return `None`.
174+
pub fn take_segments(&mut self, num_segments: usize) -> Option<Datagrams> {
175+
if self.contents.is_empty() {
176+
return None;
177+
}
178+
179+
let Some(segment_size) = self.segment_size else {
180+
let contents = std::mem::take(&mut self.contents);
181+
return Some(Datagrams {
182+
ecn: self.ecn,
183+
segment_size: None,
184+
contents,
185+
});
186+
};
187+
188+
let usize_segment_size = usize::from(u16::from(segment_size));
189+
let max_content_len = num_segments * usize_segment_size;
190+
let contents = self
191+
.contents
192+
.split_to(std::cmp::min(max_content_len, self.contents.len()));
193+
194+
let is_datagram_batch = num_segments > 1 && usize_segment_size < contents.len();
195+
196+
Some(Datagrams {
197+
ecn: self.ecn,
198+
segment_size: is_datagram_batch.then_some(segment_size),
199+
contents,
200+
})
201+
}
202+
160203
fn write_to<O: BufMut>(&self, mut dst: O) -> O {
161204
let ecn = self.ecn.map_or(0, |ecn| ecn as u8);
162205
dst.put_u8(ecn);

iroh/examples/transfer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,9 @@ enum Commands {
153153

154154
#[tokio::main]
155155
async fn main() -> Result<()> {
156-
tracing_subscriber::fmt::init();
156+
tracing_subscriber::fmt()
157+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
158+
.init();
157159
let cli = Cli::parse();
158160
match cli.command {
159161
Commands::Provide {

iroh/src/magicsock.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
pin::Pin,
2424
sync::{
2525
Arc, Mutex, RwLock,
26-
atomic::{AtomicBool, AtomicU64, Ordering},
26+
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
2727
},
2828
task::{Context, Poll},
2929
};
@@ -1263,6 +1263,7 @@ impl Handle {
12631263

12641264
let my_relay = Watchable::new(None);
12651265
let ipv6_reported = Arc::new(AtomicBool::new(false));
1266+
let max_receive_segments = Arc::new(AtomicUsize::new(1));
12661267

12671268
let relay_transport = RelayTransport::new(RelayActorConfig {
12681269
my_relay: my_relay.clone(),
@@ -1271,6 +1272,7 @@ impl Handle {
12711272
dns_resolver: dns_resolver.clone(),
12721273
proxy_url: proxy_url.clone(),
12731274
ipv6_reported: ipv6_reported.clone(),
1275+
max_receive_segments: max_receive_segments.clone(),
12741276
#[cfg(any(test, feature = "test-utils"))]
12751277
insecure_skip_relay_cert_verify,
12761278
metrics: metrics.magicsock.clone(),
@@ -1282,9 +1284,9 @@ impl Handle {
12821284
let ipv6 = ip_transports.iter().any(|t| t.bind_addr().is_ipv6());
12831285

12841286
#[cfg(not(wasm_browser))]
1285-
let transports = Transports::new(ip_transports, relay_transports);
1287+
let transports = Transports::new(ip_transports, relay_transports, max_receive_segments);
12861288
#[cfg(wasm_browser)]
1287-
let transports = Transports::new(relay_transports);
1289+
let transports = Transports::new(relay_transports, max_receive_segments);
12881290

12891291
let (disco, disco_receiver) = DiscoState::new(secret_encryption_key);
12901292

iroh/src/magicsock/transports.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub(crate) struct Transports {
3232
ip: Vec<IpTransport>,
3333
relay: Vec<RelayTransport>,
3434

35+
max_receive_segments: Arc<AtomicUsize>,
3536
poll_recv_counter: AtomicUsize,
3637
}
3738

@@ -61,11 +62,13 @@ impl Transports {
6162
pub(crate) fn new(
6263
#[cfg(not(wasm_browser))] ip: Vec<IpTransport>,
6364
relay: Vec<RelayTransport>,
65+
max_receive_segments: Arc<AtomicUsize>,
6466
) -> Self {
6567
Self {
6668
#[cfg(not(wasm_browser))]
6769
ip,
6870
relay,
71+
max_receive_segments,
6972
poll_recv_counter: Default::default(),
7073
}
7174
}
@@ -196,6 +199,7 @@ impl Transports {
196199

197200
#[cfg(not(wasm_browser))]
198201
pub(crate) fn max_receive_segments(&self) -> usize {
202+
use std::sync::atomic::Ordering::Relaxed;
199203
// `max_receive_segments` controls the size of the `RecvMeta` buffer
200204
// that quinn creates. Having buffers slightly bigger than necessary
201205
// isn't terrible, and makes sure a single socket can read the maximum
@@ -204,7 +208,9 @@ impl Transports {
204208
// and it's impossible and unnecessary to be refactored that way.
205209

206210
let res = self.ip.iter().map(|t| t.max_receive_segments()).max();
207-
res.unwrap_or(1)
211+
let segments = res.unwrap_or(1);
212+
self.max_receive_segments.store(segments, Relaxed);
213+
segments
208214
}
209215

210216
#[cfg(wasm_browser)]

iroh/src/magicsock/transports/relay.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ impl RelayTransport {
100100
}
101101
};
102102

103+
if buf_out.len() < dm.datagrams.contents.len() {
104+
// Our receive buffer isn't big enough to process this datagram.
105+
// Continuing would cause a panic.
106+
warn!(
107+
quinn_buf_len = buf_out.len(),
108+
datagram_len = dm.datagrams.contents.len(),
109+
segment_size = ?dm.datagrams.segment_size,
110+
"dropping received datagram: quinn buffer too small"
111+
);
112+
break;
113+
// In theory we could put some logic in here to fragment the datagram in case
114+
// we still have enough room in our `buf_out` left to fit a couple of
115+
// `dm.datagrams.segment_size`es, but we *should* have cut those datagrams
116+
// to appropriate sizes earlier in the pipeline (just before we put them
117+
// into the `relay_datagram_recv_queue` in the `ActiveRelayActor`).
118+
// So the only case in which this happens is we receive a datagram via the relay
119+
// that's essentially bigger than our configured `max_udp_payload_size`.
120+
// In that case we drop it and let MTU discovery take over.
121+
}
103122
buf_out[..dm.datagrams.contents.len()].copy_from_slice(&dm.datagrams.contents);
104123
meta_out.len = dm.datagrams.contents.len();
105124
meta_out.stride = dm

iroh/src/magicsock/transports/relay/actor.rs

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::{
3333
pin::{Pin, pin},
3434
sync::{
3535
Arc,
36-
atomic::{AtomicBool, Ordering},
36+
atomic::{AtomicBool, AtomicUsize, Ordering},
3737
},
3838
};
3939

@@ -149,6 +149,8 @@ struct ActiveRelayActor {
149149
/// last datagram sent to the relay, received datagrams will trigger QUIC ACKs which is
150150
/// sufficient to keep active connections open.
151151
inactive_timeout: Pin<Box<time::Sleep>>,
152+
/// The last known value for the magic socket's `AsyncUdpSocket::max_receive_segments`.
153+
max_receive_segments: Arc<AtomicUsize>,
152154
/// Token indicating the [`ActiveRelayActor`] should stop.
153155
stop_token: CancellationToken,
154156
metrics: Arc<MagicsockMetrics>,
@@ -193,6 +195,7 @@ struct ActiveRelayActorOptions {
193195
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
194196
relay_datagrams_recv: mpsc::Sender<RelayRecvDatagram>,
195197
connection_opts: RelayConnectionOptions,
198+
max_receive_segments: Arc<AtomicUsize>,
196199
stop_token: CancellationToken,
197200
metrics: Arc<MagicsockMetrics>,
198201
}
@@ -276,6 +279,7 @@ impl ActiveRelayActor {
276279
relay_datagrams_send,
277280
relay_datagrams_recv,
278281
connection_opts,
282+
max_receive_segments,
279283
stop_token,
280284
metrics,
281285
} = opts;
@@ -289,6 +293,7 @@ impl ActiveRelayActor {
289293
relay_client_builder,
290294
is_home_relay: false,
291295
inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)),
296+
max_receive_segments,
292297
stop_token,
293298
metrics,
294299
}
@@ -624,7 +629,7 @@ impl ActiveRelayActor {
624629
metrics.send_relay.inc_by(item.datagrams.contents.len() as _);
625630
Ok(ClientToRelayMsg::Datagrams {
626631
dst_node_id: item.remote_node,
627-
datagrams: item.datagrams
632+
datagrams: item.datagrams,
628633
})
629634
});
630635
let mut packet_stream = n0_future::stream::iter(packet_iter);
@@ -678,12 +683,28 @@ impl ActiveRelayActor {
678683
state.last_packet_src = Some(remote_node_id);
679684
state.nodes_present.insert(remote_node_id);
680685
}
681-
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
682-
url: self.url.clone(),
683-
src: remote_node_id,
686+
687+
let max_segments = self
688+
.max_receive_segments
689+
.load(std::sync::atomic::Ordering::Relaxed);
690+
// We might receive a datagram batch that's bigger than our magic socket's
691+
// `AsyncUdpSocket::max_receive_segments`, if the other endpoint behind the relay
692+
// has a higher `AsyncUdpSocket::max_transmit_segments` than we do.
693+
// This happens e.g. when a linux machine (max transmit segments is usually 64)
694+
// talks to a windows machine or a macos machine (max transmit segments is usually 1).
695+
let re_batched = DatagramReBatcher {
696+
max_segments,
684697
datagrams,
685-
}) {
686-
warn!("Dropping received relay packet: {err:#}");
698+
};
699+
for datagrams in re_batched {
700+
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
701+
url: self.url.clone(),
702+
src: remote_node_id,
703+
datagrams,
704+
}) {
705+
warn!("Dropping received relay packet: {err:#}");
706+
break; // No need to hot-loop in that case.
707+
}
687708
}
688709
}
689710
RelayToClientMsg::NodeGone(node_id) => {
@@ -859,6 +880,8 @@ pub struct Config {
859880
pub proxy_url: Option<Url>,
860881
/// If the last net_report report, reports IPv6 to be available.
861882
pub ipv6_reported: Arc<AtomicBool>,
883+
/// The last known return value of the magic socket's `AsyncUdpSocket::max_receive_segments` value
884+
pub max_receive_segments: Arc<AtomicUsize>,
862885
#[cfg(any(test, feature = "test-utils"))]
863886
pub insecure_skip_relay_cert_verify: bool,
864887
pub metrics: Arc<MagicsockMetrics>,
@@ -1114,6 +1137,7 @@ impl RelayActor {
11141137
relay_datagrams_send: send_datagram_rx,
11151138
relay_datagrams_recv: self.relay_datagram_recv_queue.clone(),
11161139
connection_opts,
1140+
max_receive_segments: self.config.max_receive_segments.clone(),
11171141
stop_token: self.cancel_token.child_token(),
11181142
metrics: self.config.metrics.clone(),
11191143
};
@@ -1224,13 +1248,35 @@ pub(crate) struct RelayRecvDatagram {
12241248
pub(crate) src: NodeId,
12251249
pub(crate) datagrams: Datagrams,
12261250
}
1251+
1252+
/// Turns a datagrams batch into multiple datagram batches of maximum `max_segments` size.
1253+
///
1254+
/// If the given datagram isn't batched, it just returns that datagram once.
1255+
struct DatagramReBatcher {
1256+
max_segments: usize,
1257+
datagrams: Datagrams,
1258+
}
1259+
1260+
impl Iterator for DatagramReBatcher {
1261+
type Item = Datagrams;
1262+
1263+
fn next(&mut self) -> Option<Self::Item> {
1264+
self.datagrams.take_segments(self.max_segments)
1265+
}
1266+
}
1267+
12271268
#[cfg(test)]
12281269
mod tests {
12291270
use std::{
1230-
sync::{Arc, atomic::AtomicBool},
1271+
num::NonZeroU16,
1272+
sync::{
1273+
Arc,
1274+
atomic::{AtomicBool, AtomicUsize},
1275+
},
12311276
time::Duration,
12321277
};
12331278

1279+
use bytes::Bytes;
12341280
use iroh_base::{NodeId, RelayUrl, SecretKey};
12351281
use iroh_relay::{PingTracker, protos::relay::Datagrams};
12361282
use n0_snafu::{Error, Result, ResultExt};
@@ -1244,7 +1290,9 @@ mod tests {
12441290
RELAY_INACTIVE_CLEANUP_TIME, RelayConnectionOptions, RelayRecvDatagram, RelaySendItem,
12451291
UNDELIVERABLE_DATAGRAM_TIMEOUT,
12461292
};
1247-
use crate::{dns::DnsResolver, test_utils};
1293+
use crate::{
1294+
dns::DnsResolver, magicsock::transports::relay::actor::DatagramReBatcher, test_utils,
1295+
};
12481296

12491297
/// Starts a new [`ActiveRelayActor`].
12501298
#[allow(clippy::too_many_arguments)]
@@ -1271,6 +1319,7 @@ mod tests {
12711319
prefer_ipv6: Arc::new(AtomicBool::new(true)),
12721320
insecure_skip_cert_verify: true,
12731321
},
1322+
max_receive_segments: Arc::new(AtomicUsize::new(1)),
12741323
stop_token,
12751324
metrics: Default::default(),
12761325
};
@@ -1569,4 +1618,39 @@ mod tests {
15691618
let res = tokio::time::timeout(Duration::from_secs(10), tracker.timeout()).await;
15701619
assert!(res.is_err(), "ping timeout should only happen once");
15711620
}
1621+
1622+
fn run_datagram_re_batcher(max_segments: usize, expected_lengths: Vec<usize>) {
1623+
let contents = Bytes::from_static(
1624+
b"Hello world! There's lots of stuff to talk about when you need a big buffer.",
1625+
);
1626+
let datagrams = Datagrams {
1627+
contents: contents.clone(),
1628+
ecn: None,
1629+
segment_size: NonZeroU16::new(10),
1630+
};
1631+
1632+
let re_batched_lengths = DatagramReBatcher {
1633+
datagrams,
1634+
max_segments,
1635+
}
1636+
.map(|d| d.contents.len())
1637+
.collect::<Vec<_>>();
1638+
1639+
assert_eq!(expected_lengths, re_batched_lengths);
1640+
}
1641+
1642+
#[test]
1643+
fn test_datagram_re_batcher_small_batches() {
1644+
run_datagram_re_batcher(3, vec![30, 30, 16]);
1645+
}
1646+
1647+
#[test]
1648+
fn test_datagram_re_batcher_batch_full() {
1649+
run_datagram_re_batcher(10, vec![76]);
1650+
}
1651+
1652+
#[test]
1653+
fn test_datagram_re_batcher_unbatch() {
1654+
run_datagram_re_batcher(1, vec![10, 10, 10, 10, 10, 10, 10, 6]);
1655+
}
15721656
}

0 commit comments

Comments
 (0)