Skip to content

Commit 79e0a4b

Browse files
committed
watermark_strategy_expression
1 parent 7a78054 commit 79e0a4b

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

rfcs/0002-watermark-filter.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,69 @@ WINDOW w AS (
235235
)
236236
```
237237

238+
### watermark_strategy_expression
239+
240+
> the section is updated on 31 Oct
241+
242+
the old synatx is
243+
244+
```plain
245+
WATERMARK(orders, time_column, timeout: interval)
246+
```
247+
248+
and we will change it to
249+
250+
```plain
251+
WATERMARK(orders, time_column, watermark_strategy_expression)
252+
```
253+
254+
And the concept `watermark_strategy_expression` refers [FlinkSQL](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark). The expression is evaluated for every record and update the watermark if the result greater than the current watermark.
255+
The new synatx is more friendly for flinkSQL user and offer potential to make user define their strategy
256+
257+
```sql
258+
CREATE SOURCE `orders` (
259+
`id` BIGINT,
260+
`order_time` TIMESTAMP,
261+
`price` DECIMAL,
262+
`customer_id` BIGINT,
263+
) WITH (
264+
'connector' = ...,
265+
);
266+
267+
-- normal timeout watermark
268+
with watermarked_orders as (
269+
WATERMARK(orders, order_time, order - INTERVAL '1' MINUTE),
270+
)
271+
272+
-- normal timeout watermark with simple check
273+
with watermarked_orders as (
274+
WATERMARK(
275+
orders,
276+
order_time,
277+
CASE
278+
-- we have not determined the design about `PROC_TIME()`
279+
WHEN order_time > PROC_TIME() THEN Null
280+
ELSE order - INTERVAL '1' MINUTE
281+
END),
282+
283+
-- normal timeout watermark with removing outliers
284+
with watermarked_orders as (
285+
WATERMARK(
286+
(
287+
-- we have not determined the window funtion on unordered stream
288+
select *,
289+
max(order_time) as win_max_time
290+
OVER(BETWEEN 10 PRECEDING AND CURRENT ROW)
291+
from orders
292+
),
293+
order_time,
294+
CASE
295+
WHEN order_time == win_max_time THEN Null
296+
ELSE order - INTERVAL '1' MINUTE
297+
END),
298+
)
299+
```
300+
238301
## Future possibilities
239302

240303
### Should we support extra columns in Sort?

0 commit comments

Comments
 (0)