Skip to content

Commit 9cef520

Browse files
authored
refactor(iroh): Remove CancellationToken from Endpoint (#3101)
## Description The internal CancellationToke was used to know by other parts of the code when the endpoint is shut down. But those bits of code already have mechanisms to do so. This bit of API makes is a bit of extra complexity that is not needed. ## Breaking Changes None, this is internal. ## Notes & open questions Closes #3096. Closes #3098 (replaces). Maybe not directly but now there's an example of how to write an accept loop without having to rely on the CancellationToken. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent c650ea8 commit 9cef520

File tree

3 files changed

+26
-45
lines changed

3 files changed

+26
-45
lines changed

iroh/src/discovery.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ use std::{collections::BTreeSet, net::SocketAddr, time::Duration};
116116
use anyhow::{anyhow, ensure, Result};
117117
use futures_lite::stream::{Boxed as BoxStream, StreamExt};
118118
use iroh_base::{NodeAddr, NodeId, RelayUrl};
119-
use tokio::{sync::oneshot, task::JoinHandle};
119+
use tokio::sync::oneshot;
120+
use tokio_util::task::AbortOnDropHandle;
120121
use tracing::{debug, error_span, warn, Instrument};
121122

122123
use crate::Endpoint;
@@ -285,7 +286,7 @@ const MAX_AGE: Duration = Duration::from_secs(10);
285286
/// A wrapper around a tokio task which runs a node discovery.
286287
pub(super) struct DiscoveryTask {
287288
on_first_rx: oneshot::Receiver<Result<()>>,
288-
task: JoinHandle<()>,
289+
task: AbortOnDropHandle<()>,
289290
}
290291

291292
impl DiscoveryTask {
@@ -299,7 +300,10 @@ impl DiscoveryTask {
299300
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
300301
),
301302
);
302-
Ok(Self { task, on_first_rx })
303+
Ok(Self {
304+
task: AbortOnDropHandle::new(task),
305+
on_first_rx,
306+
})
303307
}
304308

305309
/// Starts a discovery task after a delay and only if no path to the node was recently active.
@@ -340,7 +344,10 @@ impl DiscoveryTask {
340344
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
341345
),
342346
);
343-
Ok(Some(Self { task, on_first_rx }))
347+
Ok(Some(Self {
348+
task: AbortOnDropHandle::new(task),
349+
on_first_rx,
350+
}))
344351
}
345352

346353
/// Waits until the discovery task produced at least one result.
@@ -350,11 +357,6 @@ impl DiscoveryTask {
350357
Ok(())
351358
}
352359

353-
/// Cancels the discovery task.
354-
pub(super) fn cancel(&self) {
355-
self.task.abort();
356-
}
357-
358360
fn create_stream(ep: &Endpoint, node_id: NodeId) -> Result<BoxStream<Result<DiscoveryItem>>> {
359361
let discovery = ep
360362
.discovery()
@@ -400,11 +402,7 @@ impl DiscoveryTask {
400402
let mut on_first_tx = Some(on_first_tx);
401403
debug!("discovery: start");
402404
loop {
403-
let next = tokio::select! {
404-
_ = ep.cancel_token().cancelled() => break,
405-
next = stream.next() => next
406-
};
407-
match next {
405+
match stream.next().await {
408406
Some(Ok(r)) => {
409407
if r.node_addr.is_empty() {
410408
debug!(provenance = %r.provenance, "discovery: empty address found");

iroh/src/endpoint.rs

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ use std::{
2323
};
2424

2525
use anyhow::{bail, Context, Result};
26-
use derive_more::Debug;
2726
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey};
2827
use iroh_relay::RelayMap;
2928
use pin_project::pin_project;
30-
use tokio_util::sync::CancellationToken;
3129
use tracing::{debug, instrument, trace, warn};
3230
use url::Url;
3331

@@ -92,7 +90,7 @@ pub enum PathSelection {
9290
/// new [`NodeId`].
9391
///
9492
/// To create the [`Endpoint`] call [`Builder::bind`].
95-
#[derive(Debug)]
93+
#[derive(derive_more::Debug)]
9694
pub struct Builder {
9795
secret_key: Option<SecretKey>,
9896
relay_mode: RelayMode,
@@ -510,7 +508,6 @@ pub struct Endpoint {
510508
msock: Handle,
511509
endpoint: quinn::Endpoint,
512510
rtt_actor: Arc<rtt_actor::RttHandle>,
513-
cancel_token: CancellationToken,
514511
static_config: Arc<StaticConfig>,
515512
}
516513

@@ -561,7 +558,6 @@ impl Endpoint {
561558
msock,
562559
endpoint,
563560
rtt_actor: Arc::new(rtt_actor::RttHandle::new()),
564-
cancel_token: CancellationToken::new(),
565561
static_config: Arc::new(static_config),
566562
})
567563
}
@@ -618,10 +614,11 @@ impl Endpoint {
618614
let node_id = node_addr.node_id;
619615
let direct_addresses = node_addr.direct_addresses.clone();
620616

621-
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this address.
622-
// Start discovery for this node if it's enabled and we have no valid or verified
623-
// address information for this node.
624-
let (addr, discovery) = self
617+
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
618+
// address. Start discovery for this node if it's enabled and we have no valid or
619+
// verified address information for this node. Dropping the discovery cancels any
620+
// still running task.
621+
let (addr, _discovery_drop_guard) = self
625622
.get_mapping_addr_and_maybe_start_discovery(node_addr)
626623
.await
627624
.with_context(|| {
@@ -636,16 +633,9 @@ impl Endpoint {
636633
node_id, addr, direct_addresses
637634
);
638635

639-
// Start connecting via quinn. This will time out after 10 seconds if no reachable address
640-
// is available.
641-
let conn = self.connect_quinn(node_id, alpn, addr).await;
642-
643-
// Cancel the node discovery task (if still running).
644-
if let Some(discovery) = discovery {
645-
discovery.cancel();
646-
}
647-
648-
conn
636+
// Start connecting via quinn. This will time out after 10 seconds if no reachable
637+
// address is available.
638+
self.connect_quinn(node_id, alpn, addr).await
649639
}
650640

651641
#[instrument(
@@ -990,7 +980,6 @@ impl Endpoint {
990980
return Ok(());
991981
}
992982

993-
self.cancel_token.cancel();
994983
tracing::debug!("Closing connections");
995984
self.endpoint.close(0u16.into(), b"");
996985
self.endpoint.wait_idle().await;
@@ -1002,16 +991,11 @@ impl Endpoint {
1002991

1003992
/// Check if this endpoint is still alive, or already closed.
1004993
pub fn is_closed(&self) -> bool {
1005-
self.cancel_token.is_cancelled() && self.msock.is_closed()
994+
self.msock.is_closed()
1006995
}
1007996

1008997
// # Remaining private methods
1009998

1010-
/// Expose the internal [`CancellationToken`] to link shutdowns.
1011-
pub(crate) fn cancel_token(&self) -> &CancellationToken {
1012-
&self.cancel_token
1013-
}
1014-
1015999
/// Return the quic mapped address for this `node_id` and possibly start discovery
10161000
/// services if discovery is enabled on this magic endpoint.
10171001
///
@@ -1085,7 +1069,7 @@ impl Endpoint {
10851069
}
10861070

10871071
/// Future produced by [`Endpoint::accept`].
1088-
#[derive(Debug)]
1072+
#[derive(derive_more::Debug)]
10891073
#[pin_project]
10901074
pub struct Accept<'a> {
10911075
#[pin]

iroh/src/protocol.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,8 @@ impl RouterBuilder {
248248
let mut join_set = JoinSet::new();
249249
let endpoint = self.endpoint.clone();
250250

251-
// We use a child token of the endpoint, to ensure that this is shutdown
252-
// when the endpoint is shutdown, but that we can shutdown ourselves independently.
253-
let cancel = endpoint.cancel_token().child_token();
251+
// Our own shutdown works with a cancellation token.
252+
let cancel = CancellationToken::new();
254253
let cancel_token = cancel.clone();
255254

256255
let run_loop_fut = async move {
@@ -289,7 +288,7 @@ impl RouterBuilder {
289288
// handle incoming p2p connections.
290289
incoming = endpoint.accept() => {
291290
let Some(incoming) = incoming else {
292-
break;
291+
break; // Endpoint is closed.
293292
};
294293

295294
let protocols = protocols.clone();

0 commit comments

Comments
 (0)