Skip to content

Commit 1eb8b62

Browse files
TennyZhuangst1pageBugenZhao
authored
RFC: The WatermarkFilter and StreamSort operator (#2)
* add template for watermark-filter Signed-off-by: TennyZhuang <[email protected]> * add the RFC content Signed-off-by: TennyZhuang <[email protected]> * rename the RFC file Signed-off-by: TennyZhuang <[email protected]> * rename the RFC file Signed-off-by: TennyZhuang <[email protected]> * add watermark_strategy_expression (#9) * add watermark_strategy_expression * fix typo * fix mistake * Update rfcs/0002-watermark-filter.md Co-authored-by: Bugen Zhao <[email protected]> * Update rfcs/0002-watermark-filter.md Co-authored-by: Bugen Zhao <[email protected]> Co-authored-by: Bugen Zhao <[email protected]> --------- Signed-off-by: TennyZhuang <[email protected]> Co-authored-by: stonepage <[email protected]> Co-authored-by: Bugen Zhao <[email protected]>
1 parent 27b842f commit 1eb8b62

File tree

1 file changed

+299
-0
lines changed

1 file changed

+299
-0
lines changed

rfcs/0002-watermark-filter.md

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
---
2+
feature: watermark_filter
3+
authors:
4+
- "TennyZhuang"
5+
- "st1page"
6+
- "BugenZhao"
7+
start_date: "2022/10/20"
8+
---
9+
10+
# The WatermarkFilter and StreamSort operator
11+
12+
## Summary
13+
14+
We will introduce the watermark strategy in the doc. The main changes are two new oprators, `WatermarkFilter` and `StreamSort`.
15+
16+
## Motivation
17+
18+
We have the following purposes:
19+
20+
* Support some operators which can only accept ordered input.
21+
* SessionWindow
22+
* OverAgg
23+
* MatchRecognize
24+
* Support sink a complete result set to some append-only warehouse.
25+
* State cleaning
26+
27+
## Design
28+
29+
### The watermark message
30+
31+
```rust
32+
pub enum Message {
33+
Chunk(StreamChunk),
34+
Barrier(Barrier),
35+
Watermark(col_idx, Timestamp),
36+
}
37+
```
38+
39+
**Term 1.1: Every record has multiple watermark columns with the timestamp type.**
40+
41+
**Term 1.2: If the order of a data record *d* in the stream is after another watermark record *w*, then *d[watermark_col] > w.val*.**
42+
43+
Based on Term 1.2, we can get a noteworthy property:
44+
45+
**Prop 1.1: We can postpone or remove any watermark record in a valid stream.**
46+
47+
The next question: Where does the watermark message come from?
48+
49+
### The WatermarkFilter operator
50+
51+
For simplicity, we chose to support only the append-only stream as the input, we can discuss the retractable stream later.
52+
53+
We will introduce a new operator, `WatermarkFilter`, which will filter the outdated records and output some `Watermark` messages **over one column**.
54+
55+
The design is highly different from [[Deprecated] RFC: Watermark in RisingWave (WaterMark part I)](https://www.notion.so/Deprecated-RFC-Watermark-in-RisingWave-WaterMark-part-I-b97ed46b310549ae921abe61e1f30226), we will not buffer the out-of-order records, and no orders are guaranteed on the output stream.
56+
57+
```python
58+
watermark = recovered.or(0)
59+
watermark_col: int = user_defined()
60+
def process(msg):
61+
match msg:
62+
# We use a row-level representation instead of the Chunk in the pseudo-code.
63+
case Record(rec):
64+
rec_event_time = rec[watermark_col]
65+
if rec_event_time < watermark:
66+
# Filter the outdated records.
67+
return;
68+
if rec_event_time - timeout > watermark:
69+
watermark = rec_event_time - timeout
70+
# Send a watermark message, optionally
71+
emit(Watermark(watermark))
72+
emit(Chunk(chunk))
73+
case Barrier(barrier):
74+
if barrier.checkpoint:
75+
# Watermark should be checkpointed.
76+
checkpoint(watermark)
77+
emit(Barrier(barrier))
78+
case Watermark(upper_col_idx, w):
79+
# Watermark from the upstream
80+
emit(Watermark(upper_col_idx, w))
81+
```
82+
83+
The WatermarkFilter mainly does the following two things:
84+
85+
1. Insert some watermarks to a message stream, which satisfies Term 1.2.
86+
2. Filtered out some outdated records.
87+
88+
The following graph shows a simple example; 3 and 7 are filtered due to outdated.
89+
90+
And multiple watermarks are produced in the output stream (Note, we can omit some of them for performance).
91+
92+
![https://viewer.diagrams.net/?border=0&tags=%7B%7D&highlight=0000ff&edit=_blank&layers=1&nav=1&title=watermark-1.drawio&lightbox=1&chrome=0#R5VdNc5swEP01nkkP6fBhsHN0bCc9tDOd8XTSnjoKyKBGICJEwP31XSHJIAP5bi%2FNRehptSvt27dyZv46a645KtIvLMZ05jlxM%2FM3M89z514Ig0QOClnM5wpIOIm1UQfsyG%2BsQUejFYlxaRkKxqgghQ1GLM9xJCwMcc5q22zPqB21QAkeALsI0SF6Q2KRKnTpLTr8EyZJaiK74YVayZAx1jcpUxSzugf525m%2F5owJ9ZU1a0xl8kxe1L6ridXjwTjOxXM2OM0dC%2FNdk9xef7svdouidn%2BeB8rLA6KVvrA%2BrDiYDOAYEqKnjIuUJSxHdNuhl5xVeYxlGAdmnc1nxgoAXQB%2FYSEOml1UCQZQKjKqV%2FcsF2tGGW8j%2BldXDvwBPryivnXJKh7hR%2B6l7eTZext1Yq4xy7DgBzDgmCJBHmyyka6Z5Gh33PqVETgKxFQWQajJ1dU9dxzbhUA8wULv6siBj94xOqil7AX0uSP0hVTolFo8hvcVMwvnZcvECgzcoGjaTJt1%2BErkeIME5hnid9sGR8AYPxMkw6ySXluHjv%2FBBIOzq3hqq4FvuUFq40ztXAyLjFJQtCymOiUC7wrU0ltDU7FLZbIkHjAXuHmUbL3qLW3SfENi3QkcmFRY2hP30pmuD4vZl9LoPUOFebyS7QxmEUVlSSI7L7YGITn88L0%2F%2BSEnHwMz3TT9xc1Bzyazq%2Br4HQTXy3EwkmKDvVGXR%2FqmdKkayECXA0fzE0f%2Bxb8VuD%2BoDMgFuHI8NYRqOFF9dOylnar9%2Fb7tqgOh%2B5Mqlp7nagjUsFSDfJzfGnYxGfak8kHVwq51REmSSyFAlWIIeCm1T%2BDNXumFjMSxepkwdDp027qS9V1InlrmgstZsJG%2BoLWpbti6LgVnd9i8RTnLpZc9ofQEeoc25DonpbUMBm3IHWtD%2Ft9qQ%2BFTr8kowY4ThmME35w50w9EW0WvreTpiI88SZPF%2FLaIwRN3fIFSpoMspoP8J2I5%2FaE1JpbRB%2BUVYoFp96tcNfLufxt%2F%2Bwc%3D](https://user-images.githubusercontent.com/9161438/197964182-b9b78e8c-1940-43c4-b819-ff999bb16796.png)
93+
94+
### The Sort operator in Stream
95+
96+
We now have a bounded out-of-order stream, but some operators can’t handle an unordered stream so we may need the order property on stream:
97+
98+
* OverAgg (SQL Window Function)
99+
* MatchRecognize
100+
* SortAgg
101+
102+
To support these operators, we can introduce the Sort operator.
103+
104+
The Sort operator will buffer all records before the watermark that it received recently and output the ordered stream. **In a database context, the Sort operator is also similar to a Filter, which will filter some incomplete records out. (We can see the records in the future, but at this point, they are absent).**
105+
106+
The following graph shows a simple example of the Sort operator.
107+
108+
Since the output of the Sort operator always outputs an ordered stream, every record can pair with a watermark, but we may still omit some watermarks for performance.
109+
110+
![https://viewer.diagrams.net/?border=0&tags=%7B%7D&highlight=0000ff&edit=_blank&layers=1&nav=1&title=Untitled.drawio&lightbox=1&chrome=0#R7VZLj5swEP41SMlhK4KB0GMeu%2B1hV6oarWiPTnDAXYOpMXn013eMDcQh2Ye27akne76xx3jmm884aJEfPglcZg88Iczx3OTgoKXjeRPfC2FQyFEjU9%2FXQCpoYhb1wIr%2BIgZ0DVrThFTWQsk5k7S0wQ0vCrKRFoaF4Ht72ZYz%2B9QSp2QArDaYDdGYJjLTaORNe%2FwzoWnWnjwJP2pPjtvF5iZVhhO%2BP4HQrYMWgnOpZ%2FlhQZhKXpsXve%2Fuirf7MEEK%2BZoN60f09dtTIOOZeLjbPdbzHb2%2FMdXZYVabC5uPlcc2AySBhBiTC5nxlBeY3fboXPC6SIg6xgWrX3PPeQngBMAfRMqjqS6uJQcokzkz3i0vpHFOArD1N6iDr97VQBWvxYY8c0GvyzRQlPCcSHGEfYIwLOnOjo8NV9JuXbf1C6dwsucaXoeuKaphtd%2FabQiJRUqk2dUXBSYnn9FDTaneUDbvQtlCJk0qrfqFP2vFsLk1S834%2FBbluKmausxgwSQoDxcDraDkbTC4jo5nn7EWLbKut1sCltsc5TrBPHK8hQoP86We7rEkIsfiqVs2HQa6euI5fxkDsVA83WdUklWJG8LsQa9sFpqcEiHJ4XneDflkNqDQ5kVn73vt8FquZCe6EbnXKWiR561MQa9o8CKZKaUEa8NwVdGNnRe7vc97FTIjjt%2BMszG%2BK%2BND51seTp3Lo7Fe3%2BO6l97e4yc5Dy6kvMXeKQXIe0EKtEQNpGAQyPfPuBP8W03xB0yJR%2B7Y9KYePD2EeohHyLh9PQR6iFp3MO462yDT8YB90GzS5htmNC0UGYEIoBRorlqSwpM8M46cJol%2BeAioE143oRSnSpWbJlvBXIkJxIK3pjJsBbOSgj%2BRBWcc4i4LXqgoW8rYOXTxQXqXNJzRBEXBQBnQJWVAf0sZopfrHY8mY6vy8cizSx6P%2FLFV%2FL7qHUvC8f%2FyD%2F4YLtY%2F%2BjP1B7P%2FrdTy0P%2Bco9vf](https://user-images.githubusercontent.com/9161438/197971481-56e25377-914f-41fb-9c3f-7818a9eabec3.png)
111+
112+
### Watermark derivation
113+
114+
We need to derive the watermark during some operators, e.g., for the tumbling window:
115+
116+
For a Tumble(time_col) with a watermark on time_col, we will have the following derivation:
117+
118+
```plain!
119+
W(time_col, t) → W(time_col, t), W(window_start, tumble_start(t)), W(window_end, tumble_end(t))
120+
```
121+
122+
We can get three watermarks over three columns after the projection.
123+
124+
The following operators may have some special watermark derivation rules, which we will discuss in another doc:
125+
126+
* Tumble Window
127+
* Hop Window
128+
* Session Window
129+
* Project with expressions that keep the order (e.g., EXTRACT)
130+
131+
A tumble window example:
132+
133+
Watermark(10:40, order_time) → Tumble(30 minutes)
134+
135+
1. Watermark(10:40, order_time)
136+
2. Watermark(10:30, window_start)
137+
3. Watermark(11:00, window_end)
138+
139+
We will also need to derive the watermark for some operators which have multiple inputs; the typical examples are **Join and Union**. The solution is very easy enough; **the output watermark should be the smaller value of watermarks from upstreams.**
140+
141+
### State cleaning
142+
143+
We can use the watermark to clean the states in many operators, which we will also discuss in another doc, but we can give a simple example here:
144+
145+
```sql
146+
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
147+
FROM TUMBLE(
148+
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
149+
order_time,
150+
INTERVAL '1' HOUR)
151+
GROUP BY window_end, customer_id;
152+
```
153+
154+
In the Aggregation operator, we will get two watermarks, `order_time` and `window_end`, and `window_end` matches the prefix of the grouping key, so when we receive `Watermark(window_end, w)`, we can confirm that we will not receive records whose `window_start` is smaller than `w`, then we can clean the states before that.
155+
156+
Note: we may need Range-Delete here.
157+
158+
### Exchange over watermarks
159+
160+
We need to handle the `Watermark` message correctly, and the strategy is very hard, and @Bugen Zhao will introduce that in another doc.
161+
162+
The core idea of Exchange is **Prop 1.1.** We need also to align the watermarks, but we can achieve that by delaying the watermarks.
163+
164+
## Unresolved questions
165+
166+
* Are there some questions that haven't been resolved in the RFC?
167+
* Can they be resolved in some future RFCs?
168+
* Move some meaningful comments to here.
169+
170+
## Syntax
171+
172+
We can discuss the syntax later, but now we can implement that as a table function and expose the watermarkFilter’s behavior directly.
173+
174+
```plain
175+
WATERMARK(orders, time_column, watermark_strategy_expression)
176+
```
177+
178+
And the concept `watermark_strategy_expression` refers [FlinkSQL](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#watermark). The expression is evaluated for every record and update the watermark if the result greater than the current watermark.
179+
The new syntax is more friendly for flinkSQL user and offers user methods defining their strategy.
180+
181+
```sql
182+
CREATE SOURCE `orders` (
183+
`id` BIGINT,
184+
`order_time` TIMESTAMP,
185+
`price` DECIMAL,
186+
`customer_id` BIGINT,
187+
) WITH (
188+
'connector' = ...,
189+
);
190+
191+
-- normal streaming plan, just for state cleaning
192+
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
193+
FROM TUMBLE(
194+
WATERMARK(orders, order_time, order_time - INTERVAL '1' MINUTE),
195+
order_time,
196+
INTERVAL '1' HOUR)
197+
GROUP BY window_end, customer_id;
198+
199+
-- Using the option `EMIT ON WINDOW CLOSE` on agg to make sink append-only
200+
CREAT SINK AS
201+
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
202+
FROM TUMBLE(
203+
WATERMARK(orders, order_time, order_time - INTERVAL '1' MINUTE),
204+
order_time,
205+
INTERVAL '1' HOUR)
206+
GROUP BY window_end, customer_id EMIT ON WINDOW CLOSE;
207+
208+
-- session window
209+
-- `EMIT ON WINDOW CLOSE` is necessary for `SESSION` due to implementation.
210+
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
211+
FROM SESSION(
212+
WATERMARK(orders, order_time, order_time - INTERVAL '1' MINUTE),
213+
order_time,
214+
customer_id,
215+
INTERVAL '1' HOUR)
216+
GROUP BY window_end, customer_id EMIT ON WINDOW CLOSE;
217+
218+
-- SQL WINDOW FUNCTION, which only use the Watermark table function but not the time window.
219+
-- We allow window functions to be defined with the time attribute column of a stream.
220+
-- In brief, the event time column with a watermark or the omitted processing time.
221+
-- Fraud Detection, more details:
222+
-- https://singularity-data.quip.com/8BJoAulRYblq/Flink-Demo-Fraud-Detection-in-Online-Shopping-Assignee-
223+
create materialized view ALERTS as
224+
SELECT
225+
customer_id,
226+
price,
227+
order_time,
228+
lead(price, 1) over w as next_price,
229+
lead(order_time, 1) over w as next_order_time
230+
FROM
231+
WATERMARK(orders, order_time, order_time - INTERVAL '1' MINUTE),
232+
WHERE
233+
abs(next_price - price) > 10000
234+
AND next_order_time - order_time < INTERVAL '30' SECOND
235+
WINDOW w AS (
236+
PARTITION BY customer_id
237+
ORDER BY order_time
238+
)
239+
```
240+
241+
And `watermark_strategy_expression` offers potential to make user define their own strategy. Some following case is fancy and not very well-defined. You can treat them as a pseudocode which we might implement in future.
242+
243+
```sql
244+
CREATE SOURCE `orders` (
245+
`id` BIGINT,
246+
`order_time` TIMESTAMP,
247+
`price` DECIMAL,
248+
`customer_id` BIGINT,
249+
) WITH (
250+
'connector' = ...,
251+
);
252+
253+
-- normal timeout watermark
254+
with watermarked_orders as (
255+
WATERMARK(orders, order_time, order_time - INTERVAL '1' MINUTE),
256+
)
257+
258+
-- normal timeout watermark with simple check
259+
with watermarked_orders as (
260+
WATERMARK(
261+
orders,
262+
order_time,
263+
CASE
264+
-- we have not determined the design about `PROC_TIME()`
265+
WHEN order_time > PROC_TIME() THEN Null
266+
ELSE order_time - INTERVAL '1' MINUTE
267+
END),
268+
269+
-- normal timeout watermark with removing outliers
270+
with watermarked_orders as (
271+
WATERMARK(
272+
(
273+
-- we have not determined the window funtion on unordered stream
274+
select *,
275+
max(order_time) as win_max_time
276+
OVER(BETWEEN 10 PRECEDING AND CURRENT ROW)
277+
from orders
278+
),
279+
order_time,
280+
CASE
281+
WHEN order_time == win_max_time THEN Null
282+
ELSE order_time - INTERVAL '1' MINUTE
283+
END),
284+
)
285+
```
286+
287+
## Future possibilities
288+
289+
### Should we support extra columns in Sort?
290+
291+
Yes, it definitely works, we can also sort columns prefixed by a watermarked column, and it may be useful for some optimization such as SortAgg and SortMergeJoin.
292+
293+
### GroupWatermark over Source
294+
295+
Flink has a watermark strategy that is aware of the partitioning. The typical use case is that the partitions upstream are skewed heavily but records in every partition are almost ordered. We can introduce a `GroupWatermark` operator for the scenario. We can discuss it later.
296+
297+
### The type of watermark column
298+
299+
In the doc, we require the type of watermark column to be `Timestamp`, but it’s not necessary. The only limitation is `Ord + Sub`. At least, we have to support `Timestamp`, `TimestampZ`, and `int64` (for unix epoch). We can even support `Decimal` easily, but I don’t think it’s meaningful.

0 commit comments

Comments
 (0)