Skip to content

Commit ad7e21b

Browse files
authored
refractor(optimizer): replace StreamIndexScan by StreamTableScan on logical index scan (risingwavelabs#8567)
Signed-off-by: Clearlove <[email protected]>
1 parent 5efe089 commit ad7e21b

File tree

6 files changed

+21
-265
lines changed

6 files changed

+21
-265
lines changed

src/frontend/planner_test/tests/testdata/delta_join.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_conflict: "no check" }
1212
└─StreamExchange { dist: HashShard(i_a1.a1, i_a1.a._row_id, i_b1.b._row_id) }
1313
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
14-
├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
15-
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
14+
├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
15+
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
1616
- sql: |
1717
set rw_streaming_enable_delta_join = true;
1818
create table a (a1 int primary key, a2 int);
@@ -25,7 +25,7 @@
2525
└─StreamExchange { dist: HashShard(a.a1, i_b1.b._row_id) }
2626
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
2727
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
28-
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
28+
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
2929
- sql: |
3030
set rw_streaming_enable_delta_join = true;
3131
create table a (a1 int primary key, a2 int);

src/frontend/planner_test/tests/testdata/distribution_derive.yaml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
2121
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
2222
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
23-
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
24-
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
23+
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
24+
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
2525
stream_dist_plan: |
2626
Fragment 0
2727
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
@@ -34,12 +34,12 @@
3434
StreamExchange Hash([2, 4, 3]) from 5
3535
3636
Fragment 2
37-
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
37+
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
3838
Upstream
3939
BatchPlanNode
4040
4141
Fragment 3
42-
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
42+
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
4343
Upstream
4444
BatchPlanNode
4545
@@ -68,7 +68,7 @@
6868
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
6969
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
7070
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
71-
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
71+
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
7272
stream_dist_plan: |
7373
Fragment 0
7474
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
@@ -86,7 +86,7 @@
8686
BatchPlanNode
8787
8888
Fragment 3
89-
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
89+
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
9090
Upstream
9191
BatchPlanNode
9292
@@ -114,7 +114,7 @@
114114
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
115115
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
116116
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
117-
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
117+
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
118118
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
119119
stream_dist_plan: |
120120
Fragment 0
@@ -128,7 +128,7 @@
128128
StreamExchange Hash([2, 4, 3]) from 5
129129
130130
Fragment 2
131-
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
131+
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
132132
Upstream
133133
BatchPlanNode
134134

src/frontend/src/optimizer/plan_node/mod.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -524,9 +524,6 @@ impl dyn PlanNode {
524524
if let Some(stream_table_scan) = self.as_stream_table_scan() {
525525
return stream_table_scan.adhoc_to_stream_prost();
526526
}
527-
if let Some(stream_index_scan) = self.as_stream_index_scan() {
528-
return stream_index_scan.adhoc_to_stream_prost();
529-
}
530527
if let Some(stream_share) = self.as_stream_share() {
531528
return stream_share.adhoc_to_stream_prost(state);
532529
}
@@ -660,7 +657,6 @@ mod stream_group_topn;
660657
mod stream_hash_agg;
661658
mod stream_hash_join;
662659
mod stream_hop_window;
663-
mod stream_index_scan;
664660
mod stream_local_simple_agg;
665661
mod stream_materialize;
666662
mod stream_now;
@@ -736,7 +732,6 @@ pub use stream_group_topn::StreamGroupTopN;
736732
pub use stream_hash_agg::StreamHashAgg;
737733
pub use stream_hash_join::StreamHashJoin;
738734
pub use stream_hop_window::StreamHopWindow;
739-
pub use stream_index_scan::StreamIndexScan;
740735
pub use stream_local_simple_agg::StreamLocalSimpleAgg;
741736
pub use stream_materialize::StreamMaterialize;
742737
pub use stream_now::StreamNow;
@@ -835,7 +830,6 @@ macro_rules! for_all_plan_nodes {
835830
, { Stream, TopN }
836831
, { Stream, HopWindow }
837832
, { Stream, DeltaJoin }
838-
, { Stream, IndexScan }
839833
, { Stream, Expand }
840834
, { Stream, DynamicFilter }
841835
, { Stream, ProjectSet }
@@ -936,7 +930,6 @@ macro_rules! for_stream_plan_nodes {
936930
, { Stream, TopN }
937931
, { Stream, HopWindow }
938932
, { Stream, DeltaJoin }
939-
, { Stream, IndexScan }
940933
, { Stream, Expand }
941934
, { Stream, DynamicFilter }
942935
, { Stream, ProjectSet }

src/frontend/src/optimizer/plan_node/stream_delta_join.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,13 @@ impl StreamNode for StreamDeltaJoin {
156156
let left = self.left();
157157
let right = self.right();
158158

159-
let left_table = if let Some(stream_index_scan) = left.as_stream_index_scan() {
160-
stream_index_scan.logical()
161-
} else if let Some(stream_table_scan) = left.as_stream_table_scan() {
159+
let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
162160
stream_table_scan.logical()
163161
} else {
164162
unreachable!();
165163
};
166164
let left_table_desc = left_table.table_desc();
167-
let right_table = if let Some(stream_index_scan) = right.as_stream_index_scan() {
168-
stream_index_scan.logical()
169-
} else if let Some(stream_table_scan) = right.as_stream_table_scan() {
165+
let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
170166
stream_table_scan.logical()
171167
} else {
172168
unreachable!();

src/frontend/src/optimizer/plan_node/stream_index_scan.rs

Lines changed: 0 additions & 233 deletions
This file was deleted.

0 commit comments

Comments
 (0)