Skip to content

Commit c96b032

Browse files
authored
fix(iroh-net): Make sure the rtt-actor is shutdown correctly (#2914)
## Description When the endpoint is dropped the rtt-actor should be shut down. This makes sure this actually happens. Fixes #2911 ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions Also make sure the task is aborted on drop. We should always do this anyway. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent bf603e8 commit c96b032

File tree

1 file changed

+45
-10
lines changed

1 file changed

+45
-10
lines changed

iroh-net/src/endpoint/rtt_actor.rs

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ use iroh_base::key::NodeId;
88
use iroh_metrics::inc;
99
use tokio::{
1010
sync::{mpsc, Notify},
11-
task::JoinHandle,
1211
time::Duration,
1312
};
14-
use tracing::{debug, error, info_span, trace, warn, Instrument};
13+
use tokio_util::task::AbortOnDropHandle;
14+
use tracing::{debug, error, info_span, trace, Instrument};
1515

1616
use crate::{
1717
magicsock::{ConnectionType, ConnectionTypeStream},
@@ -21,7 +21,7 @@ use crate::{
2121
#[derive(Debug)]
2222
pub(super) struct RttHandle {
2323
// We should and some point use this to propagate panics and errors.
24-
pub(super) _handle: JoinHandle<()>,
24+
pub(super) _handle: AbortOnDropHandle<()>,
2525
pub(super) msg_tx: mpsc::Sender<RttMessage>,
2626
}
2727

@@ -33,13 +33,16 @@ impl RttHandle {
3333
tick: Notify::new(),
3434
};
3535
let (msg_tx, msg_rx) = mpsc::channel(16);
36-
let _handle = tokio::spawn(
36+
let handle = tokio::spawn(
3737
async move {
3838
actor.run(msg_rx).await;
3939
}
4040
.instrument(info_span!("rtt-actor")),
4141
);
42-
Self { _handle, msg_tx }
42+
Self {
43+
_handle: AbortOnDropHandle::new(handle),
44+
msg_tx,
45+
}
4346
}
4447
}
4548

@@ -87,12 +90,18 @@ impl RttActor {
8790
cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
8891
loop {
8992
tokio::select! {
90-
Some(msg) = msg_rx.recv() => self.handle_msg(msg),
91-
item = self.connection_events.next(),
92-
if !self.connection_events.is_empty() => self.do_reset_rtt(item),
93+
biased;
94+
msg = msg_rx.recv() => {
95+
match msg {
96+
Some(msg) => self.handle_msg(msg),
97+
None => break,
98+
}
99+
}
100+
item = self.connection_events.next(), if !self.connection_events.is_empty() => {
101+
self.do_reset_rtt(item);
102+
}
93103
_ = cleanup_interval.tick() => self.do_connections_cleanup(),
94104
() = self.tick.notified() => continue,
95-
else => break,
96105
}
97106
}
98107
debug!("rtt-actor finished");
@@ -156,7 +165,7 @@ impl RttActor {
156165
None => error!("No connection found for stream item"),
157166
},
158167
None => {
159-
warn!("self.conn_type_changes is empty but was polled");
168+
trace!("No more connections");
160169
}
161170
}
162171
}
@@ -171,3 +180,29 @@ impl RttActor {
171180
}
172181
}
173182
}
183+
184+
#[cfg(test)]
185+
mod tests {
186+
use super::*;
187+
188+
#[tokio::test]
189+
async fn test_actor_mspc_close() {
190+
let mut actor = RttActor {
191+
connection_events: stream_group::StreamGroup::new().keyed(),
192+
connections: HashMap::new(),
193+
tick: Notify::new(),
194+
};
195+
let (msg_tx, msg_rx) = mpsc::channel(16);
196+
let handle = tokio::spawn(async move {
197+
actor.run(msg_rx).await;
198+
});
199+
200+
// Dropping the msg_tx should stop the actor
201+
drop(msg_tx);
202+
203+
let task_res = tokio::time::timeout(Duration::from_secs(5), handle)
204+
.await
205+
.expect("timeout - actor did not finish");
206+
assert!(task_res.is_ok());
207+
}
208+
}

0 commit comments

Comments
 (0)