Skip to content

fix(iroh): Poll all AsyncUdpSocket sources fairly #2996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 118 additions & 52 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
task::{Context, Poll, Waker},
Expand Down Expand Up @@ -186,6 +186,8 @@ pub(crate) struct MagicSock {
/// Stores wakers, to be called when relay_recv_ch receives new data.
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
network_send_wakers: Arc<parking_lot::Mutex<Option<Waker>>>,
/// Counter for ordering of [`MagicSock::poll_recv`] polling order.
poll_recv_counter: AtomicUsize,

/// The DNS resolver to be used in this magicsock.
dns_resolver: DnsResolver,
Expand Down Expand Up @@ -650,27 +652,86 @@ impl MagicSock {
bufs: &mut [io::IoSliceMut<'_>],
metas: &mut [quinn_udp::RecvMeta],
) -> Poll<io::Result<usize>> {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
if self.is_closed() {
return Poll::Pending;
}

// order of polling is: UDPv4, UDPv6, relay
let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => match &self.pconn6 {
Some(conn) => match conn.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {
return self.poll_recv_relay(cx, bufs, metas);
// Three macros to help polling: they return if they get a result, execution
// continues if they were Pending and we need to poll others (or finally return
// Pending).
macro_rules! poll_ipv4 {
() => {
match self.pconn4.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
self.process_udp_datagrams(true, &mut bufs[..n], &mut metas[..n]);
return Poll::Ready(Ok(n));
}
Poll::Ready(n) => (n, false),
},
None => {
return self.poll_recv_relay(cx, bufs, metas);
}
},
Poll::Ready(n) => (n, true),
};
};
}
macro_rules! poll_ipv6 {
() => {
if let Some(ref pconn) = self.pconn6 {
match pconn.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
self.process_udp_datagrams(false, &mut bufs[..n], &mut metas[..n]);
return Poll::Ready(Ok(n));
}
}
}
};
}
macro_rules! poll_relay {
() => {
match self.poll_recv_relay(cx, bufs, metas) {
Poll::Pending => {}
Poll::Ready(n) => return Poll::Ready(n),
}
};
}

let counter = self.poll_recv_counter.fetch_add(1, Ordering::Relaxed);
match counter % 3 {
0 => {
// order of polling: UDPv4, UDPv6, relay
poll_ipv4!();
poll_ipv6!();
poll_relay!();
Poll::Pending
}
1 => {
// order of polling: UDPv6, relay, UDPv4
poll_ipv6!();
poll_relay!();
poll_ipv4!();
Poll::Pending
}
_ => {
// order of polling: relay, UDPv4, UDPv6
poll_relay!();
poll_ipv4!();
poll_ipv6!();
Poll::Pending
}
}
}

/// Process datagrams received from UDP sockets.
///
/// All the `bufs` and `metas` should have initialized packets in them.
///
/// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO
/// packets, processing them inside the magic socket.
fn process_udp_datagrams(
&self,
from_ipv4: bool,
bufs: &mut [io::IoSliceMut<'_>],
metas: &mut [quinn_udp::RecvMeta],
) {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");

// Adding the IP address we received something on results in Quinn using this
// address on the send path to send from. However we let Quinn use a
Expand All @@ -692,77 +753,83 @@ impl MagicSock {

let mut quic_packets_total = 0;

for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) {
let mut is_quic = false;
let mut quic_packets_count = 0;
for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()) {
let mut buf_contains_quic_datagrams = false;
let mut quic_datagram_count = 0;
if meta.len > meta.stride {
trace!(%meta.len, %meta.stride, "GRO datagram received");
inc!(MagicsockMetrics, recv_gro_datagrams);
}

// find disco and stun packets and forward them to the actor
for packet in buf[..meta.len].chunks_mut(meta.stride) {
if packet.len() < meta.stride {
// Chunk through the datagrams in this GRO payload to find disco and stun
// packets and forward them to the actor
for datagram in buf[..meta.len].chunks_mut(meta.stride) {
if datagram.len() < meta.stride {
trace!(
len = %packet.len(),
len = %datagram.len(),
%meta.stride,
"Last GRO datagram smaller than stride",
);
}

let packet_is_quic = if stun::is(packet) {
// Detect DISCO and STUN datagrams and process them. Overwrite the first
// byte of those packets with zero to make Quinn ignore the packet. This
// relies on quinn::EndpointConfig::grease_quic_bit being set to `false`,
// which we do in Endpoint::bind.
if stun::is(datagram) {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet");
let packet2 = Bytes::copy_from_slice(packet);
let packet2 = Bytes::copy_from_slice(datagram);
self.net_reporter.receive_stun_packet(packet2, meta.addr);
false
} else if let Some((sender, sealed_box)) = disco::source_and_box(packet) {
// Disco?
datagram[0] = 0u8;
} else if let Some((sender, sealed_box)) = disco::source_and_box(datagram) {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet");
self.handle_disco_message(
sender,
sealed_box,
DiscoMessageSource::Udp(meta.addr),
);
false
datagram[0] = 0u8;
} else {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet");
if from_ipv4 {
inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _);
inc_by!(MagicsockMetrics, recv_data_ipv4, datagram.len() as _);
} else {
inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _);
inc_by!(MagicsockMetrics, recv_data_ipv6, datagram.len() as _);
}
true
quic_datagram_count += 1;
buf_contains_quic_datagrams = true;
};

if packet_is_quic {
quic_packets_count += 1;
is_quic = true;
} else {
// overwrite the first byte of the packets with zero.
// this makes quinn reliably and quickly ignore the packet as long as
// [`quinn::EndpointConfig::grease_quic_bit`] is set to `false`
// (which we always do in Endpoint::bind).
packet[0] = 0u8;
}
}

if is_quic {
// remap addr
if buf_contains_quic_datagrams {
// Update the NodeMap and remap RecvMeta to the QuicMappedAddr.
match self.node_map.receive_udp(meta.addr) {
None => {
warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping");
// if we have no node state for the from addr, set len to 0 to make quinn skip the buf completely.
warn!(
src = ?meta.addr,
count = %quic_datagram_count,
len = meta.len,
"UDP recv quic packets: no node state found, skipping",
);
// If we have no node state for the from addr, set len to 0 to make
// quinn skip the buf completely.
meta.len = 0;
}
Some((node_id, quic_mapped_addr)) => {
trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets");
quic_packets_total += quic_packets_count;
trace!(
src = ?meta.addr,
node = %node_id.fmt_short(),
count = %quic_datagram_count,
len = meta.len,
"UDP recv quic packets",
);
quic_packets_total += quic_datagram_count;
meta.addr = quic_mapped_addr.0;
}
}
} else {
// if there is no non-stun,non-disco packet in the chunk, set len to zero to make
// quinn skip the buf completely.
// If all datagrams in this buf are DISCO or STUN, set len to zero to make
// Quinn skip the buf completely.
meta.len = 0;
}
// Normalize local_ip
Expand All @@ -773,8 +840,6 @@ impl MagicSock {
inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _);
trace!("UDP recv: {} packets", quic_packets_total);
}

Poll::Ready(Ok(msgs))
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -1398,6 +1463,7 @@ impl Handle {
relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver),
network_recv_wakers: parking_lot::Mutex::new(None),
network_send_wakers: Arc::new(parking_lot::Mutex::new(None)),
poll_recv_counter: AtomicUsize::new(0),
actor_sender: actor_sender.clone(),
ipv6_reported: Arc::new(AtomicBool::new(false)),
relay_map,
Expand Down
Loading