Skip to content

Commit 16c9708

Browse files
authored
fix(streaming): drop subtask on another blocking thread (risingwavelabs#8672)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 83057e5 commit 16c9708

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

src/compute/src/rpc/service/stream_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl StreamService for StreamServiceImpl {
108108
) -> std::result::Result<Response<DropActorsResponse>, Status> {
109109
let req = request.into_inner();
110110
let actors = req.actor_ids;
111-
self.mgr.drop_actor(&actors).await?;
111+
self.mgr.drop_actors(&actors).await?;
112112
Ok(Response::new(DropActorsResponse {
113113
request_id: req.request_id,
114114
status: None,

src/stream/src/executor/subtask.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use tokio::sync::mpsc::error::SendError;
1919
use tokio_stream::wrappers::ReceiverStream;
2020

2121
use super::actor::spawn_blocking_drop_stream;
22-
use super::{BoxedExecutor, Executor, ExecutorInfo, MessageStreamItem};
22+
use super::{BoxedExecutor, Executor, ExecutorInfo, Message, MessageStreamItem};
23+
use crate::task::ActorId;
2324

2425
/// Handle used to drive the subtask.
2526
pub type SubtaskHandle = impl Future<Output = ()> + Send + 'static;
@@ -60,7 +61,7 @@ impl Executor for SubtaskRxExecutor {
6061
/// Used when there're multiple stateful executors in an actor. These subtasks can be concurrently
6162
/// executed to improve the I/O performance, while the computing resource can be still bounded to a
6263
/// single thread.
63-
pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
64+
pub fn wrap(input: BoxedExecutor, actor_id: ActorId) -> (SubtaskHandle, SubtaskRxExecutor) {
6465
let (tx, rx) = mpsc::channel(1);
6566
let rx_executor = SubtaskRxExecutor {
6667
info: ExecutorInfo {
@@ -72,7 +73,18 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
7273

7374
let handle = async move {
7475
let mut input = input.execute();
76+
7577
while let Some(item) = input.next().await {
78+
// Decide whether to stop the subtask. We explicitly do this instead of relying on the
79+
// termination of the input stream, because we don't want to exhaust the stream, which
80+
// causes the stream dropped in the scope of the current async task and blocks other
81+
// actors. See `spawn_blocking_drop_stream` for more details.
82+
let to_stop = match &item {
83+
Ok(Message::Barrier(barrier)) => barrier.is_stop_or_update_drop_actor(actor_id),
84+
Ok(_) => false,
85+
Err(_) => true,
86+
};
87+
7688
// It's possible that the downstream itself yields an error (e.g. from remote input) and
7789
// finishes, so we may fail to send the message. In this case, we can simply ignore the
7890
// send error and exit as well. If the message itself is another error, log it.
@@ -85,7 +97,12 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
8597
}
8698
break;
8799
}
100+
101+
if to_stop {
102+
break;
103+
}
88104
}
105+
89106
spawn_blocking_drop_stream(input).await;
90107
}
91108
.instrument_await("Subtask");

src/stream/src/task/stream_manager.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -310,18 +310,19 @@ impl LocalStreamManager {
310310
Ok(())
311311
}
312312

313-
pub async fn drop_actor(&self, actors: &[ActorId]) -> StreamResult<()> {
313+
/// Drop the resources of the given actors.
314+
pub async fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> {
314315
let mut core = self.core.lock().await;
315-
for id in actors {
316-
core.drop_actor(*id);
316+
for &id in actors {
317+
core.drop_actor(id);
317318
}
318319
tracing::debug!(actors = ?actors, "drop actors");
319320
Ok(())
320321
}
321322

322-
/// Force stop all actors on this worker.
323+
/// Force stop all actors on this worker, and then drop their resources.
323324
pub async fn stop_all_actors(&self) -> StreamResult<()> {
324-
self.core.lock().await.drop_all_actors().await;
325+
self.core.lock().await.stop_all_actors().await;
325326
// Clear shared buffer in storage to release memory
326327
self.clear_storage_buffer().await;
327328
self.clear_all_senders_and_collect_rx();
@@ -557,7 +558,7 @@ impl LocalStreamManagerCore {
557558

558559
// If there're multiple stateful executors in this actor, we will wrap it into a subtask.
559560
let executor = if has_stateful && is_stateful {
560-
let (subtask, executor) = subtask::wrap(executor);
561+
let (subtask, executor) = subtask::wrap(executor, actor_context.id);
561562
subtasks.push(subtask);
562563
executor.boxed()
563564
} else {
@@ -781,14 +782,16 @@ impl LocalStreamManagerCore {
781782
.inspect(|handle| handle.abort());
782783
self.context.actor_infos.write().remove(&actor_id);
783784
self.actors.remove(&actor_id);
784-
// Task should have already stopped when this method is invoked.
785-
self.handles
786-
.remove(&actor_id)
787-
.inspect(|handle| handle.abort());
785+
786+
// Task should have already stopped when this method is invoked. There might be some
787+
// clean-up work left (like dropping in-memory data structures), but we don't have to wait
788+
// for them to finish, in order to make this request non-blocking.
789+
self.handles.remove(&actor_id);
788790
}
789791

790-
/// `drop_all_actors` is invoked by meta node via RPC for recovery purpose.
791-
async fn drop_all_actors(&mut self) {
792+
/// `stop_all_actors` is invoked by meta node via RPC for recovery purpose. Different from the
793+
/// `drop_actor`, the execution of the actors will be aborted.
794+
async fn stop_all_actors(&mut self) {
792795
for (actor_id, handle) in &self.handles {
793796
tracing::debug!("force stopping actor {}", actor_id);
794797
handle.abort();

0 commit comments

Comments
 (0)