Skip to content

Commit ff245cd

Browse files
authored
Merge 9f5e0c5 into 5aba17e
2 parents 5aba17e + 9f5e0c5 commit ff245cd

File tree

3 files changed

+26
-45
lines changed

3 files changed

+26
-45
lines changed

iroh/src/discovery.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ use std::{collections::BTreeSet, net::SocketAddr, time::Duration};
116116
use anyhow::{anyhow, ensure, Result};
117117
use futures_lite::stream::{Boxed as BoxStream, StreamExt};
118118
use iroh_base::{NodeAddr, NodeId, RelayUrl};
119-
use tokio::{sync::oneshot, task::JoinHandle};
119+
use tokio::sync::oneshot;
120+
use tokio_util::task::AbortOnDropHandle;
120121
use tracing::{debug, error_span, warn, Instrument};
121122

122123
use crate::Endpoint;
@@ -285,7 +286,7 @@ const MAX_AGE: Duration = Duration::from_secs(10);
285286
/// A wrapper around a tokio task which runs a node discovery.
286287
pub(super) struct DiscoveryTask {
287288
on_first_rx: oneshot::Receiver<Result<()>>,
288-
task: JoinHandle<()>,
289+
task: AbortOnDropHandle<()>,
289290
}
290291

291292
impl DiscoveryTask {
@@ -299,7 +300,10 @@ impl DiscoveryTask {
299300
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
300301
),
301302
);
302-
Ok(Self { task, on_first_rx })
303+
Ok(Self {
304+
task: AbortOnDropHandle::new(task),
305+
on_first_rx,
306+
})
303307
}
304308

305309
/// Starts a discovery task after a delay and only if no path to the node was recently active.
@@ -340,7 +344,10 @@ impl DiscoveryTask {
340344
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
341345
),
342346
);
343-
Ok(Some(Self { task, on_first_rx }))
347+
Ok(Some(Self {
348+
task: AbortOnDropHandle::new(task),
349+
on_first_rx,
350+
}))
344351
}
345352

346353
/// Waits until the discovery task produced at least one result.
@@ -350,11 +357,6 @@ impl DiscoveryTask {
350357
Ok(())
351358
}
352359

353-
/// Cancels the discovery task.
354-
pub(super) fn cancel(&self) {
355-
self.task.abort();
356-
}
357-
358360
fn create_stream(ep: &Endpoint, node_id: NodeId) -> Result<BoxStream<Result<DiscoveryItem>>> {
359361
let discovery = ep
360362
.discovery()
@@ -400,11 +402,7 @@ impl DiscoveryTask {
400402
let mut on_first_tx = Some(on_first_tx);
401403
debug!("discovery: start");
402404
loop {
403-
let next = tokio::select! {
404-
_ = ep.cancel_token().cancelled() => break,
405-
next = stream.next() => next
406-
};
407-
match next {
405+
match stream.next().await {
408406
Some(Ok(r)) => {
409407
if r.node_addr.is_empty() {
410408
debug!(provenance = %r.provenance, "discovery: empty address found");

iroh/src/endpoint.rs

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ use std::{
2323
};
2424

2525
use anyhow::{bail, Context, Result};
26-
use derive_more::Debug;
2726
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey};
2827
use iroh_relay::RelayMap;
2928
use pin_project::pin_project;
30-
use tokio_util::sync::CancellationToken;
3129
use tracing::{debug, instrument, trace, warn};
3230
use url::Url;
3331

@@ -92,7 +90,7 @@ pub enum PathSelection {
9290
/// new [`NodeId`].
9391
///
9492
/// To create the [`Endpoint`] call [`Builder::bind`].
95-
#[derive(Debug)]
93+
#[derive(derive_more::Debug)]
9694
pub struct Builder {
9795
secret_key: Option<SecretKey>,
9896
relay_mode: RelayMode,
@@ -510,7 +508,6 @@ pub struct Endpoint {
510508
msock: Handle,
511509
endpoint: quinn::Endpoint,
512510
rtt_actor: Arc<rtt_actor::RttHandle>,
513-
cancel_token: CancellationToken,
514511
static_config: Arc<StaticConfig>,
515512
}
516513

@@ -561,7 +558,6 @@ impl Endpoint {
561558
msock,
562559
endpoint,
563560
rtt_actor: Arc::new(rtt_actor::RttHandle::new()),
564-
cancel_token: CancellationToken::new(),
565561
static_config: Arc::new(static_config),
566562
})
567563
}
@@ -618,10 +614,11 @@ impl Endpoint {
618614
let node_id = node_addr.node_id;
619615
let direct_addresses = node_addr.direct_addresses.clone();
620616

621-
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this address.
622-
// Start discovery for this node if it's enabled and we have no valid or verified
623-
// address information for this node.
624-
let (addr, discovery) = self
617+
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
618+
// address. Start discovery for this node if it's enabled and we have no valid or
619+
// verified address information for this node. Dropping the discovery cancels any
620+
// still running task.
621+
let (addr, _discovery_drop_guard) = self
625622
.get_mapping_addr_and_maybe_start_discovery(node_addr)
626623
.await
627624
.with_context(|| {
@@ -636,16 +633,9 @@ impl Endpoint {
636633
node_id, addr, direct_addresses
637634
);
638635

639-
// Start connecting via quinn. This will time out after 10 seconds if no reachable address
640-
// is available.
641-
let conn = self.connect_quinn(node_id, alpn, addr).await;
642-
643-
// Cancel the node discovery task (if still running).
644-
if let Some(discovery) = discovery {
645-
discovery.cancel();
646-
}
647-
648-
conn
636+
// Start connecting via quinn. This will time out after 10 seconds if no reachable
637+
// address is available.
638+
self.connect_quinn(node_id, alpn, addr).await
649639
}
650640

651641
#[instrument(
@@ -990,7 +980,6 @@ impl Endpoint {
990980
return Ok(());
991981
}
992982

993-
self.cancel_token.cancel();
994983
tracing::debug!("Closing connections");
995984
self.endpoint.close(0u16.into(), b"");
996985
self.endpoint.wait_idle().await;
@@ -1002,16 +991,11 @@ impl Endpoint {
1002991

1003992
/// Check if this endpoint is still alive, or already closed.
1004993
pub fn is_closed(&self) -> bool {
1005-
self.cancel_token.is_cancelled() && self.msock.is_closed()
994+
self.msock.is_closed()
1006995
}
1007996

1008997
// # Remaining private methods
1009998

1010-
/// Expose the internal [`CancellationToken`] to link shutdowns.
1011-
pub(crate) fn cancel_token(&self) -> &CancellationToken {
1012-
&self.cancel_token
1013-
}
1014-
1015999
/// Return the quic mapped address for this `node_id` and possibly start discovery
10161000
/// services if discovery is enabled on this magic endpoint.
10171001
///
@@ -1085,7 +1069,7 @@ impl Endpoint {
10851069
}
10861070

10871071
/// Future produced by [`Endpoint::accept`].
1088-
#[derive(Debug)]
1072+
#[derive(derive_more::Debug)]
10891073
#[pin_project]
10901074
pub struct Accept<'a> {
10911075
#[pin]

iroh/src/protocol.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,8 @@ impl RouterBuilder {
248248
let mut join_set = JoinSet::new();
249249
let endpoint = self.endpoint.clone();
250250

251-
// We use a child token of the endpoint, to ensure that this is shutdown
252-
// when the endpoint is shutdown, but that we can shutdown ourselves independently.
253-
let cancel = endpoint.cancel_token().child_token();
251+
// Our own shutdown works with a cancellation token.
252+
let cancel = CancellationToken::new();
254253
let cancel_token = cancel.clone();
255254

256255
let run_loop_fut = async move {
@@ -289,7 +288,7 @@ impl RouterBuilder {
289288
// handle incoming p2p connections.
290289
incoming = endpoint.accept() => {
291290
let Some(incoming) = incoming else {
292-
break;
291+
break; // Endpoint is closed.
293292
};
294293

295294
let protocols = protocols.clone();

0 commit comments

Comments
 (0)