Skip to content

Commit dd65c89

Browse files
committed
Move re-batching logic to the receiving side
1 parent ce1c82f commit dd65c89

File tree

1 file changed

+33
-24
lines changed
  • iroh/src/magicsock/transports/relay

1 file changed

+33
-24
lines changed

iroh/src/magicsock/transports/relay/actor.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -618,28 +618,18 @@ impl ActiveRelayActor {
618618
&mut send_datagrams_buf,
619619
SEND_DATAGRAM_BATCH_SIZE,
620620
) => {
621-
use std::sync::atomic::Ordering::Relaxed;
622-
623621
if count == 0 {
624622
warn!("Datagram inbox closed, shutdown");
625623
break Ok(());
626624
};
627625
self.reset_inactive_timeout();
628626
// TODO(frando): can we avoid the clone here?
629627
let metrics = self.metrics.clone();
630-
let max_segments = self.max_receive_segments.load(Relaxed);
631-
let packet_iter = send_datagrams_buf.drain(..).flat_map(|item| {
632-
let metrics = metrics.clone();
633-
let dst_node_id = item.remote_node;
634-
DatagramFragmenter {
635-
max_segments,
636-
datagram: item.datagrams,
637-
}.map(move |datagrams| {
638-
metrics.send_relay.inc_by(datagrams.contents.len() as _);
639-
Ok(ClientToRelayMsg::Datagrams {
640-
dst_node_id,
641-
datagrams
642-
})
628+
let packet_iter = send_datagrams_buf.drain(..).map(|item| {
629+
metrics.send_relay.inc_by(item.datagrams.contents.len() as _);
630+
Ok(ClientToRelayMsg::Datagrams {
631+
dst_node_id: item.remote_node,
632+
datagrams: item.datagrams,
643633
})
644634
});
645635
let mut packet_stream = n0_future::stream::iter(packet_iter);
@@ -693,12 +683,28 @@ impl ActiveRelayActor {
693683
state.last_packet_src = Some(remote_node_id);
694684
state.nodes_present.insert(remote_node_id);
695685
}
696-
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
697-
url: self.url.clone(),
698-
src: remote_node_id,
686+
687+
let max_segments = self
688+
.max_receive_segments
689+
.load(std::sync::atomic::Ordering::Relaxed);
690+
// We might receive a datagram batch that's bigger than our magic socket's
691+
// `AsyncUdpSocket::max_receive_segments`, if the other endpoint behind the relay
692+
// has a higher `AsyncUdpSocket::max_transmit_segments` than we do.
693+
// This happens e.g. when a linux machine (max transmit segments is usually 64)
694+
// talks to a windows machine or a macos machine (max transmit segments is usually 1).
695+
let re_batched = DatagramReBatcher {
696+
max_segments,
699697
datagrams,
700-
}) {
701-
warn!("Dropping received relay packet: {err:#}");
698+
};
699+
for datagrams in re_batched {
700+
if let Err(err) = self.relay_datagrams_recv.try_send(RelayRecvDatagram {
701+
url: self.url.clone(),
702+
src: remote_node_id,
703+
datagrams,
704+
}) {
705+
warn!("Dropping received relay packet: {err:#}");
706+
break; // No need to hot-loop in that case.
707+
}
702708
}
703709
}
704710
RelayToClientMsg::NodeGone(node_id) => {
@@ -1243,16 +1249,19 @@ pub(crate) struct RelayRecvDatagram {
12431249
pub(crate) datagrams: Datagrams,
12441250
}
12451251

1246-
struct DatagramFragmenter {
1252+
/// Turns a datagrams batch into multiple datagram batches of maximum `max_segments` size.
1253+
///
1254+
/// If the given datagram isn't batched, it just returns that datagram once.
1255+
struct DatagramReBatcher {
12471256
max_segments: usize,
1248-
datagram: Datagrams,
1257+
datagrams: Datagrams,
12491258
}
12501259

1251-
impl Iterator for DatagramFragmenter {
1260+
impl Iterator for DatagramReBatcher {
12521261
type Item = Datagrams;
12531262

12541263
fn next(&mut self) -> Option<Self::Item> {
1255-
self.datagram.take_segments(self.max_segments)
1264+
self.datagrams.take_segments(self.max_segments)
12561265
}
12571266
}
12581267

0 commit comments

Comments
 (0)