Skip to content

Commit 5576c2c

Browse files
authored
feat: support sequential exchange (#14795)
1 parent 4793c51 commit 5576c2c

File tree

12 files changed

+95
-43
lines changed

12 files changed

+95
-43
lines changed

proto/batch_plan.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ message ExchangeSource {
241241

242242
message ExchangeNode {
243243
repeated ExchangeSource sources = 1;
244+
bool sequential = 2;
244245
repeated plan_common.Field input_schema = 3;
245246
}
246247

src/batch/src/executor/generic_exchange.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use futures::StreamExt;
1616
use futures_async_stream::try_stream;
17-
use itertools::Itertools;
1817
use risingwave_common::array::DataChunk;
1918
use risingwave_common::catalog::{Field, Schema};
2019
use risingwave_common::util::iter_util::ZipEqFast;
@@ -39,6 +38,7 @@ pub struct GenericExchangeExecutor<CS, C> {
3938
proto_sources: Vec<PbExchangeSource>,
4039
/// Mock-able CreateSource.
4140
source_creators: Vec<CS>,
41+
sequential: bool,
4242
context: C,
4343

4444
schema: Schema,
@@ -126,6 +126,8 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
126126
NodeBody::Exchange
127127
)?;
128128

129+
let sequential = node.get_sequential();
130+
129131
ensure!(!node.get_sources().is_empty());
130132
let proto_sources: Vec<PbExchangeSource> = node.get_sources().to_vec();
131133
let source_creators =
@@ -136,6 +138,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
136138
Ok(Box::new(ExchangeExecutor::<C> {
137139
proto_sources,
138140
source_creators,
141+
sequential,
139142
context: source.context().clone(),
140143
schema: Schema { fields },
141144
task_id: source.task_id.clone(),
@@ -164,26 +167,33 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> Executor
164167
impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExecutor<CS, C> {
165168
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
166169
async fn do_execute(self: Box<Self>) {
167-
let mut stream = select_all(
168-
self.proto_sources
169-
.into_iter()
170-
.zip_eq_fast(self.source_creators)
171-
.map(|(prost_source, source_creator)| {
172-
Self::data_chunk_stream(
173-
prost_source,
174-
source_creator,
175-
self.context.clone(),
176-
self.metrics.clone(),
177-
self.identity.clone(),
178-
)
179-
})
180-
.collect_vec(),
181-
)
182-
.boxed();
183-
184-
while let Some(data_chunk) = stream.next().await {
185-
let data_chunk = data_chunk?;
186-
yield data_chunk
170+
let streams = self
171+
.proto_sources
172+
.into_iter()
173+
.zip_eq_fast(self.source_creators)
174+
.map(|(prost_source, source_creator)| {
175+
Self::data_chunk_stream(
176+
prost_source,
177+
source_creator,
178+
self.context.clone(),
179+
self.metrics.clone(),
180+
self.identity.clone(),
181+
)
182+
});
183+
184+
if self.sequential {
185+
for mut stream in streams {
186+
while let Some(data_chunk) = stream.next().await {
187+
let data_chunk = data_chunk?;
188+
yield data_chunk
189+
}
190+
}
191+
} else {
192+
let mut stream = select_all(streams).boxed();
193+
while let Some(data_chunk) = stream.next().await {
194+
let data_chunk = data_chunk?;
195+
yield data_chunk
196+
}
187197
}
188198
}
189199

@@ -262,6 +272,7 @@ mod tests {
262272
metrics: None,
263273
proto_sources,
264274
source_creators,
275+
sequential: false,
265276
context,
266277
schema: Schema {
267278
fields: vec![Field::unnamed(DataType::Int32)],

src/batch/src/executor/join/local_lookup_join.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
224224

225225
let exchange_node = NodeBody::Exchange(ExchangeNode {
226226
sources,
227+
sequential: true,
227228
input_schema: self.inner_side_schema.to_prost(),
228229
});
229230

src/frontend/planner_test/tests/testdata/output/basic_query.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@
189189
select * from t limit 1
190190
batch_plan: |-
191191
BatchLimit { limit: 1, offset: 0 }
192-
└─BatchExchange { order: [], dist: Single }
192+
└─BatchExchange { order: [], dist: Single, sequential: true }
193193
└─BatchLimit { limit: 1, offset: 0 }
194194
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 1, distribution: SomeShard }
195195
- sql: |

src/frontend/planner_test/tests/testdata/output/except.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@
220220
└─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
221221
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
222222
│ └─BatchLimit { limit: 1, offset: 0 }
223-
│ └─BatchExchange { order: [], dist: Single }
223+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
224224
│ └─BatchLimit { limit: 1, offset: 0 }
225225
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
226226
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
227227
└─BatchLimit { limit: 1, offset: 0 }
228-
└─BatchExchange { order: [], dist: Single }
228+
└─BatchExchange { order: [], dist: Single, sequential: true }
229229
└─BatchLimit { limit: 1, offset: 0 }
230230
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
231231
- sql: |
@@ -247,12 +247,12 @@
247247
└─BatchHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
248248
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
249249
│ └─BatchLimit { limit: 1, offset: 0 }
250-
│ └─BatchExchange { order: [], dist: Single }
250+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
251251
│ └─BatchLimit { limit: 1, offset: 0 }
252252
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
253253
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
254254
└─BatchLimit { limit: 1, offset: 0 }
255-
└─BatchExchange { order: [], dist: Single }
255+
└─BatchExchange { order: [], dist: Single, sequential: true }
256256
└─BatchLimit { limit: 1, offset: 0 }
257257
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
258258
- sql: |

src/frontend/planner_test/tests/testdata/output/intersect.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@
220220
└─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
221221
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
222222
│ └─BatchLimit { limit: 1, offset: 0 }
223-
│ └─BatchExchange { order: [], dist: Single }
223+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
224224
│ └─BatchLimit { limit: 1, offset: 0 }
225225
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
226226
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
227227
└─BatchLimit { limit: 1, offset: 0 }
228-
└─BatchExchange { order: [], dist: Single }
228+
└─BatchExchange { order: [], dist: Single, sequential: true }
229229
└─BatchLimit { limit: 1, offset: 0 }
230230
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
231231
- sql: |
@@ -247,12 +247,12 @@
247247
└─BatchHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
248248
├─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
249249
│ └─BatchLimit { limit: 1, offset: 0 }
250-
│ └─BatchExchange { order: [], dist: Single }
250+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
251251
│ └─BatchLimit { limit: 1, offset: 0 }
252252
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
253253
└─BatchExchange { order: [], dist: HashShard(t2.a, t2.b, t2.c) }
254254
└─BatchLimit { limit: 1, offset: 0 }
255-
└─BatchExchange { order: [], dist: Single }
255+
└─BatchExchange { order: [], dist: Single, sequential: true }
256256
└─BatchLimit { limit: 1, offset: 0 }
257257
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
258258
- sql: |

src/frontend/planner_test/tests/testdata/output/limit.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@
158158
select * from t limit 1;
159159
batch_plan: |-
160160
BatchLimit { limit: 1, offset: 0 }
161-
└─BatchExchange { order: [], dist: Single }
161+
└─BatchExchange { order: [], dist: Single, sequential: true }
162162
└─BatchLimit { limit: 1, offset: 0 }
163163
└─BatchScan { table: t, columns: [t.a], limit: 1, distribution: UpstreamHashShard(t.a) }
164164
stream_plan: |-

src/frontend/planner_test/tests/testdata/output/order_by.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@
9696
select * from t limit 3 offset 4;
9797
batch_plan: |-
9898
BatchLimit { limit: 3, offset: 4 }
99-
└─BatchExchange { order: [], dist: Single }
99+
└─BatchExchange { order: [], dist: Single, sequential: true }
100100
└─BatchLimit { limit: 7, offset: 0 }
101101
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 7, distribution: SomeShard }
102102
- sql: |
103103
create table t (v1 bigint, v2 double precision);
104104
select * from t limit 5;
105105
batch_plan: |-
106106
BatchLimit { limit: 5, offset: 0 }
107-
└─BatchExchange { order: [], dist: Single }
107+
└─BatchExchange { order: [], dist: Single, sequential: true }
108108
└─BatchLimit { limit: 5, offset: 0 }
109109
└─BatchScan { table: t, columns: [t.v1, t.v2], limit: 5, distribution: SomeShard }
110110
- sql: |

src/frontend/planner_test/tests/testdata/output/union.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@
252252
└─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
253253
└─BatchUnion { all: true }
254254
├─BatchLimit { limit: 1, offset: 0 }
255-
│ └─BatchExchange { order: [], dist: Single }
255+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
256256
│ └─BatchLimit { limit: 1, offset: 0 }
257257
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
258258
└─BatchLimit { limit: 1, offset: 0 }
259-
└─BatchExchange { order: [], dist: Single }
259+
└─BatchExchange { order: [], dist: Single, sequential: true }
260260
└─BatchLimit { limit: 1, offset: 0 }
261261
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
262262
- sql: |
@@ -278,11 +278,11 @@
278278
└─BatchExchange { order: [], dist: HashShard(t1.a, t1.b, t1.c) }
279279
└─BatchUnion { all: true }
280280
├─BatchLimit { limit: 1, offset: 0 }
281-
│ └─BatchExchange { order: [], dist: Single }
281+
│ └─BatchExchange { order: [], dist: Single, sequential: true }
282282
│ └─BatchLimit { limit: 1, offset: 0 }
283283
│ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], limit: 1, distribution: SomeShard }
284284
└─BatchLimit { limit: 1, offset: 0 }
285-
└─BatchExchange { order: [], dist: Single }
285+
└─BatchExchange { order: [], dist: Single, sequential: true }
286286
└─BatchLimit { limit: 1, offset: 0 }
287287
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], limit: 1, distribution: SomeShard }
288288
- sql: |

src/frontend/src/optimizer/plan_node/batch_exchange.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,27 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order
3030
pub struct BatchExchange {
3131
pub base: PlanBase<Batch>,
3232
input: PlanRef,
33+
sequential: bool,
3334
}
3435

3536
impl BatchExchange {
3637
pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
38+
Self::new_inner(input, order, dist, false)
39+
}
40+
41+
pub fn new_with_sequential(input: PlanRef, order: Order, dist: Distribution) -> Self {
42+
Self::new_inner(input, order, dist, true)
43+
}
44+
45+
fn new_inner(input: PlanRef, order: Order, dist: Distribution, sequential: bool) -> Self {
3746
let ctx = input.ctx();
3847
let schema = input.schema().clone();
3948
let base = PlanBase::new_batch(ctx, schema, dist, order);
40-
BatchExchange { base, input }
49+
BatchExchange {
50+
base,
51+
input,
52+
sequential,
53+
}
4154
}
4255
}
4356

@@ -53,7 +66,11 @@ impl Distill for BatchExchange {
5366
distribution: self.base.distribution(),
5467
input_schema,
5568
});
56-
childless_record("BatchExchange", vec![("order", order), ("dist", dist)])
69+
let mut fields = vec![("order", order), ("dist", dist)];
70+
if self.sequential {
71+
fields.push(("sequential", Pretty::display(&true)));
72+
}
73+
childless_record("BatchExchange", fields)
5774
}
5875
}
5976

@@ -63,7 +80,12 @@ impl PlanTreeNodeUnary for BatchExchange {
6380
}
6481

6582
fn clone_with_input(&self, input: PlanRef) -> Self {
66-
Self::new(input, self.order().clone(), self.distribution().clone())
83+
Self::new_inner(
84+
input,
85+
self.order().clone(),
86+
self.distribution().clone(),
87+
self.sequential,
88+
)
6789
}
6890
}
6991
impl_plan_tree_node_for_unary! {BatchExchange}
@@ -80,12 +102,15 @@ impl ToBatchPb for BatchExchange {
80102
if self.base.order().is_any() {
81103
NodeBody::Exchange(ExchangeNode {
82104
sources: vec![],
105+
sequential: self.sequential,
83106
input_schema: self.base.schema().to_prost(),
84107
})
85108
} else {
109+
assert!(!self.sequential);
86110
NodeBody::MergeSortExchange(MergeSortExchangeNode {
87111
exchange: Some(ExchangeNode {
88112
sources: vec![],
113+
sequential: self.sequential,
89114
input_schema: self.base.schema().to_prost(),
90115
}),
91116
column_orders: self.base.order().to_protobuf(),

src/frontend/src/optimizer/plan_node/batch_limit.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use super::{
2323
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
2424
};
2525
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26-
use crate::optimizer::plan_node::ToLocalBatch;
27-
use crate::optimizer::property::{Order, RequiredDist};
26+
use crate::optimizer::plan_node::{BatchExchange, ToLocalBatch};
27+
use crate::optimizer::property::{Distribution, Order, RequiredDist};
2828

2929
/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
3030
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -33,6 +33,8 @@ pub struct BatchLimit {
3333
core: generic::Limit<PlanRef>,
3434
}
3535

36+
const LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD: u64 = 1024;
37+
3638
impl BatchLimit {
3739
pub fn new(core: generic::Limit<PlanRef>) -> Self {
3840
let base = PlanBase::new_batch_with_core(
@@ -52,7 +54,17 @@ impl BatchLimit {
5254

5355
let single_dist = RequiredDist::single();
5456
let ensure_single_dist = if !batch_partial_limit.distribution().satisfies(&single_dist) {
55-
single_dist.enforce_if_not_satisfies(batch_partial_limit.into(), &any_order)?
57+
if new_limit < LIMIT_SEQUENTIAL_EXCHANGE_THRESHOLD {
58+
BatchExchange::new_with_sequential(
59+
batch_partial_limit.into(),
60+
any_order,
61+
Distribution::Single,
62+
)
63+
.into()
64+
} else {
65+
BatchExchange::new(batch_partial_limit.into(), any_order, Distribution::Single)
66+
.into()
67+
}
5668
} else {
5769
// The input's distribution is singleton, so use one phase limit is enough.
5870
return Ok(self.clone_with_input(new_input).into());

src/frontend/src/scheduler/distributed/stage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,7 @@ impl StageRunner {
940940
identity,
941941
node_body: Some(NodeBody::Exchange(ExchangeNode {
942942
sources: exchange_sources,
943+
sequential: true,
943944
input_schema: execution_plan_node.schema.clone(),
944945
})),
945946
},
@@ -949,6 +950,7 @@ impl StageRunner {
949950
node_body: Some(NodeBody::MergeSortExchange(MergeSortExchangeNode {
950951
exchange: Some(ExchangeNode {
951952
sources: exchange_sources,
953+
sequential: true,
952954
input_schema: execution_plan_node.schema.clone(),
953955
}),
954956
column_orders: sort_merge_exchange_node.column_orders.clone(),

0 commit comments

Comments
 (0)