Skip to content

Commit c999770

Browse files
authored
feat(iroh-relay): Rate-limit client connections (#2961)
## Description Add a rate limit to the incoming data from client connections. ## Breaking Changes If not configured there is now a default rate limit for incoming data from client connections: 4KiB/s steady-stream and 16MiB burst capacity. ## Notes & open questions - The choice here is made to rate-limit the incoming bytes, regardless of what they are. The benefit is that the incoming stream is slowed down, pushing back to the client over the TCP connection. The downside is that someone who is rate-limited will get a fairly bad experience since all DISCO traffic is also delayed. - Only rate-limiting non-disco traffic is an option, but it would not push back on the TCP stream, which is worse as then you'd still have to swallow all the incoming traffic. Also it would be open to abuse fairly easy as the detection of disco packets is based on a magic number which could easily be spoofed. - Maybe the `RateLimitedRelayedStream` should live in `stream.rs` next to the `RelayedStream`? Not really sure. ### TODO - [x] Allow rate-limit configuration in the config file. - [x] Test config file loading. - [x] Set a sensible default rate-limit. - [x] Improve tests to more fully test the rate limiting. - [x] Metrics when rate limits are hit. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 0a5379b commit c999770

File tree

13 files changed

+488
-101
lines changed

13 files changed

+488
-101
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-relay/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ clap = { version = "4", features = ["derive"] }
9898
crypto_box = { version = "0.9.1", features = ["serde", "chacha20"] }
9999
proptest = "1.2.0"
100100
rand_chacha = "0.3.1"
101+
testresult = "0.4.0"
101102
tokio = { version = "1", features = [
102103
"io-util",
103104
"sync",

iroh-relay/src/client/conn.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
510510
};
511511
if let Some(rate_limiter) = rate_limiter {
512512
if rate_limiter.check_n(frame.len()).is_err() {
513-
tracing::warn!("dropping send: rate limit reached");
513+
tracing::debug!("dropping send: rate limit reached");
514514
return Ok(());
515515
}
516516
}
@@ -521,12 +521,7 @@ pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
521521
}
522522

523523
pub(crate) struct RateLimiter {
524-
inner: governor::RateLimiter<
525-
governor::state::direct::NotKeyed,
526-
governor::state::InMemoryState,
527-
governor::clock::DefaultClock,
528-
governor::middleware::NoOpMiddleware,
529-
>,
524+
inner: governor::DefaultDirectRateLimiter,
530525
}
531526

532527
impl RateLimiter {

iroh-relay/src/main.rs

Lines changed: 105 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use std::{
88
path::{Path, PathBuf},
99
};
1010

11-
use anyhow::{anyhow, bail, Context as _, Result};
11+
use anyhow::{bail, Context as _, Result};
1212
use clap::Parser;
1313
use iroh_relay::{
1414
defaults::{DEFAULT_HTTPS_PORT, DEFAULT_HTTP_PORT, DEFAULT_METRICS_PORT, DEFAULT_STUN_PORT},
15-
server as relay,
15+
server::{self as relay, ClientConnRateLimit},
1616
};
1717
use serde::{Deserialize, Serialize};
1818
use tokio_rustls_acme::{caches::DirCache, AcmeConfig};
@@ -282,6 +282,29 @@ struct Limits {
282282
accept_conn_limit: Option<f64>,
283283
/// Burst limit for accepting new connection. Unlimited if not set.
284284
accept_conn_burst: Option<usize>,
285+
/// Rate limiting configuration per client.
286+
client: Option<PerClientRateLimitConfig>,
287+
}
288+
289+
/// Rate limit configuration for each connected client.
290+
///
291+
/// The rate limiting uses a token-bucket style algorithm:
292+
///
293+
/// - The base rate limit uses a steady-stream rate of bytes allowed.
294+
/// - Additionally a burst quota allows sending bytes over this steady-stream rate
295+
/// limit, as long as the maximum burst quota is not exceeded.
296+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
297+
struct PerClientRateLimitConfig {
298+
/// Rate limit configuration for the incoming data from the client.
299+
rx: Option<RateLimitConfig>,
300+
}
301+
302+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
303+
struct RateLimitConfig {
304+
/// Maximum number of bytes per second.
305+
bytes_per_second: Option<u32>,
306+
/// Maximum number of bytes to read in a single burst.
307+
max_burst_bytes: Option<u32>,
285308
}
286309

287310
impl Config {
@@ -295,41 +318,22 @@ impl Config {
295318
if config_path.exists() {
296319
Self::read_from_file(&config_path).await
297320
} else {
298-
let config = Config::default();
299-
config.write_to_file(&config_path).await?;
300-
301-
Ok(config)
321+
Ok(Config::default())
302322
}
303323
}
304324

325+
fn from_str(config: &str) -> Result<Self> {
326+
toml::from_str(config).context("config must be valid toml")
327+
}
328+
305329
async fn read_from_file(path: impl AsRef<Path>) -> Result<Self> {
306330
if !path.as_ref().is_file() {
307331
bail!("config-path must be a file");
308332
}
309333
let config_ser = tokio::fs::read_to_string(&path)
310334
.await
311335
.context("unable to read config")?;
312-
let config: Self = toml::from_str(&config_ser).context("config file must be valid toml")?;
313-
314-
Ok(config)
315-
}
316-
317-
/// Write the content of this configuration to the provided path.
318-
async fn write_to_file(&self, path: impl AsRef<Path>) -> Result<()> {
319-
let p = path
320-
.as_ref()
321-
.parent()
322-
.ok_or_else(|| anyhow!("invalid config file path, no parent"))?;
323-
// TODO: correct permissions (0777 for dir, 0600 for file)
324-
tokio::fs::create_dir_all(p)
325-
.await
326-
.with_context(|| format!("unable to create config-path dir: {}", p.display()))?;
327-
let config_ser = toml::to_string(self).context("unable to serialize configuration")?;
328-
tokio::fs::write(path, config_ser)
329-
.await
330-
.context("unable to write config file")?;
331-
332-
Ok(())
336+
Self::from_str(&config_ser)
333337
}
334338
}
335339

@@ -402,17 +406,37 @@ async fn build_relay_config(cfg: Config) -> Result<relay::ServerConfig<std::io::
402406
}
403407
None => None,
404408
};
405-
let limits = relay::Limits {
406-
accept_conn_limit: cfg
407-
.limits
408-
.as_ref()
409-
.map(|l| l.accept_conn_limit)
410-
.unwrap_or_default(),
411-
accept_conn_burst: cfg
412-
.limits
413-
.as_ref()
414-
.map(|l| l.accept_conn_burst)
415-
.unwrap_or_default(),
409+
let limits = match cfg.limits {
410+
Some(ref limits) => {
411+
let client_rx = match &limits.client {
412+
Some(PerClientRateLimitConfig { rx: Some(rx) }) => {
413+
if rx.bytes_per_second.is_none() && rx.max_burst_bytes.is_some() {
414+
bail!("bytes_per_seconds must be specified to enable the rate-limiter");
415+
}
416+
match rx.bytes_per_second {
417+
Some(bps) => Some(ClientConnRateLimit {
418+
bytes_per_second: bps
419+
.try_into()
420+
.context("bytes_per_second must be non-zero u32")?,
421+
max_burst_bytes: rx
422+
.max_burst_bytes
423+
.map(|v| {
424+
v.try_into().context("max_burst_bytes must be non-zero u32")
425+
})
426+
.transpose()?,
427+
}),
428+
None => None,
429+
}
430+
}
431+
Some(PerClientRateLimitConfig { rx: None }) | None => None,
432+
};
433+
relay::Limits {
434+
accept_conn_limit: limits.accept_conn_limit,
435+
accept_conn_burst: limits.accept_conn_burst,
436+
client_rx,
437+
}
438+
}
439+
None => Default::default(),
416440
};
417441
let relay_config = relay::RelayConfig {
418442
http_bind_addr: cfg.http_bind_addr(),
@@ -477,3 +501,46 @@ mod metrics {
477501
}
478502
}
479503
}
504+
505+
#[cfg(test)]
506+
mod tests {
507+
use std::num::NonZeroU32;
508+
509+
use testresult::TestResult;
510+
511+
use super::*;
512+
513+
#[tokio::test]
514+
async fn test_rate_limit_config() -> TestResult {
515+
let config = "
516+
[limits.client.rx]
517+
bytes_per_second = 400
518+
max_burst_bytes = 800
519+
";
520+
let config = Config::from_str(config)?;
521+
let relay_config = build_relay_config(config).await?;
522+
523+
let relay = relay_config.relay.expect("no relay config");
524+
assert_eq!(
525+
relay.limits.client_rx.expect("ratelimit").bytes_per_second,
526+
NonZeroU32::try_from(400).unwrap()
527+
);
528+
assert_eq!(
529+
relay.limits.client_rx.expect("ratelimit").max_burst_bytes,
530+
Some(NonZeroU32::try_from(800).unwrap())
531+
);
532+
533+
Ok(())
534+
}
535+
536+
#[tokio::test]
537+
async fn test_rate_limit_default() -> TestResult {
538+
let config = Config::from_str("")?;
539+
let relay_config = build_relay_config(config).await?;
540+
541+
let relay = relay_config.relay.expect("no relay config");
542+
assert!(relay.limits.client_rx.is_none());
543+
544+
Ok(())
545+
}
546+
}

iroh-relay/src/protos/disco.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub(crate) const MAGIC_LEN: usize = MAGIC.as_bytes().len();
1010
pub(crate) const KEY_LEN: usize = 32;
1111

1212
const MESSAGE_HEADER_LEN: usize = MAGIC_LEN + KEY_LEN;
13+
1314
/// Reports whether p looks like it's a packet containing an encrypted disco message.
1415
pub fn looks_like_disco_wrapper(p: &[u8]) -> bool {
1516
if p.len() < MESSAGE_HEADER_LEN {

iroh-relay/src/protos/relay.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use tokio_util::codec::{Decoder, Encoder};
1818
/// including its on-wire framing overhead)
1919
pub const MAX_PACKET_SIZE: usize = 64 * 1024;
2020

21+
/// The maximum frame size.
22+
///
23+
/// This is also the minimum burst size that a rate-limiter has to accept.
2124
const MAX_FRAME_SIZE: usize = 1024 * 1024;
2225

2326
/// The Relay magic number, sent in the FrameType::ClientInfo frame upon initial connection.
@@ -200,9 +203,14 @@ pub(crate) async fn recv_client_key<S: Stream<Item = anyhow::Result<Frame>> + Un
200203
}
201204
}
202205

206+
/// The protocol for the relay server.
207+
///
208+
/// This is a framed protocol, using [`tokio_util::codec`] to turn the streams of bytes into
209+
/// [`Frame`]s.
203210
#[derive(Debug, Default, Clone)]
204211
pub(crate) struct DerpCodec;
205212

213+
/// The frames in the [`DerpCodec`].
206214
#[derive(Debug, Clone, PartialEq, Eq)]
207215
pub(crate) enum Frame {
208216
ClientInfo {
@@ -279,6 +287,12 @@ impl Frame {
279287
}
280288
}
281289

290+
/// Serialized length with frame header.
291+
#[cfg(feature = "server")]
292+
pub(crate) fn len_with_header(&self) -> usize {
293+
self.len() + HEADER_LEN
294+
}
295+
282296
/// Tries to decode a frame received over websockets.
283297
///
284298
/// Specifically, bytes received from a binary websocket message frame.

iroh-relay/src/server.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
//! - HTTPS `/generate_204`: Used for net_report probes.
1717
//! - STUN: UDP port for STUN requests/responses.
1818
19-
use std::{fmt, future::Future, net::SocketAddr, pin::Pin, sync::Arc};
19+
use std::{fmt, future::Future, net::SocketAddr, num::NonZeroU32, pin::Pin, sync::Arc};
2020

2121
use anyhow::{anyhow, bail, Context, Result};
2222
use futures_lite::StreamExt;
@@ -140,12 +140,24 @@ pub struct TlsConfig<EC: fmt::Debug, EA: fmt::Debug = EC> {
140140
}
141141

142142
/// Rate limits.
143+
// TODO: accept_conn_limit and accept_conn_burst are not currently implemented.
143144
#[derive(Debug, Default)]
144145
pub struct Limits {
145146
/// Rate limit for accepting new connection. Unlimited if not set.
146147
pub accept_conn_limit: Option<f64>,
147148
/// Burst limit for accepting new connection. Unlimited if not set.
148149
pub accept_conn_burst: Option<usize>,
150+
/// Rate limits for incoming traffic from a client connection.
151+
pub client_rx: Option<ClientConnRateLimit>,
152+
}
153+
154+
/// Per-client rate limit configuration.
155+
#[derive(Debug, Copy, Clone)]
156+
pub struct ClientConnRateLimit {
157+
/// Max number of bytes per second to read from the client connection.
158+
pub bytes_per_second: NonZeroU32,
159+
/// Max number of bytes to read in a single burst.
160+
pub max_burst_bytes: Option<NonZeroU32>,
149161
}
150162

151163
/// TLS certificate configuration.
@@ -260,6 +272,9 @@ impl Server {
260272
.request_handler(Method::GET, "/index.html", Box::new(root_handler))
261273
.request_handler(Method::GET, RELAY_PROBE_PATH, Box::new(probe_handler))
262274
.request_handler(Method::GET, "/robots.txt", Box::new(robots_handler));
275+
if let Some(cfg) = relay_config.limits.client_rx {
276+
builder = builder.client_rx_ratelimit(cfg);
277+
}
263278
let http_addr = match relay_config.tls {
264279
Some(tls_config) => {
265280
let server_config = rustls::ServerConfig::builder_with_provider(Arc::new(

iroh-relay/src/server/actor.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,8 @@ impl Actor {
183183
}
184184
Message::CreateClient(client_builder) => {
185185
inc!(Metrics, accepts);
186-
187-
trace!(
188-
node_id = client_builder.node_id.fmt_short(),
189-
"create client"
190-
);
191186
let node_id = client_builder.node_id;
187+
trace!(node_id = node_id.fmt_short(), "create client");
192188

193189
// build and register client, starting up read & write loops for the client
194190
// connection
@@ -272,6 +268,7 @@ mod tests {
272268
stream: RelayedStream::Derp(Framed::new(MaybeTlsStream::Test(io), DerpCodec)),
273269
write_timeout: Duration::from_secs(1),
274270
channel_capacity: 10,
271+
rate_limit: None,
275272
server_channel,
276273
},
277274
Framed::new(test_io, DerpCodec),

0 commit comments

Comments
 (0)