From 6541a6fcd8379990a86d02dfedaf90add7d555b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Mon, 3 Mar 2025 00:41:34 -0500 Subject: [PATCH 1/3] feat: add `DiscoveryItem::user_data` method and adjust `locally-discovered-nodes` example --- iroh/examples/locally-discovered-nodes.rs | 115 ++++++++++++---------- iroh/src/discovery.rs | 5 + 2 files changed, 68 insertions(+), 52 deletions(-) diff --git a/iroh/examples/locally-discovered-nodes.rs b/iroh/examples/locally-discovered-nodes.rs index 6c2798b49ca..9ec7e51a768 100644 --- a/iroh/examples/locally-discovered-nodes.rs +++ b/iroh/examples/locally-discovered-nodes.rs @@ -5,66 +5,77 @@ //! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`LocalSwarmDiscovery`] enabled, it may discover those nodes as well. use std::time::Duration; -use iroh::{ - discovery::local_swarm_discovery::LocalSwarmDiscovery, endpoint::Source, Endpoint, SecretKey, -}; +use anyhow::Result; +use iroh::{node_info::UserData, Endpoint, NodeId}; +use n0_future::StreamExt; +use tokio::task::JoinSet; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { tracing_subscriber::fmt::init(); - println!("locally discovered nodes example!\n"); - let mut rng = rand::rngs::OsRng; - let key = SecretKey::generate(&mut rng); - let id = key.public(); - println!("creating endpoint {id:?}\n"); - let ep = Endpoint::builder() - .secret_key(key) - .discovery(Box::new(LocalSwarmDiscovery::new(id)?)) - .bind() - .await?; + println!("Discovering Local Nodes Example!"); - let node_count = 5; - println!("creating {node_count} additional endpoints to discover locally:"); - let mut discoverable_eps = Vec::with_capacity(node_count); - for _ in 0..node_count { - let key = SecretKey::generate(&mut rng); - let id = key.public(); - println!("\t{id:?}"); - let ep = Endpoint::builder() - .secret_key(key) - .discovery(Box::new(LocalSwarmDiscovery::new(id)?)) - .bind() - .await?; - discoverable_eps.push(ep); - } + let ep = Endpoint::builder().discovery_local_network().bind().await?; + let node_id = ep.node_id(); + println!("Created endpoint {}", node_id.fmt_short()); - let duration = Duration::from_secs(3); - println!("\nwaiting {duration:?} to allow discovery to occur...\n"); - tokio::time::sleep(duration).await; + let user_data = UserData::try_from(String::from("local-nodes-example"))?; - // get an iterator of all the remote nodes this endpoint knows about - let remotes = ep.remote_info_iter(); - // filter that list down to the nodes that have a `Source::Discovery` with - // the `service` name [`iroh::discovery::local_swarm_discovery::NAME`] - // If you have a long running node and want to only get the nodes that were - // discovered recently, you can also filter on the `Duration` of the source, - // which indicates how long ago we got information from that source. - let locally_discovered: Vec<_> = remotes - .filter(|remote| { - remote.sources().iter().any(|(source, _duration)| { - if let Source::Discovery { name } = source { - name == iroh::discovery::local_swarm_discovery::NAME - } else { - false + let mut discovery_stream = ep.discovery_stream(); + + let ud = user_data.clone(); + let discovery_stream_task = tokio::spawn(async move { + let mut discovered_nodes: Vec = vec![]; + while let Some(item) = discovery_stream.next().await { + match item { + Err(e) => { + tracing::error!("{e}"); + return; } - }) - }) - .map(|remote| remote.node_id) - .collect(); + Ok(item) => { + // if there is no user data, or the user data + // does not indicate that the discovered node + // is a part of the example, ignore it + match item.node_info().data.user_data() { + Some(user_data) if &ud == user_data => {} + _ => { + tracing::error!("found node with unexpected user data, ignoring it"); + continue; + } + } - println!("found:"); - for id in locally_discovered { - println!("\t{id:?}"); + // if we've already found this node, ignore it + // otherwise announce that we have found a new node + if discovered_nodes.contains(&item.node_id()) { + continue; + } else { + discovered_nodes.push(item.node_id()); + println!("Found node {}!", item.node_id().fmt_short()); + } + } + }; + } + }); + + let mut set = JoinSet::new(); + let node_count = 5; + for _ in 0..node_count { + let ud = user_data.clone(); + set.spawn(async move { + let ep = Endpoint::builder().discovery_local_network().bind().await?; + ep.set_user_data_for_discovery(Some(ud)); + tokio::time::sleep(Duration::from_secs(3)).await; + ep.close().await; + anyhow::Ok(()) + }); } + + set.join_all().await.iter().for_each(|res| { + if let Err(e) = res { + tracing::error!("{e}"); + } + }); + ep.close().await; + discovery_stream_task.abort(); Ok(()) } diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index deaf7cbcd09..c1471563b3a 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -264,6 +264,11 @@ impl DiscoveryItem { pub fn into_node_addr(self) -> NodeAddr { self.node_info.into_node_addr() } + + /// Returns any user-defined data. + pub fn user_data(&self) -> Option { + self.node_info().data.user_data().cloned() + } } impl std::ops::Deref for DiscoveryItem { From 40a196fa27424ea557d841d4dd9825eb932e2546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Wed, 5 Mar 2025 16:21:29 -0500 Subject: [PATCH 2/3] clippy --- iroh/src/discovery/local_swarm_discovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/discovery/local_swarm_discovery.rs b/iroh/src/discovery/local_swarm_discovery.rs index 8e224efef74..a97716f5024 100644 --- a/iroh/src/discovery/local_swarm_discovery.rs +++ b/iroh/src/discovery/local_swarm_discovery.rs @@ -482,8 +482,8 @@ mod tests { let mut got_ids = BTreeSet::new(); while got_ids.len() != num_nodes { if let Some(item) = events.next().await { - if node_ids.contains(&(item.node_id(), item.user_data().cloned())) { - got_ids.insert((item.node_id(), item.user_data().cloned())); + if node_ids.contains(&(item.node_id(), item.user_data())) { + got_ids.insert((item.node_id(), item.user_data())); } } else { anyhow::bail!( From 11280b50e56dbbae0514d511aa9efdd190273cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cramfox=E2=80=9D?= <“kasey@n0.computer”> Date: Sun, 9 Mar 2025 16:57:47 -0400 Subject: [PATCH 3/3] rename `LocalSwarmDiscovery` to `MdnsDiscovery` --- iroh/examples/locally-discovered-nodes.rs | 4 +- iroh/src/discovery.rs | 14 ++--- .../{local_swarm_discovery.rs => mdns.rs} | 56 +++++++++---------- iroh/src/endpoint.rs | 14 ++--- 4 files changed, 42 insertions(+), 46 deletions(-) rename iroh/src/discovery/{local_swarm_discovery.rs => mdns.rs} (90%) diff --git a/iroh/examples/locally-discovered-nodes.rs b/iroh/examples/locally-discovered-nodes.rs index 9ec7e51a768..aca9da3b0b1 100644 --- a/iroh/examples/locally-discovered-nodes.rs +++ b/iroh/examples/locally-discovered-nodes.rs @@ -1,8 +1,8 @@ -//! A small example showing how to get a list of nodes that were discovered via [`iroh::discovery::LocalSwarmDiscovery`]. LocalSwarmDiscovery uses [`swarm-discovery`](https://crates.io/crates/swarm-discovery) to discover other nodes in the local network ala mDNS. +//! A small example showing how to get a list of nodes that were discovered via [`iroh::discovery::MdnsDiscovery`]. MdnsDiscovery uses [`swarm-discovery`](https://crates.io/crates/swarm-discovery), an opinionated implementation of mDNS to discover other nodes in the local network. //! //! This example creates an iroh endpoint, a few additional iroh endpoints to discover, waits a few seconds, and reports all of the iroh NodeIds (also called `[iroh::key::PublicKey]`s) it has discovered. //! -//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`LocalSwarmDiscovery`] enabled, it may discover those nodes as well. +//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well. use std::time::Duration; use anyhow::Result; diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index c1471563b3a..2cff17f6800 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -34,8 +34,8 @@ //! - The [`PkarrResolver`] which can perform lookups from designated [pkarr relay servers] //! using HTTP. //! -//! - [`LocalSwarmDiscovery`]: local_swarm_discovery::LocalSwarmDiscovery which is an mDNS -//! implementation. +//! - [`MdnsDiscovery`]: mdns::MdnsDiscovery which uses the crate `swarm-discovery`, an +//! opinionated mDNS implementation, to discover nodes on the local network. //! //! - The [`DhtDiscovery`] also uses the [`pkarr`] system but can also publish and lookup //! records to/from the Mainline DHT. @@ -69,14 +69,14 @@ //! # } //! ``` //! -//! To also enable [`LocalSwarmDiscovery`] it can be added as another service in the +//! To also enable [`MdnsDiscovery`] it can be added as another service in the //! [`ConcurrentDiscovery`]: //! //! ```no_run //! # #[cfg(feature = "discovery-local-network")] //! # { //! # use iroh::discovery::dns::DnsDiscovery; -//! # use iroh::discovery::local_swarm_discovery::LocalSwarmDiscovery; +//! # use iroh::discovery::mdns::MdnsDiscovery; //! # use iroh::discovery::pkarr::PkarrPublisher; //! # use iroh::discovery::ConcurrentDiscovery; //! # use iroh::SecretKey; @@ -86,7 +86,7 @@ //! let discovery = ConcurrentDiscovery::from_services(vec![ //! Box::new(PkarrPublisher::n0_dns(secret_key.clone())), //! Box::new(DnsDiscovery::n0_dns()), -//! Box::new(LocalSwarmDiscovery::new(secret_key.public())?), +//! Box::new(MdnsDiscovery::new(secret_key.public())?), //! ]); //! # Ok(()) //! # } @@ -102,7 +102,7 @@ //! [`PkarrPublisher`]: pkarr::PkarrPublisher //! [`DhtDiscovery`]: pkarr::dht::DhtDiscovery //! [pkarr relay servers]: https://pkarr.org/#servers -//! [`LocalSwarmDiscovery`]: local_swarm_discovery::LocalSwarmDiscovery +//! [`MdnsDiscovery`]: mdns::MdnsDiscovery //! [`StaticProvider`]: static_provider::StaticProvider use std::sync::Arc; @@ -126,7 +126,7 @@ use crate::Endpoint; pub mod dns; #[cfg(feature = "discovery-local-network")] -pub mod local_swarm_discovery; +pub mod mdns; pub mod pkarr; pub mod static_provider; diff --git a/iroh/src/discovery/local_swarm_discovery.rs b/iroh/src/discovery/mdns.rs similarity index 90% rename from iroh/src/discovery/local_swarm_discovery.rs rename to iroh/src/discovery/mdns.rs index a97716f5024..32e30364458 100644 --- a/iroh/src/discovery/local_swarm_discovery.rs +++ b/iroh/src/discovery/mdns.rs @@ -3,7 +3,7 @@ //! This allows you to use an mdns-like swarm discovery service to find address information about nodes that are on your local network, no relay or outside internet needed. //! See the [`swarm-discovery`](https://crates.io/crates/swarm-discovery) crate for more details. //! -//! When [`LocalSwarmDiscovery`] is enabled, it's possible to get a list of the locally discovered nodes by filtering a list of `RemoteInfo`s. +//! When [`MdnsDiscovery`] is enabled, it's possible to get a list of the locally discovered nodes by filtering a list of `RemoteInfo`s. //! //! ``` //! use std::time::Duration; @@ -20,7 +20,7 @@ //! .filter(|remote| { //! remote.sources().iter().any(|(source, duration)| { //! if let Source::Discovery { name } = source { -//! name == iroh::discovery::local_swarm_discovery::NAME && *duration <= recent +//! name == iroh::discovery::mdns::NAME && *duration <= recent //! } else { //! false //! } @@ -72,7 +72,7 @@ const DISCOVERY_DURATION: Duration = Duration::from_secs(10); /// Discovery using `swarm-discovery`, a variation on mdns #[derive(Debug)] -pub struct LocalSwarmDiscovery { +pub struct MdnsDiscovery { #[allow(dead_code)] handle: AbortOnDropHandle<()>, sender: mpsc::Sender, @@ -128,8 +128,8 @@ impl Subscribers { } } -impl LocalSwarmDiscovery { - /// Create a new [`LocalSwarmDiscovery`] Service. +impl MdnsDiscovery { + /// Create a new [`MdnsDiscovery`] Service. /// /// This starts a [`Discoverer`] that broadcasts your addresses and receives addresses from other nodes in your local network. /// @@ -139,16 +139,12 @@ impl LocalSwarmDiscovery { /// # Panics /// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime. pub fn new(node_id: NodeId) -> Result { - debug!("Creating new LocalSwarmDiscovery service"); + debug!("Creating new MdnsDiscovery service"); let (send, mut recv) = mpsc::channel(64); let task_sender = send.clone(); let rt = tokio::runtime::Handle::current(); - let discovery = LocalSwarmDiscovery::spawn_discoverer( - node_id, - task_sender.clone(), - BTreeSet::new(), - &rt, - )?; + let discovery = + MdnsDiscovery::spawn_discoverer(node_id, task_sender.clone(), BTreeSet::new(), &rt)?; let local_addrs: Watchable> = Watchable::default(); let mut addrs_change = local_addrs.watch(); @@ -162,16 +158,16 @@ impl LocalSwarmDiscovery { > = HashMap::default(); let mut timeouts = JoinSet::new(); loop { - trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick"); + trace!(?node_addrs, "MdnsDiscovery Service loop tick"); let msg = tokio::select! { msg = recv.recv() => { msg } Ok(Some(data)) = addrs_change.updated() => { - tracing::trace!(?data, "LocalSwarmDiscovery address changed"); + tracing::trace!(?data, "MdnsDiscovery address changed"); discovery.remove_all(); let addrs = - LocalSwarmDiscovery::socketaddrs_to_addrs(data.direct_addresses()); + MdnsDiscovery::socketaddrs_to_addrs(data.direct_addresses()); for addr in addrs { discovery.add(addr.0, addr.1) } @@ -185,8 +181,8 @@ impl LocalSwarmDiscovery { }; let msg = match msg { None => { - error!("LocalSwarmDiscovery channel closed"); - error!("closing LocalSwarmDiscovery"); + error!("MdnsDiscovery channel closed"); + error!("closing MdnsDiscovery"); timeouts.abort_all(); return; } @@ -197,7 +193,7 @@ impl LocalSwarmDiscovery { trace!( ?discovered_node_id, ?peer_info, - "LocalSwarmDiscovery Message::Discovery" + "MdnsDiscovery Message::Discovery" ); let discovered_node_id = match PublicKey::from_str(&discovered_node_id) { Ok(node_id) => node_id, @@ -217,7 +213,7 @@ impl LocalSwarmDiscovery { if peer_info.is_expiry() { trace!( ?discovered_node_id, - "removing node from LocalSwarmDiscovery address book" + "removing node from MdnsDiscovery address book" ); node_addrs.remove(&discovered_node_id); continue; @@ -234,7 +230,7 @@ impl LocalSwarmDiscovery { debug!( ?discovered_node_id, ?peer_info, - "adding node to LocalSwarmDiscovery address book" + "adding node to MdnsDiscovery address book" ); let mut resolved = false; @@ -258,7 +254,7 @@ impl LocalSwarmDiscovery { Message::Resolve(node_id, sender) => { let id = last_id + 1; last_id = id; - trace!(?node_id, "LocalSwarmDiscovery Message::SendAddrs"); + trace!(?node_id, "MdnsDiscovery Message::SendAddrs"); if let Some(peer_info) = node_addrs.get(&node_id) { let item = peer_to_discovery_item(peer_info, &node_id); debug!(?item, "sending DiscoveryItem"); @@ -282,7 +278,7 @@ impl LocalSwarmDiscovery { }); } Message::Timeout(node_id, id) => { - trace!(?node_id, "LocalSwarmDiscovery Message::Timeout"); + trace!(?node_id, "MdnsDiscovery Message::Timeout"); if let Some(senders_for_node_id) = senders.get_mut(&node_id) { senders_for_node_id.remove(&id); if senders_for_node_id.is_empty() { @@ -291,7 +287,7 @@ impl LocalSwarmDiscovery { } } Message::Subscribe(subscriber) => { - trace!("LocalSwarmDiscovery Message::Subscribe"); + trace!("MdnsDiscovery Message::Subscribe"); subscribers.push(subscriber); } } @@ -316,7 +312,7 @@ impl LocalSwarmDiscovery { trace!( node_id, ?peer, - "Received peer information from LocalSwarmDiscovery" + "Received peer information from MdnsDiscovery" ); let sender = sender.clone(); @@ -326,7 +322,7 @@ impl LocalSwarmDiscovery { sender.send(Message::Discovery(node_id, peer)).await.ok(); }); }; - let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(&socketaddrs); + let addrs = MdnsDiscovery::socketaddrs_to_addrs(&socketaddrs); let node_id_str = data_encoding::BASE32_NOPAD .encode(node_id.as_bytes()) .to_ascii_lowercase(); @@ -376,7 +372,7 @@ fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem { DiscoveryItem::new(node_info, NAME, None) } -impl Discovery for LocalSwarmDiscovery { +impl Discovery for MdnsDiscovery { fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option>> { use futures_util::FutureExt; @@ -425,7 +421,7 @@ mod tests { #[tokio::test] #[traced_test] - async fn local_swarm_discovery_publish_resolve() -> TestResult { + async fn mdns_publish_resolve() -> TestResult { let (_, discovery_a) = make_discoverer()?; let (node_id_b, discovery_b) = make_discoverer()?; @@ -459,7 +455,7 @@ mod tests { #[tokio::test] #[traced_test] - async fn local_swarm_discovery_subscribe() -> TestResult { + async fn mdns_subscribe() -> TestResult { let num_nodes = 5; let mut node_ids = BTreeSet::new(); let mut discoverers = vec![]; @@ -499,9 +495,9 @@ mod tests { Ok(()) } - fn make_discoverer() -> Result<(PublicKey, LocalSwarmDiscovery)> { + fn make_discoverer() -> Result<(PublicKey, MdnsDiscovery)> { let node_id = SecretKey::generate(rand::thread_rng()).public(); - Ok((node_id, LocalSwarmDiscovery::new(node_id)?)) + Ok((node_id, MdnsDiscovery::new(node_id)?)) } } } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index b780f720e01..739b3d694c1 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -385,14 +385,14 @@ impl Builder { #[cfg(feature = "discovery-local-network")] /// Configures the endpoint to also use local network discovery. /// - /// This is equivalent to adding a [`crate::discovery::local_swarm_discovery::LocalSwarmDiscovery`] - /// with default settings. Note that LocalSwarmDiscovery has various more advanced + /// This is equivalent to adding a [`crate::discovery::mdns::MdnsDiscovery`] + /// with default settings. Note that MdnsDiscovery has various more advanced /// configuration options. If you need any of those, you should manually - /// create a LocalSwarmDiscovery and add it with [`Builder::add_discovery`]. + /// create a MdnsDiscovery and add it with [`Builder::add_discovery`]. pub fn discovery_local_network(mut self) -> Self { - use crate::discovery::local_swarm_discovery::LocalSwarmDiscovery; + use crate::discovery::mdns::MdnsDiscovery; self.discovery.push(Box::new(|secret_key| { - LocalSwarmDiscovery::new(secret_key.public()) + MdnsDiscovery::new(secret_key.public()) .map(|x| Box::new(x) as _) .ok() })); @@ -1011,7 +1011,7 @@ impl Endpoint { /// through [`Discovery::resolve`], which is invoked automatically when calling /// [`Endpoint::connect`] for a [`NodeId`] unknown to the endpoint. It also includes /// nodes that the endpoint discovers passively from discovery services that implement - /// [`Discovery::subscribe`], which e.g. [`LocalSwarmDiscovery`] does. + /// [`Discovery::subscribe`], which e.g. [`MdnsDiscovery`] does. /// /// The stream does not yield information about nodes that are added manually to the endpoint's /// addressbook by calling [`Endpoint::add_node_addr`] or by supplying a full [`NodeAddr`] to @@ -1026,7 +1026,7 @@ impl Endpoint { /// See also [`Endpoint::remote_info_iter`], which returns an iterator over all remotes /// the endpoint knows about at a specific point in time. /// - /// [`LocalSwarmDiscovery`]: crate::discovery::local_swarm_discovery::LocalSwarmDiscovery + /// [`MdnsDiscovery`]: crate::discovery::mdns::MdnsDiscovery /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider pub fn discovery_stream(&self) -> impl Stream> { self.msock.discovery_subscribers().subscribe()