Skip to content

Commit b85f56d

Browse files
krallincramertj
authored andcommitted
Backport to 0.1: Avoid starvation from FuturesUnordered::poll_next
This backports #2049 to the 0.1 branch. Without this change, polling > 200 futures trough a FuturesUnordered on a Tokio 0.2 executor results in a busy loop in Tokio's cooperative scheduling module. See for a repro of where this breaks: tokio-rs/tokio#2390 Tested by running the reproducer I submitted there. Without this change, it hangs forever (spinning on CPU). With the change, it doesn't.
1 parent a00d35e commit b85f56d

File tree

1 file changed

+30
-0
lines changed

1 file changed

+30
-0
lines changed

src/stream/futures_unordered.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,23 @@ use {task, Stream, Future, Poll, Async};
1515
use executor::{Notify, UnsafeNotify, NotifyHandle};
1616
use task_impl::{self, AtomicTask};
1717

18+
/// Constant used for a `FuturesUnordered` to determine how many times it is
19+
/// allowed to poll underlying futures without yielding.
20+
///
21+
/// A single call to `poll_next` may potentially do a lot of work before
22+
/// yielding. This happens in particular if the underlying futures are awoken
23+
/// frequently but continue to return `Pending`. This is problematic if other
24+
/// tasks are waiting on the executor, since they do not get to run. This value
25+
/// caps the number of calls to `poll` on underlying futures a single call to
26+
/// `poll_next` is allowed to make.
27+
///
28+
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
29+
/// that amortize wakeup and scheduling costs, but low enough that we do not
30+
/// starve other tasks for long.
31+
///
32+
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
33+
const YIELD_EVERY: usize = 32;
34+
1835
/// An unbounded set of futures.
1936
///
2037
/// This "combinator" also serves a special function in this library, providing
@@ -274,6 +291,10 @@ impl<T> Stream for FuturesUnordered<T>
274291
type Error = T::Error;
275292

276293
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
294+
// Keep track of how many child futures we have polled,
295+
// in case we want to forcibly yield.
296+
let mut polled = 0;
297+
277298
// Ensure `parent` is correctly set.
278299
self.inner.parent.register();
279300

@@ -369,12 +390,21 @@ impl<T> Stream for FuturesUnordered<T>
369390
future.poll()
370391
})
371392
};
393+
polled += 1;
372394

373395
let ret = match res {
374396
Ok(Async::NotReady) => {
375397
let node = bomb.node.take().unwrap();
376398
*node.future.get() = Some(future);
377399
bomb.queue.link(node);
400+
401+
if polled == YIELD_EVERY {
402+
// We have polled a large number of futures in a row without yielding.
403+
// To ensure we do not starve other tasks waiting on the executor,
404+
// we yield here, but immediately wake ourselves up to continue.
405+
task_impl::current().notify();
406+
return Ok(Async::NotReady);
407+
}
378408
continue
379409
}
380410
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),

0 commit comments

Comments
 (0)