Skip to content

Commit c3e988b

Browse files
authored
perf(over window): locality-based range cache for general over window executor (#11576)
Signed-off-by: Richard Chien <[email protected]>
1 parent 448d281 commit c3e988b

File tree

16 files changed

+2006
-606
lines changed

16 files changed

+2006
-606
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/stream_plan.proto

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,12 +591,20 @@ message EowcOverWindowNode {
591591
catalog.Table state_table = 4;
592592
}
593593

594-
// Note this is not exactly the same as EowcOverWindowNode in terms of future extension, so better to split as 2 different nodes.
594+
enum OverWindowCachePolicy {
595+
OVER_WINDOW_CACHE_POLICY_UNSPECIFIED = 0;
596+
OVER_WINDOW_CACHE_POLICY_FULL = 1;
597+
OVER_WINDOW_CACHE_POLICY_RECENT = 2;
598+
OVER_WINDOW_CACHE_POLICY_RECENT_FIRST_N = 3;
599+
OVER_WINDOW_CACHE_POLICY_RECENT_LAST_N = 4;
600+
}
601+
595602
message OverWindowNode {
596603
repeated expr.WindowFunction calls = 1;
597604
repeated uint32 partition_by = 2;
598605
repeated common.ColumnOrder order_by = 3;
599606
catalog.Table state_table = 4;
607+
OverWindowCachePolicy cache_policy = 5;
600608
}
601609

602610
message StreamNode {

src/common/src/row/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,16 @@ pub trait Row: Sized + std::fmt::Debug + PartialEq + Eq {
124124
/// Determines whether the datums of this row are equal to those of another.
125125
#[inline]
126126
fn eq(this: &Self, other: impl Row) -> bool {
127-
this.iter().eq(other.iter())
127+
if this.len() != other.len() {
128+
return false;
129+
}
130+
for i in (0..this.len()).rev() {
131+
// compare from the end to the start, as it's more likely to have same prefix
132+
if this.datum_at(i) != other.datum_at(i) {
133+
return false;
134+
}
135+
}
136+
true
128137
}
129138
}
130139

src/common/src/session_config/mod.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod over_window;
1516
mod query_mode;
1617
mod search_path;
1718
mod transaction_isolation_level;
@@ -23,6 +24,7 @@ use std::ops::Deref;
2324
use chrono_tz::Tz;
2425
use educe::{self, Educe};
2526
use itertools::Itertools;
27+
pub use over_window::OverWindowCachePolicy;
2628
pub use query_mode::QueryMode;
2729
pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
2830
use tracing::info;
@@ -34,7 +36,7 @@ use crate::util::epoch::Epoch;
3436

3537
// This is a hack, &'static str is not allowed as a const generics argument.
3638
// TODO: refine this using the adt_const_params feature.
37-
const CONFIG_KEYS: [&str; 36] = [
39+
const CONFIG_KEYS: [&str; 37] = [
3840
"RW_IMPLICIT_FLUSH",
3941
"CREATE_COMPACTION_GROUP_FOR_MV",
4042
"QUERY_MODE",
@@ -71,6 +73,7 @@ const CONFIG_KEYS: [&str; 36] = [
7173
"STANDARD_CONFORMING_STRINGS",
7274
"RW_STREAMING_RATE_LIMIT",
7375
"CDC_BACKFILL",
76+
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
7477
];
7578

7679
// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
@@ -111,6 +114,7 @@ const ROW_SECURITY: usize = 32;
111114
const STANDARD_CONFORMING_STRINGS: usize = 33;
112115
const RW_STREAMING_RATE_LIMIT: usize = 34;
113116
const CDC_BACKFILL: usize = 35;
117+
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;
114118

115119
trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
116120
fn entry_name() -> &'static str;
@@ -478,6 +482,10 @@ pub struct ConfigMap {
478482
streaming_rate_limit: StreamingRateLimit,
479483

480484
cdc_backfill: CdcBackfill,
485+
486+
/// Cache policy for partition cache in streaming over window.
487+
/// Can be "full", "recent", "recent_first_n" or "recent_last_n".
488+
streaming_over_window_cache_policy: OverWindowCachePolicy,
481489
}
482490

483491
impl ConfigMap {
@@ -593,6 +601,8 @@ impl ConfigMap {
593601
self.streaming_rate_limit = val.as_slice().try_into()?;
594602
} else if key.eq_ignore_ascii_case(CdcBackfill::entry_name()) {
595603
self.cdc_backfill = val.as_slice().try_into()?
604+
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
605+
self.streaming_over_window_cache_policy = val.as_slice().try_into()?;
596606
} else {
597607
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
598608
}
@@ -678,6 +688,8 @@ impl ConfigMap {
678688
Ok(self.streaming_rate_limit.to_string())
679689
} else if key.eq_ignore_ascii_case(CdcBackfill::entry_name()) {
680690
Ok(self.cdc_backfill.to_string())
691+
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
692+
Ok(self.streaming_over_window_cache_policy.to_string())
681693
} else {
682694
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
683695
}
@@ -864,7 +876,12 @@ impl ConfigMap {
864876
name: CdcBackfill::entry_name().to_lowercase(),
865877
setting: self.cdc_backfill.to_string(),
866878
description: String::from("Enable backfill for CDC table to allow lock-free and incremental snapshot"),
867-
}
879+
},
880+
VariableInfo{
881+
name: OverWindowCachePolicy::entry_name().to_lowercase(),
882+
setting: self.streaming_over_window_cache_policy.to_string(),
883+
description: String::from(r#"Cache policy for partition cache in streaming over window. Can be "full", "recent", "recent_first_n" or "recent_last_n"."#),
884+
},
868885
]
869886
}
870887

@@ -999,4 +1016,8 @@ impl ConfigMap {
9991016
pub fn get_cdc_backfill(&self) -> bool {
10001017
self.cdc_backfill.0
10011018
}
1019+
1020+
pub fn get_streaming_over_window_cache_policy(&self) -> OverWindowCachePolicy {
1021+
self.streaming_over_window_cache_policy
1022+
}
10021023
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::str::FromStr;
16+
17+
use enum_as_inner::EnumAsInner;
18+
use parse_display::{Display, FromStr};
19+
use risingwave_pb::stream_plan::PbOverWindowCachePolicy;
20+
21+
use super::{ConfigEntry, CONFIG_KEYS, STREAMING_OVER_WINDOW_CACHE_POLICY};
22+
use crate::error::ErrorCode::{self, InvalidConfigValue};
23+
use crate::error::RwError;
24+
25+
#[derive(Copy, Default, Debug, Clone, PartialEq, Eq, FromStr, Display, EnumAsInner)]
26+
#[display(style = "snake_case")]
27+
pub enum OverWindowCachePolicy {
28+
/// Cache all entries.
29+
#[default]
30+
Full,
31+
/// Cache only recently accessed range of entries.
32+
Recent,
33+
/// Cache only the first N entries in recently accessed range.
34+
RecentFirstN,
35+
/// Cache only the last N entries in recently accessed range.
36+
RecentLastN,
37+
}
38+
39+
impl TryFrom<&[&str]> for OverWindowCachePolicy {
40+
type Error = RwError;
41+
42+
fn try_from(value: &[&str]) -> Result<Self, Self::Error> {
43+
if value.len() != 1 {
44+
return Err(ErrorCode::InternalError(format!(
45+
"SET {} takes only one argument",
46+
Self::entry_name()
47+
))
48+
.into());
49+
}
50+
51+
let s = value[0].to_ascii_lowercase().replace('-', "_");
52+
OverWindowCachePolicy::from_str(&s).map_err(|_| {
53+
InvalidConfigValue {
54+
config_entry: Self::entry_name().to_string(),
55+
config_value: s.to_string(),
56+
}
57+
.into()
58+
})
59+
}
60+
}
61+
62+
impl ConfigEntry for OverWindowCachePolicy {
63+
fn entry_name() -> &'static str {
64+
CONFIG_KEYS[STREAMING_OVER_WINDOW_CACHE_POLICY]
65+
}
66+
}
67+
68+
impl OverWindowCachePolicy {
69+
pub fn to_protobuf(self) -> PbOverWindowCachePolicy {
70+
match self {
71+
Self::Full => PbOverWindowCachePolicy::Full,
72+
Self::Recent => PbOverWindowCachePolicy::Recent,
73+
Self::RecentFirstN => PbOverWindowCachePolicy::RecentFirstN,
74+
Self::RecentLastN => PbOverWindowCachePolicy::RecentLastN,
75+
}
76+
}
77+
78+
pub fn from_protobuf(pb: PbOverWindowCachePolicy) -> Self {
79+
match pb {
80+
PbOverWindowCachePolicy::Unspecified => Self::default(),
81+
PbOverWindowCachePolicy::Full => Self::Full,
82+
PbOverWindowCachePolicy::Recent => Self::Recent,
83+
PbOverWindowCachePolicy::RecentFirstN => Self::RecentFirstN,
84+
PbOverWindowCachePolicy::RecentLastN => Self::RecentLastN,
85+
}
86+
}
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use super::*;
92+
93+
#[test]
94+
fn parse_over_window_cache_policy() {
95+
assert_eq!(
96+
OverWindowCachePolicy::try_from(["full"].as_slice()).unwrap(),
97+
OverWindowCachePolicy::Full
98+
);
99+
assert_eq!(
100+
OverWindowCachePolicy::try_from(["recent"].as_slice()).unwrap(),
101+
OverWindowCachePolicy::Recent
102+
);
103+
assert_eq!(
104+
OverWindowCachePolicy::try_from(["RECENT"].as_slice()).unwrap(),
105+
OverWindowCachePolicy::Recent
106+
);
107+
assert_eq!(
108+
OverWindowCachePolicy::try_from(["recent_first_n"].as_slice()).unwrap(),
109+
OverWindowCachePolicy::RecentFirstN
110+
);
111+
assert_eq!(
112+
OverWindowCachePolicy::try_from(["recent_last_n"].as_slice()).unwrap(),
113+
OverWindowCachePolicy::RecentLastN
114+
);
115+
assert_eq!(
116+
OverWindowCachePolicy::try_from(["recent-last-n"].as_slice()).unwrap(),
117+
OverWindowCachePolicy::RecentLastN
118+
);
119+
assert_eq!(
120+
OverWindowCachePolicy::try_from(["recent_last_N"].as_slice()).unwrap(),
121+
OverWindowCachePolicy::RecentLastN
122+
);
123+
assert!(OverWindowCachePolicy::try_from(["foo"].as_slice()).is_err());
124+
}
125+
}

src/expr/src/window_function/call.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ impl Frame {
7878
exclusion,
7979
}
8080
}
81+
82+
pub fn is_unbounded(&self) -> bool {
83+
self.bounds.is_unbounded()
84+
}
8185
}
8286

8387
impl Frame {
@@ -127,6 +131,10 @@ impl FrameBounds {
127131
Self::Rows(_, end) => matches!(end, FrameBound::UnboundedFollowing),
128132
}
129133
}
134+
135+
pub fn is_unbounded(&self) -> bool {
136+
self.start_is_unbounded() || self.end_is_unbounded()
137+
}
130138
}
131139

132140
impl Display for FrameBounds {

src/frontend/src/optimizer/plan_node/stream_over_window.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,19 @@ impl StreamOverWindow {
5858

5959
let mut order_cols = HashSet::new();
6060
for idx in self.logical.partition_key_indices() {
61-
if !order_cols.contains(&idx) {
61+
if order_cols.insert(idx) {
6262
tbl_builder.add_order_column(idx, OrderType::ascending());
63-
order_cols.insert(idx);
6463
}
6564
}
6665
let read_prefix_len_hint = tbl_builder.get_current_pk_len();
6766
for o in self.logical.order_key() {
68-
if !order_cols.contains(&o.column_index) {
67+
if order_cols.insert(o.column_index) {
6968
tbl_builder.add_order_column(o.column_index, o.order_type);
70-
order_cols.insert(o.column_index);
7169
}
7270
}
73-
for idx in self.logical.input.logical_pk() {
74-
if !order_cols.contains(idx) {
75-
tbl_builder.add_order_column(*idx, OrderType::ascending());
76-
order_cols.insert(*idx);
71+
for &idx in self.logical.input.logical_pk() {
72+
if order_cols.insert(idx) {
73+
tbl_builder.add_order_column(idx, OrderType::ascending());
7774
}
7875
}
7976

@@ -123,12 +120,19 @@ impl StreamNode for StreamOverWindow {
123120
.infer_state_table()
124121
.with_id(state.gen_table_id_wrapped())
125122
.to_internal_table_prost();
123+
let cache_policy = self
124+
.base
125+
.ctx
126+
.session_ctx()
127+
.config()
128+
.get_streaming_over_window_cache_policy();
126129

127130
PbNodeBody::OverWindow(OverWindowNode {
128131
calls,
129132
partition_by,
130133
order_by,
131134
state_table: Some(state_table),
135+
cache_policy: cache_policy.to_protobuf() as _,
132136
})
133137
}
134138
}

src/stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ memcomparable = "0.2"
3838
multimap = "0.8"
3939
num-traits = "0.2"
4040
parking_lot = "0.12"
41+
parse-display = "0.8"
4142
pin-project = "1"
4243
prometheus = { version = "0.13", features = ["process"] }
4344
prost = "0.11"

0 commit comments

Comments
 (0)