1
1
//! Actor which coordinates the congestion controller for the magic socket
2
2
3
- use std:: collections :: HashMap ;
3
+ use std:: { pin :: Pin , task :: Poll } ;
4
4
5
- use futures_concurrency :: stream :: stream_group ;
6
- use futures_lite:: StreamExt ;
5
+ use futures_buffered :: MergeUnbounded ;
6
+ use futures_lite:: { Stream , StreamExt } ;
7
7
use iroh_base:: NodeId ;
8
8
use iroh_metrics:: inc;
9
- use tokio:: {
10
- sync:: { mpsc, Notify } ,
11
- time:: Duration ,
12
- } ;
9
+ use tokio:: sync:: mpsc;
13
10
use tokio_util:: task:: AbortOnDropHandle ;
14
- use tracing:: { debug, error , info_span, trace , Instrument } ;
11
+ use tracing:: { debug, info_span, Instrument } ;
15
12
16
13
use crate :: { magicsock:: ConnectionType , metrics:: MagicsockMetrics , watchable:: WatcherStream } ;
17
14
@@ -25,9 +22,7 @@ pub(super) struct RttHandle {
25
22
impl RttHandle {
26
23
pub ( super ) fn new ( ) -> Self {
27
24
let mut actor = RttActor {
28
- connection_events : stream_group:: StreamGroup :: new ( ) . keyed ( ) ,
29
- connections : HashMap :: new ( ) ,
30
- tick : Notify :: new ( ) ,
25
+ connection_events : Default :: default ( ) ,
31
26
} ;
32
27
let ( msg_tx, msg_rx) = mpsc:: channel ( 16 ) ;
33
28
let handle = tokio:: spawn (
@@ -61,30 +56,64 @@ pub(super) enum RttMessage {
61
56
///
62
57
/// The magic socket can change the underlying network path, between two nodes. If we can
63
58
/// inform the QUIC congestion controller of this event it will work much more efficiently.
64
- #[ derive( Debug ) ]
59
+ #[ derive( derive_more :: Debug ) ]
65
60
struct RttActor {
66
61
/// Stream of connection type changes.
67
- connection_events : stream_group:: Keyed < WatcherStream < ConnectionType > > ,
68
- /// References to the connections.
69
- ///
70
- /// These are weak references so not to keep the connections alive. The key allows
71
- /// removing the corresponding stream from `conn_type_changes`.
72
- /// The boolean is an indiciator of whether this connection was direct before.
62
+ #[ debug( "MergeUnbounded<WatcherStream<ConnectionType>>" ) ]
63
+ connection_events : MergeUnbounded < MappedStream > ,
64
+ }
65
+
66
+ #[ derive( Debug ) ]
67
+ struct MappedStream {
68
+ stream : WatcherStream < ConnectionType > ,
69
+ node_id : NodeId ,
70
+ /// Reference to the connection.
71
+ connection : quinn:: WeakConnectionHandle ,
72
+ /// This an indiciator of whether this connection was direct before.
73
73
/// This helps establish metrics on number of connections that became direct.
74
- connections : HashMap < stream_group:: Key , ( quinn:: WeakConnectionHandle , NodeId , bool ) > ,
75
- /// A way to notify the main actor loop to run over.
74
+ was_direct_before : bool ,
75
+ }
76
+
77
+ impl Stream for MappedStream {
78
+ type Item = ConnectionType ;
79
+
80
+ /// Performs the congestion controller reset for a magic socket path change.
76
81
///
77
- /// E.g. when a new stream was added.
78
- tick : Notify ,
82
+ /// Regardless of which kind of path we are changed to, the congestion controller needs
83
+ /// resetting. Even when switching to mixed we should reset the state as e.g. switching
84
+ /// from direct to mixed back to direct should be a rare exception and is a bug if this
85
+ /// happens commonly.
86
+ fn poll_next (
87
+ mut self : Pin < & mut Self > ,
88
+ cx : & mut std:: task:: Context < ' _ > ,
89
+ ) -> Poll < Option < Self :: Item > > {
90
+ match Pin :: new ( & mut self . stream ) . poll_next ( cx) {
91
+ Poll :: Ready ( Some ( new_conn_type) ) => {
92
+ if self . connection . network_path_changed ( ) {
93
+ debug ! (
94
+ node_id = %self . node_id. fmt_short( ) ,
95
+ new_type = ?new_conn_type,
96
+ "Congestion controller state reset" ,
97
+ ) ;
98
+ if !self . was_direct_before && matches ! ( new_conn_type, ConnectionType :: Direct ( _) )
99
+ {
100
+ self . was_direct_before = true ;
101
+ inc ! ( MagicsockMetrics , connection_became_direct) ;
102
+ }
103
+ }
104
+ Poll :: Ready ( Some ( new_conn_type) )
105
+ }
106
+ Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
107
+ Poll :: Pending => Poll :: Pending ,
108
+ }
109
+ }
79
110
}
80
111
81
112
impl RttActor {
82
113
/// Runs the actor main loop.
83
114
///
84
115
/// The main loop will finish when the sender is dropped.
85
116
async fn run ( & mut self , mut msg_rx : mpsc:: Receiver < RttMessage > ) {
86
- let mut cleanup_interval = tokio:: time:: interval ( Duration :: from_secs ( 5 ) ) ;
87
- cleanup_interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
88
117
loop {
89
118
tokio:: select! {
90
119
biased;
@@ -94,11 +123,7 @@ impl RttActor {
94
123
None => break ,
95
124
}
96
125
}
97
- item = self . connection_events. next( ) , if !self . connection_events. is_empty( ) => {
98
- self . do_reset_rtt( item) ;
99
- }
100
- _ = cleanup_interval. tick( ) => self . do_connections_cleanup( ) ,
101
- ( ) = self . tick. notified( ) => continue ,
126
+ _item = self . connection_events. next( ) , if !self . connection_events. is_empty( ) => { }
102
127
}
103
128
}
104
129
debug ! ( "rtt-actor finished" ) ;
@@ -124,82 +149,12 @@ impl RttActor {
124
149
conn_type_changes : WatcherStream < ConnectionType > ,
125
150
node_id : NodeId ,
126
151
) {
127
- let key = self . connection_events . insert ( conn_type_changes) ;
128
- self . connections . insert ( key, ( connection, node_id, false ) ) ;
129
- self . tick . notify_one ( ) ;
130
- inc ! ( MagicsockMetrics , connection_handshake_success) ;
131
- }
132
-
133
- /// Performs the congestion controller reset for a magic socket path change.
134
- ///
135
- /// Regardless of which kind of path we are changed to, the congestion controller needs
136
- /// resetting. Even when switching to mixed we should reset the state as e.g. switching
137
- /// from direct to mixed back to direct should be a rare exception and is a bug if this
138
- /// happens commonly.
139
- fn do_reset_rtt ( & mut self , item : Option < ( stream_group:: Key , ConnectionType ) > ) {
140
- match item {
141
- Some ( ( key, new_conn_type) ) => match self . connections . get_mut ( & key) {
142
- Some ( ( handle, node_id, was_direct_before) ) => {
143
- if handle. network_path_changed ( ) {
144
- debug ! (
145
- node_id = %node_id. fmt_short( ) ,
146
- new_type = ?new_conn_type,
147
- "Congestion controller state reset" ,
148
- ) ;
149
- if !* was_direct_before && matches ! ( new_conn_type, ConnectionType :: Direct ( _) )
150
- {
151
- * was_direct_before = true ;
152
- inc ! ( MagicsockMetrics , connection_became_direct) ;
153
- }
154
- } else {
155
- debug ! (
156
- node_id = %node_id. fmt_short( ) ,
157
- "removing dropped connection" ,
158
- ) ;
159
- self . connection_events . remove ( key) ;
160
- }
161
- }
162
- None => error ! ( "No connection found for stream item" ) ,
163
- } ,
164
- None => {
165
- trace ! ( "No more connections" ) ;
166
- }
167
- }
168
- }
169
-
170
- /// Performs cleanup for closed connection.
171
- fn do_connections_cleanup ( & mut self ) {
172
- for ( key, ( handle, node_id, _) ) in self . connections . iter ( ) {
173
- if !handle. is_alive ( ) {
174
- trace ! ( node_id = %node_id. fmt_short( ) , "removing stale connection" ) ;
175
- self . connection_events . remove ( * key) ;
176
- }
177
- }
178
- }
179
- }
180
-
181
- #[ cfg( test) ]
182
- mod tests {
183
- use super :: * ;
184
-
185
- #[ tokio:: test]
186
- async fn test_actor_mspc_close ( ) {
187
- let mut actor = RttActor {
188
- connection_events : stream_group:: StreamGroup :: new ( ) . keyed ( ) ,
189
- connections : HashMap :: new ( ) ,
190
- tick : Notify :: new ( ) ,
191
- } ;
192
- let ( msg_tx, msg_rx) = mpsc:: channel ( 16 ) ;
193
- let handle = tokio:: spawn ( async move {
194
- actor. run ( msg_rx) . await ;
152
+ self . connection_events . push ( MappedStream {
153
+ stream : conn_type_changes,
154
+ connection,
155
+ node_id,
156
+ was_direct_before : false ,
195
157
} ) ;
196
-
197
- // Dropping the msg_tx should stop the actor
198
- drop ( msg_tx) ;
199
-
200
- let task_res = tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , handle)
201
- . await
202
- . expect ( "timeout - actor did not finish" ) ;
203
- assert ! ( task_res. is_ok( ) ) ;
158
+ inc ! ( MagicsockMetrics , connection_handshake_success) ;
204
159
}
205
160
}
0 commit comments