-
Notifications
You must be signed in to change notification settings - Fork 643
perf: nexmark q5 #7343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
query:
plan:
|
query:
plan:
|
It seems
above it is the same for the two subqueries? Edit: |
Yes, it is an advanced feature about common sub-plan detection. Currently, we only plan to support shared source, cte, view and subquery domain. Anyway we can support it if necessary. @wsx-ucb seems interested in this feature. |
Would you mind also pasting the plan of Flink SQL here as the comparison? 🥺 |
Query:
Optimized Physical Plan:
Optimized Execution Plan:
Some differences:
The explanation for the point (3) above is that:
|
we think the Filter operator can process more efficiently because of the vectorized execution. |
Another difference is that Flink adds a watermark definition on the column |
Exactly, just realized today and put the watermark into the tracking issue. |
may worth benchmarking once, wonder if having another operator would mess up the cache, I honestly don't know |
very similar with #7244 (comment) SELECT
B.auction,
B.num,
FROM (
SELECT
auction,
num,
/*use rank here to express top-N with ties*/
rank() over (partition by starttime order by num desc) as num_rank
FROM (
SELECT bid.auction, count(*) AS num, window_start AS starttime
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY window_start, bid.auction
)
) B
WHERE price_rank <=1; |
Good point, let me add it into the performance dashboard tomorrow |
There is a HOP window that takes 5 times data amplification. We use an individual hop executor to expand the data in chunks but FlinkSQL does handle the time window in the aggregation executor which maybe does not need to expand the data physically. We might need to do operator fusion between hop and other operators. |
Will Flink also accelerate computation by reusing the aggregation results between different windows? |
A kind of 2-phase Agg optimization? 🥵 SELECT
count(*) AS num,
window_start AS starttime_c
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start After: with partial_agg as (
SELECT
sum(*) AS partial_num,
window_start AS tumble_window_start
FROM TUMBLE(bid, bid.date_time, INTERVAL '2' SECOND)
GROUP BY bid.auction, window_start
)
SELECT
sum(partial_num) AS num,
window_start AS starttime_c
FROM HOP(partial_agg, tumble_window_start, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start c.c. @fuyufjh @chenzl25 @TennyZhuang @liurenjie1024 @BugenZhao |
IIUC, flink has no such optimization since it introduces another state and exchange, and in fact the computing doesn't is not as heavy as accessing state. In streaming 2 phase agg works best when it has data skew, which is hard to predict. If there is no obvious data skew, simple increase aggregation parallelism is good enough. |
the distribution will be the same(sharded by
I think this optimization can reduce the accessing count of the state because we will do the operations on the tumble window agg before the data amplification because of the HopWindow. |
current plan after the common plan rewriting optimization.
|
Analyzation for the EMIT ON CLOSE plan's performance
|
Do you need to maintain states in first agg?
True for second agg. IIUC, flink agg executor has some optimization to buffer update before actually accessing rocksdb to avoid ser/de cost. |
Uh oh!
There was an error while loading. Please reload this page.
Query:
plan:
The text was updated successfully, but these errors were encountered: