Skip to content

Commit a4bd877

Browse files
authored
chore: remove dist_key_indices in state table and storage table (risingwavelabs#8601)
1 parent c683098 commit a4bd877

File tree

4 files changed

+46
-34
lines changed

4 files changed

+46
-34
lines changed

src/storage/src/table/batch_table/storage_table.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,6 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
8989
// FIXME: revisit constructions and usages.
9090
pk_indices: Vec<usize>,
9191

92-
/// Indices of distribution key for computing vnode.
93-
/// Note that the index is based on the all columns of the table, instead of the output ones.
94-
// FIXME: revisit constructions and usages.
95-
dist_key_indices: Vec<usize>,
96-
9792
/// Indices of distribution key for computing vnode.
9893
/// Note that the index is based on the primary key columns by `pk_indices`.
9994
dist_key_in_pk_indices: Vec<usize>,
@@ -266,7 +261,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
266261
mapping: Arc::new(mapping),
267262
row_serde: Arc::new(row_serde),
268263
pk_indices,
269-
dist_key_indices,
270264
dist_key_in_pk_indices,
271265
vnodes,
272266
table_option,
@@ -592,23 +586,21 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
592586
Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
593587
} else {
594588
trace!(
595-
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}",
589+
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
596590
self.table_id,
597591
pk_prefix,
598-
self.dist_key_indices,
599592
pk_prefix_indices
600593
);
601594
None
602595
};
603596

604597
trace!(
605-
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" ,
598+
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}" ,
606599
self.table_id,
607600
prefix_hint,
608601
start_key,
609602
end_key,
610603
pk_prefix,
611-
self.dist_key_indices,
612604
pk_prefix_indices
613605
);
614606

src/storage/src/table/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod batch_table;
1616

1717
use std::sync::{Arc, LazyLock};
1818

19+
use itertools::Itertools;
1920
use risingwave_common::array::DataChunk;
2021
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
2122
use risingwave_common::catalog::Schema;
@@ -53,6 +54,17 @@ impl Distribution {
5354
}
5455
}
5556

57+
pub fn fallback_vnodes() -> Arc<Bitmap> {
58+
/// A bitmap that only the default vnode is set.
59+
static FALLBACK_VNODES: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
60+
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT);
61+
vnodes.set(DEFAULT_VNODE.to_index(), true);
62+
vnodes.finish().into()
63+
});
64+
65+
FALLBACK_VNODES.clone()
66+
}
67+
5668
/// Distribution that accesses all vnodes, mainly used for tests.
5769
pub fn all_vnodes(dist_key_indices: Vec<usize>) -> Self {
5870
/// A bitmap that all vnodes are set.
@@ -124,14 +136,19 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
124136
/// Get vnode values with `indices` on the given `chunk`.
125137
pub fn compute_chunk_vnode(
126138
chunk: &DataChunk,
127-
indices: &[usize],
139+
dist_key_in_pk_indices: &[usize],
140+
pk_indices: &[usize],
128141
vnodes: &Bitmap,
129142
) -> Vec<VirtualNode> {
130-
if indices.is_empty() {
143+
if dist_key_in_pk_indices.is_empty() {
131144
vec![DEFAULT_VNODE; chunk.capacity()]
132145
} else {
146+
let dist_key_indices = dist_key_in_pk_indices
147+
.iter()
148+
.map(|idx| pk_indices[*idx])
149+
.collect_vec();
133150
chunk
134-
.get_hash_values(indices, Crc32FastBuilder)
151+
.get_hash_values(&dist_key_indices, Crc32FastBuilder)
135152
.into_iter()
136153
.zip_eq_fast(chunk.vis().iter())
137154
.map(|(h, vis)| {

src/stream/src/common/table/state_table.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub struct StateTableInner<
8484
/// Indices of distribution key for computing vnode.
8585
/// Note that the index is based on the all columns of the table, instead of the output ones.
8686
// FIXME: revisit constructions and usages.
87-
dist_key_indices: Vec<usize>,
87+
// dist_key_indices: Vec<usize>,
8888

8989
/// Indices of distribution key for computing vnode.
9090
/// Note that the index is based on the primary key columns by `pk_indices`.
@@ -198,15 +198,10 @@ where
198198
.collect();
199199
let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
200200

201-
let Distribution {
202-
dist_key_indices,
203-
vnodes,
204-
} = match vnodes {
205-
Some(vnodes) => Distribution {
206-
dist_key_indices,
207-
vnodes,
208-
},
209-
None => Distribution::fallback(),
201+
let vnodes = match vnodes {
202+
Some(vnodes) => vnodes,
203+
204+
None => Distribution::fallback_vnodes(),
210205
};
211206
let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
212207
let vnode_col_idx = *idx as usize;
@@ -251,7 +246,6 @@ where
251246
pk_serde,
252247
row_serde,
253248
pk_indices: pk_indices.to_vec(),
254-
dist_key_indices,
255249
dist_key_in_pk_indices,
256250
prefix_hint_len,
257251
vnodes,
@@ -452,7 +446,6 @@ where
452446
pk_serde,
453447
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
454448
pk_indices,
455-
dist_key_indices,
456449
dist_key_in_pk_indices,
457450
prefix_hint_len,
458451
vnodes,
@@ -475,7 +468,7 @@ where
475468
if self.vnode_col_idx_in_pk.is_some() {
476469
false
477470
} else {
478-
self.dist_key_indices.is_empty()
471+
self.dist_key_in_pk_indices.is_empty()
479472
}
480473
}
481474

@@ -503,8 +496,13 @@ where
503496
}
504497

505498
/// Get the vnode value of the given row
506-
pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
507-
compute_vnode(row, &self.dist_key_indices, &self.vnodes)
499+
// pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
500+
// compute_vnode(row, &self.dist_key_indices, &self.vnodes)
501+
// }
502+
503+
/// Get the vnode value of the given row
504+
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
505+
compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes)
508506
}
509507

510508
// TODO: remove, should not be exposed to user
@@ -516,9 +514,9 @@ where
516514
&self.pk_serde
517515
}
518516

519-
pub fn dist_key_indices(&self) -> &[usize] {
520-
&self.dist_key_indices
521-
}
517+
// pub fn dist_key_indices(&self) -> &[usize] {
518+
// &self.dist_key_indices
519+
// }
522520

523521
pub fn vnodes(&self) -> &Arc<Bitmap> {
524522
&self.vnodes
@@ -724,7 +722,12 @@ where
724722
pub fn write_chunk(&mut self, chunk: StreamChunk) {
725723
let (chunk, op) = chunk.into_parts();
726724

727-
let vnodes = compute_chunk_vnode(&chunk, &self.dist_key_indices, &self.vnodes);
725+
let vnodes = compute_chunk_vnode(
726+
&chunk,
727+
&self.dist_key_in_pk_indices,
728+
&self.pk_indices,
729+
&self.vnodes,
730+
);
728731

729732
let value_chunk = if let Some(ref value_indices) = self.value_indices {
730733
chunk.clone().reorder_columns(value_indices)
@@ -984,7 +987,7 @@ where
984987
trace!(
985988
table_id = %self.table_id(),
986989
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
987-
dist_key_indices = ?self.dist_key_indices, ?pk_prefix_indices,
990+
?pk_prefix_indices,
988991
"storage_iter_with_prefix"
989992
);
990993

src/stream/src/executor/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl<S: StateStore> SortExecutor<S> {
247247
let no_longer_owned_vnodes =
248248
Bitmap::bit_saturate_subtract(prev_vnode_bitmap, curr_vnode_bitmap);
249249
self.buffer.retain(|(_, pk), _| {
250-
let vnode = self.state_table.compute_vnode(pk);
250+
let vnode = self.state_table.compute_vnode_by_pk(pk);
251251
!no_longer_owned_vnodes.is_set(vnode.to_index())
252252
});
253253
}

0 commit comments

Comments
 (0)