Skip to content

Commit 1a79a19

Browse files
flubmatheus23
andauthored
feat(iroh-net): Add a Watchable struct for use in the Endpoint API (#2806)
## Description This implements a `Watchable` struct and a `Watcher` which provides access to the watched value in several ways, including some streams. ## Breaking Changes - `iroh::Endpoint::conn_type_stream` is renamed to `iroh::Endpoint::conn_type` and returns `Result<Watcher<ConnectionType>>` instead of `Result<ConnectionTypeStream>` To migrate, use `endpoint.conn_type()?.stream()` instead of `endpoint.conn_type_stream()?`. - `iroh::Endpoint::home_relay` now returns `Watcher<Option<RelayUrl>>` instead of `Option<RelayUrl>`. To migrate, use `endpoint.home_relay().get()?` instead of `endpoint.home_relay()`. - removed `iroh::Endpoint::watch_home_relay` To migrate, use `endpoint.home_relay().initialized().await?` instead of `endpoint.watch_home_relay().next().await` and use `endpoint.home_relay().stream()` instead of `endpoint.watch_home_relay().next().await`. - removed `DirectAddrsStream` and `ConnTypeStream`. Use `iroh::watchable::WatcherStream` for as named types instead. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --------- Co-authored-by: Philipp Krüger <[email protected]>
1 parent 3be22f3 commit 1a79a19

19 files changed

+730
-324
lines changed

Cargo.lock

Lines changed: 0 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ missing_debug_implementations = "warn"
3737
# require a feature enabled when using `--cfg docsrs` which we can not
3838
# do. To enable for a crate set `#![cfg_attr(iroh_docsrs,
3939
# feature(doc_auto_cfg))]` in the crate.
40-
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)"] }
40+
# We also have our own `iroh_loom` cfg to enable tokio-rs/loom testing.
41+
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_loom)"] }
4142

4243
[workspace.lints.clippy]
4344
unused-async = "warn"

iroh/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ tokio-tungstenite-wasm = "0.3"
102102
tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] }
103103
tracing = "0.1"
104104
url = { version = "2.5", features = ["serde"] }
105-
watchable = "1.1.2"
106105
webpki = { package = "rustls-webpki", version = "0.102" }
107106
webpki-roots = "0.26"
108107
x509-parser = "0.16"

iroh/bench/src/iroh.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::{
55

66
use anyhow::{Context, Result};
77
use bytes::Bytes;
8-
use futures_lite::StreamExt as _;
98
use iroh::{
109
endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig},
1110
Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl,
@@ -45,7 +44,7 @@ pub fn server_endpoint(
4544
.unwrap();
4645

4746
if relay_url.is_some() {
48-
ep.watch_home_relay().next().await;
47+
ep.home_relay().initialized().await.unwrap();
4948
}
5049

5150
let addr = ep.bound_sockets();
@@ -101,7 +100,7 @@ pub async fn connect_client(
101100
.unwrap();
102101

103102
if relay_url.is_some() {
104-
endpoint.watch_home_relay().next().await;
103+
endpoint.home_relay().initialized().await?;
105104
}
106105

107106
// TODO: We don't support passing client transport config currently

iroh/examples/connect-unreliable.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
//! Run the `listen-unreliable` example first (`iroh/examples/listen-unreliable.rs`), which will give you instructions on how to run this example to watch two nodes connect and exchange bytes.
88
use std::net::SocketAddr;
99

10-
use anyhow::Context;
1110
use clap::Parser;
12-
use futures_lite::StreamExt;
1311
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
1412
use tracing::info;
1513

@@ -52,21 +50,17 @@ async fn main() -> anyhow::Result<()> {
5250
.bind()
5351
.await?;
5452

55-
let me = endpoint.node_id();
53+
let node_addr = endpoint.node_addr().await?;
54+
let me = node_addr.node_id;
5655
println!("node id: {me}");
5756
println!("node listening addresses:");
58-
for local_endpoint in endpoint
59-
.direct_addresses()
60-
.next()
61-
.await
62-
.context("no endpoints")?
63-
{
64-
println!("\t{}", local_endpoint.addr)
65-
}
66-
67-
let relay_url = endpoint
68-
.home_relay()
69-
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
57+
node_addr
58+
.direct_addresses
59+
.iter()
60+
.for_each(|addr| println!("\t{addr}"));
61+
let relay_url = node_addr
62+
.relay_url
63+
.expect("Should have a relay URL, assuming a default endpoint setup.");
7064
println!("node relay server url: {relay_url}\n");
7165
// Build a `NodeAddr` from the node_id, relay url, and UDP addresses.
7266
let addr = NodeAddr::from_parts(args.node_id, Some(args.relay_url), args.addrs);

iroh/examples/connect.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::net::SocketAddr;
99

1010
use anyhow::Context;
1111
use clap::Parser;
12-
use futures_lite::StreamExt;
1312
use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey};
1413
use tracing::info;
1514

@@ -57,15 +56,17 @@ async fn main() -> anyhow::Result<()> {
5756
println!("node listening addresses:");
5857
for local_endpoint in endpoint
5958
.direct_addresses()
60-
.next()
59+
.initialized()
6160
.await
62-
.context("no endpoints")?
61+
.context("no direct addresses")?
6362
{
6463
println!("\t{}", local_endpoint.addr)
6564
}
6665

6766
let relay_url = endpoint
6867
.home_relay()
68+
.get()
69+
.unwrap()
6970
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
7071
println!("node relay server url: {relay_url}\n");
7172
// Build a `NodeAddr` from the node_id, relay url, and UDP addresses.

iroh/examples/listen-unreliable.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
//! This example uses the default relay servers to attempt to holepunch, and will use that relay server to relay packets if the two devices cannot establish a direct UDP connection.
44
//! run this example from the project root:
55
//! $ cargo run --example listen-unreliable
6-
use anyhow::Context;
7-
use futures_lite::StreamExt;
86
use iroh::{Endpoint, RelayMode, SecretKey};
97
use tracing::{info, warn};
108

@@ -37,23 +35,20 @@ async fn main() -> anyhow::Result<()> {
3735
println!("node id: {me}");
3836
println!("node listening addresses:");
3937

40-
let local_addrs = endpoint
41-
.direct_addresses()
42-
.next()
43-
.await
44-
.context("no endpoints")?
38+
let node_addr = endpoint.node_addr().await?;
39+
let local_addrs = node_addr
40+
.direct_addresses
4541
.into_iter()
46-
.map(|endpoint| {
47-
let addr = endpoint.addr.to_string();
42+
.map(|addr| {
43+
let addr = addr.to_string();
4844
println!("\t{addr}");
4945
addr
5046
})
5147
.collect::<Vec<_>>()
5248
.join(" ");
53-
54-
let relay_url = endpoint
55-
.home_relay()
56-
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
49+
let relay_url = node_addr
50+
.relay_url
51+
.expect("Should have a relay URL, assuming a default endpoint setup.");
5752
println!("node relay server url: {relay_url}");
5853
println!("\nin a separate terminal run:");
5954

iroh/examples/listen.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
//! $ cargo run --example listen
66
use std::time::Duration;
77

8-
use anyhow::Context;
9-
use futures_lite::StreamExt;
108
use iroh::{endpoint::ConnectionError, Endpoint, RelayMode, SecretKey};
119
use tracing::{debug, info, warn};
1210

@@ -39,23 +37,20 @@ async fn main() -> anyhow::Result<()> {
3937
println!("node id: {me}");
4038
println!("node listening addresses:");
4139

42-
let local_addrs = endpoint
43-
.direct_addresses()
44-
.next()
45-
.await
46-
.context("no endpoints")?
40+
let node_addr = endpoint.node_addr().await?;
41+
let local_addrs = node_addr
42+
.direct_addresses
4743
.into_iter()
48-
.map(|endpoint| {
49-
let addr = endpoint.addr.to_string();
44+
.map(|addr| {
45+
let addr = addr.to_string();
5046
println!("\t{addr}");
5147
addr
5248
})
5349
.collect::<Vec<_>>()
5450
.join(" ");
55-
56-
let relay_url = endpoint
57-
.home_relay()
58-
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
51+
let relay_url = node_addr
52+
.relay_url
53+
.expect("Should have a relay URL, assuming a default endpoint setup.");
5954
println!("node relay server url: {relay_url}");
6055
println!("\nin a separate terminal run:");
6156

iroh/examples/transfer.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::{
66
use anyhow::{Context, Result};
77
use bytes::Bytes;
88
use clap::{Parser, Subcommand};
9-
use futures_lite::StreamExt;
109
use indicatif::HumanBytes;
1110
use iroh::{
1211
endpoint::ConnectionError, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey,
@@ -71,23 +70,18 @@ async fn provide(size: u64, relay_url: Option<String>) -> anyhow::Result<()> {
7170

7271
let node_id = endpoint.node_id();
7372

74-
for local_endpoint in endpoint
75-
.direct_addresses()
76-
.next()
77-
.await
78-
.context("no endpoints")?
79-
{
73+
for local_endpoint in endpoint.direct_addresses().initialized().await? {
8074
println!("\t{}", local_endpoint.addr)
8175
}
8276

8377
let relay_url = endpoint
8478
.home_relay()
79+
.get()?
8580
.expect("should be connected to a relay server");
8681
let local_addrs = endpoint
8782
.direct_addresses()
88-
.next()
89-
.await
90-
.context("no endpoints")?
83+
.initialized()
84+
.await?
9185
.into_iter()
9286
.map(|endpoint| endpoint.addr)
9387
.collect::<Vec<_>>();
@@ -171,17 +165,13 @@ async fn fetch(ticket: &str, relay_url: Option<String>) -> anyhow::Result<()> {
171165
let me = endpoint.node_id();
172166
println!("node id: {me}");
173167
println!("node listening addresses:");
174-
for local_endpoint in endpoint
175-
.direct_addresses()
176-
.next()
177-
.await
178-
.context("no endpoints")?
179-
{
168+
for local_endpoint in endpoint.direct_addresses().initialized().await? {
180169
println!("\t{}", local_endpoint.addr)
181170
}
182171

183172
let relay_url = endpoint
184173
.home_relay()
174+
.get()?
185175
.expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server");
186176
println!("node relay server url: {relay_url}\n");
187177

iroh/src/discovery.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ mod tests {
480480
}
481481
}
482482
}
483+
483484
#[derive(Debug)]
484485
struct TestDiscovery {
485486
node_id: NodeId,
@@ -578,7 +579,7 @@ mod tests {
578579
new_endpoint(secret, disco).await
579580
};
580581
let ep1_addr = NodeAddr::new(ep1.node_id());
581-
// wait for out address to be updated and thus published at least once
582+
// wait for our address to be updated and thus published at least once
582583
ep1.node_addr().await?;
583584
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
584585
Ok(())
@@ -833,27 +834,6 @@ mod test_dns_pkarr {
833834
Ok(())
834835
}
835836

836-
#[tokio::test]
837-
async fn pkarr_publish_dns_discover_empty_node_addr() -> Result<()> {
838-
let _logging_guard = iroh_test::logging::setup();
839-
840-
let dns_pkarr_server = DnsPkarrServer::run().await?;
841-
let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
842-
843-
let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
844-
let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
845-
846-
// wait until our shared state received the update from pkarr publishing
847-
dns_pkarr_server
848-
.on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
849-
.await?;
850-
851-
// we connect only by node id!
852-
let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
853-
assert!(res.is_ok(), "connection established");
854-
Ok(())
855-
}
856-
857837
async fn ep_with_discovery(
858838
relay_map: &RelayMap,
859839
dns_pkarr_server: &DnsPkarrServer,

iroh/src/discovery/local_swarm_discovery.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ use tokio::{
5151
};
5252
use tokio_util::task::AbortOnDropHandle;
5353
use tracing::{debug, error, info_span, trace, warn, Instrument};
54-
use watchable::Watchable;
5554

5655
use crate::{
5756
discovery::{Discovery, DiscoveryItem},
57+
watchable::Watchable,
5858
Endpoint,
5959
};
6060

@@ -152,8 +152,8 @@ impl LocalSwarmDiscovery {
152152
)?;
153153

154154
let local_addrs: Watchable<Option<(Option<RelayUrl>, BTreeSet<SocketAddr>)>> =
155-
Watchable::new(None);
156-
let addrs_change = local_addrs.watch();
155+
Watchable::default();
156+
let mut addrs_change = local_addrs.watch();
157157
let discovery_fut = async move {
158158
let mut node_addrs: HashMap<PublicKey, Peer> = HashMap::default();
159159
let mut subscribers = Subscribers::new();
@@ -169,7 +169,7 @@ impl LocalSwarmDiscovery {
169169
msg = recv.recv() => {
170170
msg
171171
}
172-
Ok(Some((_url, addrs)))= addrs_change.next_value_async() => {
172+
Ok(Some((_url, addrs))) = addrs_change.updated() => {
173173
tracing::trace!(?addrs, "LocalSwarmDiscovery address changed");
174174
discovery.remove_all();
175175
let addrs =
@@ -379,7 +379,8 @@ impl Discovery for LocalSwarmDiscovery {
379379

380380
fn publish(&self, url: Option<&RelayUrl>, addrs: &BTreeSet<SocketAddr>) {
381381
self.local_addrs
382-
.replace(Some((url.cloned(), addrs.clone())));
382+
.set(Some((url.cloned(), addrs.clone())))
383+
.ok();
383384
}
384385

385386
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {

0 commit comments

Comments
 (0)