Skip to content

fix(netwatch): BSD rebind socket on errors #2913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions net-tools/netwatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ socket2 = "0.5.3"
thiserror = "1"
time = "0.3.20"
tokio = { version = "1", features = ["io-util", "macros", "sync", "rt", "net", "fs", "io-std", "signal", "process", "time"] }
tokio-util = { version = "0.7", features = ["rt"] }
tracing = "0.1"

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
Expand Down
19 changes: 7 additions & 12 deletions net-tools/netwatch/src/netmon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
use anyhow::Result;
use futures_lite::future::Boxed as BoxFuture;
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::task::AbortOnDropHandle;

mod actor;
#[cfg(target_os = "android")]
Expand All @@ -30,16 +28,10 @@ use self::actor::{Actor, ActorMessage};
#[derive(Debug)]
pub struct Monitor {
/// Task handle for the monitor task.
handle: JoinHandle<()>,
_handle: AbortOnDropHandle<()>,
actor_tx: mpsc::Sender<ActorMessage>,
}

impl Drop for Monitor {
fn drop(&mut self) {
self.handle.abort();
}
}

impl Monitor {
/// Create a new monitor.
pub async fn new() -> Result<Self> {
Expand All @@ -50,7 +42,10 @@ impl Monitor {
actor.run().await;
});

Ok(Monitor { handle, actor_tx })
Ok(Monitor {
_handle: AbortOnDropHandle::new(handle),
actor_tx,
})
}

/// Subscribe to network changes.
Expand Down
55 changes: 33 additions & 22 deletions net-tools/netwatch/src/netmon/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl Actor {
let interface_state = State::new().await;
let wall_time = Instant::now();

// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY);
let route_monitor = RouteMonitor::new(mon_sender)?;
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
Expand Down Expand Up @@ -112,6 +111,7 @@ impl Actor {
loop {
tokio::select! {
biased;

_ = debounce_interval.tick() => {
if let Some(time_jumped) = last_event.take() {
if let Err(err) = self.handle_potential_change(time_jumped).await {
Expand All @@ -127,29 +127,40 @@ impl Actor {
debounce_interval.reset_immediately();
}
}
Some(_event) = self.mon_receiver.recv() => {
trace!("network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
}
Some(msg) = self.actor_receiver.recv() => match msg {
ActorMessage::Subscribe(callback, s) => {
let token = self.next_callback_token();
self.callbacks.insert(token, Arc::new(callback));
s.send(token).ok();
}
ActorMessage::Unsubscribe(token, s) => {
self.callbacks.remove(&token);
s.send(()).ok();
event = self.mon_receiver.recv() => {
match event {
Some(NetworkMessage::Change) => {
trace!("network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
}
None => {
debug!("shutting down, network monitor receiver gone");
break;
}
}
ActorMessage::NetworkChange => {
trace!("external network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
}
msg = self.actor_receiver.recv() => {
match msg {
Some(ActorMessage::Subscribe(callback, s)) => {
let token = self.next_callback_token();
self.callbacks.insert(token, Arc::new(callback));
s.send(token).ok();
}
Some(ActorMessage::Unsubscribe(token, s)) => {
self.callbacks.remove(&token);
s.send(()).ok();
}
Some(ActorMessage::NetworkChange) => {
trace!("external network activity detected");
last_event.replace(false);
debounce_interval.reset_immediately();
}
None => {
debug!("shutting down, actor receiver gone");
break;
}
}
},
else => {
break;
}
}
}
Expand Down
39 changes: 27 additions & 12 deletions net-tools/netwatch/src/netmon/bsd.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::Result;
#[cfg(any(target_os = "macos", target_os = "ios"))]
use libc::{RTAX_DST, RTAX_IFP};
use tokio::{io::AsyncReadExt, sync::mpsc, task::JoinHandle};
use tokio::{io::AsyncReadExt, sync::mpsc};
use tokio_util::task::AbortOnDropHandle;
use tracing::{trace, warn};

use super::actor::NetworkMessage;
Expand All @@ -11,22 +12,23 @@ use crate::{interfaces::bsd::WireMessage, ip::is_link_local};

#[derive(Debug)]
pub(super) struct RouteMonitor {
handle: JoinHandle<()>,
_handle: AbortOnDropHandle<()>,
}

impl Drop for RouteMonitor {
fn drop(&mut self) {
self.handle.abort();
}
fn create_socket() -> Result<tokio::net::UnixStream> {
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
socket.set_nonblocking(true)?;
let socket_std: std::os::unix::net::UnixStream = socket.into();
let socket: tokio::net::UnixStream = socket_std.try_into()?;

trace!("AF_ROUTE socket bound");

Ok(socket)
}

impl RouteMonitor {
pub(super) fn new(sender: mpsc::Sender<NetworkMessage>) -> Result<Self> {
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
socket.set_nonblocking(true)?;
let socket_std: std::os::unix::net::UnixStream = socket.into();
let mut socket: tokio::net::UnixStream = socket_std.try_into()?;

let mut socket = create_socket()?;
let handle = tokio::task::spawn(async move {
trace!("AF_ROUTE monitor started");

Expand All @@ -52,12 +54,25 @@ impl RouteMonitor {
}
Err(err) => {
warn!("AF_ROUTE: error reading: {:?}", err);
// recreate socket, as it is likely in an invalid state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a small delay in case this is hot-looping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unclear, the reported error doesn't because the socket is already dead

// TODO: distinguish between different errors?
match create_socket() {
Ok(new_socket) => {
socket = new_socket;
}
Err(err) => {
warn!("AF_ROUTE: unable to bind a new socket: {:?}", err);
// TODO: what to do here?
}
}
}
}
}
});

Ok(RouteMonitor { handle })
Ok(RouteMonitor {
_handle: AbortOnDropHandle::new(handle),
})
}
}

Expand Down
Loading