Skip to content

Support zero copy hash repartitioning for Hash Aggregate #15383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 task
Dandandan opened this issue Mar 24, 2025 · 9 comments
Open
1 task

Support zero copy hash repartitioning for Hash Aggregate #15383

Dandandan opened this issue Mar 24, 2025 · 9 comments
Assignees
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Mar 24, 2025

Is your feature request related to a problem or challenge?

Is your feature request related to a problem or challenge?

Currently RepartitionExec: partitioning=Hash will be added whenever for aggregates in FinalPartitioned and SinglePartitioned

The benefit is increased parallelism, but at the cost of copying the entire table (in a not-so efficient way).

We should consider lowering the cost of repartitioning by not having to copy the input.

Dependencies

Describe the solution you'd like

Instead of repartitioning the input in RepartitionExec, support repartitioning the inputs based on a selection vector.

Instead of taking the RecordBatch, we can consider doing the following:

  • Add a (boolean) selection vector as output column for each output partition. I.e. true means the row is selected for the partition.
  • The rest of the RecordBatch remains unchanged (i.e. no copy).
  • CoalesceBatchesExec is no longer needed for the output (reducing another copy)
  • In the hash aggregate code handle the selection vector.

Describe alternatives you've considered

The partitioning could be done inside the hash aggregate (at the cost of more complexity inside it).

Additional context

No response

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@Dandandan Dandandan added the enhancement New feature or request label Mar 24, 2025
@Dandandan Dandandan changed the title Support zero copy hash repartitioning inside Hash Aggregate Support zero copy hash repartitioning for Hash Aggregate Mar 24, 2025
@Dandandan Dandandan added the performance Make DataFusion faster label Mar 24, 2025
@goldmedal
Copy link
Contributor

take

@goldmedal
Copy link
Contributor

@Dandandan
I have a draft goldmedal#3 based on #15423 for HashAggregate. Could you check if it's heading in the right direction?

When the selection vector mode is enabled:

  • CoalesceBatchesExec is not added for FinalPartitioned.
  • The selection vector is used to filter the required rows before merging batches.

The plan looks like this:

> create table t(c int) as values (1), (1), (1), (1), (2), (2), (3), (3)
> explain select count(distinct c) from t;
+---------------+--------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                             |
+---------------+--------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: count(alias1) AS count(DISTINCT t.c)                                                 |
|               |   Aggregate: groupBy=[[]], aggr=[[count(alias1)]]                                                |
|               |     Aggregate: groupBy=[[t.c AS alias1]], aggr=[[]]                                              |
|               |       TableScan: t projection=[c]                                                                |
| physical_plan | ProjectionExec: expr=[count(alias1)@0 as count(DISTINCT t.c)]                                    |
|               |   AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]                                        |
|               |     CoalescePartitionsExec                                                                       |
|               |       AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]                                  |
|               |         AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]                  |
|               |           RepartitionExec: partitioning=HashSelectionVector([alias1@0], 12), input_partitions=12 |
|               |             RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                |
|               |               AggregateExec: mode=Partial, gby=[c@0 as alias1], aggr=[]                          |
|               |                 DataSourceExec: partitions=1, partition_sizes=[1]                                |
|               |                                                                                                  |
+---------------+--------------------------------------------------------------------------------------------------+

I'll review more aggregation patterns and add additional tests.
Thanks.

@goldmedal
Copy link
Contributor

goldmedal commented Mar 31, 2025

Based on goldmedal#3, I did the some benchmarks(clieckbench_1, h2o_medium) for it.
feat_zero-copy-hash-agg-false is the branch that disables the configuration.
feat_zero-copy-hash-agg is the branch enabling the configuration.

In conclusion, HashAggregate is slower in the selection vector mode.

Comparing feat_zero-copy-hash-agg-false and feat_zero-copy-hash-agg
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ feat_zero-copy-hash-agg-false ┃ feat_zero-copy-hash-agg ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │                        0.24ms │                  0.32ms │  1.33x slower │
│ QQuery 1     │                       26.98ms │                 24.56ms │ +1.10x faster │
│ QQuery 2     │                       55.89ms │                 52.55ms │ +1.06x faster │
│ QQuery 3     │                       48.20ms │                 45.62ms │ +1.06x faster │
│ QQuery 4     │                      313.79ms │                347.08ms │  1.11x slower │
│ QQuery 5     │                      490.80ms │                471.41ms │     no change │
│ QQuery 6     │                       25.06ms │                 25.46ms │     no change │
│ QQuery 7     │                       28.17ms │                 27.29ms │     no change │
│ QQuery 8     │                      353.53ms │                406.58ms │  1.15x slower │
│ QQuery 9     │                      514.71ms │                478.99ms │ +1.07x faster │
│ QQuery 10    │                      132.73ms │                130.81ms │     no change │
│ QQuery 11    │                      142.59ms │                143.29ms │     no change │
│ QQuery 12    │                      475.75ms │                493.83ms │     no change │
│ QQuery 13    │                      569.90ms │                630.60ms │  1.11x slower │
│ QQuery 14    │                      435.30ms │                444.02ms │     no change │
│ QQuery 15    │                      361.60ms │                406.62ms │  1.12x slower │
│ QQuery 16    │                      825.41ms │                856.13ms │     no change │
│ QQuery 17    │                      752.13ms │                766.95ms │     no change │
│ QQuery 18    │                     1813.04ms │               1934.07ms │  1.07x slower │
│ QQuery 19    │                       40.67ms │                 41.49ms │     no change │
│ QQuery 20    │                      621.14ms │                625.89ms │     no change │
│ QQuery 21    │                      769.98ms │                749.81ms │     no change │
│ QQuery 22    │                     1544.70ms │               1560.61ms │     no change │
│ QQuery 23    │                     4471.51ms │               4356.12ms │     no change │
│ QQuery 24    │                      257.77ms │                265.81ms │     no change │
│ QQuery 25    │                      268.53ms │                273.24ms │     no change │
│ QQuery 26    │                      294.19ms │                307.36ms │     no change │
│ QQuery 27    │                      983.41ms │                987.90ms │     no change │
│ QQuery 28    │                     7514.46ms │               7533.94ms │     no change │
│ QQuery 29    │                      346.70ms │                344.54ms │     no change │
│ QQuery 30    │                      387.65ms │                405.92ms │     no change │
│ QQuery 31    │                      390.81ms │                427.40ms │  1.09x slower │
│ QQuery 32    │                     1597.45ms │               1987.50ms │  1.24x slower │
│ QQuery 33    │                     1753.56ms │               1863.63ms │  1.06x slower │
│ QQuery 34    │                     1950.84ms │               1945.21ms │     no change │
│ QQuery 35    │                      510.78ms │                560.47ms │  1.10x slower │
│ QQuery 36    │                      105.22ms │                110.02ms │     no change │
│ QQuery 37    │                       56.69ms │                 53.63ms │ +1.06x faster │
│ QQuery 38    │                       74.69ms │                 77.84ms │     no change │
│ QQuery 39    │                      189.59ms │                193.83ms │     no change │
│ QQuery 40    │                       24.37ms │                 24.50ms │     no change │
│ QQuery 41    │                       23.01ms │                 23.45ms │     no change │
│ QQuery 42    │                       27.48ms │                 27.82ms │     no change │
└──────────────┴───────────────────────────────┴─────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                            ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (feat_zero-copy-hash-agg-false)   │ 31570.98ms │
│ Total Time (feat_zero-copy-hash-agg)         │ 32434.14ms │
│ Average Time (feat_zero-copy-hash-agg-false) │   734.21ms │
│ Average Time (feat_zero-copy-hash-agg)       │   754.28ms │
│ Queries Faster                               │          5 │
│ Queries Slower                               │         10 │
│ Queries with No Change                       │         28 │
└──────────────────────────────────────────────┴────────────┘
--------------------
Benchmark h2o.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃ feat_zero-copy-hash-agg-false ┃ feat_zero-copy-hash-agg ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │                     1053.42ms │               1044.36ms │    no change │
│ QQuery 2     │                     2155.01ms │               2317.51ms │ 1.08x slower │
│ QQuery 3     │                     2275.93ms │               2611.06ms │ 1.15x slower │
│ QQuery 4     │                     1236.64ms │               1256.03ms │    no change │
│ QQuery 5     │                     1608.77ms │               1892.13ms │ 1.18x slower │
│ QQuery 6     │                     1369.68ms │               1382.62ms │    no change │
│ QQuery 7     │                     2258.63ms │               2548.21ms │ 1.13x slower │
│ QQuery 8     │                     3876.11ms │               3985.41ms │    no change │
│ QQuery 9     │                     5989.38ms │               6721.88ms │ 1.12x slower │
│ QQuery 10    │                     3064.89ms │               3677.05ms │ 1.20x slower │
└──────────────┴───────────────────────────────┴─────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                            ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (feat_zero-copy-hash-agg-false)   │ 24888.45ms │
│ Total Time (feat_zero-copy-hash-agg)         │ 27436.27ms │
│ Average Time (feat_zero-copy-hash-agg-false) │  2488.85ms │
│ Average Time (feat_zero-copy-hash-agg)       │  2743.63ms │
│ Queries Faster                               │          0 │
│ Queries Slower                               │          6 │
│ Queries with No Change                       │          4 │

I tried to profile Clickbench QQuery 4:
When selection vector enable:
Image

When selection vector disabled:
Image

In the current implementation, the CPU time of filter_record_batch (3.45%) is significantly greater than take_arrays(0.35%).
Does arrow have a more efficient way to filter a record batch by a boolean array?

@goldmedal
Copy link
Contributor

goldmedal commented Mar 31, 2025

I'm considering another approach. Maybe I shouldn't use filter_record_batch 🤔. It filters the all column iteratly. I should filter the row when the accumulator merge_batch 🤔
I'll draft another PR for the approach.

@zebsme
Copy link
Contributor

zebsme commented Mar 31, 2025

I'm considering another approach. Maybe I shouldn't use filter_record_batch 🤔. It filters the all column iteratly. I should filter the row when the accumulator merge_batch 🤔 I'll draft another PR for the approach.

Agree with you, we should try to avoid directly operate on the record batch.

@Dandandan
Copy link
Contributor Author

I'm considering another approach. Maybe I shouldn't use filter_record_batch 🤔. It filters the all column iteratly. I should filter the row when the accumulator merge_batch 🤔

Yes, doing so will copy the entire batch (which is what we try to avoid) and will be slower than take (in the end it will do the same).
I think what we probably want is to get the indices via https://docs.rs/arrow/latest/arrow/buffer/struct.BooleanBuffer.html#method.set_indices so it only will aggregate the values for those indices.

@Rachelint
Copy link
Contributor

I'm considering another approach. Maybe I shouldn't use filter_record_batch 🤔. It filters the all column iteratly. I should filter the row when the accumulator merge_batch 🤔

I think also need to filter rows in GroupValues::intern, too.

@goldmedal
Copy link
Contributor

I have another implementation for this issue goldmedal#4
The concept is that getting the row according to indices in the selection vector instead of going through all the rows in the batch.

Because it may involve many changes, I want to check if the implementations make sense.
Currently, I only implement GroupValuesPrimitive::intern for the group-by values. For the aggregation, I only implement count and some aggregations that use GroupsAccumulator.

I also did some optimization for the sv-mode repartition https://github.com/apache/datafusion/pull/15423/files#r2051721176.

However, I found the performance won't be better for Clickbench queries 4 and 7.

Query 4: SELECT COUNT(DISTINCT "UserID") FROM hits;
Query 7: SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC;

--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃ feat_hash-agg-sv-disable ┃ feat_hash-agg-sv ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 4     │                 311.74ms │         320.72ms │ no change │
│ QQuery 7     │                  30.28ms │          29.01ms │ no change │
└──────────────┴──────────────────────────┴──────────────────┴───────────┘

I'm not sure if I'm on the right way 🤔

@Dandandan @Rachelint Do you have any suggestions for it?

@Rachelint
Copy link
Contributor

However, I found the performance won't be better for Clickbench queries 4 and 7.

I think it may be possible that the test queries can't reflect the improvement well.
I may try to make some cases this evening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

4 participants