Skip to content

fix(core): Fix head-of-line blocking in concurrent tasks #5941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 3, 2025
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 43 additions & 61 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::task::Poll;
use std::{
collections::VecDeque,
sync::atomic::Ordering,
sync::{atomic::AtomicUsize, Arc},
};

use futures::poll;
use futures::FutureExt;

use crate::*;

Expand Down Expand Up @@ -85,6 +88,10 @@ pub struct ConcurrentTasks<I, O> {
/// `results` stores the successful results.
results: VecDeque<O>,

/// The maximum number of concurrent tasks.
concurrent: usize,
/// `completed` is the counter of completed tasks.
completed: Arc<AtomicUsize>,
/// hitting the last unrecoverable error.
///
/// If concurrent tasks hit an unrecoverable error, it will stop executing new tasks and return
Expand All @@ -107,6 +114,8 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {

tasks: VecDeque::with_capacity(concurrent),
results: VecDeque::with_capacity(concurrent),
concurrent,
completed: Arc::default(),
errored: false,
}
}
Expand All @@ -128,7 +137,7 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
/// Check if there are remaining space to push new tasks.
#[inline]
pub fn has_remaining(&self) -> bool {
self.tasks.len() < self.tasks.capacity()
self.tasks.len() < self.concurrent + self.completed.load(Ordering::Relaxed)
}

/// Chunk if there are remaining results to fetch.
Expand All @@ -137,6 +146,17 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
!self.results.is_empty()
}

/// Create a task with given input.
pub fn create_task(&self, input: I) -> Task<(I, Result<O>)> {
let completed = self.completed.clone();

let fut = (self.factory)(input).inspect(move |_| {
completed.fetch_add(1, Ordering::Relaxed);
});

self.executor.execute(fut)
}

/// Execute the task with given input.
///
/// - Execute the task in the current thread if is not concurrent.
Expand All @@ -163,66 +183,31 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
};
}

loop {
// Try poll once to see if there is any ready task.
if let Some(task) = self.tasks.front_mut() {
if let Poll::Ready((i, o)) = poll!(task) {
match o {
Ok(o) => {
let _ = self.tasks.pop_front();
self.results.push_back(o)
}
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
}
return Err(err);
}
}
}
}

// Try to push new task if there are available space.
if self.tasks.len() < self.tasks.capacity() {
self.tasks
.push_back(self.executor.execute((self.factory)(input)));
return Ok(());
}

// Wait for the next task to be ready.
let task = self
// Try to push new task if there are available space.
if !self.has_remaining() {
let (i, o) = self
.tasks
.front_mut()
.expect("tasks must have at least one task");
let (i, o) = task.await;
.pop_front()
.expect("tasks must be available")
.await;
self.completed.fetch_sub(1, Ordering::Relaxed);
match o {
Ok(o) => {
let _ = self.tasks.pop_front();
self.results.push_back(o);
continue;
}
Ok(o) => self.results.push_back(o),
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
self.tasks.push_front(self.create_task(i));
} else {
self.clear();
self.errored = true;
return Err(err);
}
return Err(err);
}
}
}

self.tasks.push_back(self.create_task(input));
Ok(())
}

/// Fetch the successful result from the result queue.
Expand All @@ -235,23 +220,19 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
}

if let Some(result) = self.results.pop_front() {
self.completed.fetch_sub(1, Ordering::Relaxed);
return Some(Ok(result));
}

if let Some(task) = self.tasks.front_mut() {
if let Some(task) = self.tasks.pop_front() {
let (i, o) = task.await;
self.completed.fetch_sub(1, Ordering::Relaxed);
return match o {
Ok(o) => {
let _ = self.tasks.pop_front();
Some(Ok(o))
}
Ok(o) => Some(Ok(o)),
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
self.tasks.push_front(self.create_task(i));
} else {
self.clear();
self.errored = true;
Expand All @@ -273,6 +254,7 @@ mod tests {
use tokio::time::sleep;

use super::*;
use pretty_assertions::assert_eq;

#[tokio::test]
async fn test_concurrent_tasks() {
Expand Down
Loading