Skip to content

Commit ded73af

Browse files
authored
fix(batch): fix sequential exchange (#14924)
1 parent 6b3a319 commit ded73af

File tree

3 files changed

+6
-3
lines changed

3 files changed

+6
-3
lines changed

proto/batch_plan.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ message ExchangeSource {
241241

242242
message ExchangeNode {
243243
repeated ExchangeSource sources = 1;
244+
// sequential means each tasks of the exchange node will be executed sequentially.
244245
bool sequential = 2;
245246
repeated plan_common.Field input_schema = 3;
246247
}

src/frontend/src/optimizer/plan_node/batch_exchange.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order
3030
pub struct BatchExchange {
3131
pub base: PlanBase<Batch>,
3232
input: PlanRef,
33+
// sequential means each tasks of the exchange node will be executed sequentially.
34+
// Currently, it is used to avoid spawn too many tasks for limit operator.
3335
sequential: bool,
3436
}
3537

src/frontend/src/scheduler/distributed/stage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -935,12 +935,12 @@ impl StageRunner {
935935
let exchange_sources = child_stage.all_exchange_sources_for(task_id);
936936

937937
match &execution_plan_node.node {
938-
NodeBody::Exchange(_exchange_node) => PlanNodePb {
938+
NodeBody::Exchange(exchange_node) => PlanNodePb {
939939
children: vec![],
940940
identity,
941941
node_body: Some(NodeBody::Exchange(ExchangeNode {
942942
sources: exchange_sources,
943-
sequential: true,
943+
sequential: exchange_node.sequential,
944944
input_schema: execution_plan_node.schema.clone(),
945945
})),
946946
},
@@ -950,7 +950,7 @@ impl StageRunner {
950950
node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode {
951951
exchange: Some(ExchangeNode {
952952
sources: exchange_sources,
953-
sequential: true,
953+
sequential: false,
954954
input_schema: execution_plan_node.schema.clone(),
955955
}),
956956
column_orders: sort_merge_exchange_node.column_orders.clone(),

0 commit comments

Comments
 (0)