Skip to content

Commit 213da6a

Browse files
refactor(streaming): top_n managed state use impl Row2 (#6480)
* top_n insert row_ref * top_n insert impl Row2 * ready for review * remove to_owned_row() in top_n * git Merge branch 'main' of https://github.com/singularity-data/risingwave into zehua/reduce_meta_snapshot_lock Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent d772092 commit 213da6a

File tree

6 files changed

+39
-51
lines changed

6 files changed

+39
-51
lines changed

src/stream/src/executor/lookup/cache.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,12 @@ impl LookupCache {
4141
let key = row.row_by_indices(arrange_join_keys);
4242
if let Some(values) = self.data.get_mut(&key) {
4343
// the item is in cache, update it
44-
let value = row.to_owned_row();
4544
match op {
4645
Op::Insert | Op::UpdateInsert => {
47-
values.insert(value);
46+
values.insert(row.into());
4847
}
4948
Op::Delete | Op::UpdateDelete => {
50-
values.remove(&value);
49+
values.remove(&row.into());
5150
}
5251
}
5352
}

src/stream/src/executor/managed_state/top_n/top_n_state.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use futures::{pin_mut, StreamExt};
16-
use risingwave_common::row::Row;
16+
use risingwave_common::row::{Row, Row2};
1717
use risingwave_common::util::epoch::EpochPair;
1818
use risingwave_common::util::ordered::OrderedRowSerde;
1919
use risingwave_common::util::sort_util::OrderType;
@@ -75,11 +75,11 @@ impl<S: StateStore> ManagedTopNState<S> {
7575
}
7676
}
7777

78-
pub fn insert(&mut self, value: Row) {
78+
pub fn insert(&mut self, value: impl Row2) {
7979
self.state_table.insert(value);
8080
}
8181

82-
pub fn delete(&mut self, value: Row) {
82+
pub fn delete(&mut self, value: impl Row2) {
8383
self.state_table.delete(value);
8484
}
8585

@@ -354,7 +354,6 @@ mod tests {
354354
let row4_bytes = serialize_row_to_cache_key(row4.clone(), 1, &cache_key_serde);
355355
let rows = vec![row1, row2, row3, row4];
356356
let ordered_rows = vec![row1_bytes, row2_bytes, row3_bytes, row4_bytes];
357-
358357
managed_state.insert(rows[3].clone());
359358

360359
// now ("ab", 4)

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use async_trait::async_trait;
1919
use risingwave_common::array::{Op, StreamChunk};
2020
use risingwave_common::buffer::Bitmap;
2121
use risingwave_common::catalog::Schema;
22-
use risingwave_common::row::Row;
2322
use risingwave_common::types::Datum;
2423
use risingwave_common::util::epoch::EpochPair;
2524
use risingwave_common::util::ordered::OrderedRowSerde;
@@ -262,40 +261,35 @@ where
262261
let cache_key =
263262
serialize_pk_to_cache_key(pk_row, self.order_by_len, &self.cache_key_serde);
264263

265-
let row = row_ref.to_owned_row();
266-
267-
let mut group_key = Vec::with_capacity(self.group_by.len());
268-
for &col_id in &self.group_by {
269-
group_key.push(row[col_id].clone());
270-
}
271-
let pk_prefix = Row::new(group_key.clone());
264+
let group_key = row_ref.row_by_indices(&self.group_by);
265+
let pk_prefix = group_key.clone();
272266

273267
// If 'self.caches' does not already have a cache for the current group, create a new
274268
// cache for it and insert it into `self.caches`
275-
if !self.caches.contains(&group_key) {
269+
if !self.caches.contains(&group_key.0) {
276270
let mut topn_cache = TopNCache::new(self.offset, self.limit, self.order_by_len);
277271
self.managed_state
278272
.init_topn_cache(Some(&pk_prefix), &mut topn_cache, self.order_by_len)
279273
.await?;
280-
self.caches.insert(group_key, topn_cache);
274+
self.caches.insert(group_key.0, topn_cache);
281275
}
282276
let cache = self.caches.get_mut(&pk_prefix.0).unwrap();
283277

284278
// apply the chunk to state table
285279
match op {
286280
Op::Insert | Op::UpdateInsert => {
287-
self.managed_state.insert(row.clone());
288-
cache.insert(cache_key, row, &mut res_ops, &mut res_rows);
281+
self.managed_state.insert(row_ref.clone());
282+
cache.insert(cache_key, row_ref, &mut res_ops, &mut res_rows);
289283
}
290284

291285
Op::Delete | Op::UpdateDelete => {
292-
self.managed_state.delete(row.clone());
286+
self.managed_state.delete(row_ref.clone());
293287
cache
294288
.delete(
295289
Some(&pk_prefix),
296290
&mut self.managed_state,
297291
cache_key,
298-
row,
292+
row_ref,
299293
&mut res_ops,
300294
&mut res_rows,
301295
)

src/stream/src/executor/top_n/top_n_appendonly.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,9 @@ where
205205
let pk_row = row_ref.row_by_indices(&self.internal_key_indices);
206206
let cache_key =
207207
serialize_pk_to_cache_key(pk_row, self.order_by_len, &self.cache_key_serde);
208-
let row = row_ref.to_owned_row();
209208
self.cache.insert(
210209
cache_key,
211-
row,
210+
row_ref,
212211
&mut res_ops,
213212
&mut res_rows,
214213
&mut self.managed_state,

src/stream/src/executor/top_n/top_n_cache.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use std::cmp::Ordering;
1616
use std::collections::BTreeMap;
1717

1818
use async_trait::async_trait;
19-
use risingwave_common::array::{Op, RowDeserializer};
20-
use risingwave_common::row::{CompactedRow, Row};
19+
use risingwave_common::array::{Op, RowDeserializer, RowRef};
20+
use risingwave_common::row::{CompactedRow, Row, Row2};
2121
use risingwave_storage::StateStore;
2222

2323
use crate::executor::error::StreamExecutorResult;
@@ -73,7 +73,7 @@ pub trait TopNCacheTrait {
7373
fn insert(
7474
&mut self,
7575
cache_key: CacheKey,
76-
row: Row,
76+
row: impl Row2,
7777
res_ops: &mut Vec<Op>,
7878
res_rows: &mut Vec<CompactedRow>,
7979
);
@@ -92,7 +92,7 @@ pub trait TopNCacheTrait {
9292
group_key: Option<&Row>,
9393
managed_state: &mut ManagedTopNState<S>,
9494
cache_key: CacheKey,
95-
row: Row,
95+
row: impl Row2 + Send,
9696
res_ops: &mut Vec<Op>,
9797
res_rows: &mut Vec<CompactedRow>,
9898
) -> StreamExecutorResult<()>;
@@ -170,7 +170,7 @@ impl TopNCacheTrait for TopNCache<false> {
170170
fn insert(
171171
&mut self,
172172
cache_key: CacheKey,
173-
row: Row,
173+
row: impl Row2,
174174
res_ops: &mut Vec<Op>,
175175
res_rows: &mut Vec<CompactedRow>,
176176
) {
@@ -236,7 +236,7 @@ impl TopNCacheTrait for TopNCache<false> {
236236
group_key: Option<&Row>,
237237
managed_state: &mut ManagedTopNState<S>,
238238
cache_key: CacheKey,
239-
row: Row,
239+
row: impl Row2 + Send,
240240
res_ops: &mut Vec<Op>,
241241
res_rows: &mut Vec<CompactedRow>,
242242
) -> StreamExecutorResult<()> {
@@ -314,7 +314,7 @@ impl TopNCacheTrait for TopNCache<true> {
314314
fn insert(
315315
&mut self,
316316
cache_key: CacheKey,
317-
row: Row,
317+
row: impl Row2,
318318
res_ops: &mut Vec<Op>,
319319
res_rows: &mut Vec<CompactedRow>,
320320
) {
@@ -412,7 +412,7 @@ impl TopNCacheTrait for TopNCache<true> {
412412
group_key: Option<&Row>,
413413
managed_state: &mut ManagedTopNState<S>,
414414
cache_key: CacheKey,
415-
row: Row,
415+
row: impl Row2 + Send,
416416
res_ops: &mut Vec<Op>,
417417
res_rows: &mut Vec<CompactedRow>,
418418
) -> StreamExecutorResult<()> {
@@ -490,7 +490,7 @@ pub trait AppendOnlyTopNCacheTrait {
490490
fn insert<S: StateStore>(
491491
&mut self,
492492
cache_key: CacheKey,
493-
row: Row,
493+
row_ref: RowRef<'_>,
494494
res_ops: &mut Vec<Op>,
495495
res_rows: &mut Vec<CompactedRow>,
496496
managed_state: &mut ManagedTopNState<S>,
@@ -503,7 +503,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache<false> {
503503
fn insert<S: StateStore>(
504504
&mut self,
505505
cache_key: CacheKey,
506-
row: Row,
506+
row_ref: RowRef<'_>,
507507
res_ops: &mut Vec<Op>,
508508
res_rows: &mut Vec<CompactedRow>,
509509
managed_state: &mut ManagedTopNState<S>,
@@ -512,11 +512,11 @@ impl AppendOnlyTopNCacheTrait for TopNCache<false> {
512512
if self.is_middle_cache_full() && &cache_key >= self.middle.last_key_value().unwrap().0 {
513513
return Ok(());
514514
}
515-
managed_state.insert(row.clone());
515+
managed_state.insert(row_ref.clone());
516516

517517
// Then insert input row to corresponding cache range according to its order key
518518
if !self.is_low_cache_full() {
519-
self.low.insert(cache_key, (&row).into());
519+
self.low.insert(cache_key, row_ref.into());
520520
return Ok(());
521521
}
522522

@@ -525,10 +525,10 @@ impl AppendOnlyTopNCacheTrait for TopNCache<false> {
525525
&& &cache_key <= low_last.key() {
526526
// Take the last element of `cache.low` and insert input row to it.
527527
let low_last = low_last.remove_entry();
528-
self.low.insert(cache_key, (&row).into());
528+
self.low.insert(cache_key, row_ref.into());
529529
low_last
530530
} else {
531-
(cache_key, (&row).into())
531+
(cache_key, row_ref.into())
532532
};
533533

534534
if !self.is_middle_cache_full() {
@@ -566,7 +566,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache<true> {
566566
fn insert<S: StateStore>(
567567
&mut self,
568568
cache_key: CacheKey,
569-
row: Row,
569+
row_ref: RowRef<'_>,
570570
res_ops: &mut Vec<Op>,
571571
res_rows: &mut Vec<CompactedRow>,
572572
managed_state: &mut ManagedTopNState<S>,
@@ -576,17 +576,15 @@ impl AppendOnlyTopNCacheTrait for TopNCache<true> {
576576
self.low.is_empty(),
577577
"Offset is not supported yet for WITH TIES, so low cache should be empty"
578578
);
579-
580-
let elem_to_compare_with_middle = (cache_key, row);
579+
let elem_to_compare_with_middle = (cache_key, row_ref);
581580

582581
if !self.is_middle_cache_full() {
583-
managed_state.insert(elem_to_compare_with_middle.1.clone());
584-
self.middle.insert(
585-
elem_to_compare_with_middle.0.clone(),
586-
(&elem_to_compare_with_middle.1).into(),
587-
);
582+
let row: CompactedRow = elem_to_compare_with_middle.1.clone().into();
583+
managed_state.insert(elem_to_compare_with_middle.1);
584+
self.middle
585+
.insert(elem_to_compare_with_middle.0.clone(), row.clone());
588586
res_ops.push(Op::Insert);
589-
res_rows.push((&elem_to_compare_with_middle.1).into());
587+
res_rows.push(row);
590588
return Ok(());
591589
}
592590

src/stream/src/executor/top_n/top_n_plain.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,24 +235,23 @@ where
235235
let pk_row = row_ref.row_by_indices(&self.internal_key_indices);
236236
let cache_key =
237237
serialize_pk_to_cache_key(pk_row, self.order_by_len, &self.cache_key_serde);
238-
let row = row_ref.to_owned_row();
239238
match op {
240239
Op::Insert | Op::UpdateInsert => {
241240
// First insert input row to state store
242-
self.managed_state.insert(row.clone());
241+
self.managed_state.insert(row_ref.clone());
243242
self.cache
244-
.insert(cache_key, row, &mut res_ops, &mut res_rows)
243+
.insert(cache_key, row_ref, &mut res_ops, &mut res_rows)
245244
}
246245

247246
Op::Delete | Op::UpdateDelete => {
248247
// First remove the row from state store
249-
self.managed_state.delete(row.clone());
248+
self.managed_state.delete(row_ref.clone());
250249
self.cache
251250
.delete(
252251
None,
253252
&mut self.managed_state,
254253
cache_key,
255-
row,
254+
row_ref,
256255
&mut res_ops,
257256
&mut res_rows,
258257
)

0 commit comments

Comments
 (0)