Skip to content

Commit c451750

Browse files
fix(netwatch): BSD rebind socket on errors (#2913)
also uses `AbortOnDropHandle` to better cleanup tasks Closes #2909 --------- Co-authored-by: Divma <[email protected]>
1 parent e2c3c98 commit c451750

File tree

5 files changed

+69
-46
lines changed

5 files changed

+69
-46
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

net-tools/netwatch/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ socket2 = "0.5.3"
2525
thiserror = "1"
2626
time = "0.3.20"
2727
tokio = { version = "1", features = ["io-util", "macros", "sync", "rt", "net", "fs", "io-std", "signal", "process", "time"] }
28+
tokio-util = { version = "0.7", features = ["rt"] }
2829
tracing = "0.1"
2930

3031
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]

net-tools/netwatch/src/netmon.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22
33
use anyhow::Result;
44
use futures_lite::future::Boxed as BoxFuture;
5-
use tokio::{
6-
sync::{mpsc, oneshot},
7-
task::JoinHandle,
8-
};
5+
use tokio::sync::{mpsc, oneshot};
6+
use tokio_util::task::AbortOnDropHandle;
97

108
mod actor;
119
#[cfg(target_os = "android")]
@@ -30,16 +28,10 @@ use self::actor::{Actor, ActorMessage};
3028
#[derive(Debug)]
3129
pub struct Monitor {
3230
/// Task handle for the monitor task.
33-
handle: JoinHandle<()>,
31+
_handle: AbortOnDropHandle<()>,
3432
actor_tx: mpsc::Sender<ActorMessage>,
3533
}
3634

37-
impl Drop for Monitor {
38-
fn drop(&mut self) {
39-
self.handle.abort();
40-
}
41-
}
42-
4335
impl Monitor {
4436
/// Create a new monitor.
4537
pub async fn new() -> Result<Self> {
@@ -50,7 +42,10 @@ impl Monitor {
5042
actor.run().await;
5143
});
5244

53-
Ok(Monitor { handle, actor_tx })
45+
Ok(Monitor {
46+
_handle: AbortOnDropHandle::new(handle),
47+
actor_tx,
48+
})
5449
}
5550

5651
/// Subscribe to network changes.

net-tools/netwatch/src/netmon/actor.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ impl Actor {
8181
let interface_state = State::new().await;
8282
let wall_time = Instant::now();
8383

84-
// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
8584
let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY);
8685
let route_monitor = RouteMonitor::new(mon_sender)?;
8786
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
@@ -112,6 +111,7 @@ impl Actor {
112111
loop {
113112
tokio::select! {
114113
biased;
114+
115115
_ = debounce_interval.tick() => {
116116
if let Some(time_jumped) = last_event.take() {
117117
if let Err(err) = self.handle_potential_change(time_jumped).await {
@@ -127,29 +127,40 @@ impl Actor {
127127
debounce_interval.reset_immediately();
128128
}
129129
}
130-
Some(_event) = self.mon_receiver.recv() => {
131-
trace!("network activity detected");
132-
last_event.replace(false);
133-
debounce_interval.reset_immediately();
134-
}
135-
Some(msg) = self.actor_receiver.recv() => match msg {
136-
ActorMessage::Subscribe(callback, s) => {
137-
let token = self.next_callback_token();
138-
self.callbacks.insert(token, Arc::new(callback));
139-
s.send(token).ok();
140-
}
141-
ActorMessage::Unsubscribe(token, s) => {
142-
self.callbacks.remove(&token);
143-
s.send(()).ok();
130+
event = self.mon_receiver.recv() => {
131+
match event {
132+
Some(NetworkMessage::Change) => {
133+
trace!("network activity detected");
134+
last_event.replace(false);
135+
debounce_interval.reset_immediately();
136+
}
137+
None => {
138+
debug!("shutting down, network monitor receiver gone");
139+
break;
140+
}
144141
}
145-
ActorMessage::NetworkChange => {
146-
trace!("external network activity detected");
147-
last_event.replace(false);
148-
debounce_interval.reset_immediately();
142+
}
143+
msg = self.actor_receiver.recv() => {
144+
match msg {
145+
Some(ActorMessage::Subscribe(callback, s)) => {
146+
let token = self.next_callback_token();
147+
self.callbacks.insert(token, Arc::new(callback));
148+
s.send(token).ok();
149+
}
150+
Some(ActorMessage::Unsubscribe(token, s)) => {
151+
self.callbacks.remove(&token);
152+
s.send(()).ok();
153+
}
154+
Some(ActorMessage::NetworkChange) => {
155+
trace!("external network activity detected");
156+
last_event.replace(false);
157+
debounce_interval.reset_immediately();
158+
}
159+
None => {
160+
debug!("shutting down, actor receiver gone");
161+
break;
162+
}
149163
}
150-
},
151-
else => {
152-
break;
153164
}
154165
}
155166
}

net-tools/netwatch/src/netmon/bsd.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use anyhow::Result;
22
#[cfg(any(target_os = "macos", target_os = "ios"))]
33
use libc::{RTAX_DST, RTAX_IFP};
4-
use tokio::{io::AsyncReadExt, sync::mpsc, task::JoinHandle};
4+
use tokio::{io::AsyncReadExt, sync::mpsc};
5+
use tokio_util::task::AbortOnDropHandle;
56
use tracing::{trace, warn};
67

78
use super::actor::NetworkMessage;
@@ -11,22 +12,23 @@ use crate::{interfaces::bsd::WireMessage, ip::is_link_local};
1112

1213
#[derive(Debug)]
1314
pub(super) struct RouteMonitor {
14-
handle: JoinHandle<()>,
15+
_handle: AbortOnDropHandle<()>,
1516
}
1617

17-
impl Drop for RouteMonitor {
18-
fn drop(&mut self) {
19-
self.handle.abort();
20-
}
18+
fn create_socket() -> Result<tokio::net::UnixStream> {
19+
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
20+
socket.set_nonblocking(true)?;
21+
let socket_std: std::os::unix::net::UnixStream = socket.into();
22+
let socket: tokio::net::UnixStream = socket_std.try_into()?;
23+
24+
trace!("AF_ROUTE socket bound");
25+
26+
Ok(socket)
2127
}
2228

2329
impl RouteMonitor {
2430
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self> {
25-
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
26-
socket.set_nonblocking(true)?;
27-
let socket_std: std::os::unix::net::UnixStream = socket.into();
28-
let mut socket: tokio::net::UnixStream = socket_std.try_into()?;
29-
31+
let mut socket = create_socket()?;
3032
let handle = tokio::task::spawn(async move {
3133
trace!("AF_ROUTE monitor started");
3234

@@ -52,12 +54,25 @@ impl RouteMonitor {
5254
}
5355
Err(err) => {
5456
warn!("AF_ROUTE: error reading: {:?}", err);
57+
// recreate socket, as it is likely in an invalid state
58+
// TODO: distinguish between different errors?
59+
match create_socket() {
60+
Ok(new_socket) => {
61+
socket = new_socket;
62+
}
63+
Err(err) => {
64+
warn!("AF_ROUTE: unable to bind a new socket: {:?}", err);
65+
// TODO: what to do here?
66+
}
67+
}
5568
}
5669
}
5770
}
5871
});
5972

60-
Ok(RouteMonitor { handle })
73+
Ok(RouteMonitor {
74+
_handle: AbortOnDropHandle::new(handle),
75+
})
6176
}
6277
}
6378

0 commit comments

Comments
 (0)