Skip to content

Commit 0d06320

Browse files
authored
fix(iroh, iroh-relay)!: Bypass magicsock::Actor for datagrams from the relay (#2986)
## Description Datagrams received via the relay used to be sent to the `magicsock::Actor` for processing (handling DISCO packets and adding RecvMeta). After processing they were put on a channel from which `AsyncUdpSocket::poll_recv` would read them. The major problem was that the relay actor did not handle the backpressure well: the channel it was placing the processed datagrams on is smaller than its inbox and it did not want to drop packets (which is good, dropping packets at this point is not helpful). So the entire actor blocked, which has negative effects all around. This change connects a channel directly between the `ConnectedRelay` actor which receives the packets and the `AsyncUdpSocket::poll_recv`. The processing which was happening in the `magicsock::Actor` now happens inside `poll_recv`. This is fine because: - The processing needs access to the `MagicSock` inners, like the `NodeMap`. This required accessing the same mutexes that `AsyncUdpSocket::poll_recv` was already accessing anyway. So this path is now slowed down. - The alternative was moving the processing to the `ConnectedRelay` actor, which would have added potential more conention on the `MagicSock` mutexted-inners. To support this there are a few additional changes: - The channel between the `AsyncUdpSocket::poll_recv(_relay)` and the `ConnectedRelay` actor has been abstracted into the `RelayRecvChannel`. This takes care of the wakers, which now do not need to be separate fields on the `MagicSock`, reducing the `MagicSock` state and risk of using the wakers wrongly. - The `ActiveRelay` actor is renamed to `ConnectedRelay`. This improves readability. - The `PacketSplitIter` is moved into the `relay_actor` module, this is where the corresponding `PacketizeIter` lives and makes the two much better logically linked. - ReceviedMessage::ReceivedPacket::source is renamed into remote_node_id. ## Breaking Changes ### iroh-relay - `ReceivedMessage::ReceivedPacket::source` is renamed to `ReceivedMessage::ReceivedPacket::remote_node_id`. ## Notes & open questions Closes #2982. I'd be happy to separate out some more changes from this: - PacketSplitIter move can easily be its own independent PR. - ActiveRelay -> ConnectedRelay rename can easily be independent too. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent cd9c188 commit 0d06320

File tree

5 files changed

+425
-290
lines changed

5 files changed

+425
-290
lines changed

iroh-relay/src/client/conn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ fn process_incoming_frame(frame: Frame) -> Result<ReceivedMessage> {
179179
Frame::NodeGone { node_id } => Ok(ReceivedMessage::NodeGone(node_id)),
180180
Frame::RecvPacket { src_key, content } => {
181181
let packet = ReceivedMessage::ReceivedPacket {
182-
source: src_key,
182+
remote_node_id: src_key,
183183
data: content,
184184
};
185185
Ok(packet)
@@ -451,7 +451,7 @@ pub enum ReceivedMessage {
451451
/// Represents an incoming packet.
452452
ReceivedPacket {
453453
/// The [`NodeId`] of the packet sender.
454-
source: NodeId,
454+
remote_node_id: NodeId,
455455
/// The received packet bytes.
456456
#[debug(skip)]
457457
data: Bytes, // TODO: ref

iroh-relay/src/server.rs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -914,8 +914,12 @@ mod tests {
914914
client_a.send(b_key, msg.clone()).await.unwrap();
915915

916916
let res = client_b_receiver.recv().await.unwrap().unwrap();
917-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
918-
assert_eq!(a_key, source);
917+
if let ReceivedMessage::ReceivedPacket {
918+
remote_node_id,
919+
data,
920+
} = res
921+
{
922+
assert_eq!(a_key, remote_node_id);
919923
assert_eq!(msg, data);
920924
} else {
921925
panic!("client_b received unexpected message {res:?}");
@@ -926,8 +930,12 @@ mod tests {
926930
client_b.send(a_key, msg.clone()).await.unwrap();
927931

928932
let res = client_a_receiver.recv().await.unwrap().unwrap();
929-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
930-
assert_eq!(b_key, source);
933+
if let ReceivedMessage::ReceivedPacket {
934+
remote_node_id,
935+
data,
936+
} = res
937+
{
938+
assert_eq!(b_key, remote_node_id);
931939
assert_eq!(msg, data);
932940
} else {
933941
panic!("client_a received unexpected message {res:?}");
@@ -982,8 +990,12 @@ mod tests {
982990
client_a.send(b_key, msg.clone()).await.unwrap();
983991

984992
let res = client_b_receiver.recv().await.unwrap().unwrap();
985-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
986-
assert_eq!(a_key, source);
993+
if let ReceivedMessage::ReceivedPacket {
994+
remote_node_id,
995+
data,
996+
} = res
997+
{
998+
assert_eq!(a_key, remote_node_id);
987999
assert_eq!(msg, data);
9881000
} else {
9891001
panic!("client_b received unexpected message {res:?}");
@@ -994,8 +1006,12 @@ mod tests {
9941006
client_b.send(a_key, msg.clone()).await.unwrap();
9951007

9961008
let res = client_a_receiver.recv().await.unwrap().unwrap();
997-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
998-
assert_eq!(b_key, source);
1009+
if let ReceivedMessage::ReceivedPacket {
1010+
remote_node_id,
1011+
data,
1012+
} = res
1013+
{
1014+
assert_eq!(b_key, remote_node_id);
9991015
assert_eq!(msg, data);
10001016
} else {
10011017
panic!("client_a received unexpected message {res:?}");
@@ -1049,8 +1065,12 @@ mod tests {
10491065
client_a.send(b_key, msg.clone()).await.unwrap();
10501066

10511067
let res = client_b_receiver.recv().await.unwrap().unwrap();
1052-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
1053-
assert_eq!(a_key, source);
1068+
if let ReceivedMessage::ReceivedPacket {
1069+
remote_node_id,
1070+
data,
1071+
} = res
1072+
{
1073+
assert_eq!(a_key, remote_node_id);
10541074
assert_eq!(msg, data);
10551075
} else {
10561076
panic!("client_b received unexpected message {res:?}");
@@ -1061,8 +1081,12 @@ mod tests {
10611081
client_b.send(a_key, msg.clone()).await.unwrap();
10621082

10631083
let res = client_a_receiver.recv().await.unwrap().unwrap();
1064-
if let ReceivedMessage::ReceivedPacket { source, data } = res {
1065-
assert_eq!(b_key, source);
1084+
if let ReceivedMessage::ReceivedPacket {
1085+
remote_node_id,
1086+
data,
1087+
} = res
1088+
{
1089+
assert_eq!(b_key, remote_node_id);
10661090
assert_eq!(msg, data);
10671091
} else {
10681092
panic!("client_a received unexpected message {res:?}");

iroh-relay/src/server/http_server.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,11 @@ mod tests {
799799
}
800800
Some(Ok(msg)) => {
801801
info!("got message on {:?}: {msg:?}", key.public());
802-
if let ReceivedMessage::ReceivedPacket { source, data } = msg {
802+
if let ReceivedMessage::ReceivedPacket {
803+
remote_node_id: source,
804+
data,
805+
} = msg
806+
{
803807
received_msg_s
804808
.send((source, data))
805809
.await
@@ -947,8 +951,11 @@ mod tests {
947951
let msg = Bytes::from_static(b"hello client b!!");
948952
client_a.send(public_key_b, msg.clone()).await?;
949953
match client_receiver_b.recv().await? {
950-
ReceivedMessage::ReceivedPacket { source, data } => {
951-
assert_eq!(public_key_a, source);
954+
ReceivedMessage::ReceivedPacket {
955+
remote_node_id,
956+
data,
957+
} => {
958+
assert_eq!(public_key_a, remote_node_id);
952959
assert_eq!(&msg[..], data);
953960
}
954961
msg => {
@@ -960,8 +967,11 @@ mod tests {
960967
let msg = Bytes::from_static(b"nice to meet you client a!!");
961968
client_b.send(public_key_a, msg.clone()).await?;
962969
match client_receiver_a.recv().await? {
963-
ReceivedMessage::ReceivedPacket { source, data } => {
964-
assert_eq!(public_key_b, source);
970+
ReceivedMessage::ReceivedPacket {
971+
remote_node_id,
972+
data,
973+
} => {
974+
assert_eq!(public_key_b, remote_node_id);
965975
assert_eq!(&msg[..], data);
966976
}
967977
msg => {
@@ -1027,8 +1037,11 @@ mod tests {
10271037
let msg = Bytes::from_static(b"hello client b!!");
10281038
client_a.send(public_key_b, msg.clone()).await?;
10291039
match client_receiver_b.recv().await? {
1030-
ReceivedMessage::ReceivedPacket { source, data } => {
1031-
assert_eq!(public_key_a, source);
1040+
ReceivedMessage::ReceivedPacket {
1041+
remote_node_id,
1042+
data,
1043+
} => {
1044+
assert_eq!(public_key_a, remote_node_id);
10321045
assert_eq!(&msg[..], data);
10331046
}
10341047
msg => {
@@ -1040,8 +1053,11 @@ mod tests {
10401053
let msg = Bytes::from_static(b"nice to meet you client a!!");
10411054
client_b.send(public_key_a, msg.clone()).await?;
10421055
match client_receiver_a.recv().await? {
1043-
ReceivedMessage::ReceivedPacket { source, data } => {
1044-
assert_eq!(public_key_b, source);
1056+
ReceivedMessage::ReceivedPacket {
1057+
remote_node_id,
1058+
data,
1059+
} => {
1060+
assert_eq!(public_key_b, remote_node_id);
10451061
assert_eq!(&msg[..], data);
10461062
}
10471063
msg => {
@@ -1065,8 +1081,11 @@ mod tests {
10651081
let msg = Bytes::from_static(b"are you still there, b?!");
10661082
client_a.send(public_key_b, msg.clone()).await?;
10671083
match new_client_receiver_b.recv().await? {
1068-
ReceivedMessage::ReceivedPacket { source, data } => {
1069-
assert_eq!(public_key_a, source);
1084+
ReceivedMessage::ReceivedPacket {
1085+
remote_node_id,
1086+
data,
1087+
} => {
1088+
assert_eq!(public_key_a, remote_node_id);
10701089
assert_eq!(&msg[..], data);
10711090
}
10721091
msg => {
@@ -1078,8 +1097,11 @@ mod tests {
10781097
let msg = Bytes::from_static(b"just had a spot of trouble but I'm back now,a!!");
10791098
new_client_b.send(public_key_a, msg.clone()).await?;
10801099
match client_receiver_a.recv().await? {
1081-
ReceivedMessage::ReceivedPacket { source, data } => {
1082-
assert_eq!(public_key_b, source);
1100+
ReceivedMessage::ReceivedPacket {
1101+
remote_node_id,
1102+
data,
1103+
} => {
1104+
assert_eq!(public_key_b, remote_node_id);
10831105
assert_eq!(&msg[..], data);
10841106
}
10851107
msg => {

0 commit comments

Comments
 (0)