Skip to content

Commit e7503c0

Browse files
authored
refactor(iroh): Newtype the packet sent over relay servers (#3030)
## Description There is a difference between the datagrams that the MagicSock sends and the packets that contain datagrams that gets sent to relay servers. But this was all just Bytes. This newtypes the packets explicitly and makes it a sibling of the existing RelayRecvDatagram. Now the PacketizeIter at least does not both consume and produce the same type, because they're not the same things. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions This clones the RelayUrl (usually) twice per packet sent now, instead of (usually) once. That's the only thing I don't like about this really. I still wonder whether RelayUrl should not be some sort of interned and cloned thing. Watch out, the base of this PR is `flub/active-relay-again`. ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented.
1 parent 80bc8a3 commit e7503c0

File tree

1 file changed

+60
-33
lines changed

1 file changed

+60
-33
lines changed

iroh/src/magicsock/relay_actor.rs

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ struct ActiveRelayActor {
4747
/// Queue to send received relay datagrams on.
4848
relay_datagrams_recv: Arc<RelayDatagramsQueue>,
4949
/// Channel on which we receive packets to send to the relay.
50-
relay_datagrams_send: mpsc::Receiver<(NodeId, Bytes)>,
50+
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
5151
url: RelayUrl,
5252
relay_client: relay::client::Client,
5353
relay_client_receiver: relay::client::ClientReceiver,
@@ -79,7 +79,7 @@ impl ActiveRelayActor {
7979
relay_client: relay::client::Client,
8080
relay_client_receiver: relay::client::ClientReceiver,
8181
relay_datagrams_recv: Arc<RelayDatagramsQueue>,
82-
relay_datagrams_send: mpsc::Receiver<(NodeId, Bytes)>,
82+
relay_datagrams_send: mpsc::Receiver<RelaySendPacket>,
8383
) -> Self {
8484
ActiveRelayActor {
8585
last_write: Instant::now(),
@@ -133,7 +133,7 @@ impl ActiveRelayActor {
133133
// Only poll for new datagrams if relay_send_fut is not busy.
134134
Some(msg) = self.relay_datagrams_send.recv(), if relay_send_fut.is_none() => {
135135
relay_send_fut = MaybeFuture::with_future(
136-
Box::pin(relay_client.send(msg.0, msg.1))
136+
Box::pin(relay_client.send(msg.node_id, msg.packet))
137137
);
138138
self.last_write = Instant::now();
139139

@@ -252,17 +252,13 @@ impl ActiveRelayActor {
252252
self.node_present.insert(remote_node_id);
253253
}
254254

255-
for datagram in PacketSplitIter::new(data) {
255+
for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data)
256+
{
256257
let Ok(datagram) = datagram else {
257258
error!("Invalid packet split");
258259
break;
259260
};
260-
let res = RelayRecvDatagram {
261-
url: self.url.clone(),
262-
src: remote_node_id,
263-
buf: datagram,
264-
};
265-
if let Err(err) = self.relay_datagrams_recv.try_send(res) {
261+
if let Err(err) = self.relay_datagrams_recv.try_send(datagram) {
266262
warn!("dropping received relay packet: {err:#}");
267263
}
268264
}
@@ -441,13 +437,9 @@ impl RelayActor {
441437
// and prefix them with a u16 packet size. They then get sent as a single DISCO
442438
// frame. However this might still be multiple packets when otherwise the maximum
443439
// packet size for the relay protocol would be exceeded.
444-
for packet in PacketizeIter::<_, PAYLOAD_SIZE>::new(contents) {
440+
for packet in PacketizeIter::<_, PAYLOAD_SIZE>::new(remote_node, contents) {
445441
let len = packet.len();
446-
match handle
447-
.datagrams_send_queue
448-
.send((remote_node, packet))
449-
.await
450-
{
442+
match handle.datagrams_send_queue.send(packet).await {
451443
Ok(_) => inc_by!(MagicsockMetrics, send_relay, len as _),
452444
Err(err) => {
453445
warn!(?url, "send failed: {err:#}");
@@ -799,10 +791,28 @@ impl RelayActor {
799791
#[derive(Debug)]
800792
struct ActiveRelayHandle {
801793
inbox_addr: mpsc::Sender<ActiveRelayMessage>,
802-
datagrams_send_queue: mpsc::Sender<(NodeId, Bytes)>,
794+
datagrams_send_queue: mpsc::Sender<RelaySendPacket>,
803795
actor_task: JoinHandle<()>,
804796
}
805797

798+
/// A packet to send over the relay.
799+
///
800+
/// This is nothing but a newtype, it should be constructed using [`PacketizeIter`]. This
801+
/// is a packet of one or more datagrams, each prefixed with a u16-be length. This is what
802+
/// the `Frame::SendPacket` of the `DerpCodec` transports and is produced by
803+
/// [`PacketizeIter`] and transformed back into datagrams using [`PacketSplitIter`].
804+
#[derive(Debug, PartialEq, Eq)]
805+
struct RelaySendPacket {
806+
node_id: NodeId,
807+
packet: Bytes,
808+
}
809+
810+
impl RelaySendPacket {
811+
fn len(&self) -> usize {
812+
self.packet.len()
813+
}
814+
}
815+
806816
/// A single datagram received from a relay server.
807817
///
808818
/// This could be either a QUIC or DISCO packet.
@@ -828,16 +838,18 @@ pub(super) enum ReadResult {
828838
///
829839
/// The [`PacketSplitIter`] does the inverse and splits such packets back into individual
830840
/// datagrams.
831-
pub(super) struct PacketizeIter<I: Iterator, const N: usize> {
841+
struct PacketizeIter<I: Iterator, const N: usize> {
842+
node_id: NodeId,
832843
iter: std::iter::Peekable<I>,
833844
buffer: BytesMut,
834845
}
835846

836847
impl<I: Iterator, const N: usize> PacketizeIter<I, N> {
837848
/// Create a new new PacketizeIter from something that can be turned into an
838849
/// iterator of slices, like a `Vec<Bytes>`.
839-
pub(super) fn new(iter: impl IntoIterator<IntoIter = I>) -> Self {
850+
fn new(node_id: NodeId, iter: impl IntoIterator<IntoIter = I>) -> Self {
840851
Self {
852+
node_id,
841853
iter: iter.into_iter().peekable(),
842854
buffer: BytesMut::with_capacity(N),
843855
}
@@ -848,7 +860,7 @@ impl<I: Iterator, const N: usize> Iterator for PacketizeIter<I, N>
848860
where
849861
I::Item: AsRef<[u8]>,
850862
{
851-
type Item = Bytes;
863+
type Item = RelaySendPacket;
852864

853865
fn next(&mut self) -> Option<Self::Item> {
854866
use bytes::BufMut;
@@ -864,7 +876,10 @@ where
864876
self.iter.next();
865877
}
866878
if !self.buffer.is_empty() {
867-
Some(self.buffer.split().freeze())
879+
Some(RelaySendPacket {
880+
node_id: self.node_id,
881+
packet: self.buffer.split().freeze(),
882+
})
868883
} else {
869884
None
870885
}
@@ -877,16 +892,18 @@ where
877892
/// that struct for more details.
878893
#[derive(Debug)]
879894
struct PacketSplitIter {
895+
url: RelayUrl,
896+
src: NodeId,
880897
bytes: Bytes,
881898
}
882899

883900
impl PacketSplitIter {
884901
/// Create a new PacketSplitIter from a packet.
885-
fn new(bytes: Bytes) -> Self {
886-
Self { bytes }
902+
fn new(url: RelayUrl, src: NodeId, bytes: Bytes) -> Self {
903+
Self { url, src, bytes }
887904
}
888905

889-
fn fail(&mut self) -> Option<std::io::Result<Bytes>> {
906+
fn fail(&mut self) -> Option<std::io::Result<RelayRecvDatagram>> {
890907
self.bytes.clear();
891908
Some(Err(std::io::Error::new(
892909
std::io::ErrorKind::UnexpectedEof,
@@ -896,7 +913,7 @@ impl PacketSplitIter {
896913
}
897914

898915
impl Iterator for PacketSplitIter {
899-
type Item = std::io::Result<Bytes>;
916+
type Item = std::io::Result<RelayRecvDatagram>;
900917

901918
fn next(&mut self) -> Option<Self::Item> {
902919
use bytes::Buf;
@@ -908,8 +925,12 @@ impl Iterator for PacketSplitIter {
908925
if self.bytes.remaining() < len {
909926
return self.fail();
910927
}
911-
let item = self.bytes.split_to(len);
912-
Some(Ok(item))
928+
let buf = self.bytes.split_to(len);
929+
Some(Ok(RelayRecvDatagram {
930+
url: self.url.clone(),
931+
src: self.src,
932+
buf,
933+
}))
913934
} else {
914935
None
915936
}
@@ -918,26 +939,32 @@ impl Iterator for PacketSplitIter {
918939

919940
#[cfg(test)]
920941
mod tests {
942+
use iroh_base::SecretKey;
943+
921944
use super::*;
922945

923946
#[test]
924947
fn test_packetize_iter() {
948+
let node_id = SecretKey::generate().public();
925949
let empty_vec: Vec<Bytes> = Vec::new();
926-
let mut iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(empty_vec);
950+
let mut iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(node_id, empty_vec);
927951
assert_eq!(None, iter.next());
928952

929953
let single_vec = vec!["Hello"];
930-
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(single_vec);
954+
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(node_id, single_vec);
931955
let result = iter.collect::<Vec<_>>();
932956
assert_eq!(1, result.len());
933-
assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..]);
957+
assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0].packet[..]);
934958

935959
let spacer = vec![0u8; MAX_PACKET_SIZE - 10];
936960
let multiple_vec = vec![&b"Hello"[..], &spacer, &b"World"[..]];
937-
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(multiple_vec);
961+
let iter = PacketizeIter::<_, MAX_PACKET_SIZE>::new(node_id, multiple_vec);
938962
let result = iter.collect::<Vec<_>>();
939963
assert_eq!(2, result.len());
940-
assert_eq!(&[5, 0, b'H', b'e', b'l', b'l', b'o'], &result[0][..7]);
941-
assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1][..]);
964+
assert_eq!(
965+
&[5, 0, b'H', b'e', b'l', b'l', b'o'],
966+
&result[0].packet[..7]
967+
);
968+
assert_eq!(&[5, 0, b'W', b'o', b'r', b'l', b'd'], &result[1].packet[..]);
942969
}
943970
}

0 commit comments

Comments
 (0)