Skip to content

Commit cd12da3

Browse files
dignifiedquireflub
andauthored
feat(iroh-relay): send regular pings to check the connection (#3113)
## Description Because of issues with TCP, we now send regular ping-pongs from client to server and vice versa. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --------- Co-authored-by: Floris Bruynooghe <[email protected]>
1 parent a5bb926 commit cd12da3

File tree

5 files changed

+159
-73
lines changed

5 files changed

+159
-73
lines changed

iroh-relay/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub mod quic;
3838
#[cfg(feature = "server")]
3939
pub mod server;
4040

41+
mod ping_tracker;
42+
4143
mod key_cache;
4244
mod relay_map;
4345
pub(crate) use key_cache::KeyCache;
@@ -47,4 +49,7 @@ mod dns;
4749

4850
pub use protos::relay::MAX_PACKET_SIZE;
4951

50-
pub use self::relay_map::{RelayMap, RelayNode, RelayQuicConfig};
52+
pub use self::{
53+
ping_tracker::PingTracker,
54+
relay_map::{RelayMap, RelayNode, RelayQuicConfig},
55+
};

iroh-relay/src/ping_tracker.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::time::{Duration, Instant};
2+
3+
use tracing::debug;
4+
5+
/// Maximum time for a ping response in the relay protocol.
6+
pub const PING_TIMEOUT: Duration = Duration::from_secs(5);
7+
8+
/// Tracks pings on a single relay connection.
9+
///
10+
/// Only the last ping needs is useful, any previously sent ping is forgotten and ignored.
11+
#[derive(Debug)]
12+
pub struct PingTracker {
13+
inner: Option<PingInner>,
14+
default_timeout: Duration,
15+
}
16+
17+
#[derive(Debug)]
18+
struct PingInner {
19+
data: [u8; 8],
20+
deadline: Instant,
21+
}
22+
23+
impl Default for PingTracker {
24+
fn default() -> Self {
25+
Self::new(PING_TIMEOUT)
26+
}
27+
}
28+
29+
impl PingTracker {
30+
/// Creates a new ping tracker, setting the ping timeout for pings.
31+
pub fn new(default_timeout: Duration) -> Self {
32+
Self {
33+
inner: None,
34+
default_timeout,
35+
}
36+
}
37+
38+
/// Returns the current timeout set for pings.
39+
pub fn default_timeout(&self) -> Duration {
40+
self.default_timeout
41+
}
42+
43+
/// Starts a new ping.
44+
pub fn new_ping(&mut self) -> [u8; 8] {
45+
let ping_data = rand::random();
46+
debug!(data = ?ping_data, "Sending ping to relay server.");
47+
self.inner = Some(PingInner {
48+
data: ping_data,
49+
deadline: Instant::now() + self.default_timeout,
50+
});
51+
ping_data
52+
}
53+
54+
/// Updates the ping tracker with a received pong.
55+
///
56+
/// Only the pong of the most recent ping will do anything. There is no harm feeding
57+
/// any pong however.
58+
pub fn pong_received(&mut self, data: [u8; 8]) {
59+
if self.inner.as_ref().map(|inner| inner.data) == Some(data) {
60+
debug!(?data, "Pong received from relay server");
61+
self.inner = None;
62+
}
63+
}
64+
65+
/// Cancel-safe waiting for a ping timeout.
66+
///
67+
/// Unless the most recent sent ping times out, this will never return.
68+
pub async fn timeout(&mut self) {
69+
match self.inner {
70+
Some(PingInner { deadline, data }) => {
71+
tokio::time::sleep_until(deadline.into()).await;
72+
debug!(?data, "Ping timeout.");
73+
self.inner = None;
74+
}
75+
None => std::future::pending().await,
76+
}
77+
}
78+
}

iroh-relay/src/protos/relay.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,13 @@ const MAX_FRAME_SIZE: usize = 1024 * 1024;
4141
/// The Relay magic number, sent in the FrameType::ClientInfo frame upon initial connection.
4242
const MAGIC: &str = "RELAY🔑";
4343

44+
/// Interval in which we ping the relay server to ensure the connection is alive.
45+
///
46+
/// The default QUIC max_idle_timeout is 30s, so setting that to half this time gives some
47+
/// chance of recovering.
4448
#[cfg(feature = "server")]
45-
pub(crate) const KEEP_ALIVE: Duration = Duration::from_secs(60);
49+
pub(crate) const PING_INTERVAL: Duration = Duration::from_secs(15);
50+
4651
/// The number of packets buffered for sending per client
4752
#[cfg(feature = "server")]
4853
pub(crate) const PER_CLIENT_SEND_QUEUE_DEPTH: usize = 512; //32;

iroh-relay/src/server/client.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,21 @@ use futures_sink::Sink;
99
use futures_util::{SinkExt, Stream, StreamExt};
1010
use iroh_base::NodeId;
1111
use iroh_metrics::{inc, inc_by};
12-
use tokio::sync::mpsc::{self, error::TrySendError};
12+
use rand::Rng;
13+
use tokio::{
14+
sync::mpsc::{self, error::TrySendError},
15+
time::MissedTickBehavior,
16+
};
1317
use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle};
1418
use tracing::{debug, error, instrument, trace, warn, Instrument};
1519

1620
use crate::{
1721
protos::{
1822
disco,
19-
relay::{write_frame, Frame, KEEP_ALIVE},
23+
relay::{write_frame, Frame, PING_INTERVAL},
2024
},
2125
server::{clients::Clients, metrics::Metrics, streams::RelayedStream, ClientRateLimit},
26+
PingTracker,
2227
};
2328

2429
/// A request to write a dataframe to a Client
@@ -102,6 +107,7 @@ impl Client {
102107
node_id,
103108
connection_id,
104109
clients: clients.clone(),
110+
ping_tracker: PingTracker::default(),
105111
};
106112

107113
// start io loop
@@ -201,6 +207,7 @@ struct Actor {
201207
connection_id: u64,
202208
/// Reference to the other connected clients.
203209
clients: Clients,
210+
ping_tracker: PingTracker,
204211
}
205212

206213
impl Actor {
@@ -220,10 +227,16 @@ impl Actor {
220227
}
221228

222229
async fn run_inner(&mut self, done: CancellationToken) -> Result<()> {
223-
let jitter = Duration::from_secs(5);
224-
let mut keep_alive = tokio::time::interval(KEEP_ALIVE + jitter);
230+
// Add some jitter to ping pong interactions, to avoid all pings being sent at the same time
231+
let next_interval = || {
232+
let random_secs = rand::rngs::OsRng.gen_range(1..=5);
233+
Duration::from_secs(random_secs) + PING_INTERVAL
234+
};
235+
236+
let mut ping_interval = tokio::time::interval(next_interval());
225237
// ticks immediately
226-
keep_alive.tick().await;
238+
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
239+
ping_interval.tick().await;
227240

228241
loop {
229242
tokio::select! {
@@ -237,6 +250,8 @@ impl Actor {
237250
}
238251
maybe_frame = self.stream.next() => {
239252
self.handle_frame(maybe_frame).await.context("handle read")?;
253+
// reset the ping interval, we just received a message
254+
ping_interval.reset();
240255
}
241256
// First priority, disco packets
242257
packet = self.disco_send_queue.recv() => {
@@ -254,11 +269,19 @@ impl Actor {
254269
trace!("node_id gone: {:?}", node_id);
255270
self.write_frame(Frame::NodeGone { node_id }).await?;
256271
}
257-
_ = keep_alive.tick() => {
258-
trace!("keep alive");
259-
self.write_frame(Frame::KeepAlive).await?;
272+
_ = self.ping_tracker.timeout() => {
273+
trace!("pong timed out");
274+
break;
275+
}
276+
_ = ping_interval.tick() => {
277+
trace!("keep alive ping");
278+
// new interval
279+
ping_interval.reset_after(next_interval());
280+
let data = self.ping_tracker.new_ping();
281+
self.write_frame(Frame::Ping { data }).await?;
260282
}
261283
}
284+
262285
self.stream.flush().await.context("tick flush")?;
263286
}
264287
Ok(())
@@ -321,6 +344,7 @@ impl Actor {
321344
Some(frame) => frame?,
322345
None => anyhow::bail!("stream terminated"),
323346
};
347+
324348
match frame {
325349
Frame::SendPacket { dst_key, packet } => {
326350
let packet_len = packet.len();
@@ -333,6 +357,9 @@ impl Actor {
333357
self.write_frame(Frame::Pong { data }).await?;
334358
inc!(Metrics, sent_pong);
335359
}
360+
Frame::Pong { data } => {
361+
self.ping_tracker.pong_received(data);
362+
}
336363
Frame::Health { problem } => {
337364
bail!("server issue: {:?}", problem);
338365
}
@@ -567,6 +594,7 @@ mod tests {
567594
connection_id: 0,
568595
node_id,
569596
clients: clients.clone(),
597+
ping_tracker: PingTracker::default(),
570598
};
571599

572600
let done = CancellationToken::new();

0 commit comments

Comments
 (0)