-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Support data source sampling with TABLESAMPLE #16325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Given SAMPLE and TABLESAMPLE parsed SQL, add a logical plan filter based on sample quantity. For example, `select COUNT(*) from data TABLESAMPLE SYSTEM 42` produces a filter `random() < 0.42`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I think this feature will benefit many users.
According to PostgreSQL's reference:
https://wiki.postgresql.org/wiki/TABLESAMPLE_Implementation#SYSTEM_Option
I believe SYSTEM
option is equivalent to keep the entire RecordBatch
according to the specified probability, this rewrite rule implemented here is sampling row by row, which follows the behavior of BERNOULLI
option.
Since df has vectorized execution, evaluation a random() < x
filter should be efficient, I think we can apply this implementation on both SYSTEM
and BERNOULLI
option to keep it simple.
And I left some suggestions for additional test coverage, looking forward to your feedbacks!
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); | ||
|
||
// Process `where` clause | ||
let base_plan = self.plan_selection(select.selection, plan, planner_context)?; | ||
let mut base_plan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we'd better do this rewrite in a separate logical optimizer rule, to keep the planning code clean. It can be done with a follow-up PR before adding more functionality to scan sampling.
(Unless there's a specific reason to do this during the planning phase — I did notice some rewrites happening during planning, but I'm not sure why.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"logical optimizer rule" mainly focuses on Optimization, I think it's fair to rewrite during planning phase.
@@ -4714,3 +4714,115 @@ fn test_using_join_wildcard_schema() { | |||
] | |||
); | |||
} | |||
|
|||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding test structure, I suggest:
- Move all of the
sql_integration
tests tosqllogictest
, since.slt
s are easier to maintain.
To only show logical plans, you can use
set datafusion.explain.logical_plan_only = true;
# sqllogictest tests
# cleanup
set datafusion.explain.logical_plan_only = false;
- Create a separate
.slt
file for all tests related toTABLESAMPLE
To improve test coverage, I recommend to add the following test cases
- Select from multiple table, and test only some of table with sample / all of the tables have sample.
- Test all sample methods in
https://github.com/apache/datafusion-sqlparser-rs/blob/84c3a1b325c39c879b68ab712e3b9b3e3e40ed56/src/ast/query.rs#L1475
and expect error for unimplemented ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworked tests in slt files, covered extra cases.
I have included explanations in the code regarding the handling of system vs row sampling.
The remaining part is handling joins (subqueries work fine, verified with integration tests). I put a TODO about it to address in a future PR.
Thank you for the review and suggestions! I'll rework the testing approach and get back with the improved version. |
@2010YOUY01 I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters? Before:
After:
Data:
set datafusion.execution.parquet.pushdown_filters=true;
create external table data stored as parquet location 'sample.parquet';
SELECT count(*) FROM data WHERE random() < 0.1;
|
Moved to a new discussion issue #16545, since it's not strictly related to this PR. |
I wonder would creating new physical plan operator to do per batch sampling avoid issues @theirix mentioned. Something similar to https://github.com/milenkovicm/ballista_extensions/blob/master/src/physical/sample_exec.rs ? Edit: implementation demonstrates ballista extensibility, implementation might not be correct/optimal |
Which issue does this PR close?
Rationale for this change
Explained in #13563 in detail with known syntax examples.
Thanks to changes to sqlparser, it is now possible to use the
TABLESAMPLE
andSAMPLE
constructs in a logical plan builder.Added a rewrite function to
datafusion-sql
which produces an additionalLogicalPlan::Filter
based onTableSample
from sqlparser. There is no need to remove anything from SQL AST since it's not used anywhere.What changes are included in this PR?
Are these changes tested?
Leads to the initial logical plan
The physical plan is somewhat unusual, as volatile functions are pushed down to the data source (
datafusion.execution.parquet.pushdown_filters
is enabled by default), which was addressed in #13268.More details:
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] type_coercion:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final analyzed plan:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::analyzer] Analyzer took 0 ms
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 0):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] simplify_expressions:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(0.426)
Filter: data.double_col < Float64(42)
TableScan: data
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data, partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 0):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 1):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'simplify_expressions' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 1):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] optimizer pass 1 did not make changes
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final optimized plan:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Optimizer took 7 ms
[2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Input physical plan:
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
FilterExec: double_col@0 < 42 AND random() < 0.426
DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426
[2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Optimized physical plan:
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: double_col@0 < 42 AND random() < 0.426
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@0 < 42 AND random() < 0.426, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_min@0 < 42, required_guarantees=[]
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Use filter and page index to create RowSelection RowSelection { selectors: [RowSelector { row_count: 7300, skip: false }] } from predicate: BinaryExpr { left: BinaryExpr { left: Column { name: "double_col_null_count", index: 1 }, op: NotEq, right: Column { name: "row_count", index: 2 }, fail_on_overflow: false }, op: And, right: BinaryExpr { left: Column { name: "double_col_min", index: 0 }, op: Lt, right: Literal { value: Float64(42), field: Field { name: "42", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }
+----------+
| count(*) |
+----------+
| 1576 |
+----------+
1 row(s) fetched.
Elapsed 0.042 seconds.
Are there any user-facing changes?
No. The behaviour is changed only if a user specifies a new
SAMPLE
/TABLESAMPLE
SQL syntax, which was not supported before.