Skip to content

fix(iroh): Re-batch received relay datagram batches in case they exceed max_receive_segments #3414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions iroh-relay/src/protos/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,49 @@ impl<T: AsRef<[u8]>> From<T> for Datagrams {
}

impl Datagrams {
/// Splits the current datagram into at maximum `num_segments` segments, potentially returning
/// the batch with at most `num_segments` and leaving only the rest in `self`.
///
/// Calling this on a datagram batch that only contains a single datagram (`segment_size == None`)
/// will result in returning essentially `Some(self.clone())`, while making `self` empty afterwards.
///
/// Calling this on a datagram batch with e.g. 15 datagrams with `num_segments == 10` will
/// result in returning `Some(datagram_batch)` where that `datagram_batch` contains the first
/// 10 datagrams and `self` contains the remaining 5 datagrams.
///
/// Calling this on a datagram batch that doesn't contain `num_segments` datagrams, but less
/// will result in making `self` empty and returning essentially a clone of `self`.
///
/// Calling this on an empty datagram batch (i.e. one where `contents.is_empty()`) will return `None`.
pub fn take_segments(&mut self, num_segments: usize) -> Option<Datagrams> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goodness I love Bytes.

if self.contents.is_empty() {
return None;
}

let Some(segment_size) = self.segment_size else {
let contents = std::mem::take(&mut self.contents);
return Some(Datagrams {
ecn: self.ecn,
segment_size: None,
contents,
});
};

let usize_segment_size = usize::from(u16::from(segment_size));
let max_content_len = num_segments * usize_segment_size;
let contents = self
.contents
.split_to(std::cmp::min(max_content_len, self.contents.len()));

let is_datagram_batch = num_segments > 1 && usize_segment_size < contents.len();

Some(Datagrams {
ecn: self.ecn,
segment_size: is_datagram_batch.then_some(segment_size),
contents,
})
}

fn write_to<O: BufMut>(&self, mut dst: O) -> O {
let ecn = self.ecn.map_or(0, |ecn| ecn as u8);
dst.put_u8(ecn);
Expand Down
4 changes: 3 additions & 1 deletion iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ enum Commands {

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let cli = Cli::parse();
match cli.command {
Commands::Provide {
Expand Down
8 changes: 5 additions & 3 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
pin::Pin,
sync::{
Arc, Mutex, RwLock,
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
},
task::{Context, Poll},
};
Expand Down Expand Up @@ -1263,6 +1263,7 @@ impl Handle {

let my_relay = Watchable::new(None);
let ipv6_reported = Arc::new(AtomicBool::new(false));
let max_receive_segments = Arc::new(AtomicUsize::new(1));

let relay_transport = RelayTransport::new(RelayActorConfig {
my_relay: my_relay.clone(),
Expand All @@ -1271,6 +1272,7 @@ impl Handle {
dns_resolver: dns_resolver.clone(),
proxy_url: proxy_url.clone(),
ipv6_reported: ipv6_reported.clone(),
max_receive_segments: max_receive_segments.clone(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify,
metrics: metrics.magicsock.clone(),
Expand All @@ -1282,9 +1284,9 @@ impl Handle {
let ipv6 = ip_transports.iter().any(|t| t.bind_addr().is_ipv6());

#[cfg(not(wasm_browser))]
let transports = Transports::new(ip_transports, relay_transports);
let transports = Transports::new(ip_transports, relay_transports, max_receive_segments);
#[cfg(wasm_browser)]
let transports = Transports::new(relay_transports);
let transports = Transports::new(relay_transports, max_receive_segments);

let (disco, disco_receiver) = DiscoState::new(secret_encryption_key);

Expand Down
8 changes: 7 additions & 1 deletion iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) struct Transports {
ip: Vec<IpTransport>,
relay: Vec<RelayTransport>,

max_receive_segments: Arc<AtomicUsize>,
poll_recv_counter: AtomicUsize,
}

Expand Down Expand Up @@ -61,11 +62,13 @@ impl Transports {
pub(crate) fn new(
#[cfg(not(wasm_browser))] ip: Vec<IpTransport>,
relay: Vec<RelayTransport>,
max_receive_segments: Arc<AtomicUsize>,
) -> Self {
Self {
#[cfg(not(wasm_browser))]
ip,
relay,
max_receive_segments,
poll_recv_counter: Default::default(),
}
}
Expand Down Expand Up @@ -196,6 +199,7 @@ impl Transports {

#[cfg(not(wasm_browser))]
pub(crate) fn max_receive_segments(&self) -> usize {
use std::sync::atomic::Ordering::Relaxed;
// `max_receive_segments` controls the size of the `RecvMeta` buffer
// that quinn creates. Having buffers slightly bigger than necessary
// isn't terrible, and makes sure a single socket can read the maximum
Expand All @@ -204,7 +208,9 @@ impl Transports {
// and it's impossible and unnecessary to be refactored that way.

let res = self.ip.iter().map(|t| t.max_receive_segments()).max();
res.unwrap_or(1)
let segments = res.unwrap_or(1);
self.max_receive_segments.store(segments, Relaxed);
segments
}

#[cfg(wasm_browser)]
Expand Down
19 changes: 19 additions & 0 deletions iroh/src/magicsock/transports/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ impl RelayTransport {
}
};

if buf_out.len() < dm.datagrams.contents.len() {
// Our receive buffer isn't big enough to process this datagram.
// Continuing would cause a panic.
warn!(
quinn_buf_len = buf_out.len(),
datagram_len = dm.datagrams.contents.len(),
segment_size = ?dm.datagrams.segment_size,
"dropping received datagram: quinn buffer too small"
);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One, in my opinion large, downside of your approach is that you have to plumb the signal of how large this an be all the way through to the ActiveRelayActor. I find all that plumbing really unfortunate and complex.

Have you considered having an Option<Datagrams> on the RelayTransport itself? I think that way you could store the split off ones here locally. When you get called next you first consume that and then continue reading off the channel. Would that not be a lot simpler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't consider that.
But thinking about how I'd do this, I'm somewhat afraid of messing up the waking logic in that case.
I'd prefer not to have to mess with wakers :S

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think this is very similar to the ipv6_reported: Arc<AtomicBool> we already use. I tried to move the max_receive_segments declarations close to that if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to mess with wakers? When you receive something that doesn't fit you will return a Poll::Ready and no waker is expected to be installed. Next time it gets called it again returns Poll::Ready because there's something in the Option. When the Option is empty you poll the channel and things behave as before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. I'll look into following up this PR with a little bit of a cleanup.

// In theory we could put some logic in here to fragment the datagram in case
// we still have enough room in our `buf_out` left to fit a couple of
// `dm.datagrams.segment_size`es, but we *should* have cut those datagrams
// to appropriate sizes earlier in the pipeline (just before we put them
// into the `relay_datagram_recv_queue` in the `ActiveRelayActor`).
// So the only case in which this happens is we receive a datagram via the relay
// that's essentially bigger than our configured `max_udp_payload_size`.
// In that case we drop it and let MTU discovery take over.
}
buf_out[..dm.datagrams.contents.len()].copy_from_slice(&dm.datagrams.contents);
meta_out.len = dm.datagrams.contents.len();
meta_out.stride = dm
Expand Down
102 changes: 93 additions & 9 deletions iroh/src/magicsock/transports/relay/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
pin::{Pin, pin},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
},
};

Expand Down Expand Up @@ -149,6 +149,8 @@ struct ActiveRelayActor {
/// last datagram sent to the relay, received datagrams will trigger QUIC ACKs which is
/// sufficient to keep active connections open.
inactive_timeout: Pin<Box<time::Sleep>>,
/// The last known value for the magic socket's `AsyncUdpSocket::max_receive_segments`.
max_receive_segments: Arc<AtomicUsize>,
/// Token indicating the [`ActiveRelayActor`] should stop.
stop_token: CancellationToken,
metrics: Arc<MagicsockMetrics>,
Expand Down Expand Up @@ -193,6 +195,7 @@ struct ActiveRelayActorOptions {
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
relay_datagrams_recv: mpsc::Sender<RelayRecvDatagram>,
connection_opts: RelayConnectionOptions,
max_receive_segments: Arc<AtomicUsize>,
stop_token: CancellationToken,
metrics: Arc<MagicsockMetrics>,
}
Expand Down Expand Up @@ -276,6 +279,7 @@ impl ActiveRelayActor {
relay_datagrams_send,
relay_datagrams_recv,
connection_opts,
max_receive_segments,
stop_token,
metrics,
} = opts;
Expand All @@ -289,6 +293,7 @@ impl ActiveRelayActor {
relay_client_builder,
is_home_relay: false,
inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)),
max_receive_segments,
stop_token,
metrics,
}
Expand Down Expand Up @@ -624,7 +629,7 @@ impl ActiveRelayActor {
metrics.send_relay.inc_by(item.datagrams.contents.len() as _);
Ok(ClientToRelayMsg::Datagrams {
dst_node_id: item.remote_node,
datagrams: item.datagrams
datagrams: item.datagrams,
})
});
let mut packet_stream = n0_future::stream::iter(packet_iter);
Expand Down Expand Up @@ -678,12 +683,28 @@ impl ActiveRelayActor {
state.last_packet_src = Some(remote_node_id);
state.nodes_present.insert(remote_node_id);
}
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
url: self.url.clone(),
src: remote_node_id,

let max_segments = self
.max_receive_segments
.load(std::sync::atomic::Ordering::Relaxed);
// We might receive a datagram batch that's bigger than our magic socket's
// `AsyncUdpSocket::max_receive_segments`, if the other endpoint behind the relay
// has a higher `AsyncUdpSocket::max_transmit_segments` than we do.
// This happens e.g. when a linux machine (max transmit segments is usually 64)
// talks to a windows machine or a macos machine (max transmit segments is usually 1).
let re_batched = DatagramReBatcher {
max_segments,
datagrams,
}) {
warn!("Dropping received relay packet: {err:#}");
};
for datagrams in re_batched {
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
url: self.url.clone(),
src: remote_node_id,
datagrams,
}) {
warn!("Dropping received relay packet: {err:#}");
break; // No need to hot-loop in that case.
}
}
}
RelayToClientMsg::NodeGone(node_id) => {
Expand Down Expand Up @@ -859,6 +880,8 @@ pub struct Config {
pub proxy_url: Option<Url>,
/// If the last net_report report, reports IPv6 to be available.
pub ipv6_reported: Arc<AtomicBool>,
/// The last known return value of the magic socket's `AsyncUdpSocket::max_receive_segments` value
pub max_receive_segments: Arc<AtomicUsize>,
#[cfg(any(test, feature = "test-utils"))]
pub insecure_skip_relay_cert_verify: bool,
pub metrics: Arc<MagicsockMetrics>,
Expand Down Expand Up @@ -1114,6 +1137,7 @@ impl RelayActor {
relay_datagrams_send: send_datagram_rx,
relay_datagrams_recv: self.relay_datagram_recv_queue.clone(),
connection_opts,
max_receive_segments: self.config.max_receive_segments.clone(),
stop_token: self.cancel_token.child_token(),
metrics: self.config.metrics.clone(),
};
Expand Down Expand Up @@ -1224,13 +1248,35 @@ pub(crate) struct RelayRecvDatagram {
pub(crate) src: NodeId,
pub(crate) datagrams: Datagrams,
}

/// Turns a datagrams batch into multiple datagram batches of maximum `max_segments` size.
///
/// If the given datagram isn't batched, it just returns that datagram once.
struct DatagramReBatcher {
max_segments: usize,
datagrams: Datagrams,
}

impl Iterator for DatagramReBatcher {
type Item = Datagrams;

fn next(&mut self) -> Option<Self::Item> {
self.datagrams.take_segments(self.max_segments)
}
}

#[cfg(test)]
mod tests {
use std::{
sync::{Arc, atomic::AtomicBool},
num::NonZeroU16,
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize},
},
time::Duration,
};

use bytes::Bytes;
use iroh_base::{NodeId, RelayUrl, SecretKey};
use iroh_relay::{PingTracker, protos::relay::Datagrams};
use n0_snafu::{Error, Result, ResultExt};
Expand All @@ -1244,7 +1290,9 @@ mod tests {
RELAY_INACTIVE_CLEANUP_TIME, RelayConnectionOptions, RelayRecvDatagram, RelaySendItem,
UNDELIVERABLE_DATAGRAM_TIMEOUT,
};
use crate::{dns::DnsResolver, test_utils};
use crate::{
dns::DnsResolver, magicsock::transports::relay::actor::DatagramReBatcher, test_utils,
};

/// Starts a new [`ActiveRelayActor`].
#[allow(clippy::too_many_arguments)]
Expand All @@ -1271,6 +1319,7 @@ mod tests {
prefer_ipv6: Arc::new(AtomicBool::new(true)),
insecure_skip_cert_verify: true,
},
max_receive_segments: Arc::new(AtomicUsize::new(1)),
stop_token,
metrics: Default::default(),
};
Expand Down Expand Up @@ -1569,4 +1618,39 @@ mod tests {
let res = tokio::time::timeout(Duration::from_secs(10), tracker.timeout()).await;
assert!(res.is_err(), "ping timeout should only happen once");
}

fn run_datagram_re_batcher(max_segments: usize, expected_lengths: Vec<usize>) {
let contents = Bytes::from_static(
b"Hello world! There's lots of stuff to talk about when you need a big buffer.",
);
let datagrams = Datagrams {
contents: contents.clone(),
ecn: None,
segment_size: NonZeroU16::new(10),
};

let re_batched_lengths = DatagramReBatcher {
datagrams,
max_segments,
}
.map(|d| d.contents.len())
.collect::<Vec<_>>();

assert_eq!(expected_lengths, re_batched_lengths);
}

#[test]
fn test_datagram_re_batcher_small_batches() {
run_datagram_re_batcher(3, vec![30, 30, 16]);
}

#[test]
fn test_datagram_re_batcher_batch_full() {
run_datagram_re_batcher(10, vec![76]);
}

#[test]
fn test_datagram_re_batcher_unbatch() {
run_datagram_re_batcher(1, vec![10, 10, 10, 10, 10, 10, 10, 6]);
}
}
Loading