Skip to content

Commit da57717

Browse files
authored
fix(streaming): drop actor on another blocking thread (risingwavelabs#8624)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent f111bfb commit da57717

File tree

3 files changed

+40
-25
lines changed

3 files changed

+40
-25
lines changed

src/stream/src/executor/actor.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
use std::sync::atomic::{AtomicUsize, Ordering};
1616
use std::sync::Arc;
1717

18+
use anyhow::anyhow;
1819
use await_tree::InstrumentAwait;
1920
use futures::future::join_all;
20-
use futures::pin_mut;
2121
use hytra::TrAdder;
2222
use minitrace::prelude::*;
2323
use parking_lot::Mutex;
@@ -173,33 +173,33 @@ where
173173
};
174174

175175
let mut last_epoch: Option<EpochPair> = None;
176-
177-
let stream = Box::new(self.consumer).execute();
178-
pin_mut!(stream);
176+
let mut stream = Box::pin(Box::new(self.consumer).execute());
179177

180178
// Drive the streaming task with an infinite loop
181-
while let Some(barrier) = stream
182-
.next()
183-
.in_span(span)
184-
.instrument_await(
185-
last_epoch.map_or("Epoch <initial>".into(), |e| format!("Epoch {}", e.curr)),
186-
)
187-
.await
188-
.transpose()?
189-
{
190-
last_epoch = Some(barrier.epoch);
179+
let result = loop {
180+
let barrier = match stream
181+
.try_next()
182+
.in_span(span)
183+
.instrument_await(
184+
last_epoch.map_or("Epoch <initial>".into(), |e| format!("Epoch {}", e.curr)),
185+
)
186+
.await
187+
{
188+
Ok(Some(barrier)) => barrier,
189+
Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
190+
Err(err) => break Err(err),
191+
};
191192

192193
// Collect barriers to local barrier manager
193194
self.context.lock_barrier_manager().collect(id, &barrier);
194195

195196
// Then stop this actor if asked
196-
let to_stop = barrier.is_stop_or_update_drop_actor(id);
197-
if to_stop {
198-
tracing::trace!(actor_id = id, "actor exit");
199-
return Ok(());
197+
if barrier.is_stop_or_update_drop_actor(id) {
198+
break Ok(());
200199
}
201200

202201
// Tracing related work
202+
last_epoch = Some(barrier.epoch);
203203
span = {
204204
let mut span = Span::enter_with_local_parent("actor_poll");
205205
span.add_property(|| ("otel.name", span_name.to_string()));
@@ -208,8 +208,24 @@ where
208208
span.add_property(|| ("epoch", barrier.epoch.curr.to_string()));
209209
span
210210
};
211-
}
211+
};
212+
213+
spawn_blocking_drop_stream(stream).await;
212214

213-
Ok(())
215+
tracing::trace!(actor_id = id, "actor exit");
216+
result
214217
}
215218
}
219+
220+
/// Drop the stream in a blocking task to avoid interfering with other actors.
221+
///
222+
/// Logically the actor is dropped after we send the barrier with `Drop` mutation to the
223+
/// downstream,thus making the `drop`'s progress asynchronous. However, there might be a
224+
/// considerable amount of data in the executors' in-memory cache, dropping these structures might
225+
/// be a CPU-intensive task. This may lead to the runtime being unable to schedule other actors if
226+
/// the `drop` is called on the current thread.
227+
pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
228+
let _ = tokio::task::spawn_blocking(move || drop(stream))
229+
.instrument_await("drop_stream")
230+
.await;
231+
}

src/stream/src/executor/dispatch.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,12 @@ impl StreamConsumer for DispatchExecutor {
262262
#[for_await]
263263
for msg in input {
264264
let msg: Message = msg?;
265-
let (barrier, message) = match msg {
265+
let (barrier, span) = match msg {
266266
Message::Chunk(_) => (None, "dispatch_chunk"),
267267
Message::Barrier(ref barrier) => (Some(barrier.clone()), "dispatch_barrier"),
268268
Message::Watermark(_) => (None, "dispatch_watermark"),
269269
};
270-
self.inner
271-
.dispatch(msg)
272-
.verbose_instrument_await(message)
273-
.await?;
270+
self.inner.dispatch(msg).instrument_await(span).await?;
274271
if let Some(barrier) = barrier {
275272
yield barrier;
276273
}

src/stream/src/executor/subtask.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio::sync::mpsc;
1818
use tokio::sync::mpsc::error::SendError;
1919
use tokio_stream::wrappers::ReceiverStream;
2020

21+
use super::actor::spawn_blocking_drop_stream;
2122
use super::{BoxedExecutor, Executor, ExecutorInfo, MessageStreamItem};
2223

2324
/// Handle used to drive the subtask.
@@ -85,6 +86,7 @@ pub fn wrap(input: BoxedExecutor) -> (SubtaskHandle, SubtaskRxExecutor) {
8586
break;
8687
}
8788
}
89+
spawn_blocking_drop_stream(input).await;
8890
}
8991
.instrument_await("Subtask");
9092

0 commit comments

Comments
 (0)