Skip to content

perf: nexmark q5 #7343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Tracked by #7289
lmatz opened this issue Jan 12, 2023 · 21 comments
Open
Tracked by #7289

perf: nexmark q5 #7343

lmatz opened this issue Jan 12, 2023 · 21 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 12, 2023

Query:

CREATE MATERIALIZED VIEW nexmark_q5
AS
SELECT
    AuctionBids.auction, AuctionBids.num
FROM (
    SELECT
        bid.auction,
	count(*) AS num, 
        window_start AS starttime
    FROM 
        HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
    GROUP BY
    	window_start, 
	bid.auction
) AS AuctionBids
JOIN (
	SELECT
		max(CountBids.num) AS maxn,
		CountBids.starttime_c
	FROM (
		SELECT
			count(*) AS num,
			window_start AS starttime_c
		FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
		GROUP BY
			bid.auction,
			window_start
		) AS CountBids
	GROUP BY 
		CountBids.starttime_c
	) AS MaxBids
ON 
	AuctionBids.starttime = MaxBids.starttime_c AND
	AuctionBids.num >= MaxBids.maxn;

plan:

 StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] }
 └─StreamProject { exprs: [Field(bid, 0:Int32), count, window_start, window_start] }
   └─StreamFilter { predicate: (count >= max(count)) }
     └─StreamHashJoin { type: Inner, predicate: window_start = window_start }
       ├─StreamExchange { dist: HashShard(window_start) }
       | └─StreamProject { exprs: [Field(bid, 0:Int32), count, window_start] }
       |   └─StreamAppendOnlyHashAgg { group_key: [window_start, Field(bid, 0:Int32)], aggs: [count, count] }
       |     └─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
       |       └─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
       |         └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
       |           └─StreamFilter { predicate: (event_type = 2:Int32) }
       |             └─StreamProject { exprs: [event_type, bid, _row_id] }
       |               └─StreamShare { id = 484 }
       |                 └─StreamProject { exprs: [event_type, bid, _row_id] }
       |                   └─StreamFilter { predicate: (event_type = 2:Int32) }
       |                     └─StreamRowIdGen { row_id_index: 4 }
       |                       └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
       └─StreamProject { exprs: [max(count), window_start] }
         └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
           └─StreamExchange { dist: HashShard(window_start) }
             └─StreamProject { exprs: [Field(bid, 0:Int32), window_start, count] }
               └─StreamAppendOnlyHashAgg { group_key: [Field(bid, 0:Int32), window_start], aggs: [count, count] }
                 └─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
                   └─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
                     └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
                       └─StreamFilter { predicate: (event_type = 2:Int32) }
                         └─StreamProject { exprs: [event_type, bid, _row_id] }
                           └─StreamShare { id = 484 }
                             └─StreamProject { exprs: [event_type, bid, _row_id] }
                               └─StreamFilter { predicate: (event_type = 2:Int32) }
                                 └─StreamRowIdGen { row_id_index: 4 }
                                   └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
@github-actions github-actions bot added this to the release-0.1.16 milestone Jan 12, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Jan 12, 2023

query:

create materialized view AuctionBids
as
SELECT
        bid.auction,
	count(*) AS num,
        window_start AS starttime
    FROM
        HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
    GROUP BY
    	window_start,
	bid.auction;

plan:

 StreamMaterialize { columns: [maxn, starttime_c], pk_columns: [starttime_c] }
 └─StreamProject { exprs: [max(count), window_start] }
   └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
     └─StreamExchange { dist: HashShard(window_start) }
       └─StreamProject { exprs: [Field(bid, 0:Int32), window_start, count] }
         └─StreamAppendOnlyHashAgg { group_key: [Field(bid, 0:Int32), window_start], aggs: [count, count] }
           └─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
             └─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
               └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
                 └─StreamFilter { predicate: (event_type = 2:Int32) }
                   └─StreamRowIdGen { row_id_index: 4 }
                     └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

@lmatz
Copy link
Contributor Author

lmatz commented Jan 12, 2023

query:

create materialized view MaxBids
as
SELECT
		max(CountBids.num) AS maxn,
		CountBids.starttime_c
	FROM (
		SELECT
			count(*) AS num,
			window_start AS starttime_c
		FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
		GROUP BY
			bid.auction,
			window_start
		) AS CountBids
	GROUP BY
		CountBids.starttime_c;

plan:

 StreamMaterialize { columns: [maxn, starttime_c], pk_columns: [starttime_c] }
 └─StreamProject { exprs: [max(count), window_start] }
   └─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
     └─StreamExchange { dist: HashShard(window_start) }
       └─StreamProject { exprs: [Field(bid, 0:Int32), window_start, count] }
         └─StreamAppendOnlyHashAgg { group_key: [Field(bid, 0:Int32), window_start], aggs: [count, count] }
           └─StreamExchange { dist: HashShard(Field(bid, 0:Int32), window_start) }
             └─StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
               └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }
                 └─StreamFilter { predicate: (event_type = 2:Int32) }
                   └─StreamRowIdGen { row_id_index: 4 }
                     └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }

@lmatz
Copy link
Contributor Author

lmatz commented Jan 12, 2023

It seems StreamShare can be lifted up by two operators because

StreamHopWindow { time_col: Field(bid, 5:Int32), slide: 00:00:02, size: 00:00:10, output: [Field(bid, 0:Int32), window_start, _row_id] }
               └─StreamProject { exprs: [Field(bid, 0:Int32), Field(bid, 5:Int32), _row_id] }

above it is the same for the two subqueries?


Edit:
three, even the StreamExchange.

@chenzl25
Copy link
Contributor

Yes, it is an advanced feature about common sub-plan detection. Currently, we only plan to support shared source, cte, view and subquery domain. Anyway we can support it if necessary. @wsx-ucb seems interested in this feature.

@BugenZhao
Copy link
Member

Would you mind also pasting the plan of Flink SQL here as the comparison? 🥺

@lmatz
Copy link
Contributor Author

lmatz commented Jan 13, 2023

Query:

SELECT AuctionBids.auction, AuctionBids.num
 FROM (
   SELECT
     B1.auction,
     count(*) AS num,
     HOP_START(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
     HOP_END(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime
   FROM bid B1
   GROUP BY
     B1.auction,
     HOP(B1.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
 ) AS AuctionBids
 JOIN (
   SELECT
     max(CountBids.num) AS maxn,
     CountBids.starttime,
     CountBids.endtime
   FROM (
     SELECT
       count(*) AS num,
       HOP_START(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS starttime,
       HOP_END(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND) AS endtime
     FROM bid B2
     GROUP BY
       B2.auction,
       HOP(B2.dateTime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
     ) AS CountBids
   GROUP BY CountBids.starttime, CountBids.endtime
 ) AS MaxBids
 ON AuctionBids.starttime = MaxBids.starttime AND
    AuctionBids.endtime = MaxBids.endtime AND
    AuctionBids.num >= MaxBids.maxn;

Optimized Physical Plan:

Calc(select=[auction, num])
+- Join(joinType=[InnerJoin], where=[AND(=(starttime, starttime0), =(endtime, endtime0), >=(num, maxn))], select=[auction, num, starttime, endtime, maxn, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[starttime, endtime]])
   :  +- Calc(select=[$f0 AS auction, num, w$start AS starttime, w$end AS endtime])
   :     +- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
   :        +- Exchange(distribution=[hash[$f0]])
   :           +- Calc(select=[bid.auction AS $f0, dateTime], where=[=(event_type, 2)])
   :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
   :                 +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
   :                    +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[starttime, endtime]])
      +- Calc(select=[maxn, starttime, endtime])
         +- GroupAggregate(groupBy=[starttime, endtime], select=[starttime, endtime, MAX(num) AS maxn])
            +- Exchange(distribution=[hash[starttime, endtime]])
               +- Calc(select=[w$start AS starttime, w$end AS endtime, num])
                  +- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
                     +- Exchange(distribution=[hash[$f0]])
                        +- Calc(select=[bid.auction AS $f0, dateTime], where=[=(event_type, 2)])
                           +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                              +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                                 +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

Optimized Execution Plan:

Calc(select=[auction, num])
+- Join(joinType=[InnerJoin], where=[((starttime = starttime0) AND (endtime = endtime0) AND (num >= maxn))], select=[auction, num, starttime, endtime, maxn, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
   :- Exchange(distribution=[hash[starttime, endtime]])
   :  +- Calc(select=[$f0 AS auction, num, w$start AS starttime, w$end AS endtime])
   :     +- GroupWindowAggregate(groupBy=[$f0], window=[SlidingGroupWindow('w$, dateTime, 10000, 2000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, COUNT(*) AS num, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])(reuse_id=[1])
   :        +- Exchange(distribution=[hash[$f0]])
   :           +- Calc(select=[bid.auction AS $f0, dateTime], where=[(event_type = 2)])
   :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
   :                 +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
   :                    +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[starttime, endtime]])
      +- Calc(select=[maxn, starttime, endtime])
         +- GroupAggregate(groupBy=[starttime, endtime], select=[starttime, endtime, MAX(num) AS maxn])
            +- Exchange(distribution=[hash[starttime, endtime]])
               +- Calc(select=[w$start AS starttime, w$end AS endtime, num])
                  +- Reused(reference_id=[1])

Some differences:

  1. It merges the final filter into the hash join. Notice that at https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/hash_join.rs#L225, RW's hash join is able to accept a non-equal condition. Wonder what's the criteria to determine using a separate filter on top of join or a non-equal hash join?
  2. share lifted up, so the common sub-plan includes more operators.
  3. It has one more exchange based on the hash value of bid.auction. RW does not have this exchange, only the exchange based on the hash value of both bid.auction and window_start.
  4. one subquery(below one) exchanges twice based on the hash value of starttime and endtime. Seems unnecessary?

The explanation for the point (3) above is that:

  1. the query is different.... RW didn't group by both starttime and endtime.....
  2. but the thing is since they use the same window, so one of them can indeed be removed

@st1page
Copy link
Contributor

st1page commented Feb 22, 2023

It merges the final filter into the hash join. Notice that at https://github.com/risingwavelabs/risingwave/blob/main/src/stream/src/executor/hash_join.rs#L225, RW's hash join is able to accept a non-equal condition. Wonder what's the criteria to determine using a separate filter on top of join or a non-equal hash join?

we think the Filter operator can process more efficiently because of the vectorized execution.

@st1page
Copy link
Contributor

st1page commented Feb 22, 2023

Another difference is that Flink adds a watermark definition on the column date_time and uses the GroupWindowAggregate which is more like our EMIT ON WINDOW CLOSE query. So the first aggregation can buffer the data until the window has been closed and emit an append-only stream, and then the second aggregation can be more efficient with an append-only input.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 22, 2023

Exactly, just realized today and put the watermark into the tracking issue.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 22, 2023

we think the Filter operator can process more efficiently because of the vectorized execution.

may worth benchmarking once, wonder if having another operator would mess up the cache, I honestly don't know

@st1page
Copy link
Contributor

st1page commented Feb 23, 2023

very similar with #7244 (comment)
rewriten SQL:

  SELECT
      B.auction,
      B.num,
  FROM (
      SELECT
          auction,
          num,
          /*use rank here to express top-N with ties*/
          rank() over (partition by starttime order by num desc) as num_rank
      FROM (
          SELECT bid.auction, count(*) AS num, window_start AS starttime
          FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
          GROUP BY window_start, bid.auction
      )
  ) B
  WHERE price_rank <=1;

@lmatz
Copy link
Contributor Author

lmatz commented Feb 23, 2023

Good point, let me add it into the performance dashboard tomorrow

@st1page
Copy link
Contributor

st1page commented Mar 20, 2023

There is a HOP window that takes 5 times data amplification. We use an individual hop executor to expand the data in chunks but FlinkSQL does handle the time window in the aggregation executor which maybe does not need to expand the data physically. We might need to do operator fusion between hop and other operators.
However, after an offline discussion with @TennyZhuang , we are not sure how much the overhead is. Because the HopExecutor does not do so many the Physical copy( because we use the Arc of ArrayImpl)

@BugenZhao
Copy link
Member

There is a HOP window that takes 5 times data amplification. We use an individual hop executor to expand the data in chunks but FlinkSQL does handle the time window in the aggregation executor which maybe does not need to expand the data physically. We might need to do operator fusion between hop and other operators.
However, after an offline discussion with @TennyZhuang , we are not sure how much the overhead is. Because the HopExecutor does not do so many the Physical copy( because we use the Arc of ArrayImpl)

Will Flink also accelerate computation by reusing the aggregation results between different windows?

@st1page
Copy link
Contributor

st1page commented Mar 20, 2023

Will Flink also accelerate computation by reusing the aggregation results between different windows?

A kind of 2-phase Agg optimization? 🥵
Before:

SELECT
    count(*) AS num,
    window_start AS starttime_c
FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start

After:

with partial_agg as (
    SELECT
        sum(*) AS partial_num,
        window_start AS tumble_window_start
    FROM TUMBLE(bid, bid.date_time, INTERVAL '2' SECOND)
    GROUP BY bid.auction, window_start
)
SELECT
    sum(partial_num) AS num,
    window_start AS starttime_c
FROM HOP(partial_agg, tumble_window_start, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
GROUP BY bid.auction, window_start

c.c. @fuyufjh @chenzl25 @TennyZhuang @liurenjie1024 @BugenZhao

@liurenjie1024
Copy link
Contributor

IIUC, flink has no such optimization since it introduces another state and exchange, and in fact the computing doesn't is not as heavy as accessing state. In streaming 2 phase agg works best when it has data skew, which is hard to predict. If there is no obvious data skew, simple increase aggregation parallelism is good enough.

@st1page
Copy link
Contributor

st1page commented Mar 20, 2023

since it introduces another state and exchange

the distribution will be the same(sharded by bid.auction) if we adopt #3255.

in fact the computing doesn't is not as heavy as accessing state

I think this optimization can reduce the accessing count of the state because we will do the operations on the tumble window agg before the data amplification because of the HopWindow.

@TennyZhuang
Copy link
Contributor

Checkout the Flink code, I found that they also generate multiple rows.

image

The method replace is evaluated lazily, so no actual data copy happened. We achieve the same goal by sharing the data columns using Arc.

@st1page
Copy link
Contributor

st1page commented Mar 21, 2023

current plan after the common plan rewriting optimization.

    StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [auction, window_start, window_start#1], pk_conflict: "no check" }
    └─StreamProject { exprs: [bid.auction, count, window_start, window_start] }
      └─StreamFilter { predicate: (count >= max(count)) }
        └─StreamHashJoin { type: Inner, predicate: window_start = window_start, output: all }
          ├─StreamExchange { dist: HashShard(window_start) }
          | └─StreamProject { exprs: [bid.auction, count, window_start] }
          |   └─StreamShare { id = 8 }
          |     └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
          |       └─StreamExchange { dist: HashShard(bid.auction, window_start) }
          |         └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
          |           └─StreamFilter { predicate: IsNotNull(bid.date_time) }
          |             └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
          └─StreamProject { exprs: [max(count), window_start] }
            └─StreamHashAgg { group_key: [window_start], aggs: [max(count), count] }
              └─StreamExchange { dist: HashShard(window_start) }
                └─StreamProject { exprs: [bid.auction, window_start, count] }
                  └─StreamShare { id = 8 }
                    └─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
                      └─StreamExchange { dist: HashShard(bid.auction, window_start) }
                        └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
                          └─StreamFilter { predicate: IsNotNull(bid.date_time) }
                            └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }

@st1page
Copy link
Contributor

st1page commented Mar 21, 2023

Analyzation for the EMIT ON CLOSE plan's performance

  1. with the EMIT ON CLOSE property in the agg executor, it can produce the append-only stream which will make the following executor can process better. This optimization is extremely important in the second Agg because our updatable Max() aggregator is much worse than the append-only version.
  2. the EMIT ON WINDOW CLOSE aggregation will reduce its outputting operations to the number of the time windows. Currently, we have done the batching with epoch( agg only emits the results when a barrier comes). But if a time window's data cross multiple epoch, the operations will amplify with the "number_of_acrossed_epoch" times.
Current plan:
            SourceExec
                │
                │
                ▼
             HopExec
                │
                │
                ▼
     Append-only AggExec Count()
       group by           
         window_start, auction
       Output Updatable Stream
                │
                │ ──────────────────┐
                ▼                   │
         AggExec Max()              │
           group by                 │
             window_start           │
           Output Updatable Stream  │ 
                │                   │
                │                   │
                └───────────────────┘
     JoinExec   ▼
       join key: window_start


(Ideal EMIT ON WINDOW CLOSE PLAN)
            SourceExec
                │
                │
                ▼
             HopExec  
                │
                │
                ▼
     Append-only AggExec Count()
      group by           
         window_start, auction
       EMIT ON WINDOW CLOSE
       Output AppendOnlyStream
                │
                │ ─────────────┐
                ▼              │
     Append-onlygAggExecx()    │
       group by                │
         window_start          │
       EMIT ON WINDOW CLOSE    │
       Output AppendOnlyStream │
                │              │
                │              │
                ▼──────────────┘
     Append-onlyJoin
       join key: window_start

@liurenjie1024
Copy link
Contributor

liurenjie1024 commented Mar 21, 2023

the distribution will be the same(sharded by bid.auction) if we adopt #3255.

Do you need to maintain states in first agg?

I think this optimization can reduce the accessing count of the state because we will do the operations on the tumble window agg before the data amplification because of the HopWindow.

True for second agg.

IIUC, flink agg executor has some optimization to buffer update before actually accessing rocksdb to avoid ser/de cost.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants