Skip to content

Commit b2b070f

Browse files
authored
refactor(iroh): Remove unused rate limiter (#3007)
## Description This was a rate limiter which seems like the intention was to slow down how fast it sends to a particular relay server. This is the wrong place to rate-limit. If the server limits that it should read slower from the TCP stream (it does this now!) and that blocks up the TCP-stream and in turn slows down the relay client. In no world does a socket internally rate-limit to one of it's destinations, that's not how sockets work. And this client exists for the MagicSock. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 566d7eb commit b2b070f

File tree

3 files changed

+9
-52
lines changed

3 files changed

+9
-52
lines changed

iroh-relay/src/client/conn.rs

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
55
use std::{
66
net::SocketAddr,
7-
num::NonZeroU32,
87
pin::Pin,
98
sync::Arc,
109
task::{Context, Poll},
1110
time::Duration,
1211
};
1312

14-
use anyhow::{anyhow, bail, ensure, Context as _, Result};
13+
use anyhow::{anyhow, bail, ensure, Result};
1514
use bytes::Bytes;
1615
use futures_lite::Stream;
1716
use futures_sink::Sink;
@@ -229,15 +228,14 @@ enum ConnWriterMessage {
229228
struct ConnWriterTasks {
230229
recv_msgs: mpsc::Receiver<ConnWriterMessage>,
231230
writer: ConnWriter,
232-
rate_limiter: Option<RateLimiter>,
233231
}
234232

235233
impl ConnWriterTasks {
236234
async fn run(mut self) -> Result<()> {
237235
while let Some(msg) = self.recv_msgs.recv().await {
238236
match msg {
239237
ConnWriterMessage::Packet((key, bytes)) => {
240-
send_packet(&mut self.writer, &self.rate_limiter, key, bytes).await?;
238+
send_packet(&mut self.writer, key, bytes).await?;
241239
}
242240
ConnWriterMessage::Pong(data) => {
243241
write_frame(&mut self.writer, Frame::Pong { data }, None).await?;
@@ -360,7 +358,7 @@ impl ConnBuilder {
360358
}
361359
}
362360

363-
async fn server_handshake(&mut self) -> Result<Option<RateLimiter>> {
361+
async fn server_handshake(&mut self) -> Result<()> {
364362
debug!("server_handshake: started");
365363
let client_info = ClientInfo {
366364
version: PROTOCOL_VERSION,
@@ -369,22 +367,18 @@ impl ConnBuilder {
369367
crate::protos::relay::send_client_key(&mut self.writer, &self.secret_key, &client_info)
370368
.await?;
371369

372-
// TODO: add some actual configuration
373-
let rate_limiter = RateLimiter::new(0, 0)?;
374-
375370
debug!("server_handshake: done");
376-
Ok(rate_limiter)
371+
Ok(())
377372
}
378373

379374
pub async fn build(mut self) -> Result<(Conn, ConnReceiver)> {
380375
// exchange information with the server
381-
let rate_limiter = self.server_handshake().await?;
376+
self.server_handshake().await?;
382377

383378
// create task to handle writing to the server
384379
let (writer_sender, writer_recv) = mpsc::channel(PER_CLIENT_SEND_QUEUE_DEPTH);
385380
let writer_task = tokio::task::spawn(
386381
ConnWriterTasks {
387-
rate_limiter,
388382
writer: self.writer,
389383
recv_msgs: writer_recv,
390384
}
@@ -494,7 +488,6 @@ pub enum ReceivedMessage {
494488

495489
pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
496490
mut writer: S,
497-
rate_limiter: &Option<RateLimiter>,
498491
dst: NodeId,
499492
packet: Bytes,
500493
) -> Result<()> {
@@ -508,43 +501,8 @@ pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
508501
dst_key: dst,
509502
packet,
510503
};
511-
if let Some(rate_limiter) = rate_limiter {
512-
if rate_limiter.check_n(frame.len()).is_err() {
513-
tracing::debug!("dropping send: rate limit reached");
514-
return Ok(());
515-
}
516-
}
517504
writer.send(frame).await?;
518505
writer.flush().await?;
519506

520507
Ok(())
521508
}
522-
523-
pub(crate) struct RateLimiter {
524-
inner: governor::DefaultDirectRateLimiter,
525-
}
526-
527-
impl RateLimiter {
528-
pub(crate) fn new(bytes_per_second: usize, bytes_burst: usize) -> Result<Option<Self>> {
529-
if bytes_per_second == 0 || bytes_burst == 0 {
530-
return Ok(None);
531-
}
532-
let bytes_per_second = NonZeroU32::new(u32::try_from(bytes_per_second)?)
533-
.context("bytes_per_second not non-zero")?;
534-
let bytes_burst =
535-
NonZeroU32::new(u32::try_from(bytes_burst)?).context("bytes_burst not non-zero")?;
536-
Ok(Some(Self {
537-
inner: governor::RateLimiter::direct(
538-
governor::Quota::per_second(bytes_per_second).allow_burst(bytes_burst),
539-
),
540-
}))
541-
}
542-
543-
pub(crate) fn check_n(&self, n: usize) -> Result<()> {
544-
let n = NonZeroU32::new(u32::try_from(n)?).context("n not non-zero")?;
545-
match self.inner.check_n(n) {
546-
Ok(_) => Ok(()),
547-
Err(_) => bail!("batch cannot go through"),
548-
}
549-
}
550-
}

iroh-relay/src/server/actor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,7 @@ mod tests {
308308

309309
// write message from b to a
310310
let msg = b"hello world!";
311-
crate::client::conn::send_packet(&mut b_io, &None, node_id_a, Bytes::from_static(msg))
312-
.await?;
311+
crate::client::conn::send_packet(&mut b_io, node_id_a, Bytes::from_static(msg)).await?;
313312

314313
// get message on a's reader
315314
let frame = recv_frame(FrameType::RecvPacket, &mut a_io).await?;

iroh-relay/src/server/client_conn.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ mod tests {
617617
// send packet
618618
println!(" send packet");
619619
let data = b"hello world!";
620-
conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?;
620+
conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?;
621621
let msg = server_channel_r.recv().await.unwrap();
622622
match msg {
623623
actor::Message::SendPacket {
@@ -640,7 +640,7 @@ mod tests {
640640
let mut disco_data = disco::MAGIC.as_bytes().to_vec();
641641
disco_data.extend_from_slice(target.as_bytes());
642642
disco_data.extend_from_slice(data);
643-
conn::send_packet(&mut io_rw, &None, target, disco_data.clone().into()).await?;
643+
conn::send_packet(&mut io_rw, target, disco_data.clone().into()).await?;
644644
let msg = server_channel_r.recv().await.unwrap();
645645
match msg {
646646
actor::Message::SendDiscoPacket {
@@ -698,7 +698,7 @@ mod tests {
698698
let data = b"hello world!";
699699
let target = SecretKey::generate().public();
700700

701-
conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?;
701+
conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?;
702702
let msg = server_channel_r.recv().await.unwrap();
703703
match msg {
704704
actor::Message::SendPacket {

0 commit comments

Comments
 (0)