Skip to content

Commit 35e9873

Browse files
authored
refactor!: Migrate to tokio's AbortOnDropHandle (#2701)
## Description This migrates us to tokio's AbortOnDropHandle from the various versions we had for this ourselves. Only for iroh::node::Node we keep a local version around since it is both Clone *and* awaits the tasks. Generally it's probably better to not make this kind of design though, if possible. ## Breaking Changes Removes the `iroh_net::util` module. This only contained utilities that should have been internal in the first place. If you were using one of the tools to abort tasks on drop, we recommend migrating to `tokio_util::task::AbortOnDropHandle`. ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - ~~[ ] Tests if relevant.~~ - [x] All breaking changes documented.
1 parent 2e5188a commit 35e9873

File tree

26 files changed

+165
-219
lines changed

26 files changed

+165
-219
lines changed

Cargo.lock

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ tempfile = "3.10.1"
6161
thiserror = "1.0.58"
6262
time = { version = "0.3", features = ["formatting"] }
6363
tokio = { version = "1.36.0", features = ["full"] }
64+
tokio-util = { version = "0.7.12", features = ["rt"] }
6465
toml = { version = "0.8.12", features = ["preserve_order"] }
6566
tracing = "0.1.40"
6667
tracing-appender = "0.2.3"

iroh-cli/src/commands/doctor.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use iroh::{
3535
netcheck, portmapper,
3636
relay::{RelayMap, RelayMode, RelayUrl},
3737
ticket::NodeTicket,
38-
util::CancelOnDrop,
3938
Endpoint, NodeAddr, NodeId,
4039
},
4140
util::{path::IrohPaths, progress::ProgressWriter},
@@ -44,6 +43,7 @@ use portable_atomic::AtomicU64;
4443
use postcard::experimental::max_size::MaxSize;
4544
use serde::{Deserialize, Serialize};
4645
use tokio::{io::AsyncWriteExt, sync};
46+
use tokio_util::task::AbortOnDropHandle;
4747

4848
use iroh::net::metrics::MagicsockMetrics;
4949
use iroh_metrics::core::Core;
@@ -372,7 +372,7 @@ struct Gui {
372372
recv_pb: ProgressBar,
373373
echo_pb: ProgressBar,
374374
#[allow(dead_code)]
375-
counter_task: Option<CancelOnDrop>,
375+
counter_task: Option<AbortOnDropHandle<()>>,
376376
}
377377

378378
impl Gui {
@@ -412,10 +412,7 @@ impl Gui {
412412
send_pb,
413413
recv_pb,
414414
echo_pb,
415-
counter_task: Some(CancelOnDrop::new(
416-
"counter_task",
417-
counter_task.abort_handle(),
418-
)),
415+
counter_task: Some(AbortOnDropHandle::new(counter_task)),
419416
}
420417
}
421418

iroh-docs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ tempfile = { version = "3.4" }
4444
thiserror = "1"
4545
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
4646
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
47-
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
47+
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "io-util", "io", "rt"] }
4848
tracing = "0.1"
4949

5050
[dev-dependencies]

iroh-docs/src/engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use futures_lite::{Stream, StreamExt};
1414
use iroh_blobs::downloader::Downloader;
1515
use iroh_blobs::{store::EntryStatus, Hash};
1616
use iroh_gossip::net::Gossip;
17-
use iroh_net::util::SharedAbortingJoinHandle;
1817
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
1918
use serde::{Deserialize, Serialize};
2019
use tokio::sync::{mpsc, oneshot};
20+
use tokio_util::task::AbortOnDropHandle;
2121
use tracing::{error, error_span, Instrument};
2222

2323
use crate::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId};
@@ -49,7 +49,7 @@ pub struct Engine {
4949
pub default_author: Arc<DefaultAuthor>,
5050
to_live_actor: mpsc::Sender<ToLiveActor>,
5151
#[allow(dead_code)]
52-
actor_handle: SharedAbortingJoinHandle<()>,
52+
actor_handle: Arc<AbortOnDropHandle<()>>,
5353
#[debug("ContentStatusCallback")]
5454
content_status_cb: ContentStatusCallback,
5555
}
@@ -108,7 +108,7 @@ impl Engine {
108108
endpoint,
109109
sync,
110110
to_live_actor: live_actor_tx,
111-
actor_handle: actor_handle.into(),
111+
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
112112
content_status_cb,
113113
default_author: Arc::new(default_author),
114114
})

iroh-gossip/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ futures-concurrency = { version = "7.6.1", optional = true }
3535
futures-util = { version = "0.3.30", optional = true }
3636
iroh-net = { path = "../iroh-net", version = "0.24.0", optional = true, default-features = false }
3737
tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] }
38-
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
38+
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "rt"] }
3939
tracing = "0.1"
4040

4141
[dev-dependencies]

iroh-gossip/src/net.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,19 @@ use iroh_net::{
1313
dialer::Dialer,
1414
endpoint::{get_remote_node_id, Connection},
1515
key::PublicKey,
16-
util::SharedAbortingJoinHandle,
1716
AddrInfo, Endpoint, NodeAddr, NodeId,
1817
};
1918
use rand::rngs::StdRng;
2019
use rand_core::SeedableRng;
2120
use std::{
2221
collections::{BTreeSet, HashMap, HashSet, VecDeque},
2322
pin::Pin,
23+
sync::Arc,
2424
task::{Context, Poll},
2525
time::Instant,
2626
};
2727
use tokio::{sync::mpsc, task::JoinSet};
28+
use tokio_util::task::AbortOnDropHandle;
2829
use tracing::{debug, error_span, trace, warn, Instrument};
2930

3031
use self::util::{read_message, write_message, Timers};
@@ -90,7 +91,7 @@ type ProtoMessage = proto::Message<PublicKey>;
9091
pub struct Gossip {
9192
to_actor_tx: mpsc::Sender<ToActor>,
9293
on_direct_addrs_tx: mpsc::Sender<Vec<iroh_net::endpoint::DirectAddr>>,
93-
_actor_handle: SharedAbortingJoinHandle<()>,
94+
_actor_handle: Arc<AbortOnDropHandle<()>>,
9495
max_message_size: usize,
9596
}
9697

@@ -138,7 +139,7 @@ impl Gossip {
138139
Self {
139140
to_actor_tx,
140141
on_direct_addrs_tx: on_endpoints_tx,
141-
_actor_handle: actor_handle.into(),
142+
_actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
142143
max_message_size,
143144
}
144145
}

iroh-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ tokio = { version = "1", features = ["io-util", "macros", "sync", "rt", "net", "
6666
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
6767
tokio-tungstenite = "0.21"
6868
tokio-tungstenite-wasm = "0.3"
69-
tokio-util = { version = "0.7", features = ["io-util", "io", "codec"] }
69+
tokio-util = { version = "0.7.12", features = ["io-util", "io", "codec", "rt"] }
7070
tracing = "0.1"
7171
tungstenite = "0.21"
7272
url = { version = "2.4", features = ["serde"] }

iroh-net/src/discovery.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,9 @@ mod tests {
343343

344344
use parking_lot::Mutex;
345345
use rand::Rng;
346+
use tokio_util::task::AbortOnDropHandle;
346347

347-
use crate::{key::SecretKey, relay::RelayMode, util::AbortingJoinHandle};
348+
use crate::{key::SecretKey, relay::RelayMode};
348349

349350
use super::*;
350351

@@ -588,7 +589,7 @@ mod tests {
588589
async fn new_endpoint(
589590
secret: SecretKey,
590591
disco: impl Discovery + 'static,
591-
) -> (Endpoint, AbortingJoinHandle<anyhow::Result<()>>) {
592+
) -> (Endpoint, AbortOnDropHandle<anyhow::Result<()>>) {
592593
let ep = Endpoint::builder()
593594
.secret_key(secret)
594595
.discovery(Box::new(disco))
@@ -611,7 +612,7 @@ mod tests {
611612
}
612613
});
613614

614-
(ep, AbortingJoinHandle::from(handle))
615+
(ep, AbortOnDropHandle::new(handle))
615616
}
616617

617618
fn system_time_now() -> u64 {
@@ -632,6 +633,7 @@ mod test_dns_pkarr {
632633

633634
use anyhow::Result;
634635
use iroh_base::key::SecretKey;
636+
use tokio_util::task::AbortOnDropHandle;
635637

636638
use crate::{
637639
discovery::pkarr::PkarrPublisher,
@@ -642,7 +644,6 @@ mod test_dns_pkarr {
642644
pkarr_dns_state::State,
643645
run_relay_server, DnsPkarrServer,
644646
},
645-
util::AbortingJoinHandle,
646647
AddrInfo, Endpoint, NodeAddr,
647648
};
648649

@@ -753,7 +754,7 @@ mod test_dns_pkarr {
753754
async fn ep_with_discovery(
754755
relay_map: &RelayMap,
755756
dns_pkarr_server: &DnsPkarrServer,
756-
) -> Result<(Endpoint, AbortingJoinHandle<Result<()>>)> {
757+
) -> Result<(Endpoint, AbortOnDropHandle<Result<()>>)> {
757758
let secret_key = SecretKey::generate();
758759
let ep = Endpoint::builder()
759760
.relay_mode(RelayMode::Custom(relay_map.clone()))
@@ -778,6 +779,6 @@ mod test_dns_pkarr {
778779
}
779780
});
780781

781-
Ok((ep, AbortingJoinHandle::from(handle)))
782+
Ok((ep, AbortOnDropHandle::new(handle)))
782783
}
783784
}

iroh-net/src/discovery/local_swarm_discovery.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use tracing::{debug, error, trace, warn};
1717
use iroh_base::key::PublicKey;
1818
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
1919
use tokio::{sync::mpsc, task::JoinSet};
20+
use tokio_util::task::AbortOnDropHandle;
2021

2122
use crate::{
2223
discovery::{Discovery, DiscoveryItem},
23-
util::AbortingJoinHandle,
2424
AddrInfo, Endpoint, NodeId,
2525
};
2626

@@ -37,7 +37,7 @@ const DISCOVERY_DURATION: Duration = Duration::from_secs(10);
3737
#[derive(Debug)]
3838
pub struct LocalSwarmDiscovery {
3939
#[allow(dead_code)]
40-
handle: AbortingJoinHandle<()>,
40+
handle: AbortOnDropHandle<()>,
4141
sender: mpsc::Sender<Message>,
4242
}
4343

@@ -192,7 +192,7 @@ impl LocalSwarmDiscovery {
192192
}
193193
});
194194
Ok(Self {
195-
handle: handle.into(),
195+
handle: AbortOnDropHandle::new(handle),
196196
sender: send,
197197
})
198198
}

iroh-net/src/discovery/pkarr/dht.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use crate::{
1818
},
1919
dns::node_info::NodeInfo,
2020
key::SecretKey,
21-
util::AbortingJoinHandle,
2221
AddrInfo, Endpoint, NodeId,
2322
};
2423
use futures_lite::StreamExt;
@@ -27,6 +26,7 @@ use pkarr::{
2726
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
2827
RelaySettings, SignedPacket,
2928
};
29+
use tokio_util::task::AbortOnDropHandle;
3030
use url::Url;
3131

3232
/// Republish delay for the DHT. This is only for when the info does not change.
@@ -59,8 +59,9 @@ struct Inner {
5959
#[debug("Option<PkarrRelayClientAsync>")]
6060
pkarr_relay: Option<PkarrRelayClientAsync>,
6161
/// The background task that periodically publishes the node address.
62-
/// Due to AbortingJoinHandle, this will be aborted when the discovery is dropped.
63-
task: Mutex<Option<AbortingJoinHandle<()>>>,
62+
///
63+
/// Due to [`AbortOnDropHandle`], this will be aborted when the discovery is dropped.
64+
task: Mutex<Option<AbortOnDropHandle<()>>>,
6465
/// Optional keypair for signing the DNS packets.
6566
///
6667
/// If this is None, the node will not publish its address to the DHT.
@@ -370,7 +371,7 @@ impl Discovery for DhtDiscovery {
370371
let this = self.clone();
371372
let curr = tokio::spawn(this.publish_loop(keypair.clone(), signed_packet));
372373
let mut task = self.0.task.lock().unwrap();
373-
*task = Some(curr.into());
374+
*task = Some(AbortOnDropHandle::new(curr));
374375
}
375376

376377
fn resolve(

iroh-net/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub mod relay;
133133
pub mod stun;
134134
pub mod ticket;
135135
pub mod tls;
136-
pub mod util;
136+
pub(crate) mod util;
137137

138138
pub use endpoint::{AddrInfo, Endpoint, NodeAddr};
139139

iroh-net/src/magicsock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2816,11 +2816,11 @@ mod tests {
28162816
use anyhow::Context;
28172817
use iroh_test::CallOnDrop;
28182818
use rand::RngCore;
2819+
use tokio_util::task::AbortOnDropHandle;
28192820

28202821
use crate::defaults::staging::EU_RELAY_HOSTNAME;
28212822
use crate::relay::RelayMode;
28222823
use crate::tls;
2823-
use crate::util::AbortingJoinHandle;
28242824
use crate::Endpoint;
28252825

28262826
use super::*;
@@ -3761,7 +3761,7 @@ mod tests {
37613761
}
37623762
.instrument(info_span!("ep2.accept, me = node_id_2.fmt_short()"))
37633763
});
3764-
let _accept_task = AbortingJoinHandle::from(accept_task);
3764+
let _accept_task = AbortOnDropHandle::new(accept_task);
37653765

37663766
let node_addr_2 = NodeAddr {
37673767
node_id: node_id_2,
@@ -3826,7 +3826,7 @@ mod tests {
38263826
}
38273827
.instrument(info_span!("ep2.accept", me = node_id_2.fmt_short()))
38283828
});
3829-
let _accept_task = AbortingJoinHandle::from(accept_task);
3829+
let _accept_task = AbortOnDropHandle::new(accept_task);
38303830

38313831
// Add an empty entry in the NodeMap of ep_1
38323832
msock_1.node_map.add_node_addr(

iroh-net/src/netcheck.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ use iroh_metrics::inc;
1717
use tokio::sync::{self, mpsc, oneshot};
1818
use tokio::time::{Duration, Instant};
1919
use tokio_util::sync::CancellationToken;
20+
use tokio_util::task::AbortOnDropHandle;
2021
use tracing::{debug, error, info_span, trace, warn, Instrument};
2122

2223
use crate::dns::DnsResolver;
2324
use crate::net::{IpFamily, UdpSocket};
2425
use crate::relay::RelayUrl;
25-
use crate::util::CancelOnDrop;
2626

2727
use super::portmapper;
2828
use super::relay::RelayMap;
@@ -174,7 +174,7 @@ pub struct Client {
174174
/// the actor will terminate.
175175
addr: Addr,
176176
/// Ensures the actor is terminated when the client is dropped.
177-
_drop_guard: Arc<CancelOnDrop>,
177+
_drop_guard: Arc<AbortOnDropHandle<()>>,
178178
}
179179

180180
#[derive(Debug)]
@@ -210,7 +210,7 @@ impl Client {
210210
let addr = actor.addr();
211211
let task =
212212
tokio::spawn(async move { actor.run().await }.instrument(info_span!("netcheck.actor")));
213-
let drop_guard = CancelOnDrop::new("netcheck actor", task.abort_handle());
213+
let drop_guard = AbortOnDropHandle::new(task);
214214
Ok(Client {
215215
addr,
216216
_drop_guard: Arc::new(drop_guard),

0 commit comments

Comments
 (0)