Skip to content

Commit 83b01ad

Browse files
fix(iroh): use two stage accept from quic-rpc (#2416)
## Description fix(iroh): use two stage accept from quic-rpc to make the accept process cancel-safe Needs n0-computer/quic-rpc#87 ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --------- Co-authored-by: dignifiedquire <[email protected]>
1 parent 38e8ce0 commit 83b01ad

File tree

6 files changed

+11
-11
lines changed

6 files changed

+11
-11
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ parking_lot = "0.12.1"
4545
pkarr = { version = "1.1.5", default-features = false }
4646
portable-atomic = "1"
4747
postcard = "1.0.8"
48-
quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] }
48+
quic-rpc = { version = "0.11", features = ["flume-transport", "quinn-transport"] }
4949
rand = "0.8.5"
5050
ratatui = "0.26.2"
5151
reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] }

iroh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" }
3737
iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" }
3838
parking_lot = "0.12.1"
3939
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
40-
quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] }
40+
quic-rpc = { version = "0.11", default-features = false, features = ["flume-transport", "quinn-transport"] }
4141
quinn = { package = "iroh-quinn", version = "0.10" }
4242
rand = "0.8"
4343
serde = { version = "1", features = ["derive"] }

iroh/src/node.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
273273
// accept is just a pending future.
274274
request = external_rpc.accept() => {
275275
match request {
276-
Ok((msg, chan)) => {
277-
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
276+
Ok(accepting) => {
277+
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
278278
}
279279
Err(e) => {
280280
info!("rpc request error: {:?}", e);
@@ -284,8 +284,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
284284
// handle internal rpc requests.
285285
request = internal_rpc.accept() => {
286286
match request {
287-
Ok((msg, chan)) => {
288-
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, msg, chan);
287+
Ok(accepting) => {
288+
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
289289
}
290290
Err(e) => {
291291
info!("internal rpc request error: {:?}", e);

iroh/src/node/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ where
494494
.await?;
495495

496496
// Initialize the internal RPC connection.
497-
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
497+
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32);
498498
// box the controller. Boxing has a special case for the flume channel that avoids allocations,
499499
// so this has zero overhead.
500500
let controller = quic_rpc::transport::boxed::Connection::new(controller);

iroh/src/node/rpc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ impl<D: BaoStore> Handler<D> {
109109
pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
110110
inner: Arc<NodeInner<D>>,
111111
join_set: &mut JoinSet<anyhow::Result<()>>,
112-
msg: Request,
113-
chan: RpcChannel<RpcService, E>,
112+
accepting: quic_rpc::server::Accepting<RpcService, E>,
114113
) {
115114
let handler = Self::new(inner);
116115
join_set.spawn(async move {
116+
let (msg, chan) = accepting.read_first().await?;
117117
if let Err(err) = handler.handle_rpc_request(msg, chan).await {
118118
warn!("rpc request handler error: {err:?}");
119119
}

0 commit comments

Comments
 (0)