@@ -11,7 +11,7 @@ use iroh_metrics::inc;
11
11
use tokio:: sync:: mpsc:: error:: TrySendError ;
12
12
use tracing:: { debug, trace} ;
13
13
14
- use super :: client:: { Client , Config , Packet } ;
14
+ use super :: client:: { Client , Config } ;
15
15
use crate :: server:: metrics:: Metrics ;
16
16
17
17
/// Manages the connections to all currently connected clients.
@@ -56,7 +56,14 @@ impl Clients {
56
56
/// Removes the client from the map of clients, & sends a notification
57
57
/// to each client that peers has sent data to, to let them know that
58
58
/// peer is gone from the network.
59
- async fn unregister ( & self , node_id : NodeId ) {
59
+ ///
60
+ /// Explicitly drops the reference to the client to avoid deadlock.
61
+ async fn unregister < ' a > (
62
+ & self ,
63
+ client : dashmap:: mapref:: one:: Ref < ' a , iroh_base:: PublicKey , Client > ,
64
+ node_id : NodeId ,
65
+ ) {
66
+ drop ( client) ; // avoid deadlock
60
67
trace ! ( node_id = node_id. fmt_short( ) , "unregistering client" ) ;
61
68
62
69
if let Some ( ( _, client) ) = self . 0 . clients . remove ( & node_id) {
@@ -83,42 +90,53 @@ impl Clients {
83
90
}
84
91
}
85
92
86
- /// Attempt to send a packet to client with [`NodeId`] `dst`
93
+ /// Attempt to send a packet to client with [`NodeId`] `dst`.
87
94
pub ( super ) async fn send_packet ( & self , dst : NodeId , data : Bytes , src : NodeId ) -> Result < ( ) > {
88
- if let Some ( client) = self . 0 . clients . get ( & dst) {
89
- let res = client. try_send_packet ( src, data) ;
90
- return self . process_result ( src, dst, res) . await ;
95
+ let Some ( client) = self . 0 . clients . get ( & dst) else {
96
+ debug ! ( dst = dst. fmt_short( ) , "no connected client, dropped packet" ) ;
97
+ inc ! ( Metrics , send_packets_dropped) ;
98
+ return Ok ( ( ) ) ;
99
+ } ;
100
+ match client. try_send_packet ( src, data) {
101
+ Ok ( _) => {
102
+ // Record sent_to relationship
103
+ self . 0 . sent_to . entry ( src) . or_default ( ) . insert ( dst) ;
104
+ Ok ( ( ) )
105
+ }
106
+ Err ( TrySendError :: Full ( _) ) => {
107
+ debug ! (
108
+ dst = dst. fmt_short( ) ,
109
+ "client too busy to receive packet, dropping packet"
110
+ ) ;
111
+ bail ! ( "failed to send message: full" ) ;
112
+ }
113
+ Err ( TrySendError :: Closed ( _) ) => {
114
+ debug ! (
115
+ dst = dst. fmt_short( ) ,
116
+ "can no longer write to client, dropping message and pruning connection"
117
+ ) ;
118
+ self . unregister ( client, dst) . await ;
119
+ bail ! ( "failed to send message: gone" ) ;
120
+ }
91
121
}
92
- debug ! ( dst = dst. fmt_short( ) , "no connected client, dropped packet" ) ;
93
- inc ! ( Metrics , send_packets_dropped) ;
94
- Ok ( ( ) )
95
122
}
96
123
124
+ /// Attempt to send a disco packet to client with [`NodeId`] `dst`.
97
125
pub ( super ) async fn send_disco_packet (
98
126
& self ,
99
127
dst : NodeId ,
100
128
data : Bytes ,
101
129
src : NodeId ,
102
130
) -> Result < ( ) > {
103
- if let Some ( client) = self . 0 . clients . get ( & dst) {
104
- let res = client. try_send_disco_packet ( src, data) ;
105
- return self . process_result ( src, dst, res) . await ;
106
- }
107
- debug ! (
108
- dst = dst. fmt_short( ) ,
109
- "no connected client, dropped disco packet"
110
- ) ;
111
- inc ! ( Metrics , disco_packets_dropped) ;
112
- Ok ( ( ) )
113
- }
114
-
115
- async fn process_result (
116
- & self ,
117
- src : NodeId ,
118
- dst : NodeId ,
119
- res : Result < ( ) , TrySendError < Packet > > ,
120
- ) -> Result < ( ) > {
121
- match res {
131
+ let Some ( client) = self . 0 . clients . get ( & dst) else {
132
+ debug ! (
133
+ dst = dst. fmt_short( ) ,
134
+ "no connected client, dropped disco packet"
135
+ ) ;
136
+ inc ! ( Metrics , disco_packets_dropped) ;
137
+ return Ok ( ( ) ) ;
138
+ } ;
139
+ match client. try_send_disco_packet ( src, data) {
122
140
Ok ( _) => {
123
141
// Record sent_to relationship
124
142
self . 0 . sent_to . entry ( src) . or_default ( ) . insert ( dst) ;
@@ -127,17 +145,17 @@ impl Clients {
127
145
Err ( TrySendError :: Full ( _) ) => {
128
146
debug ! (
129
147
dst = dst. fmt_short( ) ,
130
- "client too busy to receive packet, dropping packet"
148
+ "client too busy to receive disco packet, dropping packet"
131
149
) ;
132
- bail ! ( "failed to send message" ) ;
150
+ bail ! ( "failed to send message: full " ) ;
133
151
}
134
152
Err ( TrySendError :: Closed ( _) ) => {
135
153
debug ! (
136
154
dst = dst. fmt_short( ) ,
137
- "can no longer write to client, dropping message and pruning connection"
155
+ "can no longer write to client, dropping disco message and pruning connection"
138
156
) ;
139
- self . unregister ( dst) . await ;
140
- bail ! ( "failed to send message" ) ;
157
+ self . unregister ( client , dst) . await ;
158
+ bail ! ( "failed to send message: gone " ) ;
141
159
}
142
160
}
143
161
}
@@ -212,8 +230,11 @@ mod tests {
212
230
}
213
231
) ;
214
232
215
- // send peer_gone
216
- clients. unregister ( a_key) . await ;
233
+ let client = clients. 0 . clients . get ( & a_key) . unwrap ( ) ;
234
+
235
+ // send peer_gone. Also, tests that we do not get a deadlock
236
+ // when unregistering.
237
+ clients. unregister ( client, a_key) . await ;
217
238
218
239
assert ! ( !clients. 0 . clients. contains_key( & a_key) ) ;
219
240
clients. shutdown ( ) . await ;
0 commit comments