Skip to content

Commit 8857a02

Browse files
xx01cyxgithub-actions[bot]mergify[bot]
authored
feat(streaming): introduce sort operator based on watermark (#6085)
* introduce sort executor based on watermark * update comments * Update src/stream/src/executor/sort.rs Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Update src/stream/src/from_proto/sort.rs Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * insert to state table and commit only when barrier is checkpoint * update in-memory buffer when scaling occurs * support fail over * update comment * apply suggestions from PR comments * use data chunk builder instead of stream chunk builder * update dashboard proto * update comments and remove dead code Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent ffa96ba commit 8857a02

File tree

9 files changed

+691
-3
lines changed

9 files changed

+691
-3
lines changed

dashboard/proto/gen/stream_plan.ts

Lines changed: 49 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/stream_plan.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,14 @@ message ProjectSetNode {
395395
repeated expr.ProjectSetSelectItem select_list = 1;
396396
}
397397

398+
// Sorts inputs and outputs ordered data based on watermark.
399+
message SortNode {
400+
// Persists data above watermark.
401+
catalog.Table state_table = 1;
402+
// Column index of watermark to perform sorting.
403+
uint32 sort_column_index = 2;
404+
}
405+
398406
message StreamNode {
399407
oneof node_body {
400408
SourceNode source = 100;
@@ -422,6 +430,7 @@ message StreamNode {
422430
DynamicFilterNode dynamic_filter = 122;
423431
ProjectSetNode project_set = 123;
424432
GroupTopNNode group_top_n = 124;
433+
SortNode sort = 125;
425434
}
426435
// The id for the operator. This is local per mview.
427436
// TODO: should better be a uint32.

src/common/src/buffer/bitmap.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,21 @@ impl Bitmap {
264264
num_bits: self.num_bits,
265265
}
266266
}
267+
268+
/// Performs bitwise saturate subtract on two equal-length bitmaps.
269+
///
270+
/// For example, lhs = [01110] and rhs = [00111], then
271+
/// `bit_saturate_subtract(lhs, rhs)` results in [01000]
272+
pub fn bit_saturate_subtract(lhs: &Bitmap, rhs: &Bitmap) -> Bitmap {
273+
assert_eq!(lhs.num_bits, rhs.num_bits);
274+
let bits = lhs
275+
.bits
276+
.iter()
277+
.zip_eq(rhs.bits.iter())
278+
.map(|(&a, &b)| (!(a & b)) & a)
279+
.collect();
280+
Bitmap::from_bytes_with_num_bits(bits, lhs.num_bits)
281+
}
267282
}
268283

269284
impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap {
@@ -464,6 +479,16 @@ mod tests {
464479
);
465480
}
466481

482+
#[test]
483+
fn test_bitwise_saturate_subtract() {
484+
let bitmap1 = Bitmap::from_bytes(Bytes::from_static(&[0b01101010]));
485+
let bitmap2 = Bitmap::from_bytes(Bytes::from_static(&[0b01001110]));
486+
assert_eq!(
487+
Bitmap::from_bytes(Bytes::from_static(&[0b00100000])),
488+
Bitmap::bit_saturate_subtract(&bitmap1, &bitmap2)
489+
);
490+
}
491+
467492
#[test]
468493
fn test_bitmap_is_set() {
469494
let bitmap = Bitmap::from_bytes(Bytes::from_static(&[0b01001010]));

src/storage/src/table/streaming_table/state_table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ impl<S: StateStore> StateTable<S> {
350350
}
351351

352352
/// Get the vnode value with given (prefix of) primary key
353-
fn compute_vnode(&self, pk_prefix: &Row) -> VirtualNode {
353+
pub fn compute_vnode(&self, pk_prefix: &Row) -> VirtualNode {
354354
let prefix_len = pk_prefix.0.len();
355355
if let Some(vnode_col_idx_in_pk) = self.vnode_col_idx_in_pk {
356356
let vnode = pk_prefix.0.get(vnode_col_idx_in_pk).unwrap();
@@ -371,6 +371,10 @@ impl<S: StateStore> StateTable<S> {
371371
&self.pk_serde
372372
}
373373

374+
pub fn vnode_bitmap(&self) -> &Bitmap {
375+
&self.vnodes
376+
}
377+
374378
pub fn is_dirty(&self) -> bool {
375379
self.mem_table.is_dirty()
376380
}

src/stream/src/executor/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ mod rearranged_chain;
7474
mod receiver;
7575
mod simple;
7676
mod sink;
77+
mod sort;
7778
pub mod source;
7879
pub mod subtask;
7980
mod top_n;
@@ -111,6 +112,7 @@ pub use receiver::ReceiverExecutor;
111112
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
112113
use simple::{SimpleExecutor, SimpleExecutorWrapper};
113114
pub use sink::SinkExecutor;
115+
pub use sort::SortExecutor;
114116
pub use source::*;
115117
pub use top_n::{AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor};
116118
pub use union::UnionExecutor;

0 commit comments

Comments
 (0)