Skip to content

Commit e767803

Browse files
jonhoocramertj
authored andcommitted
Avoid starvation from FuturesUnordered::poll_next
1 parent 3f0e90c commit e767803

File tree

1 file changed

+30
-0
lines changed
  • futures-util/src/stream/futures_unordered

1 file changed

+30
-0
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
3939
/// without running out of ram.
4040
const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();
4141

42+
/// Constant used for a `FuturesUnordered` to determine how many times it is
43+
/// allowed to poll underlying futures without yielding.
44+
///
45+
/// A single call to `poll_next` may potentially do a lot of work before
46+
/// yielding. This happens in particular if the underlying futures are awoken
47+
/// frequently but continue to return `Pending`. This is problematic if other
48+
/// tasks are waiting on the executor, since they do not get to run. This value
49+
/// caps the number of calls to `poll` on underlying futures a single call to
50+
/// `poll_next` is allowed to make.
51+
///
52+
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
53+
/// that amortize wakeup and scheduling costs, but low enough that we do not
54+
/// starve other tasks for long.
55+
///
56+
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
57+
const YIELD_EVERY: usize = 32;
58+
4259
/// A set of futures which may complete in any order.
4360
///
4461
/// This structure is optimized to manage a large number of futures.
@@ -313,6 +330,10 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
313330
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
314331
-> Poll<Option<Self::Item>>
315332
{
333+
// Keep track of how many child futures we have polled,
334+
// in case we want to forcibly yield.
335+
let mut polled = 0;
336+
316337
// Ensure `parent` is correctly set.
317338
self.ready_to_run_queue.waker.register(cx.waker());
318339

@@ -433,11 +454,20 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
433454

434455
future.poll(&mut cx)
435456
};
457+
polled += 1;
436458

437459
match res {
438460
Poll::Pending => {
439461
let task = bomb.task.take().unwrap();
440462
bomb.queue.link(task);
463+
464+
if polled == YIELD_EVERY {
465+
// We have polled a large number of futures in a row without yielding.
466+
// To ensure we do not starve other tasks waiting on the executor,
467+
// we yield here, but immediately wake ourselves up to continue.
468+
cx.waker().wake_by_ref();
469+
return Poll::Pending;
470+
}
441471
continue
442472
}
443473
Poll::Ready(output) => {

0 commit comments

Comments
 (0)