Skip to content

Commit 9ab494c

Browse files
author
“ramfox”
committed
feat: add DiscoveryItem::user_data method and adjust locally-discovered-nodes example
1 parent 4930837 commit 9ab494c

File tree

2 files changed

+68
-52
lines changed

2 files changed

+68
-52
lines changed

iroh/examples/locally-discovered-nodes.rs

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,66 +5,77 @@
55
//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`LocalSwarmDiscovery`] enabled, it may discover those nodes as well.
66
use std::time::Duration;
77

8-
use iroh::{
9-
discovery::local_swarm_discovery::LocalSwarmDiscovery, endpoint::Source, Endpoint, SecretKey,
10-
};
8+
use anyhow::Result;
9+
use iroh::{node_info::UserData, Endpoint, NodeId};
10+
use n0_future::StreamExt;
11+
use tokio::task::JoinSet;
1112

1213
#[tokio::main]
13-
async fn main() -> anyhow::Result<()> {
14+
async fn main() -> Result<()> {
1415
tracing_subscriber::fmt::init();
15-
println!("locally discovered nodes example!\n");
16-
let mut rng = rand::rngs::OsRng;
17-
let key = SecretKey::generate(&mut rng);
18-
let id = key.public();
19-
println!("creating endpoint {id:?}\n");
20-
let ep = Endpoint::builder()
21-
.secret_key(key)
22-
.discovery(Box::new(LocalSwarmDiscovery::new(id)?))
23-
.bind()
24-
.await?;
16+
println!("Discovering Local Nodes Example!");
2517

26-
let node_count = 5;
27-
println!("creating {node_count} additional endpoints to discover locally:");
28-
let mut discoverable_eps = Vec::with_capacity(node_count);
29-
for _ in 0..node_count {
30-
let key = SecretKey::generate(&mut rng);
31-
let id = key.public();
32-
println!("\t{id:?}");
33-
let ep = Endpoint::builder()
34-
.secret_key(key)
35-
.discovery(Box::new(LocalSwarmDiscovery::new(id)?))
36-
.bind()
37-
.await?;
38-
discoverable_eps.push(ep);
39-
}
18+
let ep = Endpoint::builder().discovery_local_network().bind().await?;
19+
let node_id = ep.node_id();
20+
println!("Created endpoint {}", node_id.fmt_short());
4021

41-
let duration = Duration::from_secs(3);
42-
println!("\nwaiting {duration:?} to allow discovery to occur...\n");
43-
tokio::time::sleep(duration).await;
22+
let user_data = UserData::try_from(String::from("local-nodes-example"))?;
4423

45-
// get an iterator of all the remote nodes this endpoint knows about
46-
let remotes = ep.remote_info_iter();
47-
// filter that list down to the nodes that have a `Source::Discovery` with
48-
// the `service` name [`iroh::discovery::local_swarm_discovery::NAME`]
49-
// If you have a long running node and want to only get the nodes that were
50-
// discovered recently, you can also filter on the `Duration` of the source,
51-
// which indicates how long ago we got information from that source.
52-
let locally_discovered: Vec<_> = remotes
53-
.filter(|remote| {
54-
remote.sources().iter().any(|(source, _duration)| {
55-
if let Source::Discovery { name } = source {
56-
name == iroh::discovery::local_swarm_discovery::NAME
57-
} else {
58-
false
24+
let mut discovery_stream = ep.discovery_stream();
25+
26+
let ud = user_data.clone();
27+
let discovery_stream_task = tokio::spawn(async move {
28+
let mut discovered_nodes: Vec<NodeId> = vec![];
29+
while let Some(item) = discovery_stream.next().await {
30+
match item {
31+
Err(e) => {
32+
tracing::error!("{e}");
33+
return;
5934
}
60-
})
61-
})
62-
.map(|remote| remote.node_id)
63-
.collect();
35+
Ok(item) => {
36+
// if there is no user data, or the user data
37+
// does not indicate that the discovered node
38+
// is a part of the example, ignore it
39+
match item.node_info().data.user_data() {
40+
Some(user_data) if &ud == user_data => {}
41+
_ => {
42+
tracing::error!("found node with unexpected user data, ignoring it");
43+
continue;
44+
}
45+
}
6446

65-
println!("found:");
66-
for id in locally_discovered {
67-
println!("\t{id:?}");
47+
// if we've already found this node, ignore it
48+
// otherwise announce that we have found a new node
49+
if discovered_nodes.contains(&item.node_id()) {
50+
continue;
51+
} else {
52+
discovered_nodes.push(item.node_id());
53+
println!("Found node {}!", item.node_id().fmt_short());
54+
}
55+
}
56+
};
57+
}
58+
});
59+
60+
let mut set = JoinSet::new();
61+
let node_count = 5;
62+
for _ in 0..node_count {
63+
let ud = user_data.clone();
64+
set.spawn(async move {
65+
let ep = Endpoint::builder().discovery_local_network().bind().await?;
66+
ep.set_user_data_for_discovery(Some(ud));
67+
tokio::time::sleep(Duration::from_secs(3)).await;
68+
ep.close().await;
69+
anyhow::Ok(())
70+
});
6871
}
72+
73+
set.join_all().await.iter().for_each(|res| {
74+
if let Err(e) = res {
75+
tracing::error!("{e}");
76+
}
77+
});
78+
ep.close().await;
79+
discovery_stream_task.abort();
6980
Ok(())
7081
}

iroh/src/discovery.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ impl DiscoveryItem {
264264
pub fn into_node_addr(self) -> NodeAddr {
265265
self.node_info.into_node_addr()
266266
}
267+
268+
/// Returns any user-defined data.
269+
pub fn user_data(&self) -> Option<UserData> {
270+
self.node_info().data.user_data().cloned()
271+
}
267272
}
268273

269274
impl std::ops::Deref for DiscoveryItem {

0 commit comments

Comments
 (0)