Skip to content

Commit 5e49912

Browse files
committed
feat: support user data in local swarm discovery
1 parent e35e7a2 commit 5e49912

File tree

4 files changed

+45
-24
lines changed

4 files changed

+45
-24
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l
4141

4242
[workspace.lints.clippy]
4343
unused-async = "warn"
44+
45+
[patch.crates-io]
46+
swarm-discovery = { git = "https://github.com/Frando/swarm-discovery.git", branch = "feat/txt" }

iroh/src/discovery.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
195195
impl<T: Discovery> Discovery for Arc<T> {}
196196

197197
/// The information that can be published about a node in discovery services.
198-
#[derive(Debug, Clone, Default)]
198+
#[derive(Debug, Clone, Default, Eq, PartialEq)]
199199
pub struct DiscoveryData {
200200
relay_url: Option<RelayUrl>,
201201
direct_addresses: BTreeSet<SocketAddr>,
@@ -248,6 +248,11 @@ impl DiscoveryData {
248248
direct_addresses: self.direct_addresses,
249249
}
250250
}
251+
252+
/// Returns the optional user-defined data.
253+
pub fn user_data(&self) -> Option<&UserData> {
254+
self.user_data.as_ref()
255+
}
251256
}
252257

253258
impl From<NodeAddr> for DiscoveryData {

iroh/src/discovery/local_swarm_discovery.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{
3737

3838
use anyhow::Result;
3939
use derive_more::FromStr;
40-
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl};
40+
use iroh_base::{NodeAddr, NodeId, PublicKey};
4141
use n0_future::{
4242
boxed::BoxStream,
4343
task::{self, AbortOnDropHandle, JoinSet},
@@ -65,6 +65,8 @@ const N0_LOCAL_SWARM: &str = "iroh.local.swarm";
6565
/// Used in the [`crate::endpoint::Source::Discovery`] enum variant as the `name`.
6666
pub const NAME: &str = "local.swarm.discovery";
6767

68+
const USER_DATA_ATTRIBUTE: &str = "user-data";
69+
6870
/// How long we will wait before we stop sending discovery items
6971
const DISCOVERY_DURATION: Duration = Duration::from_secs(10);
7072

@@ -75,7 +77,7 @@ pub struct LocalSwarmDiscovery {
7577
handle: AbortOnDropHandle<()>,
7678
sender: mpsc::Sender<Message>,
7779
/// When `local_addrs` changes, we re-publish our info.
78-
local_addrs: Watchable<Option<(Option<RelayUrl>, BTreeSet<SocketAddr>)>>,
80+
local_addrs: Watchable<Option<DiscoveryData>>,
7981
}
8082

8183
#[derive(Debug)]
@@ -148,8 +150,7 @@ impl LocalSwarmDiscovery {
148150
&rt,
149151
)?;
150152

151-
let local_addrs: Watchable<Option<(Option<RelayUrl>, BTreeSet<SocketAddr>)>> =
152-
Watchable::default();
153+
let local_addrs: Watchable<Option<DiscoveryData>> = Watchable::default();
153154
let mut addrs_change = local_addrs.watch();
154155
let discovery_fut = async move {
155156
let mut node_addrs: HashMap<PublicKey, Peer> = HashMap::default();
@@ -166,14 +167,20 @@ impl LocalSwarmDiscovery {
166167
msg = recv.recv() => {
167168
msg
168169
}
169-
Ok(Some((_url, addrs))) = addrs_change.updated() => {
170-
tracing::trace!(?addrs, "LocalSwarmDiscovery address changed");
170+
Ok(Some(data)) = addrs_change.updated() => {
171+
println!("PUBLISH {data:?}");
172+
tracing::trace!(?data, "LocalSwarmDiscovery address changed");
171173
discovery.remove_all();
172174
let addrs =
173-
LocalSwarmDiscovery::socketaddrs_to_addrs(addrs);
175+
LocalSwarmDiscovery::socketaddrs_to_addrs(data.direct_addrs());
174176
for addr in addrs {
175177
discovery.add(addr.0, addr.1)
176178
}
179+
if let Some(user_data) = data.user_data() {
180+
if let Err(err) = discovery.set_txt_attribute(USER_DATA_ATTRIBUTE.to_string(), Some(user_data.to_string())) {
181+
warn!("Failed to set user data in local swarm discovery: {err:?}");
182+
}
183+
}
177184
continue;
178185
}
179186
};
@@ -320,7 +327,7 @@ impl LocalSwarmDiscovery {
320327
sender.send(Message::Discovery(node_id, peer)).await.ok();
321328
});
322329
};
323-
let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(socketaddrs);
330+
let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(&socketaddrs);
324331
let node_id_str = data_encoding::BASE32_NOPAD
325332
.encode(node_id.as_bytes())
326333
.to_ascii_lowercase();
@@ -333,7 +340,7 @@ impl LocalSwarmDiscovery {
333340
discoverer.spawn(rt)
334341
}
335342

336-
fn socketaddrs_to_addrs(socketaddrs: BTreeSet<SocketAddr>) -> HashMap<u16, Vec<IpAddr>> {
343+
fn socketaddrs_to_addrs(socketaddrs: &BTreeSet<SocketAddr>) -> HashMap<u16, Vec<IpAddr>> {
337344
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
338345
for socketaddr in socketaddrs {
339346
addrs
@@ -351,6 +358,10 @@ fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem {
351358
.iter()
352359
.map(|(ip, port)| SocketAddr::new(*ip, *port))
353360
.collect();
361+
let user_data = match peer.txt_attribute(USER_DATA_ATTRIBUTE) {
362+
Some(Some(user_data)) => user_data.parse().ok(),
363+
_ => None,
364+
};
354365
DiscoveryItem {
355366
node_addr: NodeAddr {
356367
node_id: *node_id,
@@ -359,7 +370,7 @@ fn peer_to_discovery_item(peer: &Peer, node_id: &NodeId) -> DiscoveryItem {
359370
},
360371
provenance: NAME,
361372
last_updated: None,
362-
user_data: None,
373+
user_data,
363374
}
364375
}
365376

@@ -380,12 +391,7 @@ impl Discovery for LocalSwarmDiscovery {
380391
}
381392

382393
fn publish(&self, data: &DiscoveryData) {
383-
self.local_addrs
384-
.set(Some((
385-
data.relay_url().cloned(),
386-
data.direct_addrs().clone(),
387-
)))
388-
.ok();
394+
self.local_addrs.set(Some(data.clone())).ok();
389395
}
390396

391397
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
@@ -413,6 +419,7 @@ mod tests {
413419
use tracing_test::traced_test;
414420

415421
use super::super::*;
422+
use crate::discovery::UserData;
416423

417424
#[tokio::test]
418425
#[traced_test]
@@ -421,7 +428,10 @@ mod tests {
421428
let (node_id_b, discovery_b) = make_discoverer()?;
422429

423430
// make addr info for discoverer b
424-
let addr_info = DiscoveryData::new(None, BTreeSet::from(["0.0.0.0:11111".parse()?]));
431+
let user_data: UserData = "foobar".parse()?;
432+
let addr_info = DiscoveryData::new(None, BTreeSet::from(["0.0.0.0:11111".parse()?]))
433+
.with_user_data(user_data.clone());
434+
println!("info {addr_info:?}");
425435

426436
// pass in endpoint, this is never used
427437
let ep = crate::endpoint::Builder::default().bind().await?;
@@ -441,8 +451,10 @@ mod tests {
441451
.unwrap()?;
442452
assert_eq!(s1_res.node_addr.relay_url.as_ref(), addr_info.relay_url());
443453
assert_eq!(&s1_res.node_addr.direct_addresses, addr_info.direct_addrs());
454+
assert_eq!(s1_res.user_data.as_ref(), Some(&user_data));
444455
assert_eq!(s2_res.node_addr.relay_url.as_ref(), addr_info.relay_url());
445456
assert_eq!(&s2_res.node_addr.direct_addresses, addr_info.direct_addrs());
457+
assert_eq!(s2_res.user_data.as_ref(), Some(&user_data));
446458

447459
Ok(())
448460
}
@@ -457,9 +469,11 @@ mod tests {
457469
let (_, discovery) = make_discoverer()?;
458470
let addr_info = DiscoveryData::new(None, BTreeSet::from(["0.0.0.0:11111".parse()?]));
459471

460-
for _ in 0..num_nodes {
472+
for i in 0..num_nodes {
461473
let (node_id, discovery) = make_discoverer()?;
462-
node_ids.insert(node_id);
474+
let user_data: UserData = format!("node{i}").parse()?;
475+
let addr_info = addr_info.clone().with_user_data(user_data.clone());
476+
node_ids.insert((node_id, Some(user_data)));
463477
discovery.publish(&addr_info);
464478
discoverers.push(discovery);
465479
}
@@ -470,8 +484,8 @@ mod tests {
470484
let mut got_ids = BTreeSet::new();
471485
while got_ids.len() != num_nodes {
472486
if let Some(item) = events.next().await {
473-
if node_ids.contains(&item.node_addr.node_id) {
474-
got_ids.insert(item.node_addr.node_id);
487+
if node_ids.contains(&(item.node_addr.node_id, item.user_data.clone())) {
488+
got_ids.insert((item.node_addr.node_id, item.user_data));
475489
}
476490
} else {
477491
anyhow::bail!(

0 commit comments

Comments
 (0)