@@ -2868,7 +2868,10 @@ mod tests {
2868
2868
use tokio_util:: task:: AbortOnDropHandle ;
2869
2869
2870
2870
use super :: * ;
2871
- use crate :: { defaults:: staging:: EU_RELAY_HOSTNAME , tls, Endpoint , RelayMode } ;
2871
+ use crate :: {
2872
+ defaults:: staging:: { self , EU_RELAY_HOSTNAME } ,
2873
+ tls, Endpoint , RelayMode ,
2874
+ } ;
2872
2875
2873
2876
const ALPN : & [ u8 ] = b"n0/test/1" ;
2874
2877
@@ -3999,4 +4002,57 @@ mod tests {
3999
4002
// TODO: could remove the addresses again, send, add it back and see it recover.
4000
4003
// But we don't have that much private access to the NodeMap. This will do for now.
4001
4004
}
4005
+
4006
+ #[ tokio:: test( flavor = "multi_thread" ) ]
4007
+ async fn test_relay_datagram_queue ( ) {
4008
+ let queue = Arc :: new ( RelayDatagramsQueue :: new ( ) ) ;
4009
+ let url = staging:: default_na_relay_node ( ) . url ;
4010
+ let capacity = queue. queue . capacity ( ) . unwrap ( ) ;
4011
+
4012
+ let mut tasks = JoinSet :: new ( ) ;
4013
+
4014
+ tasks. spawn ( {
4015
+ let queue = queue. clone ( ) ;
4016
+ async move {
4017
+ let mut expected_msgs = vec ! [ false ; capacity] ;
4018
+
4019
+ while let Ok ( datagram) = tokio:: time:: timeout (
4020
+ Duration :: from_millis ( 100 ) ,
4021
+ futures_lite:: future:: poll_fn ( |cx| {
4022
+ queue. poll_recv ( cx) . map ( |result| result. unwrap ( ) )
4023
+ } ) ,
4024
+ )
4025
+ . await
4026
+ {
4027
+ let msg_num = usize:: from_le_bytes ( datagram. buf . as_ref ( ) . try_into ( ) . unwrap ( ) ) ;
4028
+
4029
+ if expected_msgs[ msg_num] {
4030
+ panic ! ( "Received message number {msg_num} more than once (duplicated)" ) ;
4031
+ }
4032
+
4033
+ expected_msgs[ msg_num] = true ;
4034
+ }
4035
+
4036
+ assert ! ( expected_msgs. into_iter( ) . all( |is_set| is_set) ) ;
4037
+ }
4038
+ } ) ;
4039
+
4040
+ for i in 0 ..capacity {
4041
+ tasks. spawn ( {
4042
+ let queue = queue. clone ( ) ;
4043
+ let url = url. clone ( ) ;
4044
+ async move {
4045
+ queue
4046
+ . try_send ( RelayRecvDatagram {
4047
+ url,
4048
+ src : PublicKey :: from_bytes ( & [ 0u8 ; 32 ] ) . unwrap ( ) ,
4049
+ buf : Bytes :: copy_from_slice ( & i. to_le_bytes ( ) ) ,
4050
+ } )
4051
+ . unwrap ( ) ;
4052
+ }
4053
+ } ) ;
4054
+ }
4055
+
4056
+ tasks. join_all ( ) . await ;
4057
+ }
4002
4058
}
0 commit comments