Skip to content

Commit 39ba90c

Browse files
authored
Merge 340e277 into 80a40c0
2 parents 80a40c0 + 340e277 commit 39ba90c

File tree

1 file changed

+118
-52
lines changed

1 file changed

+118
-52
lines changed

iroh/src/magicsock.rs

Lines changed: 118 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{
2222
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
2323
pin::Pin,
2424
sync::{
25-
atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
25+
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
2626
Arc, RwLock,
2727
},
2828
task::{Context, Poll, Waker},
@@ -186,6 +186,8 @@ pub(crate) struct MagicSock {
186186
/// Stores wakers, to be called when relay_recv_ch receives new data.
187187
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
188188
network_send_wakers: Arc<parking_lot::Mutex<Option<Waker>>>,
189+
/// Counter for ordering of [`MagicSock::poll_recv`] polling order.
190+
poll_recv_counter: AtomicUsize,
189191

190192
/// The DNS resolver to be used in this magicsock.
191193
dns_resolver: DnsResolver,
@@ -650,27 +652,86 @@ impl MagicSock {
650652
bufs: &mut [io::IoSliceMut<'_>],
651653
metas: &mut [quinn_udp::RecvMeta],
652654
) -> Poll<io::Result<usize>> {
653-
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
654655
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
655656
if self.is_closed() {
656657
return Poll::Pending;
657658
}
658659

659-
// order of polling is: UDPv4, UDPv6, relay
660-
let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? {
661-
Poll::Pending | Poll::Ready(0) => match &self.pconn6 {
662-
Some(conn) => match conn.poll_recv(cx, bufs, metas)? {
663-
Poll::Pending | Poll::Ready(0) => {
664-
return self.poll_recv_relay(cx, bufs, metas);
660+
// Three macros to help polling: they return if they get a result, execution
661+
// continues if they were Pending and we need to poll others (or finally return
662+
// Pending).
663+
macro_rules! poll_ipv4 {
664+
() => {
665+
match self.pconn4.poll_recv(cx, bufs, metas)? {
666+
Poll::Pending | Poll::Ready(0) => {}
667+
Poll::Ready(n) => {
668+
self.process_udp_datagrams(true, &mut bufs[..n], &mut metas[..n]);
669+
return Poll::Ready(Ok(n));
665670
}
666-
Poll::Ready(n) => (n, false),
667-
},
668-
None => {
669-
return self.poll_recv_relay(cx, bufs, metas);
670671
}
671-
},
672-
Poll::Ready(n) => (n, true),
673-
};
672+
};
673+
}
674+
macro_rules! poll_ipv6 {
675+
() => {
676+
if let Some(ref pconn) = self.pconn6 {
677+
match pconn.poll_recv(cx, bufs, metas)? {
678+
Poll::Pending | Poll::Ready(0) => {}
679+
Poll::Ready(n) => {
680+
self.process_udp_datagrams(false, &mut bufs[..n], &mut metas[..n]);
681+
return Poll::Ready(Ok(n));
682+
}
683+
}
684+
}
685+
};
686+
}
687+
macro_rules! poll_relay {
688+
() => {
689+
match self.poll_recv_relay(cx, bufs, metas) {
690+
Poll::Pending => {}
691+
Poll::Ready(n) => return Poll::Ready(n),
692+
}
693+
};
694+
}
695+
696+
let counter = self.poll_recv_counter.fetch_add(1, Ordering::Relaxed);
697+
match counter % 3 {
698+
0 => {
699+
// order of polling: UDPv4, UDPv6, relay
700+
poll_ipv4!();
701+
poll_ipv6!();
702+
poll_relay!();
703+
Poll::Pending
704+
}
705+
1 => {
706+
// order of polling: UDPv6, relay, UDPv4
707+
poll_ipv6!();
708+
poll_relay!();
709+
poll_ipv4!();
710+
Poll::Pending
711+
}
712+
_ => {
713+
// order of polling: relay, UDPv4, UDPv6
714+
poll_relay!();
715+
poll_ipv4!();
716+
poll_ipv6!();
717+
Poll::Pending
718+
}
719+
}
720+
}
721+
722+
/// Process datagrams received from UDP sockets.
723+
///
724+
/// All the `bufs` and `metas` should have initialized packets in them.
725+
///
726+
/// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO
727+
/// packets, processing them inside the magic socket.
728+
fn process_udp_datagrams(
729+
&self,
730+
from_ipv4: bool,
731+
bufs: &mut [io::IoSliceMut<'_>],
732+
metas: &mut [quinn_udp::RecvMeta],
733+
) {
734+
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
674735

675736
// Adding the IP address we received something on results in Quinn using this
676737
// address on the send path to send from. However we let Quinn use a
@@ -692,77 +753,83 @@ impl MagicSock {
692753

693754
let mut quic_packets_total = 0;
694755

695-
for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) {
696-
let mut is_quic = false;
697-
let mut quic_packets_count = 0;
756+
for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()) {
757+
let mut buf_contains_quic_datagrams = false;
758+
let mut quic_datagram_count = 0;
698759
if meta.len > meta.stride {
699760
trace!(%meta.len, %meta.stride, "GRO datagram received");
700761
inc!(MagicsockMetrics, recv_gro_datagrams);
701762
}
702763

703-
// find disco and stun packets and forward them to the actor
704-
for packet in buf[..meta.len].chunks_mut(meta.stride) {
705-
if packet.len() < meta.stride {
764+
// Chunk through the datagrams in this GRO payload to find disco and stun
765+
// packets and forward them to the actor
766+
for datagram in buf[..meta.len].chunks_mut(meta.stride) {
767+
if datagram.len() < meta.stride {
706768
trace!(
707-
len = %packet.len(),
769+
len = %datagram.len(),
708770
%meta.stride,
709771
"Last GRO datagram smaller than stride",
710772
);
711773
}
712774

713-
let packet_is_quic = if stun::is(packet) {
775+
// Detect DISCO and STUN datagrams and process them. Overwrite the first
776+
// byte of those packets with zero to make Quinn ignore the packet. This
777+
// relies on quinn::EndpointConfig::grease_quic_bit being set to `false`,
778+
// which we do in Endpoint::bind.
779+
if stun::is(datagram) {
714780
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet");
715-
let packet2 = Bytes::copy_from_slice(packet);
781+
let packet2 = Bytes::copy_from_slice(datagram);
716782
self.net_reporter.receive_stun_packet(packet2, meta.addr);
717-
false
718-
} else if let Some((sender, sealed_box)) = disco::source_and_box(packet) {
719-
// Disco?
783+
datagram[0] = 0u8;
784+
} else if let Some((sender, sealed_box)) = disco::source_and_box(datagram) {
720785
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet");
721786
self.handle_disco_message(
722787
sender,
723788
sealed_box,
724789
DiscoMessageSource::Udp(meta.addr),
725790
);
726-
false
791+
datagram[0] = 0u8;
727792
} else {
728793
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet");
729794
if from_ipv4 {
730-
inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _);
795+
inc_by!(MagicsockMetrics, recv_data_ipv4, datagram.len() as _);
731796
} else {
732-
inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _);
797+
inc_by!(MagicsockMetrics, recv_data_ipv6, datagram.len() as _);
733798
}
734-
true
799+
quic_datagram_count += 1;
800+
buf_contains_quic_datagrams = true;
735801
};
736-
737-
if packet_is_quic {
738-
quic_packets_count += 1;
739-
is_quic = true;
740-
} else {
741-
// overwrite the first byte of the packets with zero.
742-
// this makes quinn reliably and quickly ignore the packet as long as
743-
// [`quinn::EndpointConfig::grease_quic_bit`] is set to `false`
744-
// (which we always do in Endpoint::bind).
745-
packet[0] = 0u8;
746-
}
747802
}
748803

749-
if is_quic {
750-
// remap addr
804+
if buf_contains_quic_datagrams {
805+
// Update the NodeMap and remap RecvMeta to the QuicMappedAddr.
751806
match self.node_map.receive_udp(meta.addr) {
752807
None => {
753-
warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping");
754-
// if we have no node state for the from addr, set len to 0 to make quinn skip the buf completely.
808+
warn!(
809+
src = ?meta.addr,
810+
count = %quic_datagram_count,
811+
len = meta.len,
812+
"UDP recv quic packets: no node state found, skipping",
813+
);
814+
// If we have no node state for the from addr, set len to 0 to make
815+
// quinn skip the buf completely.
755816
meta.len = 0;
756817
}
757818
Some((node_id, quic_mapped_addr)) => {
758-
trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets");
759-
quic_packets_total += quic_packets_count;
819+
trace!(
820+
src = ?meta.addr,
821+
node = %node_id.fmt_short(),
822+
count = %quic_datagram_count,
823+
len = meta.len,
824+
"UDP recv quic packets",
825+
);
826+
quic_packets_total += quic_datagram_count;
760827
meta.addr = quic_mapped_addr.0;
761828
}
762829
}
763830
} else {
764-
// if there is no non-stun,non-disco packet in the chunk, set len to zero to make
765-
// quinn skip the buf completely.
831+
// If all datagrams in this buf are DISCO or STUN, set len to zero to make
832+
// Quinn skip the buf completely.
766833
meta.len = 0;
767834
}
768835
// Normalize local_ip
@@ -773,8 +840,6 @@ impl MagicSock {
773840
inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _);
774841
trace!("UDP recv: {} packets", quic_packets_total);
775842
}
776-
777-
Poll::Ready(Ok(msgs))
778843
}
779844

780845
#[instrument(skip_all)]
@@ -1398,6 +1463,7 @@ impl Handle {
13981463
relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver),
13991464
network_recv_wakers: parking_lot::Mutex::new(None),
14001465
network_send_wakers: Arc::new(parking_lot::Mutex::new(None)),
1466+
poll_recv_counter: AtomicUsize::new(0),
14011467
actor_sender: actor_sender.clone(),
14021468
ipv6_reported: Arc::new(AtomicBool::new(false)),
14031469
relay_map,

0 commit comments

Comments
 (0)