Skip to content

Commit 738c773

Browse files
refactor(iroh): remove genawaiter usage from dht discovery (#3048)
## Description One less dependency in the core of iroh ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent 4a774f1 commit 738c773

File tree

4 files changed

+91
-171
lines changed

4 files changed

+91
-171
lines changed

Cargo.lock

Lines changed: 0 additions & 75 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deny.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,5 @@ license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]
2424

2525
[advisories]
2626
ignore = [
27-
"RUSTSEC-2024-0370", # unmaintained, no upgrade available
2827
"RUSTSEC-2024-0384", # unmaintained, no upgrade available
2928
]

iroh/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ iroh-metrics = { version = "0.29", default-features = false }
115115
# local-swarm-discovery
116116
swarm-discovery = { version = "0.3.0-alpha.1", optional = true }
117117

118-
# dht_discovery
119-
genawaiter = { version = "0.99", features = ["futures03"], optional = true }
120-
121118
# Examples
122119
clap = { version = "4", features = ["derive"], optional = true }
123120
tracing-subscriber = { version = "0.3", features = [
@@ -179,7 +176,7 @@ default = ["metrics", "discovery-pkarr-dht"]
179176
metrics = ["iroh-metrics/metrics", "iroh-relay/metrics", "net-report/metrics", "portmapper/metrics"]
180177
test-utils = ["iroh-relay/test-utils", "iroh-relay/server", "dep:axum"]
181178
discovery-local-network = ["dep:swarm-discovery"]
182-
discovery-pkarr-dht = ["pkarr/dht", "dep:genawaiter"]
179+
discovery-pkarr-dht = ["pkarr/dht"]
183180
examples = [
184181
"dep:clap",
185182
"dep:tracing-subscriber",

iroh/src/discovery/pkarr/dht.rs

Lines changed: 90 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ use std::{
1212
time::Duration,
1313
};
1414

15-
use futures_lite::{stream::Boxed, StreamExt};
16-
use genawaiter::sync::{Co, Gen};
15+
use anyhow::Result;
16+
use futures_lite::{
17+
stream::{Boxed, StreamExt},
18+
FutureExt,
19+
};
1720
use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey};
1821
use pkarr::{
19-
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
20-
RelaySettings, SignedPacket,
22+
PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, RelaySettings,
23+
SignedPacket,
2124
};
2225
use tokio_util::task::AbortOnDropHandle;
2326
use url::Url;
@@ -89,6 +92,73 @@ struct Inner {
8992
republish_delay: Duration,
9093
}
9194

95+
impl Inner {
96+
async fn resolve_relay(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
97+
tracing::info!("resolving {} from relay {:?}", key.to_z32(), self.relay_url);
98+
99+
let maybe_packet = self
100+
.pkarr_relay
101+
.as_ref()
102+
.expect("checked")
103+
.resolve(&key)
104+
.await;
105+
match maybe_packet {
106+
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
107+
Ok(node_info) => {
108+
let node_addr: NodeAddr = node_info.into();
109+
110+
tracing::info!("discovered node info from relay {:?}", node_addr);
111+
Some(Ok(DiscoveryItem {
112+
node_addr,
113+
provenance: "relay",
114+
last_updated: None,
115+
}))
116+
}
117+
Err(_err) => {
118+
tracing::debug!("failed to parse signed packet as node info");
119+
None
120+
}
121+
},
122+
Ok(None) => {
123+
tracing::debug!("no signed packet found in relay");
124+
None
125+
}
126+
Err(err) => {
127+
tracing::debug!("failed to get signed packet from relay: {}", err);
128+
Some(Err(err.into()))
129+
}
130+
}
131+
}
132+
async fn resolve_dht(&self, key: pkarr::PublicKey) -> Option<Result<DiscoveryItem>> {
133+
tracing::info!("resolving {} from DHT", key.to_z32());
134+
135+
let maybe_packet = self.pkarr.resolve(&key).await;
136+
match maybe_packet {
137+
Ok(Some(signed_packet)) => match NodeInfo::from_pkarr_signed_packet(&signed_packet) {
138+
Ok(node_info) => {
139+
let node_addr: NodeAddr = node_info.into();
140+
tracing::info!("discovered node info from DHT {:?}", node_addr);
141+
Some(Ok(DiscoveryItem {
142+
node_addr,
143+
provenance: "mainline",
144+
last_updated: None,
145+
}))
146+
}
147+
Err(_err) => {
148+
tracing::debug!("failed to parse signed packet as node info");
149+
None
150+
}
151+
},
152+
Ok(None) => {
153+
// nothing to do
154+
tracing::debug!("no signed packet found in DHT");
155+
None
156+
}
157+
Err(err) => Some(Err(err.into())),
158+
}
159+
}
160+
}
161+
92162
/// Builder for [`DhtDiscovery`].
93163
///
94164
/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
@@ -179,7 +249,7 @@ impl Builder {
179249
}
180250

181251
/// Builds the discovery mechanism.
182-
pub fn build(self) -> anyhow::Result<DhtDiscovery> {
252+
pub fn build(self) -> Result<DhtDiscovery> {
183253
let pkarr = match self.client {
184254
Some(client) => client,
185255
None => PkarrClient::new(Default::default())?,
@@ -278,90 +348,6 @@ impl DhtDiscovery {
278348
tokio::time::sleep(this.0.republish_delay).await;
279349
}
280350
}
281-
282-
async fn resolve_relay(
283-
&self,
284-
pkarr_public_key: PublicKey,
285-
co: &Co<anyhow::Result<DiscoveryItem>>,
286-
) {
287-
let Some(relay) = &self.0.pkarr_relay else {
288-
return;
289-
};
290-
tracing::info!(
291-
"resolving {} from relay {:?}",
292-
pkarr_public_key.to_z32(),
293-
self.0.relay_url
294-
);
295-
let response = relay.resolve(&pkarr_public_key).await;
296-
match response {
297-
Ok(Some(signed_packet)) => {
298-
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
299-
let node_addr: NodeAddr = node_info.into();
300-
301-
tracing::info!("discovered node info from relay {:?}", node_addr);
302-
co.yield_(Ok(DiscoveryItem {
303-
node_addr,
304-
provenance: "relay",
305-
last_updated: None,
306-
}))
307-
.await;
308-
} else {
309-
tracing::debug!("failed to parse signed packet as node info");
310-
}
311-
}
312-
Ok(None) => {
313-
tracing::debug!("no signed packet found in relay");
314-
}
315-
Err(e) => {
316-
tracing::debug!("failed to get signed packet from relay: {}", e);
317-
co.yield_(Err(e.into())).await;
318-
}
319-
}
320-
}
321-
322-
/// Resolves a node id from the DHT.
323-
async fn resolve_dht(
324-
&self,
325-
pkarr_public_key: PublicKey,
326-
co: &Co<anyhow::Result<DiscoveryItem>>,
327-
) {
328-
if !self.0.dht {
329-
return;
330-
};
331-
tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());
332-
let response = match self.0.pkarr.resolve(&pkarr_public_key).await {
333-
Ok(r) => r,
334-
Err(e) => {
335-
co.yield_(Err(e.into())).await;
336-
return;
337-
}
338-
};
339-
let Some(signed_packet) = response else {
340-
tracing::debug!("no signed packet found in DHT");
341-
return;
342-
};
343-
if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
344-
let node_addr: NodeAddr = node_info.into();
345-
tracing::info!("discovered node info from DHT {:?}", node_addr);
346-
co.yield_(Ok(DiscoveryItem {
347-
node_addr,
348-
provenance: "mainline",
349-
last_updated: None,
350-
}))
351-
.await;
352-
} else {
353-
tracing::debug!("failed to parse signed packet as node info");
354-
}
355-
}
356-
357-
async fn gen_resolve(self, node_id: NodeId, co: Co<anyhow::Result<DiscoveryItem>>) {
358-
let pkarr_public_key =
359-
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
360-
tokio::join!(
361-
self.resolve_dht(pkarr_public_key.clone(), &co),
362-
self.resolve_relay(pkarr_public_key, &co)
363-
);
364-
}
365351
}
366352

367353
impl Discovery for DhtDiscovery {
@@ -395,11 +381,24 @@ impl Discovery for DhtDiscovery {
395381
_endpoint: Endpoint,
396382
node_id: NodeId,
397383
) -> Option<Boxed<anyhow::Result<DiscoveryItem>>> {
398-
let this = self.clone();
399384
let pkarr_public_key =
400385
pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
401386
tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32());
402-
Some(Gen::new(|co| async move { this.gen_resolve(node_id, co).await }).boxed())
387+
388+
let mut stream = futures_buffered::FuturesUnorderedBounded::new(2);
389+
if self.0.pkarr_relay.is_some() {
390+
let key = pkarr_public_key.clone();
391+
let discovery = self.0.clone();
392+
stream.push(async move { discovery.resolve_relay(key).await }.boxed());
393+
}
394+
395+
if self.0.dht {
396+
let key = pkarr_public_key.clone();
397+
let discovery = self.0.clone();
398+
stream.push(async move { discovery.resolve_dht(key).await }.boxed());
399+
}
400+
401+
Some(stream.filter_map(|t| t).boxed())
403402
}
404403
}
405404

0 commit comments

Comments
 (0)