Skip to content

Commit 4b3837d

Browse files
Merge f8e92a4 into c650ea8
2 parents c650ea8 + f8e92a4 commit 4b3837d

File tree

2 files changed

+94
-58
lines changed

2 files changed

+94
-58
lines changed

iroh-relay/src/server/client.rs

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub(super) struct Config {
4848
pub(super) struct Client {
4949
/// Identity of the connected peer.
5050
node_id: NodeId,
51+
/// Connection identifier.
52+
connection_id: u64,
5153
/// Used to close the connection loop.
5254
done: CancellationToken,
5355
/// Actor handle.
@@ -64,7 +66,7 @@ impl Client {
6466
/// Creates a client from a connection & starts a read and write loop to handle io to and from
6567
/// the client
6668
/// Call [`Client::shutdown`] to close the read and write loops before dropping the [`Client`]
67-
pub(super) fn new(config: Config, clients: &Clients) -> Client {
69+
pub(super) fn new(config: Config, connection_id: u64, clients: &Clients) -> Client {
6870
let Config {
6971
node_id,
7072
stream: io,
@@ -98,29 +100,21 @@ impl Client {
98100
disco_send_queue: disco_send_queue_r,
99101
node_gone: peer_gone_r,
100102
node_id,
103+
connection_id,
101104
clients: clients.clone(),
102105
};
103106

104107
// start io loop
105108
let io_done = done.clone();
106-
let handle = tokio::task::spawn(
107-
async move {
108-
match actor.run(io_done).await {
109-
Err(e) => {
110-
warn!("writer closed in error {e:#?}");
111-
}
112-
Ok(()) => {
113-
debug!("writer closed");
114-
}
115-
}
116-
}
117-
.instrument(
118-
tracing::info_span!("client connection actor", remote_node = %node_id.fmt_short()),
119-
),
120-
);
109+
let handle = tokio::task::spawn(actor.run(io_done).instrument(tracing::info_span!(
110+
"client connection actor",
111+
remote_node = %node_id.fmt_short(),
112+
connection_id = connection_id
113+
)));
121114

122115
Client {
123116
node_id,
117+
connection_id,
124118
handle: AbortOnDropHandle::new(handle),
125119
done,
126120
send_queue: send_queue_s,
@@ -129,11 +123,15 @@ impl Client {
129123
}
130124
}
131125

126+
pub(super) fn connection_id(&self) -> u64 {
127+
self.connection_id
128+
}
129+
132130
/// Shutdown the reader and writer loops and closes the connection.
133131
///
134132
/// Any shutdown errors will be logged as warnings.
135133
pub(super) async fn shutdown(self) {
136-
self.done.cancel();
134+
self.start_shutdown();
137135
if let Err(e) = self.handle.await {
138136
warn!(
139137
remote_node = %self.node_id.fmt_short(),
@@ -142,6 +140,11 @@ impl Client {
142140
};
143141
}
144142

143+
/// Starts the process of shutdown.
144+
pub(super) fn start_shutdown(&self) {
145+
self.done.cancel();
146+
}
147+
145148
pub(super) fn try_send_packet(
146149
&self,
147150
src: NodeId,
@@ -194,12 +197,29 @@ struct Actor {
194197
node_gone: mpsc::Receiver<NodeId>,
195198
/// [`NodeId`] of this client
196199
node_id: NodeId,
200+
/// Connection identifier.
201+
connection_id: u64,
197202
/// Reference to the other connected clients.
198203
clients: Clients,
199204
}
200205

201206
impl Actor {
202-
async fn run(mut self, done: CancellationToken) -> Result<()> {
207+
async fn run(mut self, done: CancellationToken) {
208+
match self.run_inner(done).await {
209+
Err(e) => {
210+
warn!("actor errored {e:#?}, exiting");
211+
}
212+
Ok(()) => {
213+
debug!("actor finished, exiting");
214+
}
215+
}
216+
217+
self.clients
218+
.unregister(self.connection_id, self.node_id)
219+
.await;
220+
}
221+
222+
async fn run_inner(&mut self, done: CancellationToken) -> Result<()> {
203223
let jitter = Duration::from_secs(5);
204224
let mut keep_alive = tokio::time::interval(KEEP_ALIVE + jitter);
205225
// ticks immediately
@@ -304,7 +324,7 @@ impl Actor {
304324
match frame {
305325
Frame::SendPacket { dst_key, packet } => {
306326
let packet_len = packet.len();
307-
self.handle_frame_send_packet(dst_key, packet).await?;
327+
self.handle_frame_send_packet(dst_key, packet)?;
308328
inc_by!(Metrics, bytes_recv, packet_len as u64);
309329
}
310330
Frame::Ping { data } => {
@@ -323,15 +343,13 @@ impl Actor {
323343
Ok(())
324344
}
325345

326-
async fn handle_frame_send_packet(&self, dst: NodeId, data: Bytes) -> Result<()> {
346+
fn handle_frame_send_packet(&self, dst: NodeId, data: Bytes) -> Result<()> {
327347
if disco::looks_like_disco_wrapper(&data) {
328348
inc!(Metrics, disco_packets_recv);
329-
self.clients
330-
.send_disco_packet(dst, data, self.node_id)
331-
.await?;
349+
self.clients.send_disco_packet(dst, data, self.node_id)?;
332350
} else {
333351
inc!(Metrics, send_packets_recv);
334-
self.clients.send_packet(dst, data, self.node_id).await?;
352+
self.clients.send_packet(dst, data, self.node_id)?;
335353
}
336354
Ok(())
337355
}
@@ -546,6 +564,7 @@ mod tests {
546564
send_queue: send_queue_r,
547565
disco_send_queue: disco_send_queue_r,
548566
node_gone: peer_gone_r,
567+
connection_id: 0,
549568
node_id,
550569
clients: clients.clone(),
551570
};
@@ -630,7 +649,7 @@ mod tests {
630649
.await?;
631650

632651
done.cancel();
633-
handle.await??;
652+
handle.await?;
634653
Ok(())
635654
}
636655

iroh-relay/src/server/clients.rs

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
//! The "Server" side of the client. Uses the `ClientConnManager`.
22
// Based on tailscale/derp/derp_server.go
33

4-
use std::{collections::HashSet, sync::Arc};
4+
use std::{
5+
collections::HashSet,
6+
sync::{
7+
atomic::{AtomicU64, Ordering},
8+
Arc,
9+
},
10+
};
511

612
use anyhow::{bail, Result};
713
use bytes::Bytes;
@@ -24,6 +30,8 @@ struct Inner {
2430
clients: DashMap<NodeId, Client>,
2531
/// Map of which client has sent where
2632
sent_to: DashMap<NodeId, HashSet<NodeId>>,
33+
/// Connection ID Counter
34+
next_connection_id: AtomicU64,
2735
}
2836

2937
impl Clients {
@@ -41,9 +49,10 @@ impl Clients {
4149
/// Builds the client handler and starts the read & write loops for the connection.
4250
pub async fn register(&self, client_config: Config) {
4351
let node_id = client_config.node_id;
52+
let connection_id = self.get_connection_id();
4453
trace!(remote_node = node_id.fmt_short(), "registering client");
4554

46-
let client = Client::new(client_config, self);
55+
let client = Client::new(client_config, connection_id, self);
4756
if let Some(old_client) = self.0.clients.insert(node_id, client) {
4857
debug!(
4958
remote_node = node_id.fmt_short(),
@@ -53,20 +62,27 @@ impl Clients {
5362
}
5463
}
5564

65+
fn get_connection_id(&self) -> u64 {
66+
self.0.next_connection_id.fetch_add(1, Ordering::Relaxed)
67+
}
68+
5669
/// Removes the client from the map of clients, & sends a notification
5770
/// to each client that peers has sent data to, to let them know that
5871
/// peer is gone from the network.
5972
///
60-
/// Explicitly drops the reference to the client to avoid deadlock.
61-
async fn unregister<'a>(
62-
&self,
63-
client: dashmap::mapref::one::Ref<'a, iroh_base::PublicKey, Client>,
64-
node_id: NodeId,
65-
) {
66-
drop(client); // avoid deadlock
67-
trace!(node_id = node_id.fmt_short(), "unregistering client");
68-
69-
if let Some((_, client)) = self.0.clients.remove(&node_id) {
73+
/// Must be passed a matching connection_id.
74+
pub(super) async fn unregister<'a>(&self, connection_id: u64, node_id: NodeId) {
75+
trace!(
76+
node_id = node_id.fmt_short(),
77+
connection_id,
78+
"unregistering client"
79+
);
80+
81+
if let Some((_, client)) = self
82+
.0
83+
.clients
84+
.remove_if(&node_id, |_, c| c.connection_id() == connection_id)
85+
{
7086
if let Some((_, sent_to)) = self.0.sent_to.remove(&node_id) {
7187
for key in sent_to {
7288
match client.try_send_peer_gone(key) {
@@ -91,7 +107,7 @@ impl Clients {
91107
}
92108

93109
/// Attempt to send a packet to client with [`NodeId`] `dst`.
94-
pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> {
110+
pub(super) fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> {
95111
let Some(client) = self.0.clients.get(&dst) else {
96112
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
97113
inc!(Metrics, send_packets_dropped);
@@ -115,19 +131,14 @@ impl Clients {
115131
dst = dst.fmt_short(),
116132
"can no longer write to client, dropping message and pruning connection"
117133
);
118-
self.unregister(client, dst).await;
134+
client.start_shutdown();
119135
bail!("failed to send message: gone");
120136
}
121137
}
122138
}
123139

124140
/// Attempt to send a disco packet to client with [`NodeId`] `dst`.
125-
pub(super) async fn send_disco_packet(
126-
&self,
127-
dst: NodeId,
128-
data: Bytes,
129-
src: NodeId,
130-
) -> Result<()> {
141+
pub(super) fn send_disco_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> {
131142
let Some(client) = self.0.clients.get(&dst) else {
132143
debug!(
133144
dst = dst.fmt_short(),
@@ -154,7 +165,7 @@ impl Clients {
154165
dst = dst.fmt_short(),
155166
"can no longer write to client, dropping disco message and pruning connection"
156167
);
157-
self.unregister(client, dst).await;
168+
client.start_shutdown();
158169
bail!("failed to send message: gone");
159170
}
160171
}
@@ -205,9 +216,7 @@ mod tests {
205216

206217
// send packet
207218
let data = b"hello world!";
208-
clients
209-
.send_packet(a_key, Bytes::from(&data[..]), b_key)
210-
.await?;
219+
clients.send_packet(a_key, Bytes::from(&data[..]), b_key)?;
211220
let frame = recv_frame(FrameType::RecvPacket, &mut a_rw).await?;
212221
assert_eq!(
213222
frame,
@@ -218,9 +227,7 @@ mod tests {
218227
);
219228

220229
// send disco packet
221-
clients
222-
.send_disco_packet(a_key, Bytes::from(&data[..]), b_key)
223-
.await?;
230+
clients.send_disco_packet(a_key, Bytes::from(&data[..]), b_key)?;
224231
let frame = recv_frame(FrameType::RecvPacket, &mut a_rw).await?;
225232
assert_eq!(
226233
frame,
@@ -230,13 +237,23 @@ mod tests {
230237
}
231238
);
232239

233-
let client = clients.0.clients.get(&a_key).unwrap();
234-
235-
// send peer_gone. Also, tests that we do not get a deadlock
236-
// when unregistering.
237-
clients.unregister(client, a_key).await;
240+
{
241+
let client = clients.0.clients.get(&a_key).unwrap();
242+
// shutdown client a, this should trigger the removal from the clients list
243+
client.start_shutdown();
244+
}
238245

239-
assert!(!clients.0.clients.contains_key(&a_key));
246+
// need to wait a moment for the removal to be processed
247+
let c = clients.clone();
248+
tokio::time::timeout(Duration::from_secs(1), async move {
249+
loop {
250+
if !c.0.clients.contains_key(&a_key) {
251+
break;
252+
}
253+
tokio::time::sleep(Duration::from_millis(100)).await;
254+
}
255+
})
256+
.await?;
240257
clients.shutdown().await;
241258

242259
Ok(())

0 commit comments

Comments
 (0)