-
Notifications
You must be signed in to change notification settings - Fork 1.5k
ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them #15566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
a653b15
a5f998c
7e2db66
cb1f830
e92d8b5
ca391c1
c78a590
34c8285
e15374f
2ceec35
3fbf379
e6721d1
b7b588b
d1f01dd
5929d03
24483bc
d0295ed
2d46289
4318267
7d29056
d382bd3
2dfa8b8
cda6e8d
e4d8a8c
3ec1b2a
a2df5e0
6938d52
6836dd4
28bb8ea
7e95283
e2f8c12
bff47be
ce49ad4
d5792bc
834f33e
9e59246
57a1230
367377f
616165d
b30953f
ec54cca
6345315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,14 +26,15 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; | |
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; | ||
use datafusion_physical_plan::projection::ProjectionExec; | ||
use datafusion_physical_plan::{ | ||
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, | ||
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanFilterPushdownResult, | ||
FilterPushdownResult, PlanProperties, | ||
}; | ||
|
||
use crate::file_scan_config::FileScanConfig; | ||
use datafusion_common::config::ConfigOptions; | ||
use datafusion_common::{Constraints, Statistics}; | ||
use datafusion_common::{Constraints, Result, Statistics}; | ||
use datafusion_execution::{SendableRecordBatchStream, TaskContext}; | ||
use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; | ||
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef}; | ||
use datafusion_physical_expr_common::sort_expr::LexOrdering; | ||
|
||
/// Common behaviors in Data Sources for both from Files and Memory. | ||
|
@@ -51,7 +52,7 @@ pub trait DataSource: Send + Sync + Debug { | |
&self, | ||
partition: usize, | ||
context: Arc<TaskContext>, | ||
) -> datafusion_common::Result<SendableRecordBatchStream>; | ||
) -> Result<SendableRecordBatchStream>; | ||
fn as_any(&self) -> &dyn Any; | ||
/// Format this source for display in explain plans | ||
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; | ||
|
@@ -62,13 +63,13 @@ pub trait DataSource: Send + Sync + Debug { | |
_target_partitions: usize, | ||
_repartition_file_min_size: usize, | ||
_output_ordering: Option<LexOrdering>, | ||
) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> { | ||
) -> Result<Option<Arc<dyn DataSource>>> { | ||
Ok(None) | ||
} | ||
|
||
fn output_partitioning(&self) -> Partitioning; | ||
fn eq_properties(&self) -> EquivalenceProperties; | ||
fn statistics(&self) -> datafusion_common::Result<Statistics>; | ||
fn statistics(&self) -> Result<Statistics>; | ||
/// Return a copy of this DataSource with a new fetch limit | ||
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>; | ||
fn fetch(&self) -> Option<usize>; | ||
|
@@ -78,9 +79,20 @@ pub trait DataSource: Send + Sync + Debug { | |
fn try_swapping_with_projection( | ||
&self, | ||
_projection: &ProjectionExec, | ||
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>; | ||
) -> Result<Option<Arc<dyn ExecutionPlan>>>; | ||
/// Try to push down filters into this DataSource. | ||
/// See [`ExecutionPlan::try_pushdown_filters`] for more details. | ||
fn try_pushdown_filters( | ||
&self, | ||
_filters: &[PhysicalExprRef], | ||
_config: &ConfigOptions, | ||
) -> Result<DataSourceFilterPushdownResult> { | ||
Ok(DataSourceFilterPushdownResult::NotPushed) | ||
} | ||
} | ||
|
||
pub type DataSourceFilterPushdownResult = FilterPushdownResult<Arc<dyn DataSource>>; | ||
|
||
/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET | ||
/// | ||
/// `DataSourceExec` implements common functionality such as applying projections, | ||
|
@@ -131,15 +143,15 @@ impl ExecutionPlan for DataSourceExec { | |
fn with_new_children( | ||
self: Arc<Self>, | ||
_: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> { | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
Ok(self) | ||
} | ||
|
||
fn repartitioned( | ||
&self, | ||
target_partitions: usize, | ||
config: &ConfigOptions, | ||
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> { | ||
) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
let data_source = self.data_source.repartitioned( | ||
target_partitions, | ||
config.optimizer.repartition_file_min_size, | ||
|
@@ -163,15 +175,15 @@ impl ExecutionPlan for DataSourceExec { | |
&self, | ||
partition: usize, | ||
context: Arc<TaskContext>, | ||
) -> datafusion_common::Result<SendableRecordBatchStream> { | ||
) -> Result<SendableRecordBatchStream> { | ||
self.data_source.open(partition, context) | ||
} | ||
|
||
fn metrics(&self) -> Option<MetricsSet> { | ||
Some(self.data_source.metrics().clone_inner()) | ||
} | ||
|
||
fn statistics(&self) -> datafusion_common::Result<Statistics> { | ||
fn statistics(&self) -> Result<Statistics> { | ||
self.data_source.statistics() | ||
} | ||
|
||
|
@@ -189,9 +201,32 @@ impl ExecutionPlan for DataSourceExec { | |
fn try_swapping_with_projection( | ||
&self, | ||
projection: &ProjectionExec, | ||
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> { | ||
) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
self.data_source.try_swapping_with_projection(projection) | ||
} | ||
|
||
fn try_pushdown_filters( | ||
&self, | ||
_plan: &Arc<dyn ExecutionPlan>, | ||
parent_filters: &[PhysicalExprRef], | ||
config: &ConfigOptions, | ||
) -> Result<ExecutionPlanFilterPushdownResult> { | ||
match self | ||
.data_source | ||
.try_pushdown_filters(parent_filters, config)? | ||
{ | ||
DataSourceFilterPushdownResult::NotPushed => { | ||
Ok(ExecutionPlanFilterPushdownResult::NotPushed) | ||
} | ||
DataSourceFilterPushdownResult::Pushed { inner, support } => { | ||
let new_self = Arc::new(DataSourceExec::new(inner)); | ||
Ok(ExecutionPlanFilterPushdownResult::Pushed { | ||
inner: new_self, | ||
support, | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl DataSourceExec { | ||
|
@@ -254,3 +289,13 @@ impl DataSourceExec { | |
}) | ||
} | ||
} | ||
|
||
/// Create a new `DataSourceExec` from a `DataSource` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
Arc::new(Self::new(Arc::new(data_source)))
} in impl DataSourceExec. Doesn't that work for you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alamb sneaked this bit in, wdyt Andrew? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel strongly either way. I must have missed the existing function |
||
impl<S> From<S> for DataSourceExec | ||
where | ||
S: DataSource + 'static, | ||
{ | ||
fn from(source: S) -> Self { | ||
Self::new(Arc::new(source)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,31 @@ pub fn split_conjunction( | |
split_impl(Operator::And, predicate, vec![]) | ||
} | ||
|
||
/// Create a conjunction of the given predicates. | ||
/// If the input is empty, return a literal true. | ||
/// If the input contains a single predicate, return the predicate. | ||
/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). | ||
pub fn conjunction( | ||
predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>, | ||
) -> Arc<dyn PhysicalExpr> { | ||
conjunction_opt(predicates).unwrap_or_else(|| crate::expressions::lit(true)) | ||
} | ||
|
||
/// Create a conjunction of the given predicates. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just merging these two? I don't think people prefer something other than "true" if they provide an empty iterator There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's actually several places where it's useful to know if the result is |
||
/// If the input is empty or the return None. | ||
/// If the input contains a single predicate, return Some(predicate). | ||
/// Otherwise, return a Some(..) of a conjunction of the predicates (e.g. `Some(a AND b AND c)`). | ||
pub fn conjunction_opt( | ||
predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>, | ||
) -> Option<Arc<dyn PhysicalExpr>> { | ||
predicates | ||
.into_iter() | ||
.fold(None, |acc, predicate| match acc { | ||
None => Some(predicate), | ||
Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))), | ||
}) | ||
} | ||
|
||
/// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs. | ||
/// | ||
/// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::sync::Arc; | ||
|
||
use datafusion_common::{config::ConfigOptions, DataFusionError, Result}; | ||
use datafusion_physical_plan::{ | ||
execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan, | ||
}; | ||
|
||
use crate::PhysicalOptimizerRule; | ||
|
||
/// A physical optimizer rule that pushes down filters in the execution plan. | ||
/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of the algorithm. | ||
#[derive(Debug)] | ||
pub struct PushdownFilter {} | ||
adriangb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
impl Default for PushdownFilter { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl PushdownFilter { | ||
pub fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
impl PhysicalOptimizerRule for PushdownFilter { | ||
fn optimize( | ||
&self, | ||
plan: Arc<dyn ExecutionPlan>, | ||
config: &ConfigOptions, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
match plan.try_pushdown_filters(&plan, &Vec::new(), config)? { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My major suggestion is we should implement the recursion logic in this rule, not in ExecutionPlan trait. Firstly, all other rules are in this pattern, and easy to debug. Secondly, there already are many recursion tools for traversing and updating mechanisms, e.g transform_down, PlanContext... As we implement the other operators like joins etc. we will be easily writing only the filter pushdown related logic in ExecutionPlan implementation, not the traversal or update logic. |
||
ExecutionPlanFilterPushdownResult::NotPushed => Ok(plan), | ||
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => { | ||
if !support.is_empty() { | ||
return Err( | ||
DataFusionError::Plan( | ||
format!("PushdownFilter: plan returned support length does not match filters length: {} != 0", support.len() | ||
)) | ||
); | ||
} | ||
Ok(inner) | ||
} | ||
} | ||
} | ||
|
||
fn name(&self) -> &str { | ||
"PushdownFilter" | ||
} | ||
|
||
fn schema_check(&self) -> bool { | ||
true // Filter pushdown does not change the schema of the plan | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than saying "inner", can we emphasize this is the new self having the filter pushed down into it? Maybe renaming it as "updated" ? WDYT?