Skip to content

Support data source sampling with TABLESAMPLE #16325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

theirix
Copy link
Contributor

@theirix theirix commented Jun 7, 2025

Which issue does this PR close?

Rationale for this change

Explained in #13563 in detail with known syntax examples.
Thanks to changes to sqlparser, it is now possible to use the TABLESAMPLE and SAMPLE constructs in a logical plan builder.

Added a rewrite function to datafusion-sql which produces an additional LogicalPlan::Filter based on TableSample from sqlparser. There is no need to remove anything from SQL AST since it's not used anywhere.

What changes are included in this PR?

Are these changes tested?

  • Unit tests (added a few)
  • Regression tests (added to select.slt)
  • Manual test (see below)
create external table data stored as parquet location 'sample.parquet';
select COUNT(*) from data TABLESAMPLE SYSTEM (13) where double_col < 42.0;

Leads to the initial logical plan

    Projection: count(Int64(1)) AS count(*)
      Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
        Filter: random() < Float64(42.6) / Float64(100)
          Filter: data.double_col < Float64(42)
            TableScan: data

The physical plan is somewhat unusual, as volatile functions are pushed down to the data source (datafusion.execution.parquet.pushdown_filters is enabled by default), which was addressed in #13268.

 ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
      AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
        CoalescePartitionsExec
          AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
            ProjectionExec: expr=[]
              CoalesceBatchesExec: target_batch_size=8192
                FilterExec: double_col@0 < 42 AND random() < 0.426
                  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
                    DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@0 < 42 AND random() < 0.426, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_min@0 < 42, required_guarantees=[]

More details:

DataFusion CLI v48.0.0 [2025-06-07T18:14:32Z DEBUG sqlparser::parser] parsing expr [2025-06-07T18:14:32Z DEBUG sqlparser::parser] prefix: Function(Function { name: ObjectName([Identifier(Ident { value: "COUNT", quote_style: None, span: Span(Location(1,8)..Location(1,13)) })]), uses_odbc_syntax: false, parameters: None, args: List(FunctionArgumentList { duplicate_treatment: None, args: [Unnamed(Wildcard)], clauses: [] }), filter: None, null_treatment: None, over: None, within_group: [] }) [2025-06-07T18:14:32Z DEBUG sqlparser::dialect] get_next_precedence_full() TokenWithSpan { token: Word(Word { value: "from", quote_style: None, keyword: FROM }), span: Span(Location(1,17)..Location(1,21)) } [2025-06-07T18:14:32Z DEBUG sqlparser::parser] next precedence: 0 [2025-06-07T18:14:32Z DEBUG sqlparser::parser] parsing expr [2025-06-07T18:14:32Z DEBUG sqlparser::parser] prefix: Value(ValueWithSpan { value: Number("42.6", false), span: Span(Location(1,39)..Location(1,43)) }) [2025-06-07T18:14:32Z DEBUG sqlparser::dialect] get_next_precedence_full() TokenWithSpan { token: Word(Word { value: "where", quote_style: None, keyword: WHERE }), span: Span(Location(1,44)..Location(1,49)) } [2025-06-07T18:14:32Z DEBUG sqlparser::parser] next precedence: 0 [2025-06-07T18:14:32Z DEBUG sqlparser::parser] parsing expr [2025-06-07T18:14:32Z DEBUG sqlparser::parser] prefix: Identifier(Ident { value: "double_col", quote_style: None, span: Span(Location(1,50)..Location(1,60)) }) [2025-06-07T18:14:32Z DEBUG sqlparser::dialect] get_next_precedence_full() TokenWithSpan { token: Lt, span: Span(Location(1,61)..Location(1,62)) } [2025-06-07T18:14:32Z DEBUG sqlparser::parser] next precedence: 20 [2025-06-07T18:14:32Z DEBUG sqlparser::parser] parsing expr [2025-06-07T18:14:32Z DEBUG sqlparser::parser] prefix: Value(ValueWithSpan { value: Number("42.0", false), span: Span(Location(1,63)..Location(1,67)) }) [2025-06-07T18:14:32Z DEBUG sqlparser::dialect] get_next_precedence_full() TokenWithSpan { token: SemiColon, span: Span(Location(1,67)..Location(1,68)) } [2025-06-07T18:14:32Z DEBUG sqlparser::parser] next precedence: 0 [2025-06-07T18:14:32Z DEBUG sqlparser::dialect] get_next_precedence_full() TokenWithSpan { token: SemiColon, span: Span(Location(1,67)..Location(1,68)) } [2025-06-07T18:14:32Z DEBUG sqlparser::parser] next precedence: 0 [2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] resolve_grouping_function: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] Filter: random() < Float64(42.6) / Float64(100) Filter: data.double_col < Float64(42) TableScan: data

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] type_coercion:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final analyzed plan:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::analyzer] Analyzer took 0 ms
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 0):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(42.6) / Float64(100)
Filter: data.double_col < Float64(42)
TableScan: data

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] simplify_expressions:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: random() < Float64(0.426)
Filter: data.double_col < Float64(42)
TableScan: data

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data, partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 0)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 0):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimizer input (pass 1):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_nested_union' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'simplify_expressions' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'replace_distinct_aggregate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_predicate_subquery' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'scalar_subquery_to_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'decorrelate_lateral_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'extract_equijoin_predicate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_duplicated_expr' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_filter' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_cross_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_limit' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'propagate_empty_relation' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_one_union' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'filter_null_join_keys' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_outer_join' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'push_down_limit' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] push_down_filter:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'single_distinct_aggregation_to_group_by' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'eliminate_group_by_constant' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Plan unchanged by optimizer rule 'common_sub_expression_eliminate' (pass 1)
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] optimize_projections:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Optimized plan (pass 1):
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] optimizer pass 1 did not make changes
[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::utils] Final optimized plan:
Projection: count(Int64(1)) AS count(*)
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
Projection:
Filter: data.double_col < Float64(42) AND random() < Float64(0.426)
TableScan: data projection=[double_col], partial_filters=[data.double_col < Float64(42)]

[2025-06-07T18:14:32Z DEBUG datafusion_optimizer::optimizer] Optimizer took 7 ms
[2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Input physical plan:
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
FilterExec: double_col@0 < 42 AND random() < 0.426
DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet

[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426
[2025-06-07T18:14:32Z DEBUG datafusion::physical_planner] Optimized physical plan:
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: double_col@0 < 42 AND random() < 0.426
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
DataSourceExec: file_groups={1 group: [[Users/irix/projects/third-party/datafusion-data/sample.parquet]]}, projection=[double_col], file_type=parquet, predicate=double_col@0 < 42 AND random() < 0.426, pruning_predicate=double_col_null_count@1 != row_count@2 AND double_col_min@0 < 42, required_guarantees=[]

[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_physical_optimizer::pruning] Error building pruning expression: Error during planning: Multi-column expressions are not currently supported
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Ignoring always true page pruning predicate: random() < 0.426
[2025-06-07T18:14:32Z DEBUG datafusion_datasource_parquet::page_filter] Use filter and page index to create RowSelection RowSelection { selectors: [RowSelector { row_count: 7300, skip: false }] } from predicate: BinaryExpr { left: BinaryExpr { left: Column { name: "double_col_null_count", index: 1 }, op: NotEq, right: Column { name: "row_count", index: 2 }, fail_on_overflow: false }, op: And, right: BinaryExpr { left: Column { name: "double_col_min", index: 0 }, op: Lt, right: Literal { value: Float64(42), field: Field { name: "42", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }
+----------+
| count(*) |
+----------+
| 1576 |
+----------+
1 row(s) fetched.
Elapsed 0.042 seconds.

Are there any user-facing changes?

No. The behaviour is changed only if a user specifies a new SAMPLE / TABLESAMPLE SQL syntax, which was not supported before.

theirix added 3 commits June 7, 2025 19:10
Given SAMPLE and TABLESAMPLE parsed SQL,
add a logical plan filter based on sample quantity.

For example, `select COUNT(*) from data TABLESAMPLE SYSTEM 42`
produces a filter `random() < 0.42`.
@github-actions github-actions bot added sql SQL Planner sqllogictest SQL Logic Tests (.slt) labels Jun 7, 2025
@theirix theirix marked this pull request as ready for review June 7, 2025 18:51
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I think this feature will benefit many users.

According to PostgreSQL's reference:
https://wiki.postgresql.org/wiki/TABLESAMPLE_Implementation#SYSTEM_Option
I believe SYSTEM option is equivalent to keep the entire RecordBatch according to the specified probability, this rewrite rule implemented here is sampling row by row, which follows the behavior of BERNOULLI option.
Since df has vectorized execution, evaluation a random() < x filter should be efficient, I think we can apply this implementation on both SYSTEM and BERNOULLI option to keep it simple.

And I left some suggestions for additional test coverage, looking forward to your feedbacks!

let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));

// Process `where` clause
let base_plan = self.plan_selection(select.selection, plan, planner_context)?;
let mut base_plan =
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we'd better do this rewrite in a separate logical optimizer rule, to keep the planning code clean. It can be done with a follow-up PR before adding more functionality to scan sampling.
(Unless there's a specific reason to do this during the planning phase — I did notice some rewrites happening during planning, but I'm not sure why.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"logical optimizer rule" mainly focuses on Optimization, I think it's fair to rewrite during planning phase.

@@ -4714,3 +4714,115 @@ fn test_using_join_wildcard_schema() {
]
);
}

#[test]
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding test structure, I suggest:

  1. Move all of the sql_integration tests to sqllogictest, since .slts are easier to maintain.
    To only show logical plans, you can use
set datafusion.explain.logical_plan_only = true;

# sqllogictest tests

# cleanup
set datafusion.explain.logical_plan_only = false;
  1. Create a separate .slt file for all tests related to TABLESAMPLE

To improve test coverage, I recommend to add the following test cases

  1. Select from multiple table, and test only some of table with sample / all of the tables have sample.
  2. Test all sample methods in
    https://github.com/apache/datafusion-sqlparser-rs/blob/84c3a1b325c39c879b68ab712e3b9b3e3e40ed56/src/ast/query.rs#L1475
    and expect error for unimplemented ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked tests in slt files, covered extra cases.

I have included explanations in the code regarding the handling of system vs row sampling.

The remaining part is handling joins (subqueries work fine, verified with integration tests). I put a TODO about it to address in a future PR.

@theirix
Copy link
Contributor Author

theirix commented Jun 11, 2025

Thank you for the review and suggestions! I'll rework the testing approach and get back with the improved version.

@theirix
Copy link
Contributor Author

theirix commented Jun 18, 2025

According to PostgreSQL's reference: https://wiki.postgresql.org/wiki/TABLESAMPLE_Implementation#SYSTEM_Option I believe SYSTEM option is equivalent to keep the entire RecordBatch according to the specified probability, this rewrite rule implemented here is sampling row by row, which follows the behavior of BERNOULLI option. Since df has vectorized execution, evaluation a random() < x filter should be efficient, I think we can apply this implementation on both SYSTEM and BERNOULLI option to keep it simple.

@2010YOUY01 I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters?

Before:

[2025-06-18T18:20:07Z TRACE datafusion::physical_planner] Optimized physical plan by LimitedDistinctAggregation:
    OutputRequirementExec
      ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
        AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
          AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
            FilterExec: random() < 0.1
              DataSourceExec: file_groups={1 group: [[sample.parquet]]}, file_type=parquet

After:

[2025-06-18T18:20:07Z TRACE datafusion::physical_planner] Optimized physical plan by FilterPushdown:
    OutputRequirementExec
      ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
        AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
          AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
            DataSourceExec: file_groups={1 group: [[sample.parquet]]}, file_type=parquet, predicate=random() < 0.1

Data:

set datafusion.execution.parquet.pushdown_filters=true; create external table data stored as parquet location 'sample.parquet'; SELECT count(*) FROM data WHERE random() < 0.1;

@theirix
Copy link
Contributor Author

theirix commented Jun 25, 2025

I'd like to double-check if a volatile filter pushdown to a Parquet executor is expected. In the mentioned PR, I disabled optimisation in a logical plan optimiser to push down volatile predicates. But it seems like the physical optimiser still pushes this predicate to an executor. While it helps us with automatic sampling, the results could be wrong. How do you think – should we implement a similar mechanism to make volatile predicates as unsupported filters?

Moved to a new discussion issue #16545, since it's not strictly related to this PR.

@theirix theirix requested a review from 2010YOUY01 June 28, 2025 12:33
@milenkovicm
Copy link
Contributor

milenkovicm commented Jul 1, 2025

I wonder would creating new physical plan operator to do per batch sampling avoid issues @theirix mentioned.

Something similar to https://github.com/milenkovicm/ballista_extensions/blob/master/src/physical/sample_exec.rs ?

Edit: implementation demonstrates ballista extensibility, implementation might not be correct/optimal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support data source sampling with TABLESAMPLE
4 participants