Skip to content

perf: nexmark q104 #8166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Tracked by #7289
lmatz opened this issue Feb 23, 2023 · 4 comments
Closed
Tracked by #7289

perf: nexmark q104 #8166

lmatz opened this issue Feb 23, 2023 · 4 comments
Labels
type/perf Type: Performance.
Milestone

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 23, 2023

Query:

CREATE MATERIALIZED VIEW nexmark_q104
AS
SELECT
    a.id AS auction_id,
    a.item_name AS auction_item_name
FROM auction a
WHERE a.id NOT IN (
    SELECT b.auction FROM bid b
    GROUP BY b.auction
    HAVING COUNT(*) < 20
);

RW:

 StreamMaterialize { columns: [auction_id, auction_item_name, _row_id(hidden)], pk_columns: [_row_id, auction_id], pk_conflict: "no check" }
 └─StreamHashJoin { type: LeftAnti, predicate: $expr1 = $expr3 }
   ├─StreamExchange { dist: HashShard($expr1) }
   | └─StreamProject { exprs: [Field(auction, 0:Int32) as $expr1, Field(auction, 1:Int32) as $expr2, _row_id] }
   |   └─StreamFilter { predicate: (event_type = 1:Int32) }
   |     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |       └─StreamShare { id = 547 }
   |         └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |           └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
   |             └─StreamRowIdGen { row_id_index: 4 }
   |               └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
   └─StreamProject { exprs: [$expr3] }
     └─StreamFilter { predicate: (count < 20:Int32) }
       └─StreamProject { exprs: [$expr3, count] }
         └─StreamAppendOnlyHashAgg { group_key: [$expr3], aggs: [count, count] }
           └─StreamExchange { dist: HashShard($expr3) }
             └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, _row_id] }
               └─StreamFilter { predicate: (event_type = 2:Int32) }
                 └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                   └─StreamShare { id = 547 }
                     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                       └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
                         └─StreamRowIdGen { row_id_index: 4 }
                           └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(24 rows)
 Fragment 0
   StreamMaterialize { columns: [auction_id, auction_item_name, _row_id(hidden)], pk_columns: [_row_id, auction_id], pk_conflict: "no check" }
       materialized table: 4294967294
     StreamHashJoin { type: LeftAnti, predicate: $expr296 = $expr298 }
         left table: 0, right table 2, left degree table: 1, right degree table: 3,
       StreamExchange Hash([0]) from 1
       StreamProject { exprs: [$expr298] }
         StreamFilter { predicate: (count < 20:Int32) }
           StreamProject { exprs: [$expr298, count] }
             StreamAppendOnlyHashAgg { group_key: [$expr298], aggs: [count, count] }
                 result table: 5, state tables: []
               StreamExchange Hash([0]) from 3
 
 Fragment 1
   StreamProject { exprs: [Field(auction, 0:Int32) as $expr296, Field(auction, 1:Int32) as $expr297, _row_id] }
     StreamFilter { predicate: (event_type = 1:Int32) }
       StreamProject { exprs: [event_type, auction, bid, _row_id] }
         StreamExchange Hash([3]) from 2
 
 Fragment 2
   StreamProject { exprs: [event_type, auction, bid, _row_id] }
     StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
       StreamRowIdGen { row_id_index: 4 }
         StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
             source state table: 4
 
 Fragment 3
   StreamProject { exprs: [Field(bid, 0:Int32) as $expr298, _row_id] }
     StreamFilter { predicate: (event_type = 2:Int32) }
       StreamProject { exprs: [event_type, auction, bid, _row_id] }
         StreamExchange Hash([3]) from 2
 
  Table 0 { columns: [$expr296, $expr297, _row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
  Table 1 { columns: [$expr296, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 2 { columns: [$expr298], primary key: [$0 ASC], value indices: [0], distribution key: [0] }
  Table 3 { columns: [$expr298, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
  Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [] }
  Table 5 { columns: [$expr298, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
  Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0] }
(39 rows)
@github-actions github-actions bot added this to the release-0.1.18 milestone Feb 23, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Feb 23, 2023

Flink:

== Optimized Physical Plan ==
Join(joinType=[LeftAntiJoin], where=[OR(IS NULL(id), IS NULL(auction), =(id, auction))], select=[id, itemName], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[single])
:  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[=(event_type, 1)])
:     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
:        +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
:           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[single])
   +- Calc(select=[auction], where=[<($f1, 20)])
      +- GroupAggregate(groupBy=[auction], select=[auction, COUNT(*) AS $f1])
         +- Exchange(distribution=[hash[auction]])
            +- Calc(select=[bid.auction AS auction], where=[=(event_type, 2)])
               +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                  +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                     +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Join(joinType=[LeftAntiJoin], where=[(id IS NULL OR auction IS NULL OR (id = auction))], select=[id, itemName], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[single])
:  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[(event_type = 1)])
:     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
:        +- Calc(select=[event_type, person, auction, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
:           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
+- Exchange(distribution=[single])
   +- Calc(select=[auction], where=[($f1 < 20)])
      +- GroupAggregate(groupBy=[auction], select=[auction, COUNT(*) AS $f1])
         +- Exchange(distribution=[hash[auction]])
            +- Calc(select=[bid.auction AS auction], where=[(event_type = 2)])
               +- Reused(reference_id=[1])

@lmatz lmatz added the type/perf Type: Performance. label Feb 23, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Feb 24, 2023

   |     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |       └─StreamShare { id = 547 }
   |         └─StreamProject { exprs: [event_type, auction, bid, _row_id] }

also occurs twice, the same as in q105 #8167 (comment)
@chenzl25 is it a redundant project that we can remove? 🤔

@chenzl25
Copy link
Contributor

   |     └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   |       └─StreamShare { id = 547 }
   |         └─StreamProject { exprs: [event_type, auction, bid, _row_id] }

also occurs twice, the same as in q105 #8167 (comment) @chenzl25 is it a redundant project that we can remove? 🤔

Ok, I will take a look

@lmatz
Copy link
Contributor Author

lmatz commented Apr 21, 2023

SCR-20230421-mao
One is about 0.52K rows/s, and RW is about 170K rows/s.

Probably somethings goes wrong ......

@lmatz lmatz reopened this Apr 21, 2023
@fuyufjh fuyufjh closed this as not planned Won't fix, can't repro, duplicate, stale Jul 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/perf Type: Performance.
Projects
None yet
Development

No branches or pull requests

3 participants