@@ -31,8 +31,9 @@ use std::{
31
31
32
32
use anyhow:: { anyhow, Context as _, Result } ;
33
33
use bytes:: Bytes ;
34
+ use concurrent_queue:: ConcurrentQueue ;
34
35
use futures_lite:: { FutureExt , Stream , StreamExt } ;
35
- use futures_util:: stream:: BoxStream ;
36
+ use futures_util:: { stream:: BoxStream , task :: AtomicWaker } ;
36
37
use iroh_base:: key:: NodeId ;
37
38
use iroh_metrics:: { inc, inc_by} ;
38
39
use iroh_relay:: protos:: stun;
@@ -180,12 +181,12 @@ pub(crate) struct MagicSock {
180
181
me : String ,
181
182
/// Proxy
182
183
proxy_url : Option < Url > ,
183
- /// Channel to receive datagrams from relays for [`AsyncUdpSocket::poll_recv`].
184
+ /// Queue to receive datagrams from relays for [`AsyncUdpSocket::poll_recv`].
184
185
///
185
- /// QUIC datagrams received by relays are put on this channel and consumed by
186
- /// [`AsyncUdpSocket`]. This channel takes care of the wakers needed by
186
+ /// Relay datagrams received by relays are put into this queue and consumed by
187
+ /// [`AsyncUdpSocket`]. This queue takes care of the wakers needed by
187
188
/// [`AsyncUdpSocket::poll_recv`].
188
- relay_recv_channel : RelayRecvReceiver ,
189
+ relay_datagrams_queue : Arc < RelayDatagramsQueue > ,
189
190
190
191
network_send_wakers : Arc < parking_lot:: Mutex < Option < Waker > > > ,
191
192
/// Counter for ordering of [`MagicSock::poll_recv`] polling order.
@@ -860,7 +861,7 @@ impl MagicSock {
860
861
// For each output buffer keep polling the datagrams from the relay until one is
861
862
// a QUIC datagram to be placed into the output buffer. Or the channel is empty.
862
863
loop {
863
- let recv = match self . relay_recv_channel . poll_recv ( cx) {
864
+ let recv = match self . relay_datagrams_queue . poll_recv ( cx) {
864
865
Poll :: Ready ( Ok ( recv) ) => recv,
865
866
Poll :: Ready ( Err ( err) ) => {
866
867
error ! ( "relay_recv_channel closed: {err:#}" ) ;
@@ -1510,7 +1511,7 @@ impl Handle {
1510
1511
insecure_skip_relay_cert_verify,
1511
1512
} = opts;
1512
1513
1513
- let ( relay_recv_tx , relay_recv_rx ) = relay_recv_channel ( ) ;
1514
+ let relay_datagrams_queue = Arc :: new ( RelayDatagramsQueue :: new ( ) ) ;
1514
1515
1515
1516
let ( pconn4, pconn6) = bind ( addr_v4, addr_v6) ?;
1516
1517
let port = pconn4. port ( ) ;
@@ -1547,7 +1548,7 @@ impl Handle {
1547
1548
local_addrs : std:: sync:: RwLock :: new ( ( ipv4_addr, ipv6_addr) ) ,
1548
1549
closing : AtomicBool :: new ( false ) ,
1549
1550
closed : AtomicBool :: new ( false ) ,
1550
- relay_recv_channel : relay_recv_rx ,
1551
+ relay_datagrams_queue : relay_datagrams_queue . clone ( ) ,
1551
1552
network_send_wakers : Arc :: new ( parking_lot:: Mutex :: new ( None ) ) ,
1552
1553
poll_recv_counter : AtomicUsize :: new ( 0 ) ,
1553
1554
actor_sender : actor_sender. clone ( ) ,
@@ -1572,7 +1573,7 @@ impl Handle {
1572
1573
1573
1574
let mut actor_tasks = JoinSet :: default ( ) ;
1574
1575
1575
- let relay_actor = RelayActor :: new ( inner. clone ( ) , relay_recv_tx ) ;
1576
+ let relay_actor = RelayActor :: new ( inner. clone ( ) , relay_datagrams_queue ) ;
1576
1577
let relay_actor_cancel_token = relay_actor. cancel_token ( ) ;
1577
1578
actor_tasks. spawn (
1578
1579
async move {
@@ -1712,64 +1713,74 @@ enum DiscoBoxError {
1712
1713
Parse ( anyhow:: Error ) ,
1713
1714
}
1714
1715
1715
- /// Channel for [`MagicSock::poll_recv_relay`] to receive datagrams from relays.
1716
+ /// A queue holding [`RelayRecvDatagram`]s that can be polled in async
1717
+ /// contexts, and wakes up tasks when something adds items using [`try_send`].
1716
1718
///
1717
- /// The sender and receiver will take care of the required wakers needed for
1718
- /// [`AsyncUdpSocket::poll_recv`].
1719
- // TODO: This channel should possibly be implemented with concurrent-queue and atomic-waker.
1720
- // Or maybe async-channel.
1721
- fn relay_recv_channel ( ) -> ( RelayRecvSender , RelayRecvReceiver ) {
1722
- let ( tx, rx) = mpsc:: channel ( 128 ) ;
1723
- let waker = Arc :: new ( parking_lot:: Mutex :: new ( None ) ) ;
1724
- let sender = RelayRecvSender {
1725
- sender : tx,
1726
- waker : waker. clone ( ) ,
1727
- } ;
1728
- let receiver = RelayRecvReceiver {
1729
- receiver : parking_lot:: Mutex :: new ( rx) ,
1730
- waker,
1731
- } ;
1732
- ( sender, receiver)
1719
+ /// This is used to transfer relay datagrams between the [`RelayActor`]
1720
+ /// and [`MagicSock`].
1721
+ ///
1722
+ /// [`try_send`]: Self::try_send
1723
+ /// [`RelayActor`]: crate::magicsock::RelayActor
1724
+ /// [`MagicSock`]: crate::magicsock::MagicSock
1725
+ #[ derive( Debug ) ]
1726
+ struct RelayDatagramsQueue {
1727
+ queue : ConcurrentQueue < RelayRecvDatagram > ,
1728
+ waker : AtomicWaker ,
1733
1729
}
1734
1730
1735
- #[ derive( Debug , Clone ) ]
1736
- struct RelayRecvSender {
1737
- sender : mpsc:: Sender < RelayRecvDatagram > ,
1738
- waker : Arc < parking_lot:: Mutex < Option < Waker > > > ,
1739
- }
1731
+ impl RelayDatagramsQueue {
1732
+ /// Creates a new, empty queue with a fixed size bound of 128 items.
1733
+ fn new ( ) -> Self {
1734
+ Self {
1735
+ queue : ConcurrentQueue :: bounded ( 128 ) ,
1736
+ waker : AtomicWaker :: new ( ) ,
1737
+ }
1738
+ }
1740
1739
1741
- impl RelayRecvSender {
1740
+ /// Sends an item into this queue and wakes a potential task
1741
+ /// that's registered its waker with a [`poll_recv`] call.
1742
+ ///
1743
+ /// [`poll_recv`]: Self::poll_recv
1742
1744
fn try_send (
1743
1745
& self ,
1744
1746
item : RelayRecvDatagram ,
1745
- ) -> Result < ( ) , mpsc:: error:: TrySendError < RelayRecvDatagram > > {
1746
- self . sender . try_send ( item) . inspect ( |_| {
1747
- if let Some ( waker) = self . waker . lock ( ) . take ( ) {
1748
- waker. wake ( ) ;
1749
- }
1747
+ ) -> Result < ( ) , concurrent_queue:: PushError < RelayRecvDatagram > > {
1748
+ self . queue . push ( item) . inspect ( |_| {
1749
+ self . waker . wake ( ) ;
1750
1750
} )
1751
1751
}
1752
- }
1753
1752
1754
- #[ derive( Debug ) ]
1755
- struct RelayRecvReceiver {
1756
- receiver : parking_lot:: Mutex < mpsc:: Receiver < RelayRecvDatagram > > ,
1757
- waker : Arc < parking_lot:: Mutex < Option < Waker > > > ,
1758
- }
1759
-
1760
- impl RelayRecvReceiver {
1753
+ /// Polls for new items in the queue.
1754
+ ///
1755
+ /// Although this method is available from `&self`, it must not be
1756
+ /// polled concurrently between tasks.
1757
+ ///
1758
+ /// Calling this will replace the current waker used. So if another task
1759
+ /// waits for this, that task's waker will be replaced and it won't be
1760
+ /// woken up for new items.
1761
+ ///
1762
+ /// The reason this method is made available as `&self` is because
1763
+ /// the interface for quinn's [`AsyncUdpSocket::poll_recv`] requires us
1764
+ /// to be able to poll from `&self`.
1761
1765
fn poll_recv ( & self , cx : & mut Context ) -> Poll < Result < RelayRecvDatagram > > {
1762
- let mut receiver = self . receiver . lock ( ) ;
1763
- self . waker . lock ( ) . replace ( cx. waker ( ) . clone ( ) ) ;
1764
- match receiver. try_recv ( ) {
1765
- Ok ( item) => {
1766
- self . waker . lock ( ) . take ( ) ;
1767
- Poll :: Ready ( Ok ( item) )
1768
- }
1769
- Err ( mpsc:: error:: TryRecvError :: Empty ) => Poll :: Pending ,
1770
- Err ( mpsc:: error:: TryRecvError :: Disconnected ) => {
1771
- Poll :: Ready ( Err ( anyhow ! ( "All RelayRecvSenders disconnected" ) ) )
1766
+ match self . queue . pop ( ) {
1767
+ Ok ( value) => Poll :: Ready ( Ok ( value) ) ,
1768
+ Err ( concurrent_queue:: PopError :: Empty ) => {
1769
+ self . waker . register ( cx. waker ( ) ) ;
1770
+
1771
+ match self . queue . pop ( ) {
1772
+ Ok ( value) => {
1773
+ self . waker . take ( ) ;
1774
+ Poll :: Ready ( Ok ( value) )
1775
+ }
1776
+ Err ( concurrent_queue:: PopError :: Empty ) => Poll :: Pending ,
1777
+ Err ( concurrent_queue:: PopError :: Closed ) => {
1778
+ self . waker . take ( ) ;
1779
+ Poll :: Ready ( Err ( anyhow ! ( "Queue closed" ) ) )
1780
+ }
1781
+ }
1772
1782
}
1783
+ Err ( concurrent_queue:: PopError :: Closed ) => Poll :: Ready ( Err ( anyhow ! ( "Queue closed" ) ) ) ,
1773
1784
}
1774
1785
}
1775
1786
}
@@ -2857,7 +2868,10 @@ mod tests {
2857
2868
use tokio_util:: task:: AbortOnDropHandle ;
2858
2869
2859
2870
use super :: * ;
2860
- use crate :: { defaults:: staging:: EU_RELAY_HOSTNAME , tls, Endpoint , RelayMode } ;
2871
+ use crate :: {
2872
+ defaults:: staging:: { self , EU_RELAY_HOSTNAME } ,
2873
+ tls, Endpoint , RelayMode ,
2874
+ } ;
2861
2875
2862
2876
const ALPN : & [ u8 ] = b"n0/test/1" ;
2863
2877
@@ -4020,4 +4034,57 @@ mod tests {
4020
4034
// TODO: could remove the addresses again, send, add it back and see it recover.
4021
4035
// But we don't have that much private access to the NodeMap. This will do for now.
4022
4036
}
4037
+
4038
+ #[ tokio:: test( flavor = "multi_thread" ) ]
4039
+ async fn test_relay_datagram_queue ( ) {
4040
+ let queue = Arc :: new ( RelayDatagramsQueue :: new ( ) ) ;
4041
+ let url = staging:: default_na_relay_node ( ) . url ;
4042
+ let capacity = queue. queue . capacity ( ) . unwrap ( ) ;
4043
+
4044
+ let mut tasks = JoinSet :: new ( ) ;
4045
+
4046
+ tasks. spawn ( {
4047
+ let queue = queue. clone ( ) ;
4048
+ async move {
4049
+ let mut expected_msgs = vec ! [ false ; capacity] ;
4050
+
4051
+ while let Ok ( datagram) = tokio:: time:: timeout (
4052
+ Duration :: from_millis ( 100 ) ,
4053
+ futures_lite:: future:: poll_fn ( |cx| {
4054
+ queue. poll_recv ( cx) . map ( |result| result. unwrap ( ) )
4055
+ } ) ,
4056
+ )
4057
+ . await
4058
+ {
4059
+ let msg_num = usize:: from_le_bytes ( datagram. buf . as_ref ( ) . try_into ( ) . unwrap ( ) ) ;
4060
+
4061
+ if expected_msgs[ msg_num] {
4062
+ panic ! ( "Received message number {msg_num} more than once (duplicated)" ) ;
4063
+ }
4064
+
4065
+ expected_msgs[ msg_num] = true ;
4066
+ }
4067
+
4068
+ assert ! ( expected_msgs. into_iter( ) . all( |is_set| is_set) ) ;
4069
+ }
4070
+ } ) ;
4071
+
4072
+ for i in 0 ..capacity {
4073
+ tasks. spawn ( {
4074
+ let queue = queue. clone ( ) ;
4075
+ let url = url. clone ( ) ;
4076
+ async move {
4077
+ queue
4078
+ . try_send ( RelayRecvDatagram {
4079
+ url,
4080
+ src : PublicKey :: from_bytes ( & [ 0u8 ; 32 ] ) . unwrap ( ) ,
4081
+ buf : Bytes :: copy_from_slice ( & i. to_le_bytes ( ) ) ,
4082
+ } )
4083
+ . unwrap ( ) ;
4084
+ }
4085
+ } ) ;
4086
+ }
4087
+
4088
+ tasks. join_all ( ) . await ;
4089
+ }
4023
4090
}
0 commit comments