Skip to content

Commit 18863e0

Browse files
authored
refactor(frontend): replace dist_key_indices with dist_key_in_pk_indices in frontend (risingwavelabs#8617)
1 parent 961e342 commit 18863e0

File tree

15 files changed

+75
-51
lines changed

15 files changed

+75
-51
lines changed

dashboard/proto/gen/plan_common.ts

Lines changed: 12 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/plan_common.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ message StorageTableDesc {
3838
repeated ColumnDesc columns = 2;
3939
// TODO: may refactor primary key representations
4040
repeated common.ColumnOrder pk = 3;
41-
repeated uint32 dist_key_indices = 4;
41+
repeated uint32 dist_key_in_pk_indices = 4;
4242
uint32 retention_seconds = 5;
4343
repeated uint32 value_indices = 6;
4444
uint32 read_prefix_len_hint = 7;

src/batch/src/executor/join/distributed_lookup_join.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,13 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
195195
.map(|k| k.column_index as usize)
196196
.collect_vec();
197197

198-
let dist_key_indices = table_desc
199-
.dist_key_indices
198+
let dist_key_in_pk_indices = table_desc
199+
.dist_key_in_pk_indices
200200
.iter()
201201
.map(|&k| k as usize)
202202
.collect_vec();
203203
// Lookup Join always contains distribution key, so we don't need vnode bitmap
204-
let distribution = Distribution::all_vnodes(dist_key_indices);
204+
let distribution = Distribution::all_vnodes(dist_key_in_pk_indices);
205205
let table_option = TableOption {
206206
retention_seconds: if table_desc.retention_seconds > 0 {
207207
Some(table_desc.retention_seconds)

src/batch/src/executor/join/local_lookup_join.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,15 @@ pub type BoxedLookupExecutorBuilder = Box<dyn LookupExecutorBuilder>;
7777
impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
7878
/// Gets the virtual node based on the given `scan_range`
7979
fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
80-
let dist_keys = self
80+
let dist_key_in_pk_indices = self
8181
.table_desc
82-
.dist_key_indices
82+
.dist_key_in_pk_indices
8383
.iter()
8484
.map(|&k| k as usize)
8585
.collect_vec();
86-
let pk_indices = self
87-
.table_desc
88-
.pk
89-
.iter()
90-
.map(|col| col.column_index as usize)
91-
.collect_vec();
9286

93-
let virtual_node = scan_range.try_compute_vnode(&dist_keys, &pk_indices);
87+
let virtual_node =
88+
scan_range.try_compute_vnode_with_dist_key_in_pk_indices(&dist_key_in_pk_indices);
9489
virtual_node.ok_or_else(|| internal_error("Could not compute vnode for lookup join"))
9590
}
9691

src/batch/src/executor/row_seq_scan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,19 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
202202
.map(|k| k.column_index as usize)
203203
.collect_vec();
204204

205-
let dist_key_indices = table_desc
206-
.dist_key_indices
205+
let dist_key_in_pk_indices = table_desc
206+
.dist_key_in_pk_indices
207207
.iter()
208208
.map(|&k| k as usize)
209209
.collect_vec();
210210
let distribution = match &seq_scan_node.vnode_bitmap {
211211
Some(vnodes) => Distribution {
212212
vnodes: Bitmap::from(vnodes).into(),
213-
dist_key_indices,
213+
dist_key_in_pk_indices,
214214
},
215215
// This is possible for dml. vnode_bitmap is not filled by scheduler.
216216
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
217-
None => Distribution::all_vnodes(dist_key_indices),
217+
None => Distribution::all_vnodes(dist_key_in_pk_indices),
218218
};
219219

220220
let table_option = TableOption {

src/common/src/catalog/physical_table.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::HashMap;
1616

1717
use fixedbitset::FixedBitSet;
18+
use itertools::Itertools;
1819
use risingwave_pb::common::PbColumnOrder;
1920
use risingwave_pb::plan_common::StorageTableDesc;
2021

@@ -77,11 +78,32 @@ impl TableDesc {
7778
}
7879

7980
pub fn to_protobuf(&self) -> StorageTableDesc {
81+
let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
82+
let pk_indices: Vec<u32> = self
83+
.pk
84+
.iter()
85+
.map(|v| v.to_protobuf().column_index)
86+
.collect();
87+
let dist_key_in_pk_indices = dist_key_indices
88+
.iter()
89+
.map(|&di| {
90+
pk_indices
91+
.iter()
92+
.position(|&pi| di == pi)
93+
.unwrap_or_else(|| {
94+
panic!(
95+
"distribution key {:?} must be a subset of primary key {:?}",
96+
dist_key_indices, pk_indices
97+
)
98+
})
99+
})
100+
.map(|d| d as u32)
101+
.collect_vec();
80102
StorageTableDesc {
81103
table_id: self.table_id.into(),
82104
columns: self.columns.iter().map(Into::into).collect(),
83105
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
84-
dist_key_indices: self.distribution_key.iter().map(|&k| k as u32).collect(),
106+
dist_key_in_pk_indices,
85107
retention_seconds: self.retention_seconds,
86108
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
87109
read_prefix_len_hint: self.read_prefix_len_hint as u32,

src/common/src/util/scan_range.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,21 @@ impl ScanRange {
9595
}
9696

9797
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(dist_key_indices, pk_indices);
98+
self.try_compute_vnode_with_dist_key_in_pk_indices(&dist_key_in_pk_indices)
99+
}
100+
101+
pub fn try_compute_vnode_with_dist_key_in_pk_indices(
102+
&self,
103+
dist_key_in_pk_indices: &[usize],
104+
) -> Option<VirtualNode> {
98105
let pk_prefix_len = self.eq_conds.len();
99106
if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) {
100107
return None;
101108
}
102109

103110
let pk_prefix_value = &self.eq_conds;
104111
let vnode = pk_prefix_value
105-
.project(&dist_key_in_pk_indices)
112+
.project(dist_key_in_pk_indices)
106113
.hash(Crc32FastBuilder)
107114
.to_vnode();
108115
Some(vnode)

src/frontend/src/scheduler/distributed/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ pub(crate) mod tests {
541541
field_descs: vec![],
542542
},
543543
],
544-
distribution_key: vec![2],
544+
distribution_key: vec![],
545545
append_only: false,
546546
retention_seconds: TABLE_OPTION_DUMMY_RETENTION_SECOND,
547547
value_indices: vec![0, 1, 2],

src/storage/src/table/batch_table/storage_table.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use futures::{Stream, StreamExt};
2525
use futures_async_stream::try_stream;
2626
use itertools::{Either, Itertools};
2727
use risingwave_common::buffer::Bitmap;
28-
use risingwave_common::catalog::{
29-
get_dist_key_in_pk_indices, ColumnDesc, ColumnId, Schema, TableId, TableOption,
30-
};
28+
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
3129
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
3230
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
3331
use risingwave_common::util::ordered::*;
@@ -183,7 +181,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
183181
order_types: Vec<OrderType>,
184182
pk_indices: Vec<usize>,
185183
Distribution {
186-
dist_key_indices,
184+
dist_key_in_pk_indices,
187185
vnodes,
188186
}: Distribution,
189187
table_option: TableOption,
@@ -244,7 +242,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
244242
}
245243
};
246244

247-
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices);
248245
let key_output_indices = match key_output_indices.is_empty() {
249246
true => None,
250247
false => Some(key_output_indices),

src/storage/src/table/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO;
3333
#[derive(Debug)]
3434
pub struct Distribution {
3535
/// Indices of distribution key for computing vnode, based on the all columns of the table.
36-
pub dist_key_indices: Vec<usize>,
36+
pub dist_key_in_pk_indices: Vec<usize>,
3737

3838
/// Virtual nodes that the table is partitioned into.
3939
pub vnodes: Arc<Bitmap>,
@@ -49,7 +49,7 @@ impl Distribution {
4949
vnodes.finish().into()
5050
});
5151
Self {
52-
dist_key_indices: vec![],
52+
dist_key_in_pk_indices: vec![],
5353
vnodes: FALLBACK_VNODES.clone(),
5454
}
5555
}
@@ -66,12 +66,12 @@ impl Distribution {
6666
}
6767

6868
/// Distribution that accesses all vnodes, mainly used for tests.
69-
pub fn all_vnodes(dist_key_indices: Vec<usize>) -> Self {
69+
pub fn all_vnodes(dist_key_in_pk_indices: Vec<usize>) -> Self {
7070
/// A bitmap that all vnodes are set.
7171
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
7272
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
7373
Self {
74-
dist_key_indices,
74+
dist_key_in_pk_indices,
7575
vnodes: ALL_VNODES.clone(),
7676
}
7777
}

src/stream/src/common/table/state_table.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ where
400400
order_types: Vec<OrderType>,
401401
pk_indices: Vec<usize>,
402402
Distribution {
403-
dist_key_indices,
403+
dist_key_in_pk_indices,
404404
vnodes,
405405
}: Distribution,
406406
value_indices: Option<Vec<usize>>,
@@ -439,7 +439,6 @@ where
439439
.collect_vec(),
440440
None => table_columns.iter().map(|c| c.column_id).collect_vec(),
441441
};
442-
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices);
443442
Self {
444443
table_id,
445444
local_store: local_state_store,

src/stream/src/from_proto/batch_query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ impl ExecutorBuilder for BatchQueryExecutorBuilder {
6666
.map(|k| k.column_index as usize)
6767
.collect_vec();
6868

69-
let dist_key_indices = table_desc
70-
.dist_key_indices
69+
let dist_key_in_pk_indices = table_desc
70+
.dist_key_in_pk_indices
7171
.iter()
7272
.map(|&k| k as usize)
7373
.collect_vec();
7474
let distribution = match params.vnode_bitmap {
7575
Some(vnodes) => Distribution {
76-
dist_key_indices,
76+
dist_key_in_pk_indices,
7777
vnodes: vnodes.into(),
7878
},
7979
None => Distribution::fallback(),

src/stream/src/from_proto/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,14 @@ impl ExecutorBuilder for ChainExecutorBuilder {
105105
.map(|k| k.column_index as usize)
106106
.collect_vec();
107107

108-
let dist_key_indices = table_desc
109-
.dist_key_indices
108+
let dist_key_in_pk_indices = table_desc
109+
.dist_key_in_pk_indices
110110
.iter()
111111
.map(|&k| k as usize)
112112
.collect_vec();
113113
let distribution = match params.vnode_bitmap {
114114
Some(vnodes) => Distribution {
115-
dist_key_indices,
115+
dist_key_in_pk_indices,
116116
vnodes: vnodes.into(),
117117
},
118118
None => Distribution::fallback(),

src/stream/src/from_proto/lookup.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ impl ExecutorBuilder for LookupExecutorBuilder {
8282
.map(|k| k.column_index as usize)
8383
.collect_vec();
8484

85-
let dist_key_indices = table_desc
86-
.dist_key_indices
85+
let dist_key_in_pk_indices = table_desc
86+
.dist_key_in_pk_indices
8787
.iter()
8888
.map(|&k| k as usize)
8989
.collect_vec();
9090
let distribution = match params.vnode_bitmap {
9191
Some(vnodes) => Distribution {
92-
dist_key_indices,
92+
dist_key_in_pk_indices,
9393
vnodes: vnodes.into(),
9494
},
9595
None => Distribution::fallback(),

src/stream/src/from_proto/temporal_join.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder {
6464
.map(|k| k.column_index as usize)
6565
.collect_vec();
6666

67-
let dist_key_indices = table_desc
68-
.dist_key_indices
67+
let dist_key_in_pk_indices = table_desc
68+
.dist_key_in_pk_indices
6969
.iter()
7070
.map(|&k| k as usize)
7171
.collect_vec();
7272
let distribution = match params.vnode_bitmap.clone() {
7373
Some(vnodes) => Distribution {
74-
dist_key_indices,
74+
dist_key_in_pk_indices,
7575
vnodes: vnodes.into(),
7676
},
7777
None => Distribution::fallback(),

0 commit comments

Comments
 (0)