Skip to content

Commit a008df5

Browse files
committed
Merge branch 'main' of https://github.com/risingwavelabs/risingwave into li0k/storage_intra_picker
2 parents de1fac3 + f1672f7 commit a008df5

File tree

12 files changed

+127
-21
lines changed

12 files changed

+127
-21
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
query I
2+
select array_max(array[1, 2, 3]);
3+
----
4+
3
5+
6+
query I
7+
select array_max(array[2, 3, 5, 2, 4]);
8+
----
9+
5
10+
11+
query I
12+
select array_max(array[114514, 114513]);
13+
----
14+
114514
15+
16+
query I
17+
select array_max(array['a', 'b', 'c', 'a']);
18+
----
19+
c
20+
21+
query I
22+
select array_max(array['e💩a', 'f🤔️b', 'c🥵c', 'd🥳d', 'e💩e']);
23+
----
24+
f🤔️b
25+
26+
query I
27+
select array_max(array['2c😅🤔😅️c2', '114🥵514', '30🤣🥳03', '5🥵💩💩🥵5']);
28+
----
29+
5🥵💩💩🥵5
30+
31+
query error invalid digit found in string
32+
select array_max(array['a', 'b', 'c', 114514]);
33+
34+
query error invalid digit found in string
35+
select array_max(array[114514, 'a', 'b', 'c']);
36+
37+
# i32::MIN & i32::MIN - 1 & i32::MAX
38+
query I
39+
select array_max(array[-2147483648, 2147483647, -2147483649]);
40+
----
41+
2147483647
42+
43+
# i64::MIN & i64::MIN - 1 & i64::MAX
44+
query I
45+
select array_max(array[-9223372036854775808, 9223372036854775807, -9223372036854775809]);
46+
----
47+
9223372036854775807
48+
49+
query I
50+
select array_max(array['a', '', 'c']);
51+
----
52+
c
53+
54+
query I
55+
select array_max(array[3.14, 1.14, 1.14514]);
56+
----
57+
3.14
58+
59+
query I
60+
select array_max(array[3.1415926, 191.14, 114514, 1313.1414]);
61+
----
62+
114514
63+
64+
query I
65+
select array_max(array[1e-4, 1.14514e5, 1.14514e-5]);
66+
----
67+
114514
68+
69+
query I
70+
select array_max(array[date'2002-10-30', date'2023-09-06', date'2017-06-18']);
71+
----
72+
2023-09-06

e2e_test/streaming/watermark.slt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ statement ok
1212
create materialized view mv as select * from t emit on window close;
1313

1414
statement ok
15-
insert into t values ('2023-05-06 16:51:00', 1);
15+
insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3);
1616

1717
statement ok
1818
insert into t values ('2023-05-06 16:56:01', 1);
@@ -25,6 +25,8 @@ query TI
2525
select * from mv;
2626
----
2727
2023-05-06 16:51:00 1
28+
2023-05-06 16:51:00 2
29+
2023-05-06 16:51:00 3
2830

2931
statement ok
3032
drop materialized view mv;

proto/expr.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ message ExprNode {
198198
ARRAY_DIMS = 544;
199199
ARRAY_TRANSFORM = 545;
200200
ARRAY_MIN = 546;
201+
ARRAY_MAX = 547;
201202

202203
// Int256 functions
203204
HEX_TO_INT256 = 560;

src/expr/src/sig/func.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ mod tests {
202202
ArrayMin: [
203203
"array_min(list) -> bytea/varchar/timestamptz/timestamp/time/date/int256/serial/decimal/float32/float64/int16/int32/int64",
204204
],
205+
ArrayMax: [
206+
"array_max(list) -> bytea/varchar/timestamptz/timestamp/time/date/int256/serial/decimal/float32/float64/int16/int32/int64",
207+
],
205208
}
206209
"#]];
207210
expected.assert_debug_eq(&duplicated);

src/expr/src/vector_op/array_min.rs renamed to src/expr/src/vector_op/array_min_max.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use risingwave_expr_macro::function;
1818

1919
use crate::Result;
2020

21+
/// FIXME: #[`function("array_min(list`) -> any")] supports
22+
/// In this way we could avoid manual macro expansion
2123
#[function("array_min(list) -> *int")]
2224
#[function("array_min(list) -> *float")]
2325
#[function("array_min(list) -> decimal")]
@@ -36,3 +38,22 @@ pub fn array_min<T: Scalar>(list: ListRef<'_>) -> Result<Option<T>> {
3638
None => Ok(None),
3739
}
3840
}
41+
42+
#[function("array_max(list) -> *int")]
43+
#[function("array_max(list) -> *float")]
44+
#[function("array_max(list) -> decimal")]
45+
#[function("array_max(list) -> serial")]
46+
#[function("array_max(list) -> int256")]
47+
#[function("array_max(list) -> date")]
48+
#[function("array_max(list) -> time")]
49+
#[function("array_max(list) -> timestamp")]
50+
#[function("array_max(list) -> timestamptz")]
51+
#[function("array_max(list) -> varchar")]
52+
#[function("array_max(list) -> bytea")]
53+
pub fn array_max<T: Scalar>(list: ListRef<'_>) -> Result<Option<T>> {
54+
let max_value = list.iter().flatten().map(DefaultOrdered).max();
55+
match max_value.map(|v| v.0).to_owned_datum() {
56+
Some(s) => Ok(Some(s.try_into()?)),
57+
None => Ok(None),
58+
}
59+
}

src/expr/src/vector_op/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub mod arithmetic_op;
1616
pub mod array_access;
1717
pub mod array_distinct;
1818
pub mod array_length;
19-
pub mod array_min;
19+
pub mod array_min_max;
2020
pub mod array_positions;
2121
pub mod array_range_access;
2222
pub mod array_remove;

src/frontend/src/binder/expr/function.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,7 @@ impl Binder {
793793
("cardinality", raw_call(ExprType::Cardinality)),
794794
("array_remove", raw_call(ExprType::ArrayRemove)),
795795
("array_replace", raw_call(ExprType::ArrayReplace)),
796+
("array_max", raw_call(ExprType::ArrayMax)),
796797
("array_position", raw_call(ExprType::ArrayPosition)),
797798
("array_positions", raw_call(ExprType::ArrayPositions)),
798799
("trim_array", raw_call(ExprType::TrimArray)),

src/frontend/src/expr/pure.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ impl ExprVisitor<bool> for ImpureAnalyzer {
153153
| expr_node::Type::Row
154154
| expr_node::Type::ArrayToString
155155
| expr_node::Type::ArrayCat
156+
| expr_node::Type::ArrayMax
156157
| expr_node::Type::ArrayAppend
157158
| expr_node::Type::ArrayPrepend
158159
| expr_node::Type::FormatType

src/frontend/src/expr/type_inference/func.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,12 @@ fn infer_type_for_special(
613613
}
614614
Ok(Some(DataType::Varchar))
615615
}
616+
ExprType::ArrayMax => {
617+
ensure_arity!("array_max", | inputs | == 1);
618+
inputs[0].ensure_array_type()?;
619+
620+
Ok(Some(inputs[0].return_type().as_list().clone()))
621+
}
616622
ExprType::StringToArray => {
617623
ensure_arity!("string_to_array", 2 <= | inputs | <= 3);
618624

src/stream/src/executor/sort_buffer.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,19 @@ impl<S: StateStore> SortBuffer<S> {
141141
watermark: ScalarImpl,
142142
buffer_table: &'a mut StateTable<S>,
143143
) {
144-
let mut last_timestamp = None;
144+
let mut last_table_pk = None;
145145
loop {
146146
if !self.cache.is_synced() {
147147
// Refill the cache, then consume from the cache, to ensure strong row ordering
148148
// and prefetch for the next watermark.
149-
self.refill_cache(last_timestamp.take(), buffer_table)
149+
self.refill_cache(last_table_pk.take(), buffer_table)
150150
.await?;
151151
}
152152

153153
#[for_await]
154154
for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) {
155-
let ((timestamp_val, _), row) = res?;
156-
last_timestamp = Some(timestamp_val.into_inner());
155+
let row = res?;
156+
last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row());
157157
yield row;
158158
}
159159

@@ -169,15 +169,15 @@ impl<S: StateStore> SortBuffer<S> {
169169
buffer_table.update_watermark(watermark, true);
170170
}
171171

172-
#[try_stream(ok = (CacheKey, OwnedRow), error = StreamExecutorError)]
172+
#[try_stream(ok = OwnedRow, error = StreamExecutorError)]
173173
async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) {
174174
while self.cache.is_synced() {
175175
let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else {
176176
break;
177177
};
178178
if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() {
179179
let row = self.cache.delete(&key).unwrap();
180-
yield (key, row);
180+
yield row;
181181
} else {
182182
break;
183183
}
@@ -187,15 +187,14 @@ impl<S: StateStore> SortBuffer<S> {
187187
/// Clear the cache and refill it with the current content of the buffer table.
188188
pub async fn refill_cache(
189189
&mut self,
190-
last_timestamp: Option<ScalarImpl>,
190+
last_table_pk: Option<OwnedRow>,
191191
buffer_table: &StateTable<S>,
192192
) -> StreamExecutorResult<()> {
193193
let mut filler = self.cache.begin_syncing();
194194

195195
let pk_range = (
196-
last_timestamp
197-
.as_ref()
198-
.map(|v| Bound::Excluded([Some(v.as_scalar_ref_impl())]))
196+
last_table_pk
197+
.map(Bound::Excluded)
199198
.unwrap_or(Bound::Unbounded),
200199
Bound::<row::Empty>::Unbounded,
201200
);

src/tests/e2e_extended_mode/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl TestSuite {
187187
);
188188
}
189189

190-
let timestamptz = DateTime::<Utc>::from_utc(
190+
let timestamptz = DateTime::<Utc>::from_naive_utc_and_offset(
191191
NaiveDate::from_ymd_opt(2022, 1, 1)
192192
.unwrap()
193193
.and_hms_opt(10, 0, 0)

0 commit comments

Comments
 (0)