Skip to content

feat(streaming): Watermark in Project Executor #6813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Tracked by #6042
jon-chuang opened this issue Dec 9, 2022 · 2 comments · Fixed by #7279
Closed
Tracked by #6042

feat(streaming): Watermark in Project Executor #6813

jon-chuang opened this issue Dec 9, 2022 · 2 comments · Fixed by #7279
Assignees
Labels
type/feature Type: New feature.

Comments

@jon-chuang
Copy link
Contributor

jon-chuang commented Dec 9, 2022

Project executor needs to handle watermark in a special way:

  • forward a new modified watermark if an expression is an offset (+- literal) of a watermark column
    • We can only guarantee watermark monotonic properties if the expression is a monotonic function. For simplicity, we only allow offsets. Offsets are quite common for time-related columns.
  • Do not foward watermark if it is not included in the project expression

For instance, if there is a project expression Project { [t, t + offset] }, we would forward two watermarks given a single incoming watermark on input column t:
Watermark { col: 0, evict_before: s }, Watermark { col: 1, evict_before: s + offset }

We do not know which of the columns would be pruned away in a future operator, so it is better to include both watermarks.


Note: Project is a SimpleExecutor, we may want to make it non-simple or we have to define an interface in SimpleExecutor for handling watermark (may not be so simple anymore - it can yield 0 - num_output_cols watermark on every watermark, so we must return a Vec<Watermark>).

Related (handling delta expressions with now and watermark from now executor): #6699

@st1page
Copy link
Contributor

st1page commented Dec 19, 2022

The frontend must derive the Project's output Watermark from the input. So it can derive that if a expression can calculate a its watermark by a input watermark and add the information in proto and send to compute node.

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Dec 20, 2022

We can only guarantee watermark monotonic properties if the expression is a monotonic function. For simplicity, we only allow offsets. Offsets are quite common for time-related columns.

Maybe we also need to_timestamp(ms / 1e3), which is the expression to convert milliseconds to timestamptz.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Type: New feature.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants