Skip to content

Commit 21c7c04

Browse files
start dealing with dns, again
1 parent 03eb8ca commit 21c7c04

File tree

3 files changed

+263
-22
lines changed

3 files changed

+263
-22
lines changed

iroh-netcheck/src/dns.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use std::fmt::Write;
2+
use std::net::IpAddr;
3+
4+
use anyhow::Result;
5+
use futures_lite::{Future, StreamExt};
6+
use hickory_resolver::{IntoName, TokioAsyncResolver};
7+
8+
use crate::defaults::timeouts::DNS_TIMEOUT;
9+
10+
/// Delay used to perform staggered dns queries.
11+
const DNS_STAGGERING_MS: &[u64] = &[200, 300];
12+
13+
/// Extension trait to [`TokioAsyncResolver`].
14+
pub trait ResolverExt {
15+
/// Perform an ipv4 lookup with a timeout.
16+
fn lookup_ipv4<N: IntoName>(
17+
&self,
18+
host: N,
19+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
20+
21+
/// Perform an ipv6 lookup with a timeout.
22+
fn lookup_ipv6<N: IntoName>(
23+
&self,
24+
host: N,
25+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
26+
27+
/// Race an ipv4 and ipv6 lookup with a timeout.
28+
fn lookup_ipv4_ipv6<N: IntoName + Clone>(
29+
&self,
30+
host: N,
31+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
32+
33+
/// Perform an ipv4 lookup with a timeout in a staggered fashion.
34+
///
35+
/// From the moment this function is called, each lookup is scheduled after the delays in
36+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
37+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The
38+
/// result of the first successful call is returned, or a summary of all errors otherwise.
39+
fn lookup_ipv4_staggered<N: IntoName + Clone>(
40+
&self,
41+
host: N,
42+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
43+
44+
/// Perform an ipv6 lookup with a timeout in a staggered fashion.
45+
///
46+
/// From the moment this function is called, each lookup is scheduled after the delays in
47+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
48+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The
49+
/// result of the first successful call is returned, or a summary of all errors otherwise.
50+
fn lookup_ipv6_staggered<N: IntoName + Clone>(
51+
&self,
52+
host: N,
53+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
54+
55+
/// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion.
56+
///
57+
/// From the moment this function is called, each lookup is scheduled after the delays in
58+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
59+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied as stated in
60+
/// [`Self::lookup_ipv4_ipv6`]. The result of the first successful call is returned, or a
61+
/// summary of all errors otherwise.
62+
// TODO(@divma): adjust docs
63+
fn lookup_ipv4_ipv6_staggered<N: IntoName + Clone>(
64+
&self,
65+
host: N,
66+
) -> impl Future<Output = Result<impl Iterator<Item = IpAddr>>>;
67+
}
68+
69+
impl ResolverExt for TokioAsyncResolver {
70+
async fn lookup_ipv4<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
71+
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv4_lookup(host)).await??;
72+
Ok(addrs.into_iter().map(|ip| IpAddr::V4(ip.0)))
73+
}
74+
75+
async fn lookup_ipv6<N: IntoName>(&self, host: N) -> Result<impl Iterator<Item = IpAddr>> {
76+
let addrs = tokio::time::timeout(DNS_TIMEOUT, self.ipv6_lookup(host)).await??;
77+
Ok(addrs.into_iter().map(|ip| IpAddr::V6(ip.0)))
78+
}
79+
80+
/// Resolve IPv4 and IPv6 in parallel.
81+
///
82+
/// `LookupIpStrategy::Ipv4AndIpv6` will wait for ipv6 resolution timeout, even if it is
83+
/// not usable on the stack, so we manually query both lookups concurrently and time them out
84+
/// individually.
85+
async fn lookup_ipv4_ipv6<N: IntoName + Clone>(
86+
&self,
87+
host: N,
88+
) -> Result<impl Iterator<Item = IpAddr>> {
89+
let res = tokio::join!(self.lookup_ipv4(host.clone()), self.lookup_ipv6(host));
90+
91+
match res {
92+
(Ok(ipv4), Ok(ipv6)) => Ok(LookupIter::Both(ipv4.chain(ipv6))),
93+
(Ok(ipv4), Err(_)) => Ok(LookupIter::Ipv4(ipv4)),
94+
(Err(_), Ok(ipv6)) => Ok(LookupIter::Ipv6(ipv6)),
95+
(Err(ipv4_err), Err(ipv6_err)) => {
96+
anyhow::bail!("Ipv4: {:?}, Ipv6: {:?}", ipv4_err, ipv6_err)
97+
}
98+
}
99+
}
100+
101+
/// Perform an ipv4 lookup with a timeout in a staggered fashion.
102+
///
103+
/// From the moment this function is called, each lookup is scheduled after the delays in
104+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
105+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The
106+
/// result of the first successful call is returned, or a summary of all errors otherwise.
107+
async fn lookup_ipv4_staggered<N: IntoName + Clone>(
108+
&self,
109+
host: N,
110+
) -> Result<impl Iterator<Item = IpAddr>> {
111+
let f = || self.lookup_ipv4(host.clone());
112+
stagger_call(f, DNS_STAGGERING_MS).await
113+
}
114+
115+
/// Perform an ipv6 lookup with a timeout in a staggered fashion.
116+
///
117+
/// From the moment this function is called, each lookup is scheduled after the delays in
118+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
119+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied to each call individually. The
120+
/// result of the first successful call is returned, or a summary of all errors otherwise.
121+
async fn lookup_ipv6_staggered<N: IntoName + Clone>(
122+
&self,
123+
host: N,
124+
) -> Result<impl Iterator<Item = IpAddr>> {
125+
let f = || self.lookup_ipv6(host.clone());
126+
stagger_call(f, DNS_STAGGERING_MS).await
127+
}
128+
129+
/// Race an ipv4 and ipv6 lookup with a timeout in a staggered fashion.
130+
///
131+
/// From the moment this function is called, each lookup is scheduled after the delays in
132+
/// `delays_ms` with the first call being done immediately. `[200ms, 300ms]` results in calls
133+
/// at T+0ms, T+200ms and T+300ms. The `timeout` is applied as stated in
134+
/// [`Self::lookup_ipv4_ipv6`]. The result of the first successful call is returned, or a
135+
/// summary of all errors otherwise.
136+
async fn lookup_ipv4_ipv6_staggered<N: IntoName + Clone>(
137+
&self,
138+
host: N,
139+
) -> Result<impl Iterator<Item = IpAddr>> {
140+
let f = || self.lookup_ipv4_ipv6(host.clone());
141+
stagger_call(f, DNS_STAGGERING_MS).await
142+
}
143+
}
144+
145+
/// Helper enum to give a unified type to the iterators of [`ResolverExt::lookup_ipv4_ipv6`].
146+
enum LookupIter<A, B> {
147+
Ipv4(A),
148+
Ipv6(B),
149+
Both(std::iter::Chain<A, B>),
150+
}
151+
152+
impl<A: Iterator<Item = IpAddr>, B: Iterator<Item = IpAddr>> Iterator for LookupIter<A, B> {
153+
type Item = IpAddr;
154+
155+
fn next(&mut self) -> Option<Self::Item> {
156+
match self {
157+
LookupIter::Ipv4(iter) => iter.next(),
158+
LookupIter::Ipv6(iter) => iter.next(),
159+
LookupIter::Both(iter) => iter.next(),
160+
}
161+
}
162+
}
163+
164+
/// Staggers calls to the future F with the given delays.
165+
///
166+
/// The first call is performed immediately. The first call to succeed generates an Ok result
167+
/// ignoring any previous error. If all calls fail, an error summarizing all errors is returned.
168+
async fn stagger_call<T, F: Fn() -> Fut, Fut: Future<Output = Result<T>>>(
169+
f: F,
170+
delays_ms: &[u64],
171+
) -> Result<T> {
172+
let mut calls = futures_buffered::FuturesUnorderedBounded::new(delays_ms.len() + 1);
173+
// NOTE: we add the 0 delay here to have a uniform set of futures. This is more performant than
174+
// using alternatives that allow futures of different types.
175+
for delay in std::iter::once(&0u64).chain(delays_ms) {
176+
let delay = std::time::Duration::from_millis(*delay);
177+
let fut = f();
178+
let staggered_fut = async move {
179+
tokio::time::sleep(delay).await;
180+
fut.await
181+
};
182+
calls.push(staggered_fut)
183+
}
184+
185+
let mut errors = vec![];
186+
while let Some(call_result) = calls.next().await {
187+
match call_result {
188+
Ok(t) => return Ok(t),
189+
Err(e) => errors.push(e),
190+
}
191+
}
192+
193+
anyhow::bail!(
194+
"no calls succeed: [ {}]",
195+
errors.into_iter().fold(String::new(), |mut summary, e| {
196+
write!(summary, "{e} ").expect("infallible");
197+
summary
198+
})
199+
)
200+
}
201+
202+
#[cfg(test)]
203+
pub(crate) mod tests {
204+
use std::sync::atomic::AtomicUsize;
205+
use std::time::Duration;
206+
207+
use super::*;
208+
const TIMEOUT: Duration = Duration::from_secs(5);
209+
const STAGGERING_DELAYS: &[u64] = &[200, 300];
210+
211+
#[tokio::test]
212+
async fn stagger_basic() {
213+
let _logging = iroh_test::logging::setup();
214+
const CALL_RESULTS: &[Result<u8, u8>] = &[Err(2), Ok(3), Ok(5), Ok(7)];
215+
static DONE_CALL: AtomicUsize = AtomicUsize::new(0);
216+
let f = || {
217+
let r_pos = DONE_CALL.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
218+
async move {
219+
tracing::info!(r_pos, "call");
220+
CALL_RESULTS[r_pos].map_err(|e| anyhow::anyhow!("{e}"))
221+
}
222+
};
223+
224+
let delays = [1000, 15];
225+
let result = stagger_call(f, &delays).await.unwrap();
226+
assert_eq!(result, 5)
227+
}
228+
}

iroh-netcheck/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use tracing::{debug, error, info_span, trace, warn, Instrument};
2929
pub(crate) use relay_map::{RelayMap, RelayMode, RelayNode, RelayUrl};
3030

3131
pub mod defaults;
32+
mod dns;
3233
mod metrics;
3334
mod ping;
3435
mod relay_map;

iroh-netcheck/src/reportgen.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ use std::{
2121
net::{IpAddr, SocketAddr},
2222
pin::Pin,
2323
sync::Arc,
24+
task::{Context, Poll},
2425
time::Duration,
2526
};
2627

27-
use anyhow::{anyhow, bail, Context, Result};
28+
use anyhow::{anyhow, bail, Context as _, Result};
2829
use iroh_metrics::inc;
2930
use iroh_relay::{http::RELAY_PROBE_PATH, protos::stun};
3031
use netwatch::{interfaces, UdpSocket};
@@ -42,13 +43,9 @@ use super::NetcheckMetrics;
4243
use crate::{
4344
self as netcheck,
4445
defaults::DEFAULT_STUN_PORT,
45-
// dns::{DnsResolver, ResolverExt},
46+
dns::ResolverExt,
4647
ping::{PingError, Pinger},
47-
util::MaybeFuture,
48-
RelayMap,
49-
RelayNode,
50-
RelayUrl,
51-
Report,
48+
RelayMap, RelayNode, RelayUrl, Report,
5249
};
5350
use hickory_resolver::TokioAsyncResolver as DnsResolver;
5451

@@ -58,15 +55,11 @@ mod probes;
5855
use probes::{Probe, ProbePlan, ProbeProto};
5956

6057
use crate::defaults::timeouts::{
61-
CAPTIVE_PORTAL_DELAY, CAPTIVE_PORTAL_TIMEOUT, DNS_TIMEOUT, OVERALL_REPORT_TIMEOUT,
62-
PROBES_TIMEOUT,
58+
CAPTIVE_PORTAL_DELAY, CAPTIVE_PORTAL_TIMEOUT, OVERALL_REPORT_TIMEOUT, PROBES_TIMEOUT,
6359
};
6460

6561
const ENOUGH_NODES: usize = 3;
6662

67-
/// Delay used to perform staggered dns queries.
68-
const DNS_STAGGERING_MS: &[u64] = &[200, 300];
69-
7063
/// Holds the state for a single invocation of [`netcheck::Client::get_report`].
7164
///
7265
/// Dropping this will cancel the actor and stop the report generation.
@@ -900,7 +893,7 @@ async fn check_captive_portal(
900893
// Ideally we would try to resolve **both** IPv4 and IPv6 rather than purely race
901894
// them. But our resolver doesn't support that yet.
902895
let addrs: Vec<_> = dns_resolver
903-
.lookup_ipv4_ipv6_staggered(domain, DNS_TIMEOUT, DNS_STAGGERING_MS)
896+
.lookup_ipv4_ipv6_staggered(domain)
904897
.await?
905898
.map(|ipaddr| SocketAddr::new(ipaddr, 0))
906899
.collect();
@@ -963,10 +956,7 @@ async fn get_relay_addr(
963956
ProbeProto::StunIpv4 | ProbeProto::IcmpV4 => match relay_node.url.host() {
964957
Some(url::Host::Domain(hostname)) => {
965958
debug!(?proto, %hostname, "Performing DNS A lookup for relay addr");
966-
match dns_resolver
967-
.lookup_ipv4_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
968-
.await
969-
{
959+
match dns_resolver.lookup_ipv4_staggered(hostname).await {
970960
Ok(mut addrs) => addrs
971961
.next()
972962
.map(|ip| ip.to_canonical())
@@ -983,10 +973,7 @@ async fn get_relay_addr(
983973
ProbeProto::StunIpv6 | ProbeProto::IcmpV6 => match relay_node.url.host() {
984974
Some(url::Host::Domain(hostname)) => {
985975
debug!(?proto, %hostname, "Performing DNS AAAA lookup for relay addr");
986-
match dns_resolver
987-
.lookup_ipv6_staggered(hostname, DNS_TIMEOUT, DNS_STAGGERING_MS)
988-
.await
989-
{
976+
match dns_resolver.lookup_ipv6_staggered(hostname).await {
990977
Ok(mut addrs) => addrs
991978
.next()
992979
.map(|ip| ip.to_canonical())
@@ -1071,7 +1058,7 @@ async fn measure_https_latency(
10711058
// but staggered for reliability. Ideally this tries to resolve **both** IPv4 and
10721059
// IPv6 though. But our resolver does not have a function for that yet.
10731060
let addrs: Vec<_> = dns_resolver
1074-
.lookup_ipv4_ipv6_staggered(domain, DNS_TIMEOUT, DNS_STAGGERING_MS)
1061+
.lookup_ipv4_ipv6_staggered(domain)
10751062
.await?
10761063
.map(|ipaddr| SocketAddr::new(ipaddr, 0))
10771064
.collect();
@@ -1175,6 +1162,31 @@ fn update_report(report: &mut Report, probe_report: ProbeReport) {
11751162
.or(probe_report.icmpv6);
11761163
}
11771164

1165+
/// Resolves to pending if the inner is `None`.
1166+
#[derive(Debug)]
1167+
pub(crate) struct MaybeFuture<T> {
1168+
/// Future to be polled.
1169+
pub inner: Option<T>,
1170+
}
1171+
1172+
// NOTE: explicit implementation to bypass derive unnecessary bounds
1173+
impl<T> Default for MaybeFuture<T> {
1174+
fn default() -> Self {
1175+
MaybeFuture { inner: None }
1176+
}
1177+
}
1178+
1179+
impl<T: Future + Unpin> Future for MaybeFuture<T> {
1180+
type Output = T::Output;
1181+
1182+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1183+
match self.inner {
1184+
Some(ref mut t) => Pin::new(t).poll(cx),
1185+
None => Poll::Pending,
1186+
}
1187+
}
1188+
}
1189+
11781190
#[cfg(test)]
11791191
mod tests {
11801192
use std::net::{Ipv4Addr, Ipv6Addr};

0 commit comments

Comments
 (0)