Skip to content

Commit 7a78054

Browse files
committed
rename the RFC file
Signed-off-by: TennyZhuang <[email protected]>
1 parent 18f7833 commit 7a78054

File tree

2 files changed

+236
-236
lines changed

2 files changed

+236
-236
lines changed

0000-template.md

Lines changed: 11 additions & 225 deletions
Original file line numberDiff line numberDiff line change
@@ -1,250 +1,36 @@
11
---
2-
feature: watermark_filter
2+
feature: my_excited_feature
33
authors:
44
- "TennyZhuang"
5-
- "st1page"
6-
- "BugenZhao"
7-
start_date: "2022/10/20"
5+
start_date: "2022/10/24"
86
---
97

10-
# The WatermarkFilter and StreamSort operator
8+
# My Excited Feature
9+
10+
Please feel free to add or remove sections.
1111

1212
## Summary
1313

14-
We will introduce the watermark strategy in the doc. The main changes are two new oprators, `WatermarkFilter` and `StreamSort`.
14+
Explain the feature in short.
1515

1616
## Motivation
1717

18-
We have the following purposes:
19-
20-
* Support some operators which can only accept ordered input.
21-
* SessionWindow
22-
* OverAgg
23-
* MatchRecognize
24-
* Support sink a complete result set to some append-only warehouse.
25-
* State cleaning
18+
Why are you want to introduce the feature?
2619

2720
## Design
2821

29-
### The watermark message
30-
31-
```rust
32-
pub enum Message {
33-
Chunk(StreamChunk),
34-
Barrier(Barrier),
35-
Watermark(col_idx, Timestamp),
36-
}
37-
```
38-
39-
**Term 1.1: Every record has multiple watermark columns with the timestamp type.**
40-
41-
**Term 1.2: If the order of a data record *d* in the stream is after another watermark record *w*, then *d[watermark_col] > w.val*.**
42-
43-
Based on Term 1.2, we can get a noteworthy property:
44-
45-
**Prop 1.1: We can postpone or remove any watermark record in a valid stream.**
46-
47-
The next question: Where does the watermark message come from?
48-
49-
### The WatermarkFilter operator
50-
51-
For simplicity, we chose to support only the append-only stream as the input, we can discuss the retractable stream later.
52-
53-
We will introduce a new operator, `WatermarkFilter`, which will filter the outdated records and output some `Watermark` messages **over one column**.
54-
55-
The design is highly different from [[Deprecated] RFC: Watermark in RisingWave (WaterMark part I)](https://www.notion.so/Deprecated-RFC-Watermark-in-RisingWave-WaterMark-part-I-b97ed46b310549ae921abe61e1f30226), we will not buffer the out-of-order records, and no orders are guaranteed on the output stream.
56-
57-
```python
58-
watermark = recovered.or(0)
59-
watermark_col: int = user_defined()
60-
def process(msg):
61-
match msg:
62-
# We use a row-level representation instead of the Chunk in the pseudo-code.
63-
case Record(rec):
64-
rec_event_time = rec[watermark_col]
65-
if rec_event_time < watermark:
66-
# Filter the outdated records.
67-
return;
68-
if rec_event_time - timeout > watermark:
69-
watermark = rec_event_time - timeout
70-
# Send a watermark message, optionally
71-
emit(Watermark(watermark))
72-
emit(Chunk(chunk))
73-
case Barrier(barrier):
74-
if barrier.checkpoint:
75-
# Watermark should be checkpointed.
76-
checkpoint(watermark)
77-
emit(Barrier(barrier))
78-
case Watermark(upper_col_idx, w):
79-
# Watermark from the upstream
80-
emit(Watermark(upper_col_idx, w))
81-
```
82-
83-
The WatermarkFilter mainly does the following two things:
84-
85-
1. Insert some watermarks to a message stream, which satisfies Term 1.2.
86-
2. Filtered out some outdated records.
87-
88-
The following graph shows a simple example; 3 and 7 are filtered due to outdated.
89-
90-
And multiple watermarks are produced in the output stream (Note, we can omit some of them for performance).
91-
92-
![https://viewer.diagrams.net/?border=0&tags=%7B%7D&highlight=0000ff&edit=_blank&layers=1&nav=1&title=watermark-1.drawio&lightbox=1&chrome=0#R5VdNc5swEP01nkkP6fBhsHN0bCc9tDOd8XTSnjoKyKBGICJEwP31XSHJIAP5bi%2FNRehptSvt27dyZv46a645KtIvLMZ05jlxM%2FM3M89z514Ig0QOClnM5wpIOIm1UQfsyG%2BsQUejFYlxaRkKxqgghQ1GLM9xJCwMcc5q22zPqB21QAkeALsI0SF6Q2KRKnTpLTr8EyZJaiK74YVayZAx1jcpUxSzugf525m%2F5owJ9ZU1a0xl8kxe1L6ridXjwTjOxXM2OM0dC%2FNdk9xef7svdouidn%2BeB8rLA6KVvrA%2BrDiYDOAYEqKnjIuUJSxHdNuhl5xVeYxlGAdmnc1nxgoAXQB%2FYSEOml1UCQZQKjKqV%2FcsF2tGGW8j%2BldXDvwBPryivnXJKh7hR%2B6l7eTZext1Yq4xy7DgBzDgmCJBHmyyka6Z5Gh33PqVETgKxFQWQajJ1dU9dxzbhUA8wULv6siBj94xOqil7AX0uSP0hVTolFo8hvcVMwvnZcvECgzcoGjaTJt1%2BErkeIME5hnid9sGR8AYPxMkw6ySXluHjv%2FBBIOzq3hqq4FvuUFq40ztXAyLjFJQtCymOiUC7wrU0ltDU7FLZbIkHjAXuHmUbL3qLW3SfENi3QkcmFRY2hP30pmuD4vZl9LoPUOFebyS7QxmEUVlSSI7L7YGITn88L0%2F%2BSEnHwMz3TT9xc1Bzyazq%2Br4HQTXy3EwkmKDvVGXR%2FqmdKkayECXA0fzE0f%2Bxb8VuD%2BoDMgFuHI8NYRqOFF9dOylnar9%2Fb7tqgOh%2B5Mqlp7nagjUsFSDfJzfGnYxGfak8kHVwq51REmSSyFAlWIIeCm1T%2BDNXumFjMSxepkwdDp027qS9V1InlrmgstZsJG%2BoLWpbti6LgVnd9i8RTnLpZc9ofQEeoc25DonpbUMBm3IHWtD%2Ft9qQ%2BFTr8kowY4ThmME35w50w9EW0WvreTpiI88SZPF%2FLaIwRN3fIFSpoMspoP8J2I5%2FaE1JpbRB%2BUVYoFp96tcNfLufxt%2F%2Bwc%3D](https://user-images.githubusercontent.com/9161438/197964182-b9b78e8c-1940-43c4-b819-ff999bb16796.png)
93-
94-
### The Sort operator in Stream
95-
96-
We now have a bounded out-of-order stream, but some operators can’t handle an unordered stream so we may need the order property on stream:
97-
98-
* OverAgg (SQL Window Function)
99-
* MatchRecognize
100-
* SortAgg
101-
102-
To support these operators, we can introduce the Sort operator.
103-
104-
The Sort operator will buffer all records before the watermark that it received recently and output the ordered stream. **In a database context, the Sort operator is also similar to a Filter, which will filter some incomplete records out. (We can see the records in the future, but at this point, they are absent).**
105-
106-
The following graph shows a simple example of the Sort operator.
107-
108-
Since the output of the Sort operator always outputs an ordered stream, every record can pair with a watermark, but we may still omit some watermarks for performance.
109-
110-
![https://viewer.diagrams.net/?border=0&tags=%7B%7D&highlight=0000ff&edit=_blank&layers=1&nav=1&title=Untitled.drawio&lightbox=1&chrome=0#R7VZLj5swEP41SMlhK4KB0GMeu%2B1hV6oarWiPTnDAXYOpMXn013eMDcQh2Ye27akne76xx3jmm884aJEfPglcZg88Iczx3OTgoKXjeRPfC2FQyFEjU9%2FXQCpoYhb1wIr%2BIgZ0DVrThFTWQsk5k7S0wQ0vCrKRFoaF4Ht72ZYz%2B9QSp2QArDaYDdGYJjLTaORNe%2FwzoWnWnjwJP2pPjtvF5iZVhhO%2BP4HQrYMWgnOpZ%2FlhQZhKXpsXve%2Fuirf7MEEK%2BZoN60f09dtTIOOZeLjbPdbzHb2%2FMdXZYVabC5uPlcc2AySBhBiTC5nxlBeY3fboXPC6SIg6xgWrX3PPeQngBMAfRMqjqS6uJQcokzkz3i0vpHFOArD1N6iDr97VQBWvxYY8c0GvyzRQlPCcSHGEfYIwLOnOjo8NV9JuXbf1C6dwsucaXoeuKaphtd%2FabQiJRUqk2dUXBSYnn9FDTaneUDbvQtlCJk0qrfqFP2vFsLk1S834%2FBbluKmausxgwSQoDxcDraDkbTC4jo5nn7EWLbKut1sCltsc5TrBPHK8hQoP86We7rEkIsfiqVs2HQa6euI5fxkDsVA83WdUklWJG8LsQa9sFpqcEiHJ4XneDflkNqDQ5kVn73vt8FquZCe6EbnXKWiR561MQa9o8CKZKaUEa8NwVdGNnRe7vc97FTIjjt%2BMszG%2BK%2BND51seTp3Lo7Fe3%2BO6l97e4yc5Dy6kvMXeKQXIe0EKtEQNpGAQyPfPuBP8W03xB0yJR%2B7Y9KYePD2EeohHyLh9PQR6iFp3MO462yDT8YB90GzS5htmNC0UGYEIoBRorlqSwpM8M46cJol%2BeAioE143oRSnSpWbJlvBXIkJxIK3pjJsBbOSgj%2BRBWcc4i4LXqgoW8rYOXTxQXqXNJzRBEXBQBnQJWVAf0sZopfrHY8mY6vy8cizSx6P%2FLFV%2FL7qHUvC8f%2FyD%2F4YLtY%2F%2BjP1B7P%2FrdTy0P%2Bco9vf](https://user-images.githubusercontent.com/9161438/197971481-56e25377-914f-41fb-9c3f-7818a9eabec3.png)
111-
112-
### Watermark derivation
113-
114-
We need to derive the watermark during some operators, e.g., for the tumbling window:
115-
116-
For a Tumble(time_col) with a watermark on time_col, we will have the following derivation:
117-
118-
```plain!
119-
W(time_col, t) → W(time_col, t), W(window_start, tumble_start(t)), W(window_end, tumble_end(t))
120-
```
121-
122-
We can get three watermarks over three columns after the projection.
123-
124-
The following operators may have some special watermark derivation rules, which we will discuss in another doc:
125-
126-
* Tumble Window
127-
* Hop Window
128-
* Session Window
129-
* Project with expressions that keep the order (e.g., EXTRACT)
130-
131-
A tumble window example:
132-
133-
Watermark(10:40, order_time) → Tumble(30 minutes)
134-
135-
1. Watermark(10:40, order_time)
136-
2. Watermark(10:30, window_start)
137-
3. Watermark(11:00, window_end)
138-
139-
We will also need to derive the watermark for some operators which have multiple inputs; the typical examples are **Join and Union**. The solution is very easy enough; **the output watermark should be the smaller value of watermarks from upstreams.**
140-
141-
### State cleaning
142-
143-
We can use the watermark to clean the states in many operators, which we will also discuss in another doc, but we can give a simple example here:
144-
145-
```sql
146-
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
147-
FROM TUMBLE(
148-
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
149-
order_time,
150-
INTERVAL '1' HOUR)
151-
GROUP BY window_end, customer_id;
152-
```
153-
154-
In the Aggregation operator, we will get two watermarks, `order_time` and `window_end`, and `window_end` matches the prefix of the grouping key, so when we receive `Watermark(window_end, w)`, we can confirm that we will not receive records whose `window_start` is smaller than `w`, then we can clean the states before that.
155-
156-
Note: we may need Range-Delete here.
157-
158-
### Exchange over watermarks
159-
160-
We need to handle the `Watermark` message correctly, and the strategy is very hard, and @Bugen Zhao will introduce that in another doc.
161-
162-
The core idea of Exchange is **Prop 1.1.** We need also to align the watermarks, but we can achieve that by delaying the watermarks.
22+
Explain the feature in detail.
16323

16424
## Unresolved questions
16525

16626
* Are there some questions that haven't been resolved in the RFC?
16727
* Can they be resolved in some future RFCs?
16828
* Move some meaningful comments to here.
16929

170-
## Syntax
171-
172-
We can discuss the syntax later, but now we can implement that as a table function and expose the watermarkFilter’s behavior directly.
173-
174-
```plain
175-
WATERMARK(orders, time_column, timeout: interval)
176-
```
177-
178-
```sql
179-
CREATE SOURCE `orders` (
180-
`id` BIGINT,
181-
`order_time` TIMESTAMP,
182-
`price` DECIMAL,
183-
`customer_id` BIGINT,
184-
) WITH (
185-
'connector' = ...,
186-
);
187-
188-
-- normal streaming plan, just for state cleaning
189-
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
190-
FROM TUMBLE(
191-
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
192-
order_time,
193-
INTERVAL '1' HOUR)
194-
GROUP BY window_end, customer_id;
30+
## Alternatives
19531

196-
-- Using the option `EMIT ON WINDOW CLOSE` on agg to make sink append-only
197-
CREAT SINK AS
198-
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
199-
FROM TUMBLE(
200-
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
201-
order_time,
202-
INTERVAL '1' HOUR)
203-
GROUP BY window_end, customer_id EMIT ON WINDOW CLOSE;
204-
205-
-- session window
206-
-- `EMIT ON WINDOW CLOSE` is necessary for `SESSION` due to implementation.
207-
SELECT customer_id, window_end as order_hour, SUM(price) as sum_price
208-
FROM SESSION(
209-
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
210-
order_time,
211-
customer_id,
212-
INTERVAL '1' HOUR)
213-
GROUP BY window_end, customer_id EMIT ON WINDOW CLOSE;
214-
215-
-- SQL WINDOW FUNCTION, which only use the Watermark table function but not the time window.
216-
-- We allow window functions to be defined with the time attribute column of a stream.
217-
-- In brief, the event time column with a watermark or the omitted processing time.
218-
-- Fraud Detection, more details:
219-
-- https://singularity-data.quip.com/8BJoAulRYblq/Flink-Demo-Fraud-Detection-in-Online-Shopping-Assignee-
220-
create materialized view ALERTS as
221-
SELECT
222-
customer_id,
223-
price,
224-
order_time,
225-
lead(price, 1) over w as next_price,
226-
lead(order_time, 1) over w as next_order_time
227-
FROM
228-
WATERMARK(orders, order_time, INTERVAL '1' MINUTE),
229-
WHERE
230-
abs(next_price - price) > 10000
231-
AND next_order_time - order_time < INTERVAL '30' SECOND
232-
WINDOW w AS (
233-
PARTITION BY customer_id
234-
ORDER BY order_time
235-
)
236-
```
32+
What other designs have been considered and what is the rationale for not choosing them?
23733

23834
## Future possibilities
23935

240-
### Should we support extra columns in Sort?
241-
242-
Yes, it definitely works, we can also sort columns prefixed by a watermarked column, and it may be useful for some optimization such as SortAgg and SortMergeJoin.
243-
244-
### GroupWatermark over Source
245-
246-
Flink has a watermark strategy that is aware of the partitioning. The typical use case is that the partitions upstream are skewed heavily but records in every partition are almost ordered. We can introduce a `GroupWatermark` operator for the scenario. We can discuss it later.
247-
248-
### The type of watermark column
249-
250-
In the doc, we require the type of watermark column to be `Timestamp`, but it’s not necessary. The only limitation is `Ord + Sub`. At least, we have to support `Timestamp`, `TimestampZ`, and `int64` (for unix epoch). We can even support `Decimal` easily, but I don’t think it’s meaningful.
36+
Some potential extensions or optimizations can be done in the future based on the RFC.

0 commit comments

Comments
 (0)