@@ -17,7 +17,10 @@ use std::fmt::Display;
17
17
use enum_as_inner:: EnumAsInner ;
18
18
use parse_display:: Display ;
19
19
use risingwave_common:: bail;
20
- use risingwave_common:: types:: DataType ;
20
+ use risingwave_common:: types:: {
21
+ DataType , Datum , ScalarImpl , ScalarRefImpl , Sentinelled , ToDatumRef , ToOwnedDatum , ToText ,
22
+ } ;
23
+ use risingwave_common:: util:: sort_util:: { Direction , OrderType } ;
21
24
use risingwave_pb:: expr:: window_frame:: { PbBound , PbExclusion } ;
22
25
use risingwave_pb:: expr:: { PbWindowFrame , PbWindowFunction } ;
23
26
use FrameBound :: { CurrentRow , Following , Preceding , UnboundedFollowing , UnboundedPreceding } ;
@@ -107,34 +110,40 @@ impl Frame {
107
110
end : Some ( end. to_protobuf ( ) ) ,
108
111
exclusion,
109
112
} ,
113
+ FrameBounds :: Range ( RangeFrameBounds { .. } ) => {
114
+ todo ! ( ) // TODO()
115
+ }
110
116
}
111
117
}
112
118
}
113
119
114
- #[ derive( Display , Debug , Clone , Eq , PartialEq , Hash ) ]
120
+ #[ derive( Display , Debug , Clone , Eq , PartialEq , Hash , EnumAsInner ) ]
115
121
#[ display( "{0}" ) ]
116
122
pub enum FrameBounds {
117
123
Rows ( RowsFrameBounds ) ,
118
124
// Groups(GroupsFrameBounds),
119
- // Range(RangeFrameBounds),
125
+ Range ( RangeFrameBounds ) ,
120
126
}
121
127
122
128
impl FrameBounds {
123
129
pub fn validate ( & self ) -> Result < ( ) > {
124
130
match self {
125
131
Self :: Rows ( bounds) => bounds. validate ( ) ,
132
+ Self :: Range ( bounds) => bounds. validate ( ) ,
126
133
}
127
134
}
128
135
129
136
pub fn start_is_unbounded ( & self ) -> bool {
130
137
match self {
131
138
Self :: Rows ( RowsFrameBounds { start, .. } ) => start. is_unbounded_preceding ( ) ,
139
+ Self :: Range ( RangeFrameBounds { start, .. } ) => start. is_unbounded_preceding ( ) ,
132
140
}
133
141
}
134
142
135
143
pub fn end_is_unbounded ( & self ) -> bool {
136
144
match self {
137
145
Self :: Rows ( RowsFrameBounds { end, .. } ) => end. is_unbounded_following ( ) ,
146
+ Self :: Range ( RangeFrameBounds { end, .. } ) => end. is_unbounded_following ( ) ,
138
147
}
139
148
}
140
149
@@ -152,11 +161,150 @@ pub struct RowsFrameBounds {
152
161
153
162
impl RowsFrameBounds {
154
163
fn validate ( & self ) -> Result < ( ) > {
155
- FrameBound :: validate_bounds ( & self . start , & self . end )
164
+ FrameBound :: validate_bounds ( & self . start , & self . end , |_| Ok ( ( ) ) )
156
165
}
157
166
}
158
167
159
- #[ derive( Display , Debug , Clone , Eq , PartialEq , Hash , EnumAsInner ) ]
168
+ #[ derive( Debug , Clone , Eq , PartialEq , Hash ) ]
169
+ pub struct RangeFrameBounds {
170
+ pub start : FrameBound < ScalarImpl > ,
171
+ pub end : FrameBound < ScalarImpl > ,
172
+ }
173
+
174
+ impl Display for RangeFrameBounds {
175
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
176
+ write ! (
177
+ f,
178
+ "RANGE BETWEEN {} AND {}" ,
179
+ self . start. for_display( ) ,
180
+ self . end. for_display( )
181
+ ) ?;
182
+ Ok ( ( ) )
183
+ }
184
+ }
185
+
186
+ impl RangeFrameBounds {
187
+ fn validate ( & self ) -> Result < ( ) > {
188
+ FrameBound :: validate_bounds ( & self . start , & self . end , |offset| {
189
+ match offset. as_scalar_ref_impl ( ) {
190
+ // TODO(): use decl macro to merge with the following
191
+ ScalarRefImpl :: Int16 ( val) if val < 0 => {
192
+ bail ! ( "frame bound offset should be non-negative, but {} is given" , val) ;
193
+ }
194
+ ScalarRefImpl :: Int32 ( val) if val < 0 => {
195
+ bail ! ( "frame bound offset should be non-negative, but {} is given" , val) ;
196
+ }
197
+ ScalarRefImpl :: Int64 ( val) if val < 0 => {
198
+ bail ! ( "frame bound offset should be non-negative, but {} is given" , val) ;
199
+ }
200
+ // TODO(): datetime types
201
+ _ => unreachable ! ( "other order column data types are not supported and should be banned in frontend" ) ,
202
+ }
203
+ } )
204
+ }
205
+
206
+ /// Get the frame start for a given order column value.
207
+ ///
208
+ /// ## Examples
209
+ ///
210
+ /// For the following frames:
211
+ ///
212
+ /// ```sql
213
+ /// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
214
+ /// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
215
+ /// ```
216
+ ///
217
+ /// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
218
+ /// represented by [`Sentinelled::Smallest`].
219
+ ///
220
+ /// For the following frame:
221
+ ///
222
+ /// ```sql
223
+ /// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
224
+ /// ```
225
+ ///
226
+ /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
227
+ ///
228
+ /// For the following frame:
229
+ ///
230
+ /// ```sql
231
+ /// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
232
+ /// ```
233
+ ///
234
+ /// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
235
+ pub fn frame_start_of (
236
+ & self ,
237
+ order_value : impl ToDatumRef ,
238
+ order_type : OrderType ,
239
+ ) -> Sentinelled < Datum > {
240
+ self . start . as_ref ( ) . bound_of ( order_value, order_type)
241
+ }
242
+
243
+ /// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
244
+ /// everything on the other direction.
245
+ pub fn frame_end_of (
246
+ & self ,
247
+ order_value : impl ToDatumRef ,
248
+ order_type : OrderType ,
249
+ ) -> Sentinelled < Datum > {
250
+ self . end . as_ref ( ) . bound_of ( order_value, order_type)
251
+ }
252
+
253
+ /// Get the order value of the CURRENT ROW of the first-most frame that includes the given order value.
254
+ ///
255
+ /// ## Examples
256
+ ///
257
+ /// For the following frames:
258
+ ///
259
+ /// ```sql
260
+ /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
261
+ /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
262
+ /// ```
263
+ ///
264
+ /// For any given order value, the first CURRENT ROW is always the first-most row, which is
265
+ /// represented by [`Sentinelled::Smallest`].
266
+ ///
267
+ /// For the following frame:
268
+ ///
269
+ /// ```sql
270
+ /// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
271
+ /// ```
272
+ ///
273
+ /// For a given order value `100`, the first CURRENT ROW should have order value `90`.
274
+ ///
275
+ /// For the following frame:
276
+ ///
277
+ /// ```sql
278
+ /// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
279
+ /// ```
280
+ ///
281
+ /// For a given order value `100`, the first CURRENT ROW should have order value `110`.
282
+ pub fn first_curr_of (
283
+ & self ,
284
+ order_value : impl ToDatumRef ,
285
+ order_type : OrderType ,
286
+ ) -> Sentinelled < Datum > {
287
+ self . end
288
+ . as_ref ( )
289
+ . reverse ( )
290
+ . bound_of ( order_value, order_type)
291
+ }
292
+
293
+ /// Get the order value of the CURRENT ROW of the last-most frame that includes the given order value.
294
+ /// It's very similar to `first_curr_of`, just with everything on the other direction.
295
+ pub fn last_curr_of (
296
+ & self ,
297
+ order_value : impl ToDatumRef ,
298
+ order_type : OrderType ,
299
+ ) -> Sentinelled < Datum > {
300
+ self . start
301
+ . as_ref ( )
302
+ . reverse ( )
303
+ . bound_of ( order_value, order_type)
304
+ }
305
+ }
306
+
307
+ #[ derive( Display , Debug , Clone , Copy , Eq , PartialEq , Hash , EnumAsInner ) ]
160
308
#[ display( style = "TITLE CASE" ) ]
161
309
pub enum FrameBound < T > {
162
310
UnboundedPreceding ,
@@ -169,10 +317,23 @@ pub enum FrameBound<T> {
169
317
}
170
318
171
319
impl < T > FrameBound < T > {
172
- fn validate_bounds ( start : & Self , end : & Self ) -> Result < ( ) > {
320
+ fn offset_value ( & self ) -> Option < & T > {
321
+ match self {
322
+ UnboundedPreceding | UnboundedFollowing | CurrentRow => None ,
323
+ Preceding ( offset) | Following ( offset) => Some ( offset) ,
324
+ }
325
+ }
326
+
327
+ fn validate_bounds (
328
+ start : & Self ,
329
+ end : & Self ,
330
+ offset_checker : impl Fn ( & T ) -> Result < ( ) > ,
331
+ ) -> Result < ( ) > {
173
332
match ( start, end) {
174
333
( _, UnboundedPreceding ) => bail ! ( "frame end cannot be UNBOUNDED PRECEDING" ) ,
175
- ( UnboundedFollowing , _) => bail ! ( "frame start cannot be UNBOUNDED FOLLOWING" ) ,
334
+ ( UnboundedFollowing , _) => {
335
+ bail ! ( "frame start cannot be UNBOUNDED FOLLOWING" )
336
+ }
176
337
( Following ( _) , CurrentRow ) | ( Following ( _) , Preceding ( _) ) => {
177
338
bail ! ( "frame starting from following row cannot have preceding rows" )
178
339
}
@@ -181,10 +342,32 @@ impl<T> FrameBound<T> {
181
342
}
182
343
_ => { }
183
344
}
345
+
346
+ for bound in [ start, end] {
347
+ if let Some ( offset) = bound. offset_value ( ) {
348
+ offset_checker ( offset) ?;
349
+ }
350
+ }
351
+
184
352
Ok ( ( ) )
185
353
}
186
354
}
187
355
356
+ impl < T > FrameBound < T >
357
+ where
358
+ FrameBound < T > : Copy ,
359
+ {
360
+ fn reverse ( self ) -> FrameBound < T > {
361
+ match self {
362
+ UnboundedPreceding => UnboundedFollowing ,
363
+ Preceding ( offset) => Following ( offset) ,
364
+ CurrentRow => CurrentRow ,
365
+ Following ( offset) => Preceding ( offset) ,
366
+ UnboundedFollowing => UnboundedPreceding ,
367
+ }
368
+ }
369
+ }
370
+
188
371
impl FrameBound < usize > {
189
372
pub fn from_protobuf ( bound : & PbBound ) -> Result < Self > {
190
373
use risingwave_pb:: expr:: window_frame:: bound:: PbOffset ;
@@ -245,6 +428,85 @@ impl FrameBound<usize> {
245
428
}
246
429
}
247
430
431
+ impl FrameBound < ScalarImpl > {
432
+ fn as_ref ( & self ) -> FrameBound < ScalarRefImpl < ' _ > > {
433
+ match self {
434
+ UnboundedPreceding => UnboundedPreceding ,
435
+ Preceding ( offset) => Preceding ( offset. as_scalar_ref_impl ( ) ) ,
436
+ CurrentRow => CurrentRow ,
437
+ Following ( offset) => Following ( offset. as_scalar_ref_impl ( ) ) ,
438
+ UnboundedFollowing => UnboundedFollowing ,
439
+ }
440
+ }
441
+
442
+ fn for_display ( & self ) -> FrameBound < String > {
443
+ match self {
444
+ UnboundedPreceding => UnboundedPreceding ,
445
+ Preceding ( offset) => Preceding ( offset. as_scalar_ref_impl ( ) . to_text ( ) ) ,
446
+ CurrentRow => CurrentRow ,
447
+ Following ( offset) => Following ( offset. as_scalar_ref_impl ( ) . to_text ( ) ) ,
448
+ UnboundedFollowing => UnboundedFollowing ,
449
+ }
450
+ }
451
+ }
452
+
453
+ impl FrameBound < ScalarRefImpl < ' _ > > {
454
+ fn bound_of ( self , order_value : impl ToDatumRef , order_type : OrderType ) -> Sentinelled < Datum > {
455
+ let order_value = order_value. to_datum_ref ( ) ;
456
+ match ( self , order_type. direction ( ) ) {
457
+ ( UnboundedPreceding , _) => Sentinelled :: Smallest ,
458
+ ( UnboundedFollowing , _) => Sentinelled :: Largest ,
459
+ ( CurrentRow , _) => Sentinelled :: Normal ( order_value. to_owned_datum ( ) ) ,
460
+ ( Preceding ( offset) , Direction :: Ascending )
461
+ | ( Following ( offset) , Direction :: Descending ) => {
462
+ // should SUBTRACT the offset
463
+ if let Some ( value) = order_value {
464
+ let res = match ( value, offset) {
465
+ // TODO(): use decl macro to merge with the following
466
+ ( ScalarRefImpl :: Int16 ( val) , ScalarRefImpl :: Int16 ( off) ) => {
467
+ ScalarImpl :: Int16 ( val - off)
468
+ }
469
+ ( ScalarRefImpl :: Int32 ( val) , ScalarRefImpl :: Int32 ( off) ) => {
470
+ ScalarImpl :: Int32 ( val - off)
471
+ }
472
+ ( ScalarRefImpl :: Int64 ( val) , ScalarRefImpl :: Int64 ( off) ) => {
473
+ ScalarImpl :: Int64 ( val - off)
474
+ }
475
+ // TODO(): datetime types
476
+ _ => unreachable ! ( "other order column data types are not supported and should be banned in frontend" ) ,
477
+ } ;
478
+ Sentinelled :: Normal ( Some ( res) )
479
+ } else {
480
+ Sentinelled :: Normal ( None )
481
+ }
482
+ }
483
+ ( Following ( offset) , Direction :: Ascending )
484
+ | ( Preceding ( offset) , Direction :: Descending ) => {
485
+ // should ADD the offset
486
+ if let Some ( value) = order_value {
487
+ let res = match ( value, offset) {
488
+ // TODO(): use decl macro to merge with the following
489
+ ( ScalarRefImpl :: Int16 ( val) , ScalarRefImpl :: Int16 ( off) ) => {
490
+ ScalarImpl :: Int16 ( val + off)
491
+ }
492
+ ( ScalarRefImpl :: Int32 ( val) , ScalarRefImpl :: Int32 ( off) ) => {
493
+ ScalarImpl :: Int32 ( val + off)
494
+ }
495
+ ( ScalarRefImpl :: Int64 ( val) , ScalarRefImpl :: Int64 ( off) ) => {
496
+ ScalarImpl :: Int64 ( val + off)
497
+ }
498
+ // TODO(): datetime types
499
+ _ => unreachable ! ( "other order column data types are not supported and should be banned in frontend" ) ,
500
+ } ;
501
+ Sentinelled :: Normal ( Some ( res) )
502
+ } else {
503
+ Sentinelled :: Normal ( None )
504
+ }
505
+ }
506
+ }
507
+ }
508
+ }
509
+
248
510
#[ derive( Display , Debug , Copy , Clone , Eq , PartialEq , Hash , Default , EnumAsInner ) ]
249
511
#[ display( "EXCLUDE {}" , style = "TITLE CASE" ) ]
250
512
pub enum FrameExclusion {
0 commit comments