Skip to content

Commit 0abb367

Browse files
committed
make the RelayActor not block on the datagram send path
This makes sure to send as many datagrams as quickly as possible to the right ActiveRelayActor but at the same time not block the RelayActor itself. This ensures it can still process other inbox items and instruct the ActiveRelayActors appropriately.
1 parent 66557ee commit 0abb367

File tree

1 file changed

+97
-66
lines changed

1 file changed

+97
-66
lines changed

iroh/src/magicsock/relay_actor.rs

Lines changed: 97 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use std::net::SocketAddr;
88
use std::{
99
collections::{BTreeMap, BTreeSet},
10+
future::Future,
1011
net::IpAddr,
1112
sync::{
1213
atomic::{AtomicBool, Ordering},
@@ -53,7 +54,7 @@ struct ActiveRelayActor {
5354
/// Queue to send received relay datagrams on.
5455
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
5556
/// Channel on which we receive packets to send to the relay.
56-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
57+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
5758
url: RelayUrl,
5859
/// Whether or not this is the home relay connection.
5960
is_home_relay: bool,
@@ -96,7 +97,7 @@ enum ActiveRelayMessage {
9697
#[derive(Debug)]
9798
struct ActiveRelayActorOptions {
9899
url: RelayUrl,
99-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
100+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
100101
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
101102
connection_opts: RelayConnectionOptions,
102103
}
@@ -204,11 +205,9 @@ impl ActiveRelayActor {
204205
relay_send_fut.as_mut().set_none();
205206
}
206207
// Only poll for new datagrams if relay_send_fut is not busy.
207-
Some(msg) = self.relay_datagrams_send.recv(), if relay_send_fut.is_none() => {
208-
let relay_client = self.relay_client.clone();
209-
let fut = async move {
210-
relay_client.send(msg.node_id, msg.packet).await
211-
};
208+
Some(item) = self.relay_datagrams_send.recv(), if relay_send_fut.is_none() => {
209+
debug_assert_eq!(item.url, self.url);
210+
let fut = Self::send_relay(self.relay_client.clone(), item);
212211
relay_send_fut.as_mut().set_future(fut);
213212
inactive_timeout.as_mut().reset(Instant::now() + RELAY_INACTIVE_CLEANUP_TIME);
214213

@@ -298,6 +297,24 @@ impl ActiveRelayActor {
298297
}
299298
}
300299

300+
async fn send_relay(relay_client: relay::client::Client, item: RelaySendItem) {
301+
// When Quinn sends a GSO Transmit magicsock::split_packets will make us receive
302+
// more than one packet to send in a single call. We join all packets back together
303+
// and prefix them with a u16 packet size. They then get sent as a single DISCO
304+
// frame. However this might still be multiple packets when otherwise the maximum
305+
// packet size for the relay protocol would be exceeded.
306+
for packet in PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(item.remote_node, item.datagrams) {
307+
let len = packet.len();
308+
match relay_client.send(packet.node_id, packet.payload).await {
309+
Ok(_) => inc_by!(MagicsockMetrics, send_relay, len as _),
310+
Err(err) => {
311+
warn!("send failed: {err:#}");
312+
inc!(MagicsockMetrics, send_relay_error);
313+
}
314+
}
315+
}
316+
}
317+
301318
async fn handle_relay_msg(&mut self, msg: Result<ReceivedMessage, ClientError>) -> ReadResult {
302319
match msg {
303320
Err(err) => {
@@ -406,6 +423,7 @@ pub(super) enum RelayActorMessage {
406423
SetHome { url: RelayUrl },
407424
}
408425

426+
#[derive(Debug, Clone)]
409427
pub(super) struct RelaySendItem {
410428
/// The destination for the datagrams.
411429
pub(super) remote_node: NodeId,
@@ -457,6 +475,10 @@ impl RelayActor {
457475
mut receiver: mpsc::Receiver<RelayActorMessage>,
458476
mut datagram_send_channel: RelayDatagramSendChannelReceiver,
459477
) {
478+
// When this future is present, it is sending pending datagrams to an
479+
// ActiveRelayActor. We can not process further datagrams during this time.
480+
let mut datagram_send_fut = std::pin::pin!(MaybeFuture::none());
481+
460482
loop {
461483
tokio::select! {
462484
biased;
@@ -481,14 +503,21 @@ impl RelayActor {
481503
let cancel_token = self.cancel_token.child_token();
482504
cancel_token.run_until_cancelled(self.handle_msg(msg)).await;
483505
}
484-
item = datagram_send_channel.recv() => {
506+
// Only poll for new datagrams if we are not blocked on sending them.
507+
item = datagram_send_channel.recv(), if datagram_send_fut.is_none() => {
485508
let Some(item) = item else {
486509
debug!("Datagram send channel dropped, shutting down.");
487510
break;
488511
};
489-
let cancel_token = self.cancel_token.child_token();
490-
cancel_token.run_until_cancelled(self.send_relay(item)).await;
512+
let token = self.cancel_token.child_token();
513+
if let Some(Some(fut)) = token.run_until_cancelled(
514+
self.try_send_datagram(item)
515+
).await {
516+
datagram_send_fut.as_mut().set_future(fut);
517+
}
491518
}
519+
// Only poll this future if it is in use.
520+
_ = &mut datagram_send_fut, if datagram_send_fut.is_some() => {}
492521
}
493522
}
494523

@@ -512,34 +541,30 @@ impl RelayActor {
512541
}
513542
}
514543

515-
async fn send_relay(&mut self, item: RelaySendItem) {
516-
let RelaySendItem {
517-
remote_node,
518-
url,
519-
datagrams,
520-
} = item;
521-
let total_bytes = datagrams.iter().map(|c| c.len() as u64).sum::<u64>();
522-
trace!(
523-
%url,
524-
remote_node = %remote_node.fmt_short(),
525-
len = total_bytes,
526-
"sending over relay",
527-
);
528-
let handle = self.active_relay_handle_for_node(&url, &remote_node).await;
529-
530-
// When Quinn sends a GSO Transmit magicsock::split_packets will make us receive
531-
// more than one packet to send in a single call. We join all packets back together
532-
// and prefix them with a u16 packet size. They then get sent as a single DISCO
533-
// frame. However this might still be multiple packets when otherwise the maximum
534-
// packet size for the relay protocol would be exceeded.
535-
for packet in PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(remote_node, datagrams) {
536-
let len = packet.len();
537-
match handle.datagrams_send_queue.send(packet).await {
538-
Ok(_) => inc_by!(MagicsockMetrics, send_relay, len as _),
539-
Err(err) => {
540-
warn!(?url, "send failed: {err:#}");
541-
inc!(MagicsockMetrics, send_relay_error);
542-
}
544+
/// Sends datagrams to the correct [`ActiveRelayActor`], or returns a future.
545+
///
546+
/// If the datagram can not be sent immediately, because the destination channel is
547+
/// full, a future is returned that will complete once the datagrams have been sent to
548+
/// the [`ActiveRelayActor`].
549+
async fn try_send_datagram(&mut self, item: RelaySendItem) -> Option<impl Future<Output = ()>> {
550+
let url = item.url.clone();
551+
let handle = self
552+
.active_relay_handle_for_node(&item.url, &item.remote_node)
553+
.await;
554+
match handle.datagrams_send_queue.try_send(item) {
555+
Ok(()) => None,
556+
Err(mpsc::error::TrySendError::Closed(_)) => {
557+
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
558+
None
559+
}
560+
Err(mpsc::error::TrySendError::Full(item)) => {
561+
let sender = handle.datagrams_send_queue.clone();
562+
let fut = async move {
563+
if sender.send(item).await.is_err() {
564+
warn!(?url, "Dropped datagram(s): ActiveRelayActor closed.");
565+
}
566+
};
567+
Some(fut)
543568
}
544569
}
545570
}
@@ -738,7 +763,7 @@ impl RelayActor {
738763
#[derive(Debug, Clone)]
739764
struct ActiveRelayHandle {
740765
inbox_addr: mpsc::Sender<ActiveRelayMessage>,
741-
datagrams_send_queue: mpsc::Sender<RelaySendPacket>,
766+
datagrams_send_queue: mpsc::Sender<RelaySendItem>,
742767
}
743768

744769
/// A packet to send over the relay.
@@ -750,12 +775,12 @@ struct ActiveRelayHandle {
750775
#[derive(Debug, PartialEq, Eq)]
751776
struct RelaySendPacket {
752777
node_id: NodeId,
753-
packet: Bytes,
778+
payload: Bytes,
754779
}
755780

756781
impl RelaySendPacket {
757782
fn len(&self) -> usize {
758-
self.packet.len()
783+
self.payload.len()
759784
}
760785
}
761786

@@ -824,7 +849,7 @@ where
824849
if !self.buffer.is_empty() {
825850
Some(RelaySendPacket {
826851
node_id: self.node_id,
827-
packet: self.buffer.split().freeze(),
852+
payload: self.buffer.split().freeze(),
828853
})
829854
} else {
830855
None
@@ -887,6 +912,7 @@ impl Iterator for PacketSplitIter {
887912
mod tests {
888913
use futures_lite::future;
889914
use iroh_base::SecretKey;
915+
use smallvec::smallvec;
890916
use testresult::TestResult;
891917
use tokio_util::task::AbortOnDropHandle;
892918

@@ -904,7 +930,10 @@ mod tests {
904930
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(node_id, single_vec);
905931
let result = iter.collect::<Vec<_>>();
906932
assert_eq!(1, result.len());
907-
assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0].packet[..]);
933+
assert_eq!(
934+
&[5, 0, b'H', b'e', b'l', b'l', b'o'],
935+
&result[0].payload[..]
936+
);
908937

909938
let spacer = vec![0u8; MAX_PACKET_SIZE - 10];
910939
let multiple_vec = vec![&b"Hello"[..], &spacer, &b"World"[..]];
@@ -913,17 +942,20 @@ mod tests {
913942
assert_eq!(2, result.len());
914943
assert_eq!(
915944
&[5, 0, b'H', b'e', b'l', b'l', b'o'],
916-
&result[0].packet[..7]
945+
&result[0].payload[..7]
946+
);
947+
assert_eq!(
948+
&[5, 0, b'W', b'o', b'r', b'l', b'd'],
949+
&result[1].payload[..]
917950
);
918-
assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1].packet[..]);
919951
}
920952

921953
/// Starts a new [`ActiveRelayActor`].
922954
fn start_active_relay_actor(
923955
secret_key: SecretKey,
924956
url: RelayUrl,
925957
inbox_rx: mpsc::Receiver<ActiveRelayMessage>,
926-
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
958+
relay_datagrams_send: mpsc::Receiver<RelaySendItem>,
927959
relay_datagrams_recv: Arc<RelayDatagramRecvQueue>,
928960
) -> AbortOnDropHandle<anyhow::Result<()>> {
929961
let opts = ActiveRelayActorOptions {
@@ -960,27 +992,30 @@ mod tests {
960992
let (inbox_tx, inbox_rx) = mpsc::channel(16);
961993
let actor_task = start_active_relay_actor(
962994
secret_key.clone(),
963-
relay_url,
995+
relay_url.clone(),
964996
inbox_rx,
965997
send_datagram_rx,
966998
recv_datagram_queue.clone(),
967999
);
968-
let echo_task = tokio::spawn(
1000+
let echo_task = tokio::spawn({
1001+
let relay_url = relay_url.clone();
9691002
async move {
9701003
loop {
9711004
let datagram = future::poll_fn(|cx| recv_datagram_queue.poll_recv(cx)).await;
9721005
if let Ok(recv) = datagram {
9731006
let RelayRecvDatagram { url: _, src, buf } = recv;
9741007
info!(from = src.fmt_short(), "Received datagram");
975-
let send = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(src, [buf])
976-
.next()
977-
.unwrap();
1008+
let send = RelaySendItem {
1009+
remote_node: src,
1010+
url: relay_url.clone(),
1011+
datagrams: smallvec![buf],
1012+
};
9781013
send_datagram_tx.send(send).await.ok();
9791014
}
9801015
}
9811016
}
982-
.instrument(info_span!("echo-task")),
983-
);
1017+
.instrument(info_span!("echo-task"))
1018+
});
9841019
let echo_task = AbortOnDropHandle::new(echo_task);
9851020
let supervisor_task = tokio::spawn(async move {
9861021
// move the inbox_tx here so it is not dropped, as this stops the actor.
@@ -1007,18 +1042,20 @@ mod tests {
10071042
let (inbox_tx, inbox_rx) = mpsc::channel(16);
10081043
let task = start_active_relay_actor(
10091044
secret_key,
1010-
relay_url,
1045+
relay_url.clone(),
10111046
inbox_rx,
10121047
send_datagram_rx,
10131048
datagram_recv_queue.clone(),
10141049
);
10151050

10161051
// Send a datagram to our echo node.
10171052
info!("first echo");
1018-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1019-
.next()
1020-
.context("no packet")?;
1021-
send_datagram_tx.send(packet).await?;
1053+
let hello_send_item = RelaySendItem {
1054+
remote_node: peer_node,
1055+
url: relay_url.clone(),
1056+
datagrams: smallvec![Bytes::from_static(b"hello")],
1057+
};
1058+
send_datagram_tx.send(hello_send_item.clone()).await?;
10221059

10231060
// Check we get it back
10241061
let RelayRecvDatagram {
@@ -1045,10 +1082,7 @@ mod tests {
10451082

10461083
// Echo should still work.
10471084
info!("second echo");
1048-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1049-
.next()
1050-
.context("no packet")?;
1051-
send_datagram_tx.send(packet).await?;
1085+
send_datagram_tx.send(hello_send_item.clone()).await?;
10521086
let recv = future::poll_fn(|cx| datagram_recv_queue.poll_recv(cx)).await?;
10531087
assert_eq!(recv.buf.as_ref(), b"hello");
10541088

@@ -1064,10 +1098,7 @@ mod tests {
10641098

10651099
// Echo should still work.
10661100
info!("third echo");
1067-
let packet = PacketizeIter::<_, MAX_PAYLOAD_SIZE>::new(peer_node, [b"hello"])
1068-
.next()
1069-
.context("no packet")?;
1070-
send_datagram_tx.send(packet).await?;
1101+
send_datagram_tx.send(hello_send_item).await?;
10711102
let recv = future::poll_fn(|cx| datagram_recv_queue.poll_recv(cx)).await?;
10721103
assert_eq!(recv.buf.as_ref(), b"hello");
10731104

0 commit comments

Comments
 (0)