Skip to content

ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them #15566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a653b15
ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply…
adriangb Apr 3, 2025
a5f998c
wip
adriangb Apr 3, 2025
7e2db66
fix tests
adriangb Apr 3, 2025
cb1f830
fix
adriangb Apr 3, 2025
e92d8b5
fix
adriangb Apr 3, 2025
ca391c1
fix doc
adriangb Apr 3, 2025
c78a590
fix doc
adriangb Apr 3, 2025
34c8285
Improve doc comments of `filter-pushdown-apis` (#22)
alamb Apr 5, 2025
e15374f
Apply suggestions from code review
adriangb Apr 5, 2025
2ceec35
simplify according to pr feedback
adriangb Apr 5, 2025
3fbf379
Add missing file
adriangb Apr 5, 2025
e6721d1
Add tests
adriangb Apr 5, 2025
b7b588b
pipe config in
adriangb Apr 5, 2025
d1f01dd
docstrings
adriangb Apr 5, 2025
5929d03
Update datafusion/physical-plan/src/filter_pushdown.rs
adriangb Apr 5, 2025
24483bc
fix
adriangb Apr 5, 2025
d0295ed
fix
adriangb Apr 6, 2025
2d46289
fmt
adriangb Apr 6, 2025
4318267
fix doc
adriangb Apr 6, 2025
7d29056
add example usage of config
adriangb Apr 6, 2025
d382bd3
fix test
adriangb Apr 6, 2025
2dfa8b8
convert exec API and optimizer rule
berkaysynnada Apr 14, 2025
cda6e8d
re-add docs
adriangb Apr 14, 2025
e4d8a8c
dbg
berkaysynnada Apr 16, 2025
3ec1b2a
dbg 2
berkaysynnada Apr 16, 2025
a2df5e0
avoid clones
adriangb Apr 16, 2025
6938d52
part 3
berkaysynnada Apr 16, 2025
6836dd4
fix lint
adriangb Apr 16, 2025
28bb8ea
Merge branch 'filter-pushdown-apis' into filter-pushdown-apis
berkaysynnada Apr 16, 2025
7e95283
tests pass
berkaysynnada Apr 16, 2025
e2f8c12
Update filter.rs
berkaysynnada Apr 16, 2025
bff47be
update projection tests
berkaysynnada Apr 16, 2025
ce49ad4
update slt files
adriangb Apr 16, 2025
d5792bc
Merge branch 'main' into filter-pushdown-apis
adriangb Apr 16, 2025
834f33e
fix
adriangb Apr 16, 2025
9e59246
fix references
adriangb Apr 16, 2025
57a1230
improve impls and update tests
berkaysynnada Apr 17, 2025
367377f
apply stop logic
berkaysynnada Apr 17, 2025
616165d
update slt's
berkaysynnada Apr 17, 2025
b30953f
update other tests
berkaysynnada Apr 17, 2025
ec54cca
minor
berkaysynnada Apr 17, 2025
6345315
rename modules to match logical optimizer, tweak docs
adriangb Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
508 changes: 508 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod aggregate_statistics;
mod combine_partial_final_agg;
mod enforce_distribution;
mod enforce_sorting;
mod filter_pushdown;
mod join_selection;
mod limit_pushdown;
mod limited_distinct_aggregation;
Expand Down
23 changes: 19 additions & 4 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_physical_expr::LexOrdering;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExprRef};
use datafusion_physical_plan::filter_pushdown::FilterPushdownResult;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -57,7 +59,7 @@ pub trait FileSource: Send + Sync {
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// Return projected statistics
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn statistics(&self) -> Result<Statistics>;
/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;
/// Format FileType specific information
Expand All @@ -75,7 +77,7 @@ pub trait FileSource: Send + Sync {
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
return Ok(None);
}
Expand All @@ -93,4 +95,17 @@ pub trait FileSource: Send + Sync {
}
Ok(None)
}
/// Try to push down filters into this FileSource.
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
///
/// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
fn try_pushdown_filters(
&self,
_filters: &[PhysicalExprRef],
_config: &ConfigOptions,
) -> Result<FileSourceFilterPushdownResult> {
Ok(FileSourceFilterPushdownResult::NotPushed)
}
}

pub type FileSourceFilterPushdownResult = FilterPushdownResult<Arc<dyn FileSource>>;
33 changes: 29 additions & 4 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ use arrow::{
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
};
use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
use datafusion_common::{
config::ConfigOptions, exec_err, ColumnStatistics, Constraints, Result, Statistics,
};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_physical_expr::{
expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
PhysicalSortExpr,
PhysicalExprRef, PhysicalSortExpr,
};
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
Expand All @@ -48,16 +50,16 @@ use datafusion_physical_plan::{
};
use log::{debug, warn};

use crate::file_groups::FileGroup;
use crate::{
display::FileGroupsDisplay,
file::FileSource,
file::{FileSource, FileSourceFilterPushdownResult},
file_compression_type::FileCompressionType,
file_stream::FileStream,
source::{DataSource, DataSourceExec},
statistics::MinMaxStatistics,
PartitionedFile,
};
use crate::{file_groups::FileGroup, source::DataSourceFilterPushdownResult};

/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
Expand Down Expand Up @@ -584,6 +586,29 @@ impl DataSource for FileScanConfig {
) as _
}))
}

fn try_pushdown_filters(
&self,
filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<DataSourceFilterPushdownResult> {
match self.file_source.try_pushdown_filters(filters, config)? {
FileSourceFilterPushdownResult::NotPushed => {
Ok(DataSourceFilterPushdownResult::NotPushed)
}
FileSourceFilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(
FileScanConfigBuilder::from(self.clone())
.with_source(inner)
.build(),
);
Ok(DataSourceFilterPushdownResult::Pushed {
inner: new_self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than saying "inner", can we emphasize this is the new self having the filter pushed down into it? Maybe renaming it as "updated" ? WDYT?

support,
})
}
}
}
}

impl FileScanConfig {
Expand Down
69 changes: 57 additions & 12 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanFilterPushdownResult,
FilterPushdownResult, PlanProperties,
};

use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// Common behaviors in Data Sources for both from Files and Memory.
Expand All @@ -51,7 +52,7 @@ pub trait DataSource: Send + Sync + Debug {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream>;
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
Expand All @@ -62,13 +63,13 @@ pub trait DataSource: Send + Sync + Debug {
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}

fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn statistics(&self) -> Result<Statistics>;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand All @@ -78,9 +79,20 @@ pub trait DataSource: Send + Sync + Debug {
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
/// Try to push down filters into this DataSource.
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
fn try_pushdown_filters(
&self,
_filters: &[PhysicalExprRef],
_config: &ConfigOptions,
) -> Result<DataSourceFilterPushdownResult> {
Ok(DataSourceFilterPushdownResult::NotPushed)
}
}

pub type DataSourceFilterPushdownResult = FilterPushdownResult<Arc<dyn DataSource>>;

/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET
///
/// `DataSourceExec` implements common functionality such as applying projections,
Expand Down Expand Up @@ -131,15 +143,15 @@ impl ExecutionPlan for DataSourceExec {
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
Expand All @@ -163,15 +175,15 @@ impl ExecutionPlan for DataSourceExec {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
) -> Result<SendableRecordBatchStream> {
self.data_source.open(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.data_source.metrics().clone_inner())
}

fn statistics(&self) -> datafusion_common::Result<Statistics> {
fn statistics(&self) -> Result<Statistics> {
self.data_source.statistics()
}

Expand All @@ -189,9 +201,32 @@ impl ExecutionPlan for DataSourceExec {
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.data_source.try_swapping_with_projection(projection)
}

fn try_pushdown_filters(
&self,
_plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<ExecutionPlanFilterPushdownResult> {
match self
.data_source
.try_pushdown_filters(parent_filters, config)?
{
DataSourceFilterPushdownResult::NotPushed => {
Ok(ExecutionPlanFilterPushdownResult::NotPushed)
}
DataSourceFilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(DataSourceExec::new(inner));
Ok(ExecutionPlanFilterPushdownResult::Pushed {
inner: new_self,
support,
})
}
}
}
}

impl DataSourceExec {
Expand Down Expand Up @@ -254,3 +289,13 @@ impl DataSourceExec {
})
}
}

/// Create a new `DataSourceExec` from a `DataSource`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is

    pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
        Arc::new(Self::new(Arc::new(data_source)))
    }

in impl DataSourceExec. Doesn't that work for you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb sneaked this bit in, wdyt Andrew?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly either way. I must have missed the existing function

impl<S> From<S> for DataSourceExec
where
S: DataSource + 'static,
{
fn from(source: S) -> Self {
Self::new(Arc::new(source))
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub use planner::{create_physical_expr, create_physical_exprs};
pub use scalar_function::ScalarFunctionExpr;

pub use datafusion_physical_expr_common::utils::reverse_order_bys;
pub use utils::split_conjunction;
pub use utils::{conjunction, conjunction_opt, split_conjunction};

// For backwards compatibility
pub mod tree_node {
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-expr/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@ pub fn split_conjunction(
split_impl(Operator::And, predicate, vec![])
}

/// Create a conjunction of the given predicates.
/// If the input is empty, return a literal true.
/// If the input contains a single predicate, return the predicate.
/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`).
pub fn conjunction(
predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
) -> Arc<dyn PhysicalExpr> {
conjunction_opt(predicates).unwrap_or_else(|| crate::expressions::lit(true))
}

/// Create a conjunction of the given predicates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just merging these two? I don't think people prefer something other than "true" if they provide an empty iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually several places where it's useful to know if the result is lit(true) because you handle that differently. E.g. FilterExec drops itself out of the plan. And it's only a small bit more code to have here to avoid having to match lit(true) in other places.

/// If the input is empty or the return None.
/// If the input contains a single predicate, return Some(predicate).
/// Otherwise, return a Some(..) of a conjunction of the predicates (e.g. `Some(a AND b AND c)`).
pub fn conjunction_opt(
predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn PhysicalExpr>> {
predicates
.into_iter()
.fold(None, |acc, predicate| match acc {
None => Some(predicate),
Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))),
})
}

/// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs.
///
/// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
Expand Down
72 changes: 72 additions & 0 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion_common::{config::ConfigOptions, DataFusionError, Result};
use datafusion_physical_plan::{
execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan,
};

use crate::PhysicalOptimizerRule;

/// A physical optimizer rule that pushes down filters in the execution plan.
/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of the algorithm.
#[derive(Debug)]
pub struct PushdownFilter {}

impl Default for PushdownFilter {
fn default() -> Self {
Self::new()
}
}

impl PushdownFilter {
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for PushdownFilter {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
match plan.try_pushdown_filters(&plan, &Vec::new(), config)? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major suggestion is we should implement the recursion logic in this rule, not in ExecutionPlan trait. Firstly, all other rules are in this pattern, and easy to debug. Secondly, there already are many recursion tools for traversing and updating mechanisms, e.g transform_down, PlanContext... As we implement the other operators like joins etc. we will be easily writing only the filter pushdown related logic in ExecutionPlan implementation, not the traversal or update logic.

ExecutionPlanFilterPushdownResult::NotPushed => Ok(plan),
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => {
if !support.is_empty() {
return Err(
DataFusionError::Plan(
format!("PushdownFilter: plan returned support length does not match filters length: {} != 0", support.len()
))
);
}
Ok(inner)
}
}
}

fn name(&self) -> &str {
"PushdownFilter"
}

fn schema_check(&self) -> bool {
true // Filter pushdown does not change the schema of the plan
}
}
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod coalesce_batches;
pub mod combine_partial_final_agg;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod filter_pushdown;
pub mod join_selection;
pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
Expand Down
Loading