Skip to content

fix(kad): enforce a timeout for inbound substreams #6009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

teor2345
Copy link

Description

In the kad substream handler, outbound substreams have a 10s timeout, but inbound substreams don't have any timeout.

This results in large numbers of warnings under specific heavy load conditions, which we have encountered in subspace:

2025-04-08T06:24:27.293722Z WARN Consensus: libp2p_kad::handler: New inbound substream to peer exceeds inbound substream limit. No older substream waiting to be reused. Dropping new substream. peer=PeerId("12D3KooWN6kFp2Ev181UGq3BUDfk1jfjaNu6sDTqxCZUBpmp8kRQ")

Fixes #5981

Notes & open questions

Should the substream be closed on a timeout?
The existing code doesn't close them on (most) substream errors, so this PR handles timeouts the same way.

I have been unable to replicate the specific conditions leading to this bug in a test or my local subspace node, but we've confirmed the fix works on multiple nodes in the subspace network.

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

@teor2345 teor2345 changed the title fix(kad): enforce an inbound substream timeout in the kad substream handler fix(kad): enforce a timeout for inbound substreams Apr 25, 2025
@teor2345 teor2345 force-pushed the kad-in-subst-timeout-upstream branch from 925b8ac to 36ce9b5 Compare April 28, 2025 23:05
@teor2345
Copy link
Author

teor2345 commented Apr 28, 2025

I'm getting an incorrect CI error:

Patch version has been bumped even though minor isn't released yet.
https://github.com/libp2p/rust-libp2p/actions/runs/14656299889/job/41308047380?pr=6009#step:13:54

We need a new patch version, because https://crates.io/crates/libp2p-kad is at 0.47.0.
It looks like there might be a missing libp2p-kad-v0.47.0 tag in this repository, which is producing the incorrect CI result.

I've updated Cargo.lock with the new patch version.

@elenaf9
Copy link
Member

elenaf9 commented Apr 30, 2025

Thank you for the PR @teor2345!

For outbound substreams, the timeout is implemented by using the futures_bounded::FuturesTupleSet (or just futures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future.
Can we not just use that for inbound substreams as well?

That would also match the implementation in other protocols, as you already stated in #5981:

Here is how other protocols implement matching inbound and outbound timeouts:

inbound_workers: futures_bounded::FuturesSet::new(

@teor2345
Copy link
Author

teor2345 commented May 1, 2025

For outbound substreams, the timeout is implemented by using the futures_bounded::FuturesTupleSet (or just futures_bounded::FuturesSet) that limits the number of futures in the set and implements a timeout for each individual future. Can we not just use that for inbound substreams as well?

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

impl futures::Stream for InboundSubstreamState {
type Item = ConnectionHandlerEvent<ProtocolConfig, (), HandlerEvent>;

It would be possible to hold the streams in a FuturesTupleSet by calling StreamExt::into_future() on them.

But we can't iterate through a FuturesTupleSet to find reusable inbound substreams. Which makes this inbound substream reuse code difficult to implement:

if self.inbound_substreams.len() == MAX_NUM_STREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage {
first_request_timeout: None,
..
}
)
}) {
*s = InboundSubstreamState::Cancelled;
tracing::debug!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
Removed older substream waiting to be reused."
)
} else {
tracing::warn!(
peer=?self.remote_peer_id,
"New inbound substream to peer exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream."
);
return;
}
}

So the only solution I could find is to add a timeout field to some of the inbound substream states. (Cancelled and Poisoned never need timeouts, and WaitingMessage only needs a timeout for the first request. So we save a bit on timers there.)

The underlying issue is that the code couples the state of the substream with the stream of items from it. A refactor could put KadInStreamSink::into_future() into a FuturesTupleSet, and store the rest of the state as the associated data. We'd also need a separate list of substreams which can/can't be reused for the substream reuse check. And a way of dropping a substream from the FuturesTupleSet when it gets reused, probably via a oneshot.

Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?

@dariusc93
Copy link
Member

dariusc93 commented May 1, 2025

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

Could we not use StreamSet from futures-bounded?

EDIT: Or maybe poll the stream in the future for the set?

@teor2345
Copy link
Author

teor2345 commented May 2, 2025

Unfortunately not (or not without a larger refactor). FuturesTupleSet holds Futures, but inbound substreams are implemented as Streams in kad:

Could we not use StreamSet from futures-bounded?

EDIT: Or maybe poll the stream in the future for the set?

Sure, but that doesn't deal with substream reuse, because the futures_bounded types do not allow iteration to find reusable substreams (they're a wrapper for SelectAll).

There are two pieces of functionality that this code needs:

  • work out when a substream becomes available for reuse
  • when a new substream is created, and the substream limit has been reached, replace a reusable substream with the new substream (or drop the new substream if no substreams are reusable)

Here's one possible way to implement that:

  • use a futures_bounded::StreamSet to poll the next item from each substream
  • on a timeout in the WaitingMessage state, mark the substream as available for reuse using a oneshot (in any other state, the substream can't be used further, so just end the stream and it will get dropped)
  • when a new substream is created, find a substream that's marked for reuse, and replace the original substream with the new substream

This will involve some quite weird types, like Oneshot<Oneshot<InboundSubstreamState>>. The outer Oneshot is for substream reuse availability, and the inner is for sending back the replacement substream. But it should work, and the changes might be less complicated than the existing PR.

Is this a change that would be acceptable in a bug fix? Particularly one that other users might want backported?

I also can't guarantee I'll have time to work on this any time soon, because the current PR code works, and fixes our downstream bug.

@elenaf9
Copy link
Member

elenaf9 commented May 5, 2025

Thank you for the follow-ups and detailed explanation @teor2345.

Sure, but that doesn't deal with substream reuse, because the futures_bounded types do not allow iteration to find reusable substreams (they're a wrapper for SelectAll).

Opened thomaseizinger/rust-futures-bounded#8 to see if we can implement iterator for StreamSet. If that PR won't be merged then I'd go with your current solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kad exceeds substream limit due to outbound timeout, but no inbound timeout
3 participants