Skip to content

Commit 014f6db

Browse files
authored
feat: remove the exchange after append_only source (risingwavelabs#8532)
1 parent 19b1be3 commit 014f6db

File tree

27 files changed

+295
-223
lines changed

27 files changed

+295
-223
lines changed

e2e_test/batch/aggregate/sum.slt.part

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ select sum(d) from t;
7777
statement ok
7878
insert into t values (9000000000000000000000000000);
7979

80-
statement error QueryError: Expr error: Numeric out of range
80+
statement error Expr error: Numeric out of range
8181
select sum(d) from t;
8282

8383
statement ok

src/batch/src/executor/insert.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ use std::iter::repeat;
1717
use anyhow::Context;
1818
use futures::future::try_join_all;
1919
use futures_async_stream::try_stream;
20-
use risingwave_common::array::{
21-
ArrayBuilder, DataChunk, I64Array, Op, PrimitiveArrayBuilder, StreamChunk,
22-
};
20+
use risingwave_common::array::serial_array::SerialArray;
21+
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
2322
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
2423
use risingwave_common::error::{Result, RwError};
2524
use risingwave_common::types::DataType;
@@ -73,7 +72,7 @@ impl InsertExecutor {
7372
table_schema
7473
} else {
7574
Schema {
76-
fields: vec![Field::unnamed(DataType::Int64)],
75+
fields: vec![Field::unnamed(DataType::Serial)],
7776
}
7877
},
7978
identity,
@@ -123,7 +122,7 @@ impl InsertExecutor {
123122
// If the user does not specify the primary key, then we need to add a column as the
124123
// primary key.
125124
if let Some(row_id_index) = self.row_id_index {
126-
let row_id_col = I64Array::from_iter(repeat(None).take(cap));
125+
let row_id_col = SerialArray::from_iter(repeat(None).take(cap));
127126
columns.insert(row_id_index, row_id_col.into())
128127
}
129128

@@ -244,7 +243,7 @@ mod tests {
244243
// Schema of the table
245244
let mut schema = schema_test_utils::ii();
246245
schema.fields.push(struct_field);
247-
schema.fields.push(Field::unnamed(DataType::Int64)); // row_id column
246+
schema.fields.push(Field::unnamed(DataType::Serial)); // row_id column
248247

249248
let row_id_index = Some(3);
250249

src/batch/src/task/consistent_hash_shuffle_channel.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use anyhow::anyhow;
2121
use itertools::Itertools;
2222
use risingwave_common::array::DataChunk;
2323
use risingwave_common::buffer::Bitmap;
24-
use risingwave_common::util::hash_util::Crc32FastBuilder;
24+
use risingwave_common::hash::VirtualNode;
2525
use risingwave_pb::batch_plan::exchange_info::ConsistentHashInfo;
2626
use risingwave_pb::batch_plan::*;
2727
use tokio::sync::mpsc;
@@ -54,20 +54,20 @@ fn generate_hash_values(
5454
chunk: &DataChunk,
5555
consistent_hash_info: &ConsistentHashInfo,
5656
) -> BatchResult<Vec<usize>> {
57-
let hasher_builder = Crc32FastBuilder;
58-
59-
let hash_values = chunk
60-
.get_hash_values(
61-
&consistent_hash_info
62-
.key
63-
.iter()
64-
.map(|idx| *idx as usize)
65-
.collect::<Vec<_>>(),
66-
hasher_builder,
67-
)
68-
.iter_mut()
69-
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode().to_index()] as usize)
57+
let vnodes = VirtualNode::compute_chunk(
58+
chunk,
59+
&consistent_hash_info
60+
.key
61+
.iter()
62+
.map(|idx| *idx as usize)
63+
.collect::<Vec<_>>(),
64+
);
65+
66+
let hash_values = vnodes
67+
.iter()
68+
.map(|vnode| consistent_hash_info.vmap[vnode.to_index()] as usize)
7069
.collect::<Vec<_>>();
70+
7171
Ok(hash_values)
7272
}
7373

src/common/src/array/data_chunk.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ pub trait DataChunkTestExt {
479479
/// // f: f32
480480
/// // T: str
481481
/// // TS: Timestamp
482+
/// // SRL: Serial
482483
/// // {i,f}: struct
483484
/// ```
484485
fn from_pretty(s: &str) -> Self;
@@ -504,6 +505,7 @@ impl DataChunkTestExt for DataChunk {
504505
"TS" => DataType::Timestamp,
505506
"TSZ" => DataType::Timestamptz,
506507
"T" => DataType::Varchar,
508+
"SRL" => DataType::Serial,
507509
array if array.starts_with('{') && array.ends_with('}') => {
508510
DataType::Struct(Arc::new(StructType {
509511
fields: array[1..array.len() - 1]
@@ -565,6 +567,12 @@ impl DataChunkTestExt for DataChunk {
565567
))
566568
}
567569
ArrayBuilderImpl::Utf8(_) => ScalarImpl::Utf8(s.into()),
570+
ArrayBuilderImpl::Serial(_) => ScalarImpl::Serial(
571+
s.parse::<i64>()
572+
.map_err(|_| panic!("invalid serial: {s:?}"))
573+
.unwrap()
574+
.into(),
575+
),
568576
ArrayBuilderImpl::Struct(builder) => {
569577
assert!(s.starts_with('{') && s.ends_with('}'));
570578
let fields = s[1..s.len() - 1]
@@ -641,7 +649,6 @@ impl DataChunkTestExt for DataChunk {
641649

642650
#[cfg(test)]
643651
mod tests {
644-
645652
use crate::array::*;
646653
use crate::row::Row;
647654
use crate::{column, column_nonnull};

src/common/src/array/serial_array.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use postgres_types::{ToSql as _, Type};
1818
use serde::{Serialize, Serializer};
1919

2020
use crate::array::{PrimitiveArray, PrimitiveArrayBuilder};
21+
use crate::util::row_id::RowId;
2122

2223
// Serial is an alias for i64
2324
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
@@ -37,6 +38,11 @@ impl Serial {
3738
pub fn into_inner(self) -> i64 {
3839
self.0
3940
}
41+
42+
#[inline]
43+
pub fn as_row_id(self) -> RowId {
44+
self.0 as RowId
45+
}
4046
}
4147

4248
impl Serialize for Serial {

src/common/src/array/stream_chunk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ impl StreamChunkTestExt for StreamChunk {
338338
/// // T: str
339339
/// // TS: Timestamp
340340
/// // TSZ: Timestamptz
341+
/// // SRL: Serial
341342
/// // {i,f}: struct
342343
/// ```
343344
fn from_pretty(s: &str) -> Self {

src/common/src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub const USER_COLUMN_ID_OFFSET: i32 = ROW_ID_COLUMN_ID.next().get_id();
8383
/// Creates a row ID column (for implicit primary key). It'll always have the ID `0` for now.
8484
pub fn row_id_column_desc() -> ColumnDesc {
8585
ColumnDesc {
86-
data_type: DataType::Int64,
86+
data_type: DataType::Serial,
8787
column_id: ROW_ID_COLUMN_ID,
8888
name: row_id_column_name(),
8989
field_descs: vec![],

src/common/src/hash/consistent_hash/vnode.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use itertools::Itertools;
1516
use parse_display::Display;
1617

18+
use crate::array::{Array, ArrayImpl, DataChunk};
1719
use crate::hash::HashCode;
20+
use crate::row::{Row, RowExt};
21+
use crate::types::ScalarRefImpl;
22+
use crate::util::hash_util::Crc32FastBuilder;
23+
use crate::util::row_id::extract_vnode_id_from_row_id;
1824

1925
/// Parallel unit is the minimal scheduling unit.
2026
// TODO: make it a newtype
@@ -100,3 +106,78 @@ impl VirtualNode {
100106
(0..Self::COUNT).map(Self::from_index)
101107
}
102108
}
109+
110+
impl VirtualNode {
111+
// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
112+
// chunk. When only one column is provided and its type is `Serial`, we consider the column to
113+
// be the one that contains RowId, and use a special method to skip the calculation of Hash
114+
// and directly extract the `VirtualNode` from `RowId`.
115+
pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
116+
if let Ok(idx) = keys.iter().exactly_one() &&
117+
let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref() {
118+
119+
return serial_array.iter()
120+
.map(|serial|extract_vnode_id_from_row_id(serial.unwrap().as_row_id()))
121+
.collect();
122+
}
123+
124+
data_chunk
125+
.get_hash_values(keys, Crc32FastBuilder)
126+
.into_iter()
127+
.map(|hash| hash.into())
128+
.collect()
129+
}
130+
131+
// `compute_row` is used to calculate the `VirtualNode` for the corresponding column in a `Row`.
132+
// Similar to `compute_chunk`, it also contains special handling for serial columns.
133+
pub fn compute_row(row: impl Row, indices: &[usize]) -> VirtualNode {
134+
let project = row.project(indices);
135+
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
136+
return extract_vnode_id_from_row_id(s.as_row_id());
137+
}
138+
139+
project.hash(Crc32FastBuilder).into()
140+
}
141+
}
142+
143+
#[cfg(test)]
144+
mod tests {
145+
use super::*;
146+
use crate::array::DataChunkTestExt;
147+
use crate::row::OwnedRow;
148+
use crate::types::ScalarImpl;
149+
use crate::util::row_id::RowIdGenerator;
150+
151+
#[tokio::test]
152+
async fn test_serial_key_chunk() {
153+
let mut gen = RowIdGenerator::new(100);
154+
let chunk = format!(
155+
"SRL I
156+
{} 1
157+
{} 2",
158+
gen.next().await,
159+
gen.next().await,
160+
);
161+
162+
let chunk = DataChunk::from_pretty(chunk.as_str());
163+
let vnodes = VirtualNode::compute_chunk(&chunk, &[0]);
164+
165+
assert_eq!(
166+
vnodes.as_slice(),
167+
&[VirtualNode::from_index(100), VirtualNode::from_index(100)]
168+
);
169+
}
170+
171+
#[tokio::test]
172+
async fn test_serial_key_row() {
173+
let mut gen = RowIdGenerator::new(100);
174+
let row = OwnedRow::new(vec![
175+
Some(ScalarImpl::Serial(gen.next().await.into())),
176+
Some(ScalarImpl::Int64(12345)),
177+
]);
178+
179+
let vnode = VirtualNode::compute_row(&row, &[0]);
180+
181+
assert_eq!(vnode, VirtualNode::from_index(100));
182+
}
183+
}

src/common/src/hash/key.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use crate::array::{
3636
ListRef, StructRef,
3737
};
3838
use crate::collection::estimate_size::EstimateSize;
39-
use crate::hash::VirtualNode;
4039
use crate::row::{OwnedRow, RowDeserializer};
4140
use crate::types::{
4241
DataType, Decimal, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, OrderedF32,
@@ -60,10 +59,6 @@ impl HashCode {
6059
pub fn hash_code(self) -> u64 {
6160
self.0
6261
}
63-
64-
pub fn to_vnode(self) -> VirtualNode {
65-
VirtualNode::from(self)
66-
}
6762
}
6863

6964
pub trait HashKeySerializer {

src/common/src/util/row_id.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1616

17+
use crate::hash::VirtualNode;
18+
1719
const TIMESTAMP_SHIFT_BITS: u8 = 22;
1820
const VNODE_ID_SHIFT_BITS: u8 = 12;
1921
const SEQUENCE_UPPER_BOUND: u16 = 1 << 12;
@@ -40,6 +42,13 @@ pub struct RowIdGenerator {
4042

4143
pub type RowId = i64;
4244

45+
#[inline]
46+
pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode {
47+
let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32;
48+
assert!(vnode_id < VNODE_ID_UPPER_BOUND);
49+
VirtualNode::from_index(vnode_id as usize)
50+
}
51+
4352
impl RowIdGenerator {
4453
pub fn new(vnode_id: u32) -> Self {
4554
assert!(vnode_id < VNODE_ID_UPPER_BOUND);

src/common/src/util/scan_range.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ use risingwave_pb::batch_plan::ScanRange as ScanRangePb;
2121
use super::value_encoding::serialize_datum;
2222
use crate::catalog::get_dist_key_in_pk_indices;
2323
use crate::hash::VirtualNode;
24-
use crate::row::{Row, RowExt};
2524
use crate::types::{Datum, ScalarImpl};
26-
use crate::util::hash_util::Crc32FastBuilder;
2725
use crate::util::value_encoding::serialize_datum_into;
2826

2927
/// See also [`ScanRangePb`]
@@ -107,11 +105,8 @@ impl ScanRange {
107105
return None;
108106
}
109107

110-
let pk_prefix_value = &self.eq_conds;
111-
let vnode = pk_prefix_value
112-
.project(dist_key_in_pk_indices)
113-
.hash(Crc32FastBuilder)
114-
.to_vnode();
108+
let pk_prefix_value: &[_] = &self.eq_conds;
109+
let vnode = VirtualNode::compute_row(pk_prefix_value, dist_key_in_pk_indices);
115110
Some(vnode)
116111
}
117112
}
@@ -193,13 +188,13 @@ mod tests {
193188
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());
194189

195190
scan_range.eq_conds.push(Some(ScalarImpl::from(514)));
196-
let vnode = OwnedRow::new(vec![
191+
let row = OwnedRow::new(vec![
197192
Some(ScalarImpl::from(114)),
198193
Some(ScalarImpl::from(514)),
199-
])
200-
.project(&[0, 1])
201-
.hash(Crc32FastBuilder)
202-
.to_vnode();
194+
]);
195+
196+
let vnode = VirtualNode::compute_row(&row, &[0, 1]);
197+
203198
assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
204199
}
205200

@@ -219,14 +214,14 @@ mod tests {
219214
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());
220215

221216
scan_range.eq_conds.push(Some(ScalarImpl::from(114514)));
222-
let vnode = OwnedRow::new(vec![
217+
let row = OwnedRow::new(vec![
223218
Some(ScalarImpl::from(114)),
224219
Some(ScalarImpl::from(514)),
225220
Some(ScalarImpl::from(114514)),
226-
])
227-
.project(&[2, 1])
228-
.hash(Crc32FastBuilder)
229-
.to_vnode();
221+
]);
222+
223+
let vnode = VirtualNode::compute_row(&row, &[2, 1]);
224+
230225
assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
231226
}
232227
}

0 commit comments

Comments
 (0)