Skip to content

Commit 49ea439

Browse files
committed
just passed a quick manual e2e test
Signed-off-by: Richard Chien <[email protected]> add comment Signed-off-by: Richard Chien <[email protected]> order type little things Signed-off-by: Richard Chien <[email protected]> `fnd_affected_ranges` seems to work now Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> simplify frame start/end calculation Signed-off-by: Richard Chien <[email protected]> minor Signed-off-by: Richard Chien <[email protected]> check range frame bounds Signed-off-by: Richard Chien <[email protected]> add comment Signed-off-by: Richard Chien <[email protected]> display RangeFrameBounds Signed-off-by: Richard Chien <[email protected]>
1 parent c74d719 commit 49ea439

File tree

8 files changed

+1351
-42
lines changed

8 files changed

+1351
-42
lines changed

src/expr/core/src/window_function/call.rs

+283-20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use std::fmt::Display;
1717
use enum_as_inner::EnumAsInner;
1818
use parse_display::Display;
1919
use risingwave_common::bail;
20-
use risingwave_common::types::DataType;
20+
use risingwave_common::types::{
21+
DataType, Datum, ScalarImpl, ScalarRefImpl, Sentinelled, ToDatumRef, ToOwnedDatum, ToText,
22+
};
23+
use risingwave_common::util::sort_util::{Direction, OrderType};
2124
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
2225
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
2326
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
@@ -107,6 +110,25 @@ impl Frame {
107110
end: Some(end.to_protobuf()),
108111
exclusion,
109112
},
113+
FrameBounds::Range(RangeFrameBounds { .. }) => {
114+
todo!() // TODO()
115+
}
116+
}
117+
}
118+
}
119+
120+
#[derive(Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
121+
pub enum FrameBounds {
122+
Rows(RowsFrameBounds),
123+
// Groups(GroupsFrameBounds),
124+
Range(RangeFrameBounds),
125+
}
126+
127+
impl Display for FrameBounds {
128+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129+
match self {
130+
Self::Rows(bounds) => bounds.fmt(f),
131+
Self::Range(bounds) => bounds.fmt(f),
110132
}
111133
}
112134
}
@@ -115,18 +137,21 @@ impl FrameBounds {
115137
pub fn validate(&self) -> Result<()> {
116138
match self {
117139
Self::Rows(bounds) => bounds.validate(),
140+
Self::Range(bounds) => bounds.validate(),
118141
}
119142
}
120143

121144
pub fn start_is_unbounded(&self) -> bool {
122145
match self {
123146
Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
147+
Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
124148
}
125149
}
126150

127151
pub fn end_is_unbounded(&self) -> bool {
128152
match self {
129153
Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
154+
Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
130155
}
131156
}
132157

@@ -135,21 +160,6 @@ impl FrameBounds {
135160
}
136161
}
137162

138-
impl Display for FrameBounds {
139-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140-
match self {
141-
Self::Rows(bounds) => bounds.fmt(f),
142-
}
143-
}
144-
}
145-
146-
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
147-
pub enum FrameBounds {
148-
Rows(RowsFrameBounds),
149-
// Groups(GroupsFrameBounds),
150-
// Range(RangeFrameBounds),
151-
}
152-
153163
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)]
154164
#[display("ROWS BETWEEN {start} AND {end}")]
155165
pub struct RowsFrameBounds {
@@ -159,11 +169,150 @@ pub struct RowsFrameBounds {
159169

160170
impl RowsFrameBounds {
161171
fn validate(&self) -> Result<()> {
162-
FrameBound::validate_bounds(&self.start, &self.end)
172+
FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(()))
173+
}
174+
}
175+
176+
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
177+
pub struct RangeFrameBounds {
178+
pub start: FrameBound<ScalarImpl>,
179+
pub end: FrameBound<ScalarImpl>,
180+
}
181+
182+
impl Display for RangeFrameBounds {
183+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184+
write!(
185+
f,
186+
"RANGE BETWEEN {} AND {}",
187+
self.start.for_display(),
188+
self.end.for_display()
189+
)?;
190+
Ok(())
191+
}
192+
}
193+
194+
impl RangeFrameBounds {
195+
fn validate(&self) -> Result<()> {
196+
FrameBound::validate_bounds(&self.start, &self.end, |offset| {
197+
match offset.as_scalar_ref_impl() {
198+
// TODO(): use decl macro to merge with the following
199+
ScalarRefImpl::Int16(val) if val < 0 => {
200+
bail!("frame bound offset should be non-negative, but {} is given", val);
201+
}
202+
ScalarRefImpl::Int32(val) if val < 0 => {
203+
bail!("frame bound offset should be non-negative, but {} is given", val);
204+
}
205+
ScalarRefImpl::Int64(val) if val < 0 => {
206+
bail!("frame bound offset should be non-negative, but {} is given", val);
207+
}
208+
// TODO(): datetime types
209+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
210+
}
211+
})
212+
}
213+
214+
/// Get the frame start for a given order column value.
215+
///
216+
/// ## Examples
217+
///
218+
/// For the following frames:
219+
///
220+
/// ```sql
221+
/// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
222+
/// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
223+
/// ```
224+
///
225+
/// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
226+
/// represented by [`Sentinelled::Smallest`].
227+
///
228+
/// For the following frame:
229+
///
230+
/// ```sql
231+
/// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
232+
/// ```
233+
///
234+
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
235+
///
236+
/// For the following frame:
237+
///
238+
/// ```sql
239+
/// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
240+
/// ```
241+
///
242+
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
243+
pub fn frame_start_of(
244+
&self,
245+
order_value: impl ToDatumRef,
246+
order_type: OrderType,
247+
) -> Sentinelled<Datum> {
248+
self.start.as_ref().bound_of(order_value, order_type)
249+
}
250+
251+
/// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
252+
/// everything on the other direction.
253+
pub fn frame_end_of(
254+
&self,
255+
order_value: impl ToDatumRef,
256+
order_type: OrderType,
257+
) -> Sentinelled<Datum> {
258+
self.end.as_ref().bound_of(order_value, order_type)
259+
}
260+
261+
/// Get the order value of the CURRENT ROW of the first-most frame that includes the given order value.
262+
///
263+
/// ## Examples
264+
///
265+
/// For the following frames:
266+
///
267+
/// ```sql
268+
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
269+
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
270+
/// ```
271+
///
272+
/// For any given order value, the first CURRENT ROW is always the first-most row, which is
273+
/// represented by [`Sentinelled::Smallest`].
274+
///
275+
/// For the following frame:
276+
///
277+
/// ```sql
278+
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
279+
/// ```
280+
///
281+
/// For a given order value `100`, the first CURRENT ROW should have order value `90`.
282+
///
283+
/// For the following frame:
284+
///
285+
/// ```sql
286+
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
287+
/// ```
288+
///
289+
/// For a given order value `100`, the first CURRENT ROW should have order value `110`.
290+
pub fn first_curr_of(
291+
&self,
292+
order_value: impl ToDatumRef,
293+
order_type: OrderType,
294+
) -> Sentinelled<Datum> {
295+
self.end
296+
.as_ref()
297+
.reverse()
298+
.bound_of(order_value, order_type)
299+
}
300+
301+
/// Get the order value of the CURRENT ROW of the last-most frame that includes the given order value.
302+
/// It's very similar to `first_curr_of`, just with everything on the other direction.
303+
pub fn last_curr_of(
304+
&self,
305+
order_value: impl ToDatumRef,
306+
order_type: OrderType,
307+
) -> Sentinelled<Datum> {
308+
self.start
309+
.as_ref()
310+
.reverse()
311+
.bound_of(order_value, order_type)
163312
}
164313
}
165314

166-
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
315+
#[derive(Display, Debug, Clone, Copy, Eq, PartialEq, Hash, EnumAsInner)]
167316
#[display(style = "TITLE CASE")]
168317
pub enum FrameBound<T> {
169318
UnboundedPreceding,
@@ -176,10 +325,23 @@ pub enum FrameBound<T> {
176325
}
177326

178327
impl<T> FrameBound<T> {
179-
fn validate_bounds(start: &Self, end: &Self) -> Result<()> {
328+
fn offset_value(&self) -> Option<&T> {
329+
match self {
330+
UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
331+
Preceding(offset) | Following(offset) => Some(offset),
332+
}
333+
}
334+
335+
fn validate_bounds(
336+
start: &Self,
337+
end: &Self,
338+
offset_checker: impl Fn(&T) -> Result<()>,
339+
) -> Result<()> {
180340
match (start, end) {
181341
(_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
182-
(UnboundedFollowing, _) => bail!("frame start cannot be UNBOUNDED FOLLOWING"),
342+
(UnboundedFollowing, _) => {
343+
bail!("frame start cannot be UNBOUNDED FOLLOWING")
344+
}
183345
(Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
184346
bail!("frame starting from following row cannot have preceding rows")
185347
}
@@ -188,10 +350,32 @@ impl<T> FrameBound<T> {
188350
}
189351
_ => {}
190352
}
353+
354+
for bound in [start, end] {
355+
if let Some(offset) = bound.offset_value() {
356+
offset_checker(offset)?;
357+
}
358+
}
359+
191360
Ok(())
192361
}
193362
}
194363

364+
impl<T> FrameBound<T>
365+
where
366+
FrameBound<T>: Copy,
367+
{
368+
fn reverse(self) -> FrameBound<T> {
369+
match self {
370+
UnboundedPreceding => UnboundedFollowing,
371+
Preceding(offset) => Following(offset),
372+
CurrentRow => CurrentRow,
373+
Following(offset) => Preceding(offset),
374+
UnboundedFollowing => UnboundedPreceding,
375+
}
376+
}
377+
}
378+
195379
impl FrameBound<usize> {
196380
pub fn from_protobuf(bound: &PbBound) -> Result<Self> {
197381
use risingwave_pb::expr::window_frame::bound::PbOffset;
@@ -252,6 +436,85 @@ impl FrameBound<usize> {
252436
}
253437
}
254438

439+
impl FrameBound<ScalarImpl> {
440+
fn as_ref(&self) -> FrameBound<ScalarRefImpl<'_>> {
441+
match self {
442+
UnboundedPreceding => UnboundedPreceding,
443+
Preceding(offset) => Preceding(offset.as_scalar_ref_impl()),
444+
CurrentRow => CurrentRow,
445+
Following(offset) => Following(offset.as_scalar_ref_impl()),
446+
UnboundedFollowing => UnboundedFollowing,
447+
}
448+
}
449+
450+
fn for_display(&self) -> FrameBound<String> {
451+
match self {
452+
UnboundedPreceding => UnboundedPreceding,
453+
Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
454+
CurrentRow => CurrentRow,
455+
Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
456+
UnboundedFollowing => UnboundedFollowing,
457+
}
458+
}
459+
}
460+
461+
impl FrameBound<ScalarRefImpl<'_>> {
462+
fn bound_of(self, order_value: impl ToDatumRef, order_type: OrderType) -> Sentinelled<Datum> {
463+
let order_value = order_value.to_datum_ref();
464+
match (self, order_type.direction()) {
465+
(UnboundedPreceding, _) => Sentinelled::Smallest,
466+
(UnboundedFollowing, _) => Sentinelled::Largest,
467+
(CurrentRow, _) => Sentinelled::Normal(order_value.to_owned_datum()),
468+
(Preceding(offset), Direction::Ascending)
469+
| (Following(offset), Direction::Descending) => {
470+
// should SUBTRACT the offset
471+
if let Some(value) = order_value {
472+
let res = match (value, offset) {
473+
// TODO(): use decl macro to merge with the following
474+
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
475+
ScalarImpl::Int16(val - off)
476+
}
477+
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
478+
ScalarImpl::Int32(val - off)
479+
}
480+
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
481+
ScalarImpl::Int64(val - off)
482+
}
483+
// TODO(): datetime types
484+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
485+
};
486+
Sentinelled::Normal(Some(res))
487+
} else {
488+
Sentinelled::Normal(None)
489+
}
490+
}
491+
(Following(offset), Direction::Ascending)
492+
| (Preceding(offset), Direction::Descending) => {
493+
// should ADD the offset
494+
if let Some(value) = order_value {
495+
let res = match (value, offset) {
496+
// TODO(): use decl macro to merge with the following
497+
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
498+
ScalarImpl::Int16(val + off)
499+
}
500+
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
501+
ScalarImpl::Int32(val + off)
502+
}
503+
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
504+
ScalarImpl::Int64(val + off)
505+
}
506+
// TODO(): datetime types
507+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
508+
};
509+
Sentinelled::Normal(Some(res))
510+
} else {
511+
Sentinelled::Normal(None)
512+
}
513+
}
514+
}
515+
}
516+
}
517+
255518
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
256519
pub enum FrameExclusion {
257520
CurrentRow,

0 commit comments

Comments
 (0)