Skip to content

Commit ccfc700

Browse files
authored
fix: remove problematic usage of else branches in tokio::selects (#2940)
## Description Fixed some more cases of `else` branches in `tokio::select!`s that would never actually run. I noticed because the relay actor would start logging `recv error Receive(shut down)` when I dropped the `Endpoint`. ## Notes & open questions There's 2 cases where the `else` branch still would never run but I wasn't sure of what the intended behaviour was, so I left a todo comment. In the `RelayActor` it seems like it would make sense to exit in both cases - the `ping_tasks` `JoinSet` being empty and the `receiver` being closed. The magicsock actor I'm much less sure of - I suspect the `else` branch could just be removed? ## Change checklist - [ ] Self-review. - [ ] Tests if relevant.
1 parent 8edaee9 commit ccfc700

File tree

5 files changed

+76
-32
lines changed

5 files changed

+76
-32
lines changed

iroh-net/src/magicsock.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,10 +1767,22 @@ impl Actor {
17671767
discovery_events = events;
17681768
}
17691769
}
1770+
1771+
let mut receiver_closed = false;
1772+
let mut portmap_watcher_closed = false;
1773+
let mut link_change_closed = false;
17701774
loop {
17711775
inc!(Metrics, actor_tick_main);
17721776
tokio::select! {
1773-
Some(msg) = self.msg_receiver.recv() => {
1777+
msg = self.msg_receiver.recv(), if !receiver_closed => {
1778+
let Some(msg) = msg else {
1779+
trace!("tick: magicsock receiver closed");
1780+
inc!(Metrics, actor_tick_other);
1781+
1782+
receiver_closed = true;
1783+
continue;
1784+
};
1785+
17741786
trace!(?msg, "tick: msg");
17751787
inc!(Metrics, actor_tick_msg);
17761788
if self.handle_actor_message(msg).await {
@@ -1782,7 +1794,15 @@ impl Actor {
17821794
inc!(Metrics, actor_tick_re_stun);
17831795
self.msock.re_stun("periodic");
17841796
}
1785-
Ok(()) = portmap_watcher.changed() => {
1797+
change = portmap_watcher.changed(), if !portmap_watcher_closed => {
1798+
if change.is_err() {
1799+
trace!("tick: portmap watcher closed");
1800+
inc!(Metrics, actor_tick_other);
1801+
1802+
portmap_watcher_closed = true;
1803+
continue;
1804+
}
1805+
17861806
trace!("tick: portmap changed");
17871807
inc!(Metrics, actor_tick_portmap_changed);
17881808
let new_external_address = *portmap_watcher.borrow();
@@ -1809,22 +1829,29 @@ impl Actor {
18091829
self.refresh_direct_addrs(reason).await;
18101830
}
18111831
}
1812-
Some(is_major) = link_change_r.recv() => {
1832+
is_major = link_change_r.recv(), if !link_change_closed => {
1833+
let Some(is_major) = is_major else {
1834+
trace!("tick: link change receiver closed");
1835+
inc!(Metrics, actor_tick_other);
1836+
1837+
link_change_closed = true;
1838+
continue;
1839+
};
1840+
18131841
trace!("tick: link change {}", is_major);
18141842
inc!(Metrics, actor_link_change);
18151843
self.handle_network_change(is_major).await;
18161844
}
1845+
// Even if `discovery_events` yields `None`, it could begin to yield
1846+
// `Some` again in the future, so we don't want to disable this branch
1847+
// forever like we do with the other branches that yield `Option`s
18171848
Some(discovery_item) = discovery_events.next() => {
18181849
trace!("tick: discovery event, address discovered: {discovery_item:?}");
18191850
let node_addr = NodeAddr {node_id: discovery_item.node_id, info: discovery_item.addr_info};
18201851
if let Err(e) = self.msock.add_node_addr(node_addr.clone(), Source::Discovery { name: discovery_item.provenance.into() }) {
18211852
warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}");
18221853
}
18231854
}
1824-
else => {
1825-
trace!("tick: other");
1826-
inc!(Metrics, actor_tick_other);
1827-
}
18281855
}
18291856
}
18301857
}

iroh-net/src/magicsock/relay_actor.rs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,12 @@ impl ActiveRelay {
111111
self.relay_client.connect().await.context("keepalive")?;
112112
}
113113
tokio::select! {
114-
Some(msg) = inbox.recv() => {
114+
msg = inbox.recv() => {
115+
let Some(msg) = msg else {
116+
debug!("all clients closed");
117+
break;
118+
};
119+
115120
trace!("tick: inbox: {:?}", msg);
116121
match msg {
117122
ActiveRelayMessage::GetLastWrite(r) => {
@@ -144,6 +149,7 @@ impl ActiveRelay {
144149
}
145150
}
146151
}
152+
147153
msg = self.relay_client_receiver.recv() => {
148154
trace!("tick: relay_client_receiver");
149155
if let Some(msg) = msg {
@@ -153,10 +159,6 @@ impl ActiveRelay {
153159
}
154160
}
155161
}
156-
else => {
157-
debug!("all clients closed");
158-
break;
159-
}
160162
}
161163
}
162164
debug!("exiting");
@@ -301,25 +303,37 @@ impl RelayActor {
301303
trace!("shutting down");
302304
break;
303305
}
304-
Some(Ok((url, ping_success))) = self.ping_tasks.join_next() => {
305-
if !ping_success {
306-
with_cancel(
307-
self.cancel_token.child_token(),
308-
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
309-
).await;
306+
// `ping_tasks` being empty is a normal situation - in fact it starts empty
307+
// until a `MaybeCloseRelaysOnRebind` message is received.
308+
Some(task_result) = self.ping_tasks.join_next() => {
309+
match task_result {
310+
Ok((url, ping_success)) => {
311+
if !ping_success {
312+
with_cancel(
313+
self.cancel_token.child_token(),
314+
self.close_or_reconnect_relay(&url, "rebind-ping-fail")
315+
).await;
316+
}
317+
}
318+
319+
Err(err) => {
320+
warn!("ping task error: {:?}", err);
321+
}
310322
}
311323
}
312-
Some(msg) = receiver.recv() => {
324+
325+
msg = receiver.recv() => {
326+
let Some(msg) = msg else {
327+
trace!("shutting down relay recv loop");
328+
break;
329+
};
330+
313331
with_cancel(self.cancel_token.child_token(), self.handle_msg(msg)).await;
314332
}
315333
_ = cleanup_timer.tick() => {
316334
trace!("tick: cleanup");
317335
with_cancel(self.cancel_token.child_token(), self.clean_stale_relay()).await;
318336
}
319-
else => {
320-
trace!("shutting down relay recv loop");
321-
break;
322-
}
323337
}
324338
}
325339

iroh-relay/src/client.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,13 @@ impl Actor {
506506
}
507507
msg_sender.send(res).await.ok();
508508
}
509-
Some(msg) = inbox.recv() => {
509+
msg = inbox.recv() => {
510+
let Some(msg) = msg else {
511+
// Shutting down
512+
self.close().await;
513+
break;
514+
};
515+
510516
match msg {
511517
ActorMessage::Connect(s) => {
512518
let res = self.connect("actor msg").await.map(|(client, _)| (client));
@@ -546,11 +552,6 @@ impl Actor {
546552
},
547553
}
548554
}
549-
else => {
550-
// Shutting down
551-
self.close().await;
552-
break;
553-
}
554555
}
555556
}
556557
}

iroh-router/src/router.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ impl RouterBuilder {
117117
break;
118118
},
119119
// handle incoming p2p connections.
120-
Some(incoming) = endpoint.accept() => {
120+
incoming = endpoint.accept() => {
121+
let Some(incoming) = incoming else {
122+
break;
123+
};
124+
121125
let protocols = protocols.clone();
122126
join_set.spawn(async move {
123127
handle_connection(incoming, protocols).await;
@@ -144,7 +148,6 @@ impl RouterBuilder {
144148
_ => {}
145149
}
146150
},
147-
else => break,
148151
}
149152
}
150153

iroh/src/node.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
433433
_ => {}
434434
}
435435
},
436-
else => break,
437436
}
438437
}
439438

0 commit comments

Comments
 (0)