Skip to content

Commit c96c913

Browse files
authored
fix(batch): fix distributed batch query not properly throwing errors to PG client (#18812)
1 parent 20699d4 commit c96c913

File tree

1 file changed

+17
-3
lines changed
  • src/frontend/src/scheduler/distributed

1 file changed

+17
-3
lines changed

src/frontend/src/scheduler/distributed/stage.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ impl StageRunner {
655655

656656
let shutdown_rx0 = shutdown_rx.clone();
657657

658-
expr_context_scope(expr_context, async {
658+
let result = expr_context_scope(expr_context, async {
659659
let executor = executor.build().await?;
660660
let chunk_stream = executor.execute();
661661
let cancelled = pin!(shutdown_rx.cancelled());
@@ -682,7 +682,19 @@ impl StageRunner {
682682
}
683683
}
684684
Ok(())
685-
}).await?;
685+
}).await;
686+
687+
if let Err(err) = &result {
688+
// If we encountered error when executing root stage locally, we have to notify the result fetcher, which is
689+
// returned by `distribute_execute` and being listened by the FE handler task. Otherwise the FE handler cannot
690+
// properly throw the error to the PG client.
691+
if let Err(_e) = result_tx
692+
.send(Err(TaskExecutionError(err.to_report_string())))
693+
.await
694+
{
695+
warn!("Send task execution failed");
696+
}
697+
}
686698

687699
// Terminated by other tasks execution error, so no need to return error here.
688700
match shutdown_rx0.message() {
@@ -701,7 +713,9 @@ impl StageRunner {
701713
self.stage.id
702714
);
703715

704-
Ok(())
716+
// We still have to throw the error in this current task, so that `StageRunner::run` can further
717+
// send `Failed` event to stop other stages.
718+
result.map(|_| ())
705719
}
706720

707721
async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> {

0 commit comments

Comments
 (0)