Skip to content

Commit e461cca

Browse files
feat(iroh): improve shutdown interactions (#2980)
## Description - ties the cancellation tokens between `Router` and `Endpoint` closer together - closing `Endpoint` is now idempotent ## Breaking Changes - added `iroh::Endpoint::close` that takes no arguments anymore, it default to using code `0` and an empty message - added `iroh::Endpoint::is_closed` ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 574337a commit e461cca

File tree

10 files changed

+65
-41
lines changed

10 files changed

+65
-41
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ let response = recv.read_to_end(1000).await?;
7777
assert_eq!(&response, b"Hello, world!");
7878

7979
// Close the endpoint and all its connections
80-
endpoint.close(0u32.into(), b"bye!").await?;
80+
endpoint.close().await?;
8181
```
8282

8383
And on the accepting side:

iroh/bench/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl EndpointSelector {
8787
pub async fn close(self) -> Result<()> {
8888
match self {
8989
EndpointSelector::Iroh(endpoint) => {
90-
endpoint.close(0u32.into(), b"").await?;
90+
endpoint.close().await?;
9191
}
9292
#[cfg(not(any(target_os = "freebsd", target_os = "openbsd", target_os = "netbsd")))]
9393
EndpointSelector::Quinn(endpoint) => {

iroh/examples/connect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,6 @@ async fn main() -> anyhow::Result<()> {
9090

9191
// We received the last message: close all connections and allow for the close
9292
// message to be sent.
93-
endpoint.close(0u8.into(), b"bye").await?;
93+
endpoint.close().await?;
9494
Ok(())
9595
}

iroh/examples/echo.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ async fn connect_side(addr: NodeAddr) -> Result<()> {
5353
let response = recv.read_to_end(1000).await?;
5454
assert_eq!(&response, b"Hello, world!");
5555

56-
// Close the endpoint (and all its connections) in one:
57-
endpoint.close(0u32.into(), b"bye!").await?;
58-
5956
Ok(())
6057
}
6158

iroh/examples/transfer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async fn fetch(ticket: &str, relay_url: Option<String>) -> anyhow::Result<()> {
206206
// We received the last message: close all connections and allow for the close
207207
// message to be sent.
208208
tokio::time::timeout(Duration::from_secs(3), async move {
209-
let res = endpoint.close(0u8.into(), b"bye").await;
209+
let res = endpoint.close().await;
210210
if res.is_err() {
211211
println!("failed to close connection: {res:#?}");
212212
}

iroh/src/discovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ impl DiscoveryTask {
405405
debug!("discovery: start");
406406
loop {
407407
let next = tokio::select! {
408-
_ = ep.cancelled() => break,
408+
_ = ep.cancel_token().cancelled() => break,
409409
next = stream.next() => next
410410
};
411411
match next {

iroh/src/endpoint.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use derive_more::Debug;
2626
use futures_lite::{Stream, StreamExt};
2727
use iroh_base::relay_map::RelayMap;
2828
use pin_project::pin_project;
29-
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
29+
use tokio_util::sync::CancellationToken;
3030
use tracing::{debug, instrument, trace, warn};
3131
use url::Url;
3232

@@ -954,38 +954,46 @@ impl Endpoint {
954954

955955
/// Closes the QUIC endpoint and the magic socket.
956956
///
957-
/// This will close all open QUIC connections with the provided error_code and
958-
/// reason. See [`quinn::Connection`] for details on how these are interpreted.
957+
/// This will close any remaining open [`Connection`]s with an error code
958+
/// of `0` and an empty reason. Though it is best practice to close those
959+
/// explicitly before with a custom error code and reason.
959960
///
960-
/// It will then wait for all connections to actually be shutdown, and afterwards close
961-
/// the magic socket. Be aware however that the underlying UDP sockets are only closed
961+
/// It will then make a best effort to wait for all close notifications to be
962+
/// acknowledged by the peers, re-transmitting them if needed. This ensures the
963+
/// peers are aware of the closed connections instead of having to wait for a timeout
964+
/// on the connection. Once all connections are closed or timed out, the magic socket is closed.
965+
///
966+
/// Be aware however that the underlying UDP sockets are only closed
962967
/// on [`Drop`], bearing in mind the [`Endpoint`] is only dropped once all the clones
963968
/// are dropped.
964969
///
965970
/// Returns an error if closing the magic socket failed.
966971
/// TODO: Document error cases.
967-
pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> {
968-
let Endpoint {
969-
msock,
970-
endpoint,
971-
cancel_token,
972-
..
973-
} = self;
974-
cancel_token.cancel();
972+
pub async fn close(&self) -> Result<()> {
973+
if self.is_closed() {
974+
return Ok(());
975+
}
976+
977+
self.cancel_token.cancel();
975978
tracing::debug!("Closing connections");
976-
endpoint.close(error_code, reason);
977-
endpoint.wait_idle().await;
979+
self.endpoint.close(0u16.into(), b"");
980+
self.endpoint.wait_idle().await;
978981

979982
tracing::debug!("Connections closed");
980-
981-
msock.close().await?;
983+
self.msock.close().await?;
982984
Ok(())
983985
}
984986

987+
/// Check if this endpoint is still alive, or already closed.
988+
pub fn is_closed(&self) -> bool {
989+
self.cancel_token.is_cancelled() && self.msock.is_closed()
990+
}
991+
985992
// # Remaining private methods
986993

987-
pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> {
988-
self.cancel_token.cancelled()
994+
/// Expose the internal [`CancellationToken`] to link shutdowns.
995+
pub(crate) fn cancel_token(&self) -> &CancellationToken {
996+
&self.cancel_token
989997
}
990998

991999
/// Return the quic mapped address for this `node_id` and possibly start discovery
@@ -1600,7 +1608,7 @@ mod tests {
16001608

16011609
info!("closing endpoint");
16021610
// close the endpoint and restart it
1603-
endpoint.close(0u32.into(), b"done").await.unwrap();
1611+
endpoint.close().await.unwrap();
16041612

16051613
info!("restarting endpoint");
16061614
// now restart it and check the addressing info of the peer
@@ -1699,7 +1707,7 @@ mod tests {
16991707
send.stopped().await.unwrap();
17001708
recv.read_to_end(0).await.unwrap();
17011709
info!("client finished");
1702-
ep.close(0u32.into(), &[]).await.unwrap();
1710+
ep.close().await.unwrap();
17031711
info!("client closed");
17041712
}
17051713
.instrument(error_span!("client", %i))

iroh/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@
173173
//!
174174
//! // Gracefully close the connection and endpoint.
175175
//! conn.close(1u8.into(), b"done");
176-
//! ep.close(0u8.into(), b"ep closing").await?;
176+
//! ep.close().await?;
177177
//! println!("Client closed");
178178
//! Ok(())
179179
//! }
@@ -202,7 +202,7 @@
202202
//!
203203
//! // Wait for the client to close the connection and gracefully close the endpoint.
204204
//! conn.closed().await;
205-
//! ep.close(0u8.into(), b"ep closing").await?;
205+
//! ep.close().await?;
206206
//! Ok(())
207207
//! }
208208
//! ```

iroh/src/magicsock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl MagicSock {
274274
self.closing.load(Ordering::Relaxed)
275275
}
276276

277-
fn is_closed(&self) -> bool {
277+
pub(crate) fn is_closed(&self) -> bool {
278278
self.closed.load(Ordering::SeqCst)
279279
}
280280

@@ -3238,8 +3238,8 @@ mod tests {
32383238
println!("closing endpoints");
32393239
let msock1 = m1.endpoint.magic_sock();
32403240
let msock2 = m2.endpoint.magic_sock();
3241-
m1.endpoint.close(0u32.into(), b"done").await?;
3242-
m2.endpoint.close(0u32.into(), b"done").await?;
3241+
m1.endpoint.close().await?;
3242+
m2.endpoint.close().await?;
32433243

32443244
assert!(msock1.msock.is_closed());
32453245
assert!(msock2.msock.is_closed());

iroh/src/protocol.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,10 @@ impl RouterBuilder {
268268
let mut join_set = JoinSet::new();
269269
let endpoint = self.endpoint.clone();
270270
let protos = protocols.clone();
271-
let cancel = CancellationToken::new();
271+
272+
// We use a child token of the endpoint, to ensure that this is shutdown
273+
// when the endpoint is shutdown, but that we can shutdown ourselves independently.
274+
let cancel = endpoint.cancel_token().child_token();
272275
let cancel_token = cancel.clone();
273276

274277
let run_loop_fut = async move {
@@ -341,15 +344,10 @@ impl RouterBuilder {
341344

342345
/// Shutdown the different parts of the router concurrently.
343346
async fn shutdown(endpoint: &Endpoint, protocols: Arc<ProtocolMap>) {
344-
let error_code = 1u16;
345-
346347
// We ignore all errors during shutdown.
347348
let _ = tokio::join!(
348349
// Close the endpoint.
349-
// Closing the Endpoint is the equivalent of calling Connection::close on all
350-
// connections: Operations will immediately fail with ConnectionError::LocallyClosed.
351-
// All streams are interrupted, this is not graceful.
352-
endpoint.close(error_code.into(), b"provider terminating"),
350+
endpoint.close(),
353351
// Shutdown protocol handlers.
354352
protocols.shutdown(),
355353
);
@@ -378,3 +376,24 @@ async fn handle_connection(incoming: crate::endpoint::Incoming, protocols: Arc<P
378376
warn!("Handling incoming connection ended with error: {err}");
379377
}
380378
}
379+
380+
#[cfg(test)]
381+
mod tests {
382+
use super::*;
383+
384+
#[tokio::test]
385+
async fn test_shutdown() -> Result<()> {
386+
let endpoint = Endpoint::builder().bind().await?;
387+
let router = Router::builder(endpoint.clone()).spawn().await?;
388+
389+
assert!(!router.is_shutdown());
390+
assert!(!endpoint.is_closed());
391+
392+
router.shutdown().await?;
393+
394+
assert!(router.is_shutdown());
395+
assert!(endpoint.is_closed());
396+
397+
Ok(())
398+
}
399+
}

0 commit comments

Comments
 (0)