Skip to content

Commit ef97b5a

Browse files
committed
make the RelayActor not block on the datagram send path
This makes sure to send as many datagrams as quickly as possible to the right ActiveRelayActor but at the same time not block the RelayActor itself. This ensures it can still process other inbox items and instruct the ActiveRelayActors appropriately.
1 parent 4f5263e commit ef97b5a

File tree

1 file changed

+97
-66
lines changed

1 file changed

+97
-66
lines changed

iroh/src/magicsock/relay_actor.rs

Lines changed: 97 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use std::net::SocketAddr;
88
use std::{
99
collections::{BTreeMap, BTreeSet},
10+
future::Future,
1011
net::IpAddr,
1112
sync::{
1213
atomic::{AtomicBool, Ordering},
@@ -53,7 +54,7 @@ struct ActiveRelayActor {
5354
/// Queue to send received relay datagrams on.
5455
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
5556
/// Channel on which we receive packets to send to the relay.
56-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
57+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
5758
url: RelayUrl,
5859
/// Whether or not this is the home relay connection.
5960
is_home_relay: bool,
@@ -96,7 +97,7 @@ enum ActiveRelayMessage {
9697
#[derive(Debug)]
9798
struct ActiveRelayActorOptions {
9899
url: RelayUrl,
99-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
100+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
100101
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
101102
connection_opts: RelayConnectionOptions,
102103
}
@@ -205,11 +206,9 @@ impl ActiveRelayActor {
205206
relay_send_fut.as_mut().set_none();
206207
}
207208
// Only poll for new datagrams if relay_send_fut is not busy.
208-
Some(msg) = self.relay_datagrams_send.recv(), if relay_send_fut.is_none() => {
209-
let relay_client = self.relay_client.clone();
210-
let fut = async move {
211-
relay_client.send(msg.node_id, msg.packet).await
212-
};
209+
Some(item) = self.relay_datagrams_send.recv(), if relay_send_fut.is_none() => {
210+
debug_assert_eq!(item.url, self.url);
211+
let fut = Self::send_relay(self.relay_client.clone(), item);
213212
relay_send_fut.as_mut().set_future(fut);
214213
inactive_timeout.reset();
215214

@@ -299,6 +298,24 @@ impl ActiveRelayActor {
299298
}
300299
}
301300

301+
async fn send_relay(relay_client: relay::client::Client, item: RelaySendItem) {
302+
// When Quinn sends a GSO Transmit magicsock::split_packets will make us receive
303+
// more than one packet to send in a single call. We join all packets back together
304+
// and prefix them with a u16 packet size. They then get sent as a single DISCO
305+
// frame. However this might still be multiple packets when otherwise the maximum
306+
// packet size for the relay protocol would be exceeded.
307+
for packet in PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(item.remote_node, item.datagrams) {
308+
let len = packet.len();
309+
match relay_client.send(packet.node_id, packet.payload).await {
310+
Ok(_) => inc_by!(MagicsockMetrics, send_relay, len as _),
311+
Err(err) => {
312+
warn!("send failed: {err:#}");
313+
inc!(MagicsockMetrics, send_relay_error);
314+
}
315+
}
316+
}
317+
}
318+
302319
async fn handle_relay_msg(&mut self, msg: Result<ReceivedMessage, ClientError>) -> ReadResult {
303320
match msg {
304321
Err(err) => {
@@ -407,6 +424,7 @@ pub(super) enum RelayActorMessage {
407424
SetHome { url: RelayUrl },
408425
}
409426

427+
#[derive(Debug, Clone)]
410428
pub(super) struct RelaySendItem {
411429
/// The destination for the datagrams.
412430
pub(super) remote_node: NodeId,
@@ -458,6 +476,10 @@ impl RelayActor {
458476
mut receiver: mpsc::Receiver<RelayActorMessage>,
459477
mut datagram_send_channel: RelayDatagramSendChannelReceiver,
460478
) {
479+
// When this future is present, it is sending pending datagrams to an
480+
// ActiveRelayActor. We can not process further datagrams during this time.
481+
let mut datagram_send_fut = std::pin::pin!(MaybeFuture::none());
482+
461483
loop {
462484
tokio::select! {
463485
biased;
@@ -482,14 +504,21 @@ impl RelayActor {
482504
let cancel_token = self.cancel_token.child_token();
483505
cancel_token.run_until_cancelled(self.handle_msg(msg)).await;
484506
}
485-
item = datagram_send_channel.recv() => {
507+
// Only poll for new datagrams if we are not blocked on sending them.
508+
item = datagram_send_channel.recv(), if datagram_send_fut.is_none() => {
486509
let Some(item) = item else {
487510
debug!("Datagram send channel dropped, shutting down.");
488511
break;
489512
};
490-
let cancel_token = self.cancel_token.child_token();
491-
cancel_token.run_until_cancelled(self.send_relay(item)).await;
513+
let token = self.cancel_token.child_token();
514+
if let Some(Some(fut)) = token.run_until_cancelled(
515+
self.try_send_datagram(item)
516+
).await {
517+
datagram_send_fut.as_mut().set_future(fut);
518+
}
492519
}
520+
// Only poll this future if it is in use.
521+
_ = &mut datagram_send_fut, if datagram_send_fut.is_some() => {}
493522
}
494523
}
495524

@@ -513,34 +542,30 @@ impl RelayActor {
513542
}
514543
}
515544

516-
async fn send_relay(&mut self, item: RelaySendItem) {
517-
let RelaySendItem {
518-
remote_node,
519-
url,
520-
datagrams,
521-
} = item;
522-
let total_bytes = datagrams.iter().map(|c| c.len() as u64).sum::<u64>();
523-
trace!(
524-
%url,
525-
remote_node = %remote_node.fmt_short(),
526-
len = total_bytes,
527-
"sending over relay",
528-
);
529-
let handle = self.active_relay_handle_for_node(&url, &remote_node).await;
530-
531-
// When Quinn sends a GSO Transmit magicsock::split_packets will make us receive
532-
// more than one packet to send in a single call. We join all packets back together
533-
// and prefix them with a u16 packet size. They then get sent as a single DISCO
534-
// frame. However this might still be multiple packets when otherwise the maximum
535-
// packet size for the relay protocol would be exceeded.
536-
for packet in PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(remote_node, datagrams) {
537-
let len = packet.len();
538-
match handle.datagrams_send_queue.send(packet).await {
539-
Ok(_) => inc_by!(MagicsockMetrics, send_relay, len as _),
540-
Err(err) => {
541-
warn!(?url, "send failed: {err:#}");
542-
inc!(MagicsockMetrics, send_relay_error);
543-
}
545+
/// Sends datagrams to the correct [`ActiveRelayActor`], or returns a future.
546+
///
547+
/// If the datagram can not be sent immediately, because the destination channel is
548+
/// full, a future is returned that will complete once the datagrams have been sent to
549+
/// the [`ActiveRelayActor`].
550+
async fn try_send_datagram(&mut self, item: RelaySendItem) -> Option<impl Future<Output = ()>> {
551+
let url = item.url.clone();
552+
let handle = self
553+
.active_relay_handle_for_node(&item.url, &item.remote_node)
554+
.await;
555+
match handle.datagrams_send_queue.try_send(item) {
556+
Ok(()) => None,
557+
Err(mpsc::error::TrySendError::Closed(_)) => {
558+
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
559+
None
560+
}
561+
Err(mpsc::error::TrySendError::Full(item)) => {
562+
let sender = handle.datagrams_send_queue.clone();
563+
let fut = async move {
564+
if sender.send(item).await.is_err() {
565+
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
566+
}
567+
};
568+
Some(fut)
544569
}
545570
}
546571
}
@@ -740,7 +765,7 @@ impl RelayActor {
740765
#[derive(Debug, Clone)]
741766
struct ActiveRelayHandle {
742767
inbox_addr: mpsc::Sender<ActiveRelayMessage>,
743-
datagrams_send_queue: mpsc::Sender<RelaySendPacket>,
768+
datagrams_send_queue: mpsc::Sender<RelaySendItem>,
744769
}
745770

746771
/// A packet to send over the relay.
@@ -752,12 +777,12 @@ struct ActiveRelayHandle {
752777
#[derive(Debug, PartialEq, Eq)]
753778
struct RelaySendPacket {
754779
node_id: NodeId,
755-
packet: Bytes,
780+
payload: Bytes,
756781
}
757782

758783
impl RelaySendPacket {
759784
fn len(&self) -> usize {
760-
self.packet.len()
785+
self.payload.len()
761786
}
762787
}
763788

@@ -826,7 +851,7 @@ where
826851
if !self.buffer.is_empty() {
827852
Some(RelaySendPacket {
828853
node_id: self.node_id,
829-
packet: self.buffer.split().freeze(),
854+
payload: self.buffer.split().freeze(),
830855
})
831856
} else {
832857
None
@@ -889,6 +914,7 @@ impl Iterator for PacketSplitIter {
889914
mod tests {
890915
use futures_lite::future;
891916
use iroh_base::SecretKey;
917+
use smallvec::smallvec;
892918
use testresult::TestResult;
893919
use tokio_util::task::AbortOnDropHandle;
894920

@@ -906,7 +932,10 @@ mod tests {
906932
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(node_id, single_vec);
907933
let result = iter.collect::<Vec<_>>();
908934
assert_eq!(1, result.len());
909-
assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0].packet[..]);
935+
assert_eq!(
936+
&[5, 0, b'H', b'e', b'l', b'l', b'o'],
937+
&result[0].payload[..]
938+
);
910939

911940
let spacer = vec![0u8; MAX_PACKET_SIZE - 10];
912941
let multiple_vec = vec![&b"Hello"[..], &spacer, &b"World"[..]];
@@ -915,17 +944,20 @@ mod tests {
915944
assert_eq!(2, result.len());
916945
assert_eq!(
917946
&[5, 0, b'H', b'e', b'l', b'l', b'o'],
918-
&result[0].packet[..7]
947+
&result[0].payload[..7]
948+
);
949+
assert_eq!(
950+
&[5, 0, b'W', b'o', b'r', b'l', b'd'],
951+
&result[1].payload[..]
919952
);
920-
assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1].packet[..]);
921953
}
922954

923955
/// Starts a new [`ActiveRelayActor`].
924956
fn start_active_relay_actor(
925957
secret_key: SecretKey,
926958
url: RelayUrl,
927959
inbox_rx: mpsc::Receiver<ActiveRelayMessage>,
928-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
960+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
929961
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
930962
) -> AbortOnDropHandle<anyhow::Result<()>> {
931963
let opts = ActiveRelayActorOptions {
@@ -962,27 +994,30 @@ mod tests {
962994
let (inbox_tx, inbox_rx) = mpsc::channel(16);
963995
let actor_task = start_active_relay_actor(
964996
secret_key.clone(),
965-
relay_url,
997+
relay_url.clone(),
966998
inbox_rx,
967999
send_datagram_rx,
9681000
recv_datagram_queue.clone(),
9691001
);
970-
let echo_task = tokio::spawn(
1002+
let echo_task = tokio::spawn({
1003+
let relay_url = relay_url.clone();
9711004
async move {
9721005
loop {
9731006
let datagram = future::poll_fn(|cx| recv_datagram_queue.poll_recv(cx)).await;
9741007
if let Ok(recv) = datagram {
9751008
let RelayRecvDatagram { url: _, src, buf } = recv;
9761009
info!(from = src.fmt_short(), "Received datagram");
977-
let send = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(src, [buf])
978-
.next()
979-
.unwrap();
1010+
let send = RelaySendItem {
1011+
remote_node: src,
1012+
url: relay_url.clone(),
1013+
datagrams: smallvec![buf],
1014+
};
9801015
send_datagram_tx.send(send).await.ok();
9811016
}
9821017
}
9831018
}
984-
.instrument(info_span!("echo-task")),
985-
);
1019+
.instrument(info_span!("echo-task"))
1020+
});
9861021
let echo_task = AbortOnDropHandle::new(echo_task);
9871022
let supervisor_task = tokio::spawn(async move {
9881023
// move the inbox_tx here so it is not dropped, as this stops the actor.
@@ -1009,18 +1044,20 @@ mod tests {
10091044
let (inbox_tx, inbox_rx) = mpsc::channel(16);
10101045
let task = start_active_relay_actor(
10111046
secret_key,
1012-
relay_url,
1047+
relay_url.clone(),
10131048
inbox_rx,
10141049
send_datagram_rx,
10151050
datagram_recv_queue.clone(),
10161051
);
10171052

10181053
// Send a datagram to our echo node.
10191054
info!("first echo");
1020-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1021-
.next()
1022-
.context("no packet")?;
1023-
send_datagram_tx.send(packet).await?;
1055+
let hello_send_item = RelaySendItem {
1056+
remote_node: peer_node,
1057+
url: relay_url.clone(),
1058+
datagrams: smallvec![Bytes::from_static(b"hello")],
1059+
};
1060+
send_datagram_tx.send(hello_send_item.clone()).await?;
10241061

10251062
// Check we get it back
10261063
let RelayRecvDatagram {
@@ -1047,10 +1084,7 @@ mod tests {
10471084

10481085
// Echo should still work.
10491086
info!("second echo");
1050-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1051-
.next()
1052-
.context("no packet")?;
1053-
send_datagram_tx.send(packet).await?;
1087+
send_datagram_tx.send(hello_send_item.clone()).await?;
10541088
let recv = future::poll_fn(|cx| datagram_recv_queue.poll_recv(cx)).await?;
10551089
assert_eq!(recv.buf.as_ref(), b"hello");
10561090

@@ -1066,10 +1100,7 @@ mod tests {
10661100

10671101
// Echo should still work.
10681102
info!("third echo");
1069-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1070-
.next()
1071-
.context("no packet")?;
1072-
send_datagram_tx.send(packet).await?;
1103+
send_datagram_tx.send(hello_send_item).await?;
10731104
let recv = future::poll_fn(|cx| datagram_recv_queue.poll_recv(cx)).await?;
10741105
assert_eq!(recv.buf.as_ref(), b"hello");
10751106

0 commit comments

Comments
 (0)