Skip to content

Commit 84b2e41

Browse files
committed
just passed a quick manual e2e test
Signed-off-by: Richard Chien <[email protected]> add comment Signed-off-by: Richard Chien <[email protected]> order type little things Signed-off-by: Richard Chien <[email protected]> `fnd_affected_ranges` seems to work now Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> typo Signed-off-by: Richard Chien <[email protected]> simplify frame start/end calculation Signed-off-by: Richard Chien <[email protected]> minor Signed-off-by: Richard Chien <[email protected]> check range frame bounds Signed-off-by: Richard Chien <[email protected]> add comment Signed-off-by: Richard Chien <[email protected]> display RangeFrameBounds Signed-off-by: Richard Chien <[email protected]>
1 parent 6a4c33b commit 84b2e41

File tree

8 files changed

+1337
-29
lines changed

8 files changed

+1337
-29
lines changed

src/expr/core/src/window_function/call.rs

+269-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ use std::fmt::Display;
1717
use enum_as_inner::EnumAsInner;
1818
use parse_display::Display;
1919
use risingwave_common::bail;
20-
use risingwave_common::types::DataType;
20+
use risingwave_common::types::{
21+
DataType, Datum, ScalarImpl, ScalarRefImpl, Sentinelled, ToDatumRef, ToOwnedDatum, ToText,
22+
};
23+
use risingwave_common::util::sort_util::{Direction, OrderType};
2124
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
2225
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
2326
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
@@ -107,34 +110,40 @@ impl Frame {
107110
end: Some(end.to_protobuf()),
108111
exclusion,
109112
},
113+
FrameBounds::Range(RangeFrameBounds { .. }) => {
114+
todo!() // TODO()
115+
}
110116
}
111117
}
112118
}
113119

114-
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)]
120+
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
115121
#[display("{0}")]
116122
pub enum FrameBounds {
117123
Rows(RowsFrameBounds),
118124
// Groups(GroupsFrameBounds),
119-
// Range(RangeFrameBounds),
125+
Range(RangeFrameBounds),
120126
}
121127

122128
impl FrameBounds {
123129
pub fn validate(&self) -> Result<()> {
124130
match self {
125131
Self::Rows(bounds) => bounds.validate(),
132+
Self::Range(bounds) => bounds.validate(),
126133
}
127134
}
128135

129136
pub fn start_is_unbounded(&self) -> bool {
130137
match self {
131138
Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
139+
Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
132140
}
133141
}
134142

135143
pub fn end_is_unbounded(&self) -> bool {
136144
match self {
137145
Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
146+
Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
138147
}
139148
}
140149

@@ -152,11 +161,150 @@ pub struct RowsFrameBounds {
152161

153162
impl RowsFrameBounds {
154163
fn validate(&self) -> Result<()> {
155-
FrameBound::validate_bounds(&self.start, &self.end)
164+
FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(()))
156165
}
157166
}
158167

159-
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
168+
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
169+
pub struct RangeFrameBounds {
170+
pub start: FrameBound<ScalarImpl>,
171+
pub end: FrameBound<ScalarImpl>,
172+
}
173+
174+
impl Display for RangeFrameBounds {
175+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176+
write!(
177+
f,
178+
"RANGE BETWEEN {} AND {}",
179+
self.start.for_display(),
180+
self.end.for_display()
181+
)?;
182+
Ok(())
183+
}
184+
}
185+
186+
impl RangeFrameBounds {
187+
fn validate(&self) -> Result<()> {
188+
FrameBound::validate_bounds(&self.start, &self.end, |offset| {
189+
match offset.as_scalar_ref_impl() {
190+
// TODO(): use decl macro to merge with the following
191+
ScalarRefImpl::Int16(val) if val < 0 => {
192+
bail!("frame bound offset should be non-negative, but {} is given", val);
193+
}
194+
ScalarRefImpl::Int32(val) if val < 0 => {
195+
bail!("frame bound offset should be non-negative, but {} is given", val);
196+
}
197+
ScalarRefImpl::Int64(val) if val < 0 => {
198+
bail!("frame bound offset should be non-negative, but {} is given", val);
199+
}
200+
// TODO(): datetime types
201+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
202+
}
203+
})
204+
}
205+
206+
/// Get the frame start for a given order column value.
207+
///
208+
/// ## Examples
209+
///
210+
/// For the following frames:
211+
///
212+
/// ```sql
213+
/// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
214+
/// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
215+
/// ```
216+
///
217+
/// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
218+
/// represented by [`Sentinelled::Smallest`].
219+
///
220+
/// For the following frame:
221+
///
222+
/// ```sql
223+
/// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
224+
/// ```
225+
///
226+
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
227+
///
228+
/// For the following frame:
229+
///
230+
/// ```sql
231+
/// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
232+
/// ```
233+
///
234+
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
235+
pub fn frame_start_of(
236+
&self,
237+
order_value: impl ToDatumRef,
238+
order_type: OrderType,
239+
) -> Sentinelled<Datum> {
240+
self.start.as_ref().bound_of(order_value, order_type)
241+
}
242+
243+
/// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
244+
/// everything on the other direction.
245+
pub fn frame_end_of(
246+
&self,
247+
order_value: impl ToDatumRef,
248+
order_type: OrderType,
249+
) -> Sentinelled<Datum> {
250+
self.end.as_ref().bound_of(order_value, order_type)
251+
}
252+
253+
/// Get the order value of the CURRENT ROW of the first-most frame that includes the given order value.
254+
///
255+
/// ## Examples
256+
///
257+
/// For the following frames:
258+
///
259+
/// ```sql
260+
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
261+
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
262+
/// ```
263+
///
264+
/// For any given order value, the first CURRENT ROW is always the first-most row, which is
265+
/// represented by [`Sentinelled::Smallest`].
266+
///
267+
/// For the following frame:
268+
///
269+
/// ```sql
270+
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
271+
/// ```
272+
///
273+
/// For a given order value `100`, the first CURRENT ROW should have order value `90`.
274+
///
275+
/// For the following frame:
276+
///
277+
/// ```sql
278+
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
279+
/// ```
280+
///
281+
/// For a given order value `100`, the first CURRENT ROW should have order value `110`.
282+
pub fn first_curr_of(
283+
&self,
284+
order_value: impl ToDatumRef,
285+
order_type: OrderType,
286+
) -> Sentinelled<Datum> {
287+
self.end
288+
.as_ref()
289+
.reverse()
290+
.bound_of(order_value, order_type)
291+
}
292+
293+
/// Get the order value of the CURRENT ROW of the last-most frame that includes the given order value.
294+
/// It's very similar to `first_curr_of`, just with everything on the other direction.
295+
pub fn last_curr_of(
296+
&self,
297+
order_value: impl ToDatumRef,
298+
order_type: OrderType,
299+
) -> Sentinelled<Datum> {
300+
self.start
301+
.as_ref()
302+
.reverse()
303+
.bound_of(order_value, order_type)
304+
}
305+
}
306+
307+
#[derive(Display, Debug, Clone, Copy, Eq, PartialEq, Hash, EnumAsInner)]
160308
#[display(style = "TITLE CASE")]
161309
pub enum FrameBound<T> {
162310
UnboundedPreceding,
@@ -169,10 +317,23 @@ pub enum FrameBound<T> {
169317
}
170318

171319
impl<T> FrameBound<T> {
172-
fn validate_bounds(start: &Self, end: &Self) -> Result<()> {
320+
fn offset_value(&self) -> Option<&T> {
321+
match self {
322+
UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
323+
Preceding(offset) | Following(offset) => Some(offset),
324+
}
325+
}
326+
327+
fn validate_bounds(
328+
start: &Self,
329+
end: &Self,
330+
offset_checker: impl Fn(&T) -> Result<()>,
331+
) -> Result<()> {
173332
match (start, end) {
174333
(_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
175-
(UnboundedFollowing, _) => bail!("frame start cannot be UNBOUNDED FOLLOWING"),
334+
(UnboundedFollowing, _) => {
335+
bail!("frame start cannot be UNBOUNDED FOLLOWING")
336+
}
176337
(Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
177338
bail!("frame starting from following row cannot have preceding rows")
178339
}
@@ -181,10 +342,32 @@ impl<T> FrameBound<T> {
181342
}
182343
_ => {}
183344
}
345+
346+
for bound in [start, end] {
347+
if let Some(offset) = bound.offset_value() {
348+
offset_checker(offset)?;
349+
}
350+
}
351+
184352
Ok(())
185353
}
186354
}
187355

356+
impl<T> FrameBound<T>
357+
where
358+
FrameBound<T>: Copy,
359+
{
360+
fn reverse(self) -> FrameBound<T> {
361+
match self {
362+
UnboundedPreceding => UnboundedFollowing,
363+
Preceding(offset) => Following(offset),
364+
CurrentRow => CurrentRow,
365+
Following(offset) => Preceding(offset),
366+
UnboundedFollowing => UnboundedPreceding,
367+
}
368+
}
369+
}
370+
188371
impl FrameBound<usize> {
189372
pub fn from_protobuf(bound: &PbBound) -> Result<Self> {
190373
use risingwave_pb::expr::window_frame::bound::PbOffset;
@@ -245,6 +428,85 @@ impl FrameBound<usize> {
245428
}
246429
}
247430

431+
impl FrameBound<ScalarImpl> {
432+
fn as_ref(&self) -> FrameBound<ScalarRefImpl<'_>> {
433+
match self {
434+
UnboundedPreceding => UnboundedPreceding,
435+
Preceding(offset) => Preceding(offset.as_scalar_ref_impl()),
436+
CurrentRow => CurrentRow,
437+
Following(offset) => Following(offset.as_scalar_ref_impl()),
438+
UnboundedFollowing => UnboundedFollowing,
439+
}
440+
}
441+
442+
fn for_display(&self) -> FrameBound<String> {
443+
match self {
444+
UnboundedPreceding => UnboundedPreceding,
445+
Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
446+
CurrentRow => CurrentRow,
447+
Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
448+
UnboundedFollowing => UnboundedFollowing,
449+
}
450+
}
451+
}
452+
453+
impl FrameBound<ScalarRefImpl<'_>> {
454+
fn bound_of(self, order_value: impl ToDatumRef, order_type: OrderType) -> Sentinelled<Datum> {
455+
let order_value = order_value.to_datum_ref();
456+
match (self, order_type.direction()) {
457+
(UnboundedPreceding, _) => Sentinelled::Smallest,
458+
(UnboundedFollowing, _) => Sentinelled::Largest,
459+
(CurrentRow, _) => Sentinelled::Normal(order_value.to_owned_datum()),
460+
(Preceding(offset), Direction::Ascending)
461+
| (Following(offset), Direction::Descending) => {
462+
// should SUBTRACT the offset
463+
if let Some(value) = order_value {
464+
let res = match (value, offset) {
465+
// TODO(): use decl macro to merge with the following
466+
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
467+
ScalarImpl::Int16(val - off)
468+
}
469+
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
470+
ScalarImpl::Int32(val - off)
471+
}
472+
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
473+
ScalarImpl::Int64(val - off)
474+
}
475+
// TODO(): datetime types
476+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
477+
};
478+
Sentinelled::Normal(Some(res))
479+
} else {
480+
Sentinelled::Normal(None)
481+
}
482+
}
483+
(Following(offset), Direction::Ascending)
484+
| (Preceding(offset), Direction::Descending) => {
485+
// should ADD the offset
486+
if let Some(value) = order_value {
487+
let res = match (value, offset) {
488+
// TODO(): use decl macro to merge with the following
489+
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
490+
ScalarImpl::Int16(val + off)
491+
}
492+
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
493+
ScalarImpl::Int32(val + off)
494+
}
495+
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
496+
ScalarImpl::Int64(val + off)
497+
}
498+
// TODO(): datetime types
499+
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
500+
};
501+
Sentinelled::Normal(Some(res))
502+
} else {
503+
Sentinelled::Normal(None)
504+
}
505+
}
506+
}
507+
}
508+
}
509+
248510
#[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
249511
#[display("EXCLUDE {}", style = "TITLE CASE")]
250512
pub enum FrameExclusion {

0 commit comments

Comments
 (0)