Skip to content

Commit bcedad3

Browse files
author
Eric Fu
authored
add emit_on_window_close (#30)
1 parent 740f878 commit bcedad3

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed

rfcs/0030-emit-on-window-close.md

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
---
2+
feature: emit_on_window_close
3+
authors:
4+
- "Eric Fu"
5+
start_date: "2022/12/16"
6+
---
7+
8+
# The Semantics of EMIT ON WINDOW CLOSE
9+
10+
## Motivation
11+
12+
Let’s clarify the problem we are solving with an example. Providing a 1-hour window aggregation, the 2 behaviors below are both sensible:
13+
14+
- **Emit on updates.** The last window is incomplete and contains partial results
15+
- **Emit on window close.** All the output results are complete windows, which also means it could wait for (window_size + watermark_delay) to show a result
16+
17+
This concept doesn’t have consistent terminology yet, but most streaming systems including Spark and Kafka Stream support it natively, while Flink provides several workarounds to archive equivalent results.
18+
19+
Currently, we are going to support
20+
21+
- **Time-window TVF** without early-fire
22+
- **Over Aggregation** i.e. `OVER WINDOW` with `ORDER BY`
23+
- **Deduplication** (N-th event)
24+
- **Session Window**
25+
- **Pattern Recognition**
26+
27+
Therefore, it’s inevitable to make a clear semantic for `emit on window close`.
28+
29+
## Design
30+
31+
Modern streaming systems follow the data model proposed (summarized) by [The Dataflow Model](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf) paper. The core contribution includes
32+
33+
- A **windowing model** to support unaligned windows on unordered event time
34+
- The **watermark** to deal with unbounded late events
35+
- A **triggering** model to define when (in processing time) to emit the results
36+
37+
However, the dataflow model was defined on programming API. As more and more systems embrace SQL as their user interface, we need a complete definition for streaming SQL, especially for triggering i.e. `emit on window close`.
38+
39+
As we know, SQL is a declarative query language on a **static dataset** aka. relations, based on the theory of relational algebra. SQL has **well-defined** **semantics**, so you can always **determine** the query result semantically and uniquely, no matter using nested-loop join, hash join, or paper and pen.
40+
41+
<aside>
42+
☝ All the occurrences of the word “operator” refer to logical operators since we are talking about semantics rather than implementation.
43+
</aside>
44+
45+
**Definition 1. All events above the watermark from Source constitute the input dataset of Source, denoted as *INPUT* here.**
46+
47+
As most streaming systems did, we assume the watermark must be associated with the source instead of anywhere else.
48+
49+
With this definition, the **static dataset** SQL runs against is now well-defined. In practice, we could place a Materialize Executor right after Source & Watermark Executor to get the ***INPUT*** dataset.
50+
51+
**Definition 2. For `emit on updates` streaming queries, at any time, the streaming results should be the same as the batch SQL query on *INPUT i.e., Q(INPUT)***
52+
53+
This is the consistency model we have been following since day one.
54+
55+
In theory, **all** queries, including the ones with time-window TVF, over window, or session window, must comply with this rule. That is, they must be able to output a reasonable result set when being evaluated as a batch SQL query.
56+
57+
Of course, sometimes, the cases may not be sensible. For example, pattern recognition functions should operate on a full session, but that is the fault of the aggregation function rather than the SQL operators. OR, some cases may be too expensive, so we disallow users to do that.
58+
59+
**Definition 3. For streaming queries with `emit on window close`, the result set becomes append-only, and it would eventually be consistent with *Q(INPUT)***
60+
61+
This rule is induced by the behavior from Flink, the de-facto standard of streaming SQL, and other well-known systems like Spark Structured Streaming.
62+
63+
Given definitions 1, 2, and 3, can we assume our streaming SQL has well-defined semantics? The answer is no. Take this simple window aggregation as an example, if ***Q(INPUT)*** is
64+
65+
```
66+
window_start count
67+
2022-12-16 00:00 1000
68+
2022-12-16 00:01 1000
69+
2022-12-16 00:02 1000
70+
2022-12-16 00:02 500
71+
```
72+
73+
Then, any subset of ***Q(INPUT)*** could be a legitimate output, like
74+
75+
```
76+
window_start count
77+
2022-12-16 00:00 1000
78+
2022-12-16 00:01 1000
79+
2022-12-16 00:02 1000
80+
```
81+
82+
```
83+
window_start count
84+
2022-12-16 00:00 1000
85+
2022-12-16 00:01 1000
86+
```
87+
88+
This inspires us that the triggering condition of window closing must also be well-defined.
89+
90+
**Definition 4. For streaming queries with `emit on window close`, there should be a deterministic trigger condition for those (logical) operators.**
91+
92+
*To formally define a trigger condition: a function that tells whether a row in **Q(INPUT)** should present in the result set or not, given the current **INPUT**.*
93+
94+
Particularly, the `emit on window close` clause alters the behavior of (logical) operators inside the query. It’s a property of (logical) operators rather than the query. We define it on the query level to make it more understandable to users.
95+
96+
![](./images/0030-emit-on-window-close/altered-operator.svg)
97+
98+
Here are some examples of deterministic trigger conditions.
99+
100+
| Feature | Trigger of window closing |
101+
| --- | --- |
102+
| Time-window TVF | Watermark > Window’s upper bound |
103+
| Over Aggregation | Watermark > Frame’s upper bound |
104+
| Deduplication | Watermark > Timestamp of N-th event |
105+
| Session Window | Watermark > last_event_time + max_wait |
106+
| Pattern Recognition | Watermark > last_event_time |
107+
108+
Note that the operators’ behaviors are altered for batch queries equivalently.
109+
110+
- The watermark in batch query can be determined by substituting `MAX(time_column)` into the watermark expression e.g., `MAX(time_column) - interval '5 minutes'`.
111+
- Besides, late events are already filtered out of ***INPUT*** by definition 1.
112+
113+
Therefore, the definition 2 - the streaming results should be the same with ***Q(INPUT)***, also holds for `emit on window close` queries.
114+
115+
**Definition 5. For `emit on window close` streaming queries, at any time, the streaming results should be exactly the same as the SQL query on *INPUT i.e. Q’(INPUT)*, where Q’ is the altered operator of Q under `emit on window close`.**
116+
117+
As a result, we should be able to run an `emit on window close` query with a simple batch SELECT. The result is guaranteed to be exactly the same as the materialized view. Note that this is only in theory; whether to do it depends on the workload is beyond the scope of this document.
118+
119+
## Notes
120+
121+
- As for implementations, our current design seems to be 100% compatible with this proposal. The changes only apply to the semantics (syntax) level.
122+
- According to our definition, the Watermark should **not** be a TVF as proposed in [RFC: The WatermarkFilter and StreamSort operator # Syntax](https://github.com/risingwavelabs/rfcs/blob/005f086e68569bbc054a5eac7d6ff0c20c58a633/rfcs/0002-watermark-filter.md#syntax). Instead, it should be associated with the Source just like Flink.
123+
- By the way, inspired by the structure of Flink’s document, it might be better to consider our features in terms of user-facing features like over aggreagtion, deduplication, window deduplication, etc. rather than SQL syntax. https://github.com/risingwavelabs/rfcs/pull/8
124+
125+
## Discussion
126+
127+
- There may be a better name for `EMIT ON WINDOW CLOSE`. Alternatives: `FINALIZED` (Snowflake), `APPEND ONLY` (Spark SQL), `SUPPRESSED` (Kafka Stream).

0 commit comments

Comments
 (0)