Skip to content

Commit 2d79894

Browse files
authored
feat(optimizer): index accelerating TopN (#7726)
Optimizing applicable `LogicalTopN-LogicalScan` to `LogicalLimit-LogicalScan` by a new rule named `top_n_on_index` that arbitrarily match the above structure. Please explain **IN DETAIL** what the changes are in this PR and why they are needed: - Summarize your change (**mandatory**) `LogicalTopN` will be replaced with `LogicalLimit` if there is applicable index or primary key. On index: ```SQL dev=> explain select v1 from t order by v1 limit 1; QUERY PLAN -------------------------------------------------------- BatchLimit { limit: 1, offset: 0 } └─BatchExchange { order: [idx1.v1 ASC], dist: Single } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: idx1, columns: [v1] } (4 rows) ``` On primary key: ```SQL dev=> explain select * from t order by k limit 1; QUERY PLAN ---------------------------------------------------- BatchLimit { limit: 1, offset: 0 } └─BatchExchange { order: [t.k ASC], dist: Single } └─BatchLimit { limit: 1, offset: 0 } └─BatchScan { table: t, columns: [k, cnt] } (4 rows) ``` Approved-By: chenzl25 Approved-By: BugenZhao Co-Authored-By: Eurekaaw <[email protected]> Co-Authored-By: Clearlove <[email protected]>
1 parent 7b71a15 commit 2d79894

File tree

18 files changed

+339
-22
lines changed

18 files changed

+339
-22
lines changed

dashboard/proto/gen/batch_plan.ts

Lines changed: 42 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
statement ok
2+
SET RW_IMPLICIT_FLUSH TO true;
3+
4+
statement ok
5+
create table t(x int, y int);
6+
7+
statement ok
8+
create index idx on t(x);
9+
10+
statement ok
11+
insert into t values (100, 3), (1, 0), (2, 3), (3, 4), (5, 4);
12+
13+
query II
14+
select * from t order by x limit 1;
15+
----
16+
1 0
17+
18+
query II
19+
select * from t order by x limit 3;
20+
----
21+
1 0
22+
2 3
23+
3 4
24+
25+
statement ok
26+
create table t1(x int primary key, y int);
27+
28+
statement ok
29+
insert into t1 values (100, 3), (1, 0), (2, 3), (3, 4), (5, 4);
30+
31+
query II
32+
select * from t1 order by x limit 1;
33+
----
34+
1 0
35+
36+
query II
37+
select * from t1 order by x limit 3;
38+
----
39+
1 0
40+
2 3
41+
3 4
42+
43+
statement ok
44+
drop table t;
45+
46+
statement ok
47+
drop table t1;

proto/batch_plan.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ message RowSeqScanNode {
2424
common.Buffer vnode_bitmap = 4;
2525
// Whether the order on output columns should be preserved.
2626
bool ordered = 5;
27+
28+
message ChunkSize {
29+
uint32 chunk_size = 1;
30+
}
31+
// If along with `batch_limit`, `chunk_size` will be set.
32+
ChunkSize chunk_size = 6;
2733
}
2834

2935
message SysRowSeqScanNode {

src/batch/src/executor/join/local_lookup_join.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
115115
scan_ranges,
116116
ordered: false,
117117
vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
118+
chunk_size: None,
118119
});
119120

120121
Ok(row_seq_scan_node)

src/batch/src/executor/row_seq_scan.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,13 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
241241
let ordered = seq_scan_node.ordered;
242242

243243
let epoch = source.epoch.clone();
244-
let chunk_size = source.context.get_config().developer.batch_chunk_size;
244+
let chunk_size = if let Some(chunk_size_) = &seq_scan_node.chunk_size {
245+
chunk_size_
246+
.get_chunk_size()
247+
.min(source.context.get_config().developer.batch_chunk_size as u32)
248+
} else {
249+
source.context.get_config().developer.batch_chunk_size as u32
250+
};
245251
let metrics = source.context().task_metrics();
246252

247253
dispatch_state_store!(source.context().state_store(), state_store, {
@@ -262,7 +268,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
262268
scan_ranges,
263269
ordered,
264270
epoch,
265-
chunk_size,
271+
chunk_size as usize,
266272
source.plan_node().get_identity().clone(),
267273
metrics,
268274
)))

src/frontend/planner_test/tests/testdata/explain.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
"stages": {
5252
"0": {
5353
"root": {
54-
"plan_node_id": 28,
54+
"plan_node_id": 29,
5555
"plan_node_type": "BatchProject",
5656
"schema": [
5757
{
@@ -64,7 +64,7 @@
6464
],
6565
"children": [
6666
{
67-
"plan_node_id": 26,
67+
"plan_node_id": 27,
6868
"plan_node_type": "BatchValues",
6969
"schema": [],
7070
"children": [],

src/frontend/planner_test/tests/testdata/index_selection.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,3 +578,23 @@
578578
batch_plan: |
579579
BatchExchange { order: [], dist: Single }
580580
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b, idx1.c], scan_ranges: [idx1.a = Int32(1)], distribution: UpstreamHashShard(idx1.a) }
581+
- name: topn on index
582+
sql: |
583+
create table t1 (a int, b int);
584+
create index idx1 on t1(a);
585+
select * from t1 order by a limit 1
586+
batch_plan: |
587+
BatchLimit { limit: 1, offset: 0 }
588+
└─BatchExchange { order: [idx1.a ASC], dist: Single }
589+
└─BatchLimit { limit: 1, offset: 0 }
590+
└─BatchScan { table: idx1, columns: [idx1.a, idx1.b], distribution: UpstreamHashShard(idx1.a) }
591+
- name: topn on primary key
592+
sql: |
593+
create table t1 (a int primary key, b int);
594+
create index idx1 on t1(a);
595+
select * from t1 order by a limit 1
596+
batch_plan: |
597+
BatchLimit { limit: 1, offset: 0 }
598+
└─BatchExchange { order: [t1.a ASC], dist: Single }
599+
└─BatchLimit { limit: 1, offset: 0 }
600+
└─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: UpstreamHashShard(t1.a) }

src/frontend/planner_test/tests/testdata/nexmark_source.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@
350350
| └─StreamExchange { dist: HashShard(auction, window_start) }
351351
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
352352
| └─StreamProject { exprs: [auction, date_time, _row_id] }
353-
| └─StreamShare { id = 744 }
353+
| └─StreamShare { id = 764 }
354354
| └─StreamProject { exprs: [auction, date_time, _row_id] }
355355
| └─StreamRowIdGen { row_id_index: 7 }
356356
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
@@ -362,7 +362,7 @@
362362
└─StreamExchange { dist: HashShard(auction, window_start) }
363363
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
364364
└─StreamProject { exprs: [auction, date_time, _row_id] }
365-
└─StreamShare { id = 744 }
365+
└─StreamShare { id = 764 }
366366
└─StreamProject { exprs: [auction, date_time, _row_id] }
367367
└─StreamRowIdGen { row_id_index: 7 }
368368
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
@@ -480,7 +480,7 @@
480480
└─StreamHashJoin { type: Inner, predicate: price = max(price), output: all }
481481
├─StreamExchange { dist: HashShard(price) }
482482
| └─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
483-
| └─StreamShare { id = 434 }
483+
| └─StreamShare { id = 446 }
484484
| └─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
485485
| └─StreamRowIdGen { row_id_index: 7 }
486486
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
@@ -489,7 +489,7 @@
489489
└─StreamAppendOnlyHashAgg { group_key: [(TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval)], aggs: [count, max(price)] }
490490
└─StreamExchange { dist: HashShard((TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval)) }
491491
└─StreamProject { exprs: [(TumbleStart(date_time, '00:00:10':Interval) + '00:00:10':Interval), price, _row_id] }
492-
└─StreamShare { id = 434 }
492+
└─StreamShare { id = 446 }
493493
└─StreamProject { exprs: [auction, bidder, price, date_time, _row_id] }
494494
└─StreamRowIdGen { row_id_index: 7 }
495495
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
@@ -1331,7 +1331,7 @@
13311331
| | └─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
13321332
| └─StreamExchange { dist: HashShard(auction) }
13331333
| └─StreamProject { exprs: [auction, _row_id] }
1334-
| └─StreamShare { id = 553 }
1334+
| └─StreamShare { id = 571 }
13351335
| └─StreamProject { exprs: [auction, _row_id] }
13361336
| └─StreamRowIdGen { row_id_index: 7 }
13371337
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
@@ -1343,7 +1343,7 @@
13431343
└─StreamAppendOnlyHashAgg { group_key: [auction], aggs: [count, count] }
13441344
└─StreamExchange { dist: HashShard(auction) }
13451345
└─StreamProject { exprs: [auction, _row_id] }
1346-
└─StreamShare { id = 553 }
1346+
└─StreamShare { id = 571 }
13471347
└─StreamProject { exprs: [auction, _row_id] }
13481348
└─StreamRowIdGen { row_id_index: 7 }
13491349
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }

src/frontend/planner_test/tests/testdata/share.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
| └─StreamProject { exprs: [id, _row_id] }
4949
| └─StreamFilter { predicate: (initial_bid = 1:Int32) }
5050
| └─StreamProject { exprs: [id, initial_bid, _row_id] }
51-
| └─StreamShare { id = 519 }
51+
| └─StreamShare { id = 539 }
5252
| └─StreamProject { exprs: [id, initial_bid, _row_id] }
5353
| └─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
5454
| └─StreamRowIdGen { row_id_index: 9 }
@@ -57,7 +57,7 @@
5757
└─StreamProject { exprs: [id, _row_id] }
5858
└─StreamFilter { predicate: (initial_bid = 2:Int32) }
5959
└─StreamProject { exprs: [id, initial_bid, _row_id] }
60-
└─StreamShare { id = 519 }
60+
└─StreamShare { id = 539 }
6161
└─StreamProject { exprs: [id, initial_bid, _row_id] }
6262
└─StreamFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
6363
└─StreamRowIdGen { row_id_index: 9 }
@@ -125,7 +125,7 @@
125125
| └─StreamExchange { dist: HashShard(auction, window_start) }
126126
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
127127
| └─StreamProject { exprs: [auction, date_time, _row_id] }
128-
| └─StreamShare { id = 744 }
128+
| └─StreamShare { id = 764 }
129129
| └─StreamProject { exprs: [auction, date_time, _row_id] }
130130
| └─StreamRowIdGen { row_id_index: 4 }
131131
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
@@ -137,7 +137,7 @@
137137
└─StreamExchange { dist: HashShard(auction, window_start) }
138138
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
139139
└─StreamProject { exprs: [auction, date_time, _row_id] }
140-
└─StreamShare { id = 744 }
140+
└─StreamShare { id = 764 }
141141
└─StreamProject { exprs: [auction, date_time, _row_id] }
142142
└─StreamRowIdGen { row_id_index: 4 }
143143
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }

src/frontend/planner_test/tests/testdata/shared_views.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
└─StreamHashJoin { type: Inner, predicate: (t1.x + t1.y) = (t1.x * (t1.x + t1.y)), output: [(t1.x + t1.y), (t1.x * (t1.x + t1.y)), (t1.y * (t1.x + t1.y)), t1._row_id, t1._row_id, t1._row_id, t1.x, (t1.x + t1.y)] }
2727
├─StreamExchange { dist: HashShard((t1.x + t1.y)) }
2828
| └─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
29-
| └─StreamShare { id = 207 }
29+
| └─StreamShare { id = 212 }
3030
| └─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
3131
| └─StreamFilter { predicate: (t1.y > 0:Int32) }
3232
| └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
@@ -37,7 +37,7 @@
3737
| └─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
3838
└─StreamExchange { dist: HashShard((t1.x + t1.y)) }
3939
└─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
40-
└─StreamShare { id = 207 }
40+
└─StreamShare { id = 212 }
4141
└─StreamProject { exprs: [(t1.x + t1.y), t1._row_id] }
4242
└─StreamFilter { predicate: (t1.y > 0:Int32) }
4343
└─StreamTableScan { table: t1, columns: [t1.x, t1.y, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }

src/frontend/planner_test/tests/testdata/tpch.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2573,7 +2573,7 @@
25732573
| ├─StreamExchange { dist: HashShard(supplier.s_suppkey) }
25742574
| | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) }
25752575
| └─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
2576-
| └─StreamShare { id = 900 }
2576+
| └─StreamShare { id = 921 }
25772577
| └─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
25782578
| └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [count, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
25792579
| └─StreamExchange { dist: HashShard(lineitem.l_suppkey) }
@@ -2587,7 +2587,7 @@
25872587
└─StreamHashAgg { group_key: [Vnode(lineitem.l_suppkey)], aggs: [count, max(sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount))))] }
25882588
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount))), Vnode(lineitem.l_suppkey)] }
25892589
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
2590-
└─StreamShare { id = 900 }
2590+
└─StreamShare { id = 921 }
25912591
└─StreamProject { exprs: [lineitem.l_suppkey, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
25922592
└─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [count, sum((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)))] }
25932593
└─StreamExchange { dist: HashShard(lineitem.l_suppkey) }

src/frontend/src/optimizer/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,13 @@ impl PlanRoot {
429429
ApplyOrder::TopDown,
430430
);
431431

432+
plan = self.optimize_by_rules(
433+
plan,
434+
"Agg on Index".to_string(),
435+
vec![TopNOnIndexRule::create()],
436+
ApplyOrder::TopDown,
437+
);
438+
432439
#[cfg(debug_assertions)]
433440
InputRefValidator.validate(plan.clone());
434441

src/frontend/src/optimizer/plan_node/batch_limit.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,15 @@ impl BatchLimit {
4949
let new_offset = 0;
5050
let logical_partial_limit = LogicalLimit::new(input, new_limit, new_offset);
5151
let batch_partial_limit = Self::new(logical_partial_limit);
52-
let ensure_single_dist = RequiredDist::single()
53-
.enforce_if_not_satisfies(batch_partial_limit.into(), &Order::any())?;
52+
let any_order = Order::any();
53+
let ensure_single_dist = RequiredDist::single().enforce_if_not_satisfies(
54+
batch_partial_limit.into(),
55+
if self.order().field_order.is_empty() {
56+
&any_order
57+
} else {
58+
self.order()
59+
},
60+
)?;
5461
let batch_global_limit = self.clone_with_input(ensure_single_dist);
5562
Ok(batch_global_limit.into())
5663
}

src/frontend/src/optimizer/plan_node/batch_seq_scan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use risingwave_common::error::Result;
2020
use risingwave_common::types::ScalarImpl;
2121
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
2222
use risingwave_pb::batch_plan::plan_node::NodeBody;
23+
use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
2324
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
2425
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
2526

@@ -240,6 +241,10 @@ impl ToBatchProst for BatchSeqScan {
240241
// To be filled by the scheduler.
241242
vnode_bitmap: None,
242243
ordered: !self.order().is_any(),
244+
chunk_size: self
245+
.logical
246+
.chunk_size()
247+
.map(|chunk_size| ChunkSize { chunk_size }),
243248
})
244249
}
245250
}

src/frontend/src/optimizer/plan_node/generic/scan.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub struct Scan {
3737
pub indexes: Vec<Rc<IndexCatalog>>,
3838
/// The pushed down predicates. It refers to column indexes of the table.
3939
pub predicate: Condition,
40+
/// Help RowSeqScan executor use a better chunk size
41+
pub chunk_size: Option<u32>,
4042
}
4143

4244
impl Scan {

0 commit comments

Comments
 (0)