Skip to content

Commit f5270fb

Browse files
Merge 23b4eb7 into 4abfd61
2 parents 4abfd61 + 23b4eb7 commit f5270fb

File tree

8 files changed

+953
-187
lines changed

8 files changed

+953
-187
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-net-report/src/reportgen/hairpin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl Actor {
121121
.context("net_report actor gone")?;
122122
msg_response_rx.await.context("net_report actor died")?;
123123

124-
if let Err(err) = socket.send_to(&stun::request(txn), dst).await {
124+
if let Err(err) = socket.send_to(&stun::request(txn), dst.into()).await {
125125
warn!(%dst, "failed to send hairpin check");
126126
return Err(err.into());
127127
}

iroh-net/src/magicsock.rs

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use futures_util::stream::BoxStream;
3636
use iroh_base::key::NodeId;
3737
use iroh_metrics::{inc, inc_by};
3838
use iroh_relay::protos::stun;
39-
use netwatch::{interfaces, ip::LocalAddresses, netmon};
39+
use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket};
4040
use quinn::AsyncUdpSocket;
4141
use rand::{seq::SliceRandom, Rng, SeedableRng};
4242
use smallvec::{smallvec, SmallVec};
@@ -441,11 +441,8 @@ impl MagicSock {
441441
// Right now however we have one single poller behaving the same for each
442442
// connection. It checks all paths and returns Poll::Ready as soon as any path is
443443
// ready.
444-
let ipv4_poller = Arc::new(self.pconn4.clone()).create_io_poller();
445-
let ipv6_poller = self
446-
.pconn6
447-
.as_ref()
448-
.map(|sock| Arc::new(sock.clone()).create_io_poller());
444+
let ipv4_poller = self.pconn4.create_io_poller();
445+
let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller());
449446
let relay_sender = self.relay_actor_sender.clone();
450447
Box::pin(IoPoller {
451448
ipv4_poller,
@@ -1091,10 +1088,9 @@ impl MagicSock {
10911088
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
10921089
// This is the socket .try_send_disco_message_udp used.
10931090
let sock = self.conn_for_addr(dst)?;
1094-
let sock = Arc::new(sock.clone());
1095-
let mut poller = sock.create_io_poller();
1096-
match poller.as_mut().poll_writable(cx)? {
1097-
Poll::Ready(()) => continue,
1091+
match sock.as_socket_ref().poll_writable(cx) {
1092+
Poll::Ready(Ok(())) => continue,
1093+
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
10981094
Poll::Pending => return Poll::Pending,
10991095
}
11001096
}
@@ -1408,6 +1404,9 @@ impl Handle {
14081404
let net_reporter =
14091405
net_report::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?;
14101406

1407+
let pconn4_sock = pconn4.as_socket();
1408+
let pconn6_sock = pconn6.as_ref().map(|p| p.as_socket());
1409+
14111410
let (actor_sender, actor_receiver) = mpsc::channel(256);
14121411
let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256);
14131412
let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256);
@@ -1431,9 +1430,9 @@ impl Handle {
14311430
ipv6_reported: Arc::new(AtomicBool::new(false)),
14321431
relay_map,
14331432
my_relay: Default::default(),
1434-
pconn4: pconn4.clone(),
1435-
pconn6: pconn6.clone(),
14361433
net_reporter: net_reporter.addr(),
1434+
pconn4,
1435+
pconn6,
14371436
disco_secrets: DiscoSecrets::default(),
14381437
node_map,
14391438
relay_actor_sender: relay_actor_sender.clone(),
@@ -1481,8 +1480,8 @@ impl Handle {
14811480
periodic_re_stun_timer: new_re_stun_timer(false),
14821481
net_info_last: None,
14831482
port_mapper,
1484-
pconn4,
1485-
pconn6,
1483+
pconn4: pconn4_sock,
1484+
pconn6: pconn6_sock,
14861485
no_v4_send: false,
14871486
net_reporter,
14881487
network_monitor,
@@ -1720,8 +1719,8 @@ struct Actor {
17201719
net_info_last: Option<NetInfo>,
17211720

17221721
// The underlying UDP sockets used to send/rcv packets.
1723-
pconn4: UdpConn,
1724-
pconn6: Option<UdpConn>,
1722+
pconn4: Arc<UdpSocket>,
1723+
pconn6: Option<Arc<UdpSocket>>,
17251724

17261725
/// The NAT-PMP/PCP/UPnP prober/client, for requesting port mappings from NAT devices.
17271726
port_mapper: portmapper::Client,
@@ -1861,6 +1860,14 @@ impl Actor {
18611860
debug!("link change detected: major? {}", is_major);
18621861

18631862
if is_major {
1863+
if let Err(err) = self.pconn4.rebind() {
1864+
warn!("failed to rebind Udp IPv4 socket: {:?}", err);
1865+
};
1866+
if let Some(ref pconn6) = self.pconn6 {
1867+
if let Err(err) = pconn6.rebind() {
1868+
warn!("failed to rebind Udp IPv6 socket: {:?}", err);
1869+
};
1870+
}
18641871
self.msock.dns_resolver.clear_cache();
18651872
self.msock.re_stun("link-change-major");
18661873
self.close_stale_relay_connections().await;
@@ -1893,14 +1900,6 @@ impl Actor {
18931900
self.port_mapper.deactivate();
18941901
self.relay_actor_cancel_token.cancel();
18951902

1896-
// Ignore errors from pconnN
1897-
// They will frequently have been closed already by a call to connBind.Close.
1898-
debug!("stopping connections");
1899-
if let Some(ref conn) = self.pconn6 {
1900-
conn.close().await.ok();
1901-
}
1902-
self.pconn4.close().await.ok();
1903-
19041903
debug!("shutdown complete");
19051904
return true;
19061905
}
@@ -2206,8 +2205,8 @@ impl Actor {
22062205
}
22072206

22082207
let relay_map = self.msock.relay_map.clone();
2209-
let pconn4 = Some(self.pconn4.as_socket());
2210-
let pconn6 = self.pconn6.as_ref().map(|p| p.as_socket());
2208+
let pconn4 = Some(self.pconn4.clone());
2209+
let pconn6 = self.pconn6.clone();
22112210

22122211
debug!("requesting net_report report");
22132212
match self
@@ -3099,6 +3098,45 @@ mod tests {
30993098
Ok(())
31003099
}
31013100

3101+
#[tokio::test]
3102+
async fn test_regression_network_change_rebind_wakes_connection_driver(
3103+
) -> testresult::TestResult {
3104+
let _ = iroh_test::logging::setup();
3105+
let m1 = MagicStack::new(RelayMode::Disabled).await?;
3106+
let m2 = MagicStack::new(RelayMode::Disabled).await?;
3107+
3108+
println!("Net change");
3109+
m1.endpoint.magic_sock().force_network_change(true).await;
3110+
tokio::time::sleep(Duration::from_secs(1)).await; // wait for socket rebinding
3111+
3112+
let _guard = mesh_stacks(vec![m1.clone(), m2.clone()]).await?;
3113+
3114+
let _handle = AbortOnDropHandle::new(tokio::spawn({
3115+
let endpoint = m2.endpoint.clone();
3116+
async move {
3117+
while let Some(incoming) = endpoint.accept().await {
3118+
println!("Incoming first conn!");
3119+
let conn = incoming.await?;
3120+
conn.closed().await;
3121+
}
3122+
3123+
testresult::TestResult::Ok(())
3124+
}
3125+
}));
3126+
3127+
println!("first conn!");
3128+
let conn = m1
3129+
.endpoint
3130+
.connect(m2.endpoint.node_addr().await?, ALPN)
3131+
.await?;
3132+
println!("Closing first conn");
3133+
conn.close(0u32.into(), b"bye lolz");
3134+
conn.closed().await;
3135+
println!("Closed first conn");
3136+
3137+
Ok(())
3138+
}
3139+
31023140
#[tokio::test(flavor = "multi_thread")]
31033141
async fn test_two_devices_roundtrip_network_change() -> Result<()> {
31043142
time::timeout(

iroh-net/src/magicsock/udp_conn.rs

Lines changed: 25 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,57 @@
11
use std::{
22
fmt::Debug,
3-
future::Future,
43
io,
54
net::SocketAddr,
65
pin::Pin,
76
sync::Arc,
8-
task::{ready, Context, Poll},
7+
task::{Context, Poll},
98
};
109

1110
use anyhow::{bail, Context as _};
1211
use netwatch::UdpSocket;
1312
use quinn::AsyncUdpSocket;
14-
use quinn_udp::{Transmit, UdpSockRef};
15-
use tokio::io::Interest;
16-
use tracing::{debug, trace};
13+
use quinn_udp::Transmit;
14+
use tracing::debug;
1715

1816
/// A UDP socket implementing Quinn's [`AsyncUdpSocket`].
19-
#[derive(Clone, Debug)]
17+
#[derive(Debug, Clone)]
2018
pub struct UdpConn {
2119
io: Arc<UdpSocket>,
22-
inner: Arc<quinn_udp::UdpSocketState>,
2320
}
2421

2522
impl UdpConn {
2623
pub(super) fn as_socket(&self) -> Arc<UdpSocket> {
2724
self.io.clone()
2825
}
2926

27+
pub(super) fn as_socket_ref(&self) -> &UdpSocket {
28+
&self.io
29+
}
30+
3031
pub(super) fn bind(addr: SocketAddr) -> anyhow::Result<Self> {
3132
let sock = bind(addr)?;
32-
let state = quinn_udp::UdpSocketState::new(quinn_udp::UdpSockRef::from(&sock))?;
33-
Ok(Self {
34-
io: Arc::new(sock),
35-
inner: Arc::new(state),
36-
})
33+
34+
Ok(Self { io: Arc::new(sock) })
3735
}
3836

3937
pub fn port(&self) -> u16 {
4038
self.local_addr().map(|p| p.port()).unwrap_or_default()
4139
}
4240

43-
#[allow(clippy::unused_async)]
44-
pub async fn close(&self) -> Result<(), io::Error> {
45-
// Nothing to do atm
46-
Ok(())
41+
pub(super) fn create_io_poller(&self) -> Pin<Box<dyn quinn::UdpPoller>> {
42+
Box::pin(IoPoller {
43+
io: self.io.clone(),
44+
})
4745
}
4846
}
4947

5048
impl AsyncUdpSocket for UdpConn {
5149
fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn quinn::UdpPoller>> {
52-
let sock = self.io.clone();
53-
Box::pin(IoPoller {
54-
next_waiter: move || {
55-
let sock = sock.clone();
56-
async move { sock.writable().await }
57-
},
58-
waiter: None,
59-
})
50+
(*self).create_io_poller()
6051
}
6152

6253
fn try_send(&self, transmit: &Transmit<'_>) -> io::Result<()> {
63-
self.io.try_io(Interest::WRITABLE, || {
64-
let sock_ref = UdpSockRef::from(&self.io);
65-
self.inner.send(sock_ref, transmit)
66-
})
54+
self.io.try_send_quinn(transmit)
6755
}
6856

6957
fn poll_recv(
@@ -72,40 +60,23 @@ impl AsyncUdpSocket for UdpConn {
7260
bufs: &mut [io::IoSliceMut<'_>],
7361
meta: &mut [quinn_udp::RecvMeta],
7462
) -> Poll<io::Result<usize>> {
75-
loop {
76-
ready!(self.io.poll_recv_ready(cx))?;
77-
if let Ok(res) = self.io.try_io(Interest::READABLE, || {
78-
self.inner.recv(Arc::as_ref(&self.io).into(), bufs, meta)
79-
}) {
80-
for meta in meta.iter().take(res) {
81-
trace!(
82-
src = %meta.addr,
83-
len = meta.len,
84-
count = meta.len / meta.stride,
85-
dst = %meta.dst_ip.map(|x| x.to_string()).unwrap_or_default(),
86-
"UDP recv"
87-
);
88-
}
89-
90-
return Poll::Ready(Ok(res));
91-
}
92-
}
63+
self.io.poll_recv_quinn(cx, bufs, meta)
9364
}
9465

9566
fn local_addr(&self) -> io::Result<SocketAddr> {
9667
self.io.local_addr()
9768
}
9869

9970
fn may_fragment(&self) -> bool {
100-
self.inner.may_fragment()
71+
self.io.may_fragment()
10172
}
10273

10374
fn max_transmit_segments(&self) -> usize {
104-
self.inner.max_gso_segments()
75+
self.io.max_gso_segments()
10576
}
10677

10778
fn max_receive_segments(&self) -> usize {
108-
self.inner.gro_segments()
79+
self.io.gro_segments()
10980
}
11081
}
11182

@@ -147,49 +118,14 @@ fn bind(mut addr: SocketAddr) -> anyhow::Result<UdpSocket> {
147118
}
148119

149120
/// Poller for when the socket is writable.
150-
///
151-
/// The tricky part is that we only have `tokio::net::UdpSocket::writable()` to create the
152-
/// waiter we need, which does not return a named future type. In order to be able to store
153-
/// this waiter in a struct without boxing we need to specify the future itself as a type
154-
/// parameter, which we can only do if we introduce a second type parameter which returns
155-
/// the future. So we end up with a function which we do not need, but it makes the types
156-
/// work.
157-
#[derive(derive_more::Debug)]
158-
#[pin_project::pin_project]
159-
struct IoPoller<F, Fut>
160-
where
161-
F: Fn() -> Fut + Send + Sync + 'static,
162-
Fut: Future<Output = io::Result<()>> + Send + Sync + 'static,
163-
{
164-
/// Function which can create a new waiter if there is none.
165-
#[debug("next_waiter")]
166-
next_waiter: F,
167-
/// The waiter which tells us when the socket is writable.
168-
#[debug("waiter")]
169-
#[pin]
170-
waiter: Option<Fut>,
121+
#[derive(Debug)]
122+
struct IoPoller {
123+
io: Arc<UdpSocket>,
171124
}
172125

173-
impl<F, Fut> quinn::UdpPoller for IoPoller<F, Fut>
174-
where
175-
F: Fn() -> Fut + Send + Sync + 'static,
176-
Fut: Future<Output = io::Result<()>> + Send + Sync + 'static,
177-
{
126+
impl quinn::UdpPoller for IoPoller {
178127
fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
179-
let mut this = self.project();
180-
if this.waiter.is_none() {
181-
this.waiter.set(Some((this.next_waiter)()));
182-
}
183-
let result = this
184-
.waiter
185-
.as_mut()
186-
.as_pin_mut()
187-
.expect("just set")
188-
.poll(cx);
189-
if result.is_ready() {
190-
this.waiter.set(None);
191-
}
192-
result
128+
self.io.poll_writable(cx)
193129
}
194130
}
195131

0 commit comments

Comments
 (0)