-
Notifications
You must be signed in to change notification settings - Fork 1.4k
ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them #15566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is first thing on my list to review tomorrow |
🤖 |
🤖: Benchmark completed Details
|
@alamb I'm curious, could you do a main vs. main run just so we get an idea of what the variability is? I don't know if I should be looking at <10% changes or not. |
I did that here: #15572 (comment) I think the queries where the absolute runtime is small (10s-100s of ms have more variation) I don't see any thing concerning in the benchmark run above. I ran out of time to review this PR today but it is very much high on my list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- this is a great PR
I have some suggestions on how we can simplify the APIs and increase the testing. I am willing to help work on them, but I think I am out of time for today.
Let me know what you think
datafusion/datasource/src/source.rs
Outdated
fn push_down_filters( | ||
&self, | ||
_filters: &[PhysicalExprRef], | ||
) -> datafusion_common::Result<Option<DataSourceFilterPushdownResult>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this type is getting pretty complicated. Maybe we could add the Option
to FilterPushdownResult
-- maybe make it a enum with variants for did pushdown, or no pushdown
pub enum FilterPushdownResult<T> {
Pushed {
pub inner: T,
pub support: Vec<FilterSupport>,
},
/// filters could not be pushed down
NotPushed
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep makes sense, happy to do that. The important bit is that you can cheaply return the NotPushed
variant without having to clone yourself, etc.
/// That is, it only allows some filters through because it changes the schema of the data. | ||
/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data. | ||
/// RepartitionExec nodes allow all filters to be pushed down as they don't change the schema or cardinality. | ||
fn filter_pushdown_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of two methods (request + rewrite) I wonder if we could simplify the API by combing the two following the model of ExecutionPlan::try_swapping_with_projection
Maybe something like
impl ExecutionPlan {
/// Attempt to push down filters into this node's children
/// this will be called with the result for each filter that this node gave in `filters_for_pushdown`
/// **and** any filters that children could not handle.
fn try_pushdown_filter(
self: Arc<Self>,
filters: &[PhysicalExprRef],
) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
Ok(None)
}
...
}
/// The result of a filter pushdown operation.
/// This includes:
pub enum FilterPushdownResult<T> {
Pushed {
/// * The inner plan that was produced by the pushdown operation.
pub inner: T,
/// * The support for each filter that was pushed down.
pub support: Vec<FilterSupport>,
},
/// No filters were pushed down
NotPushed,
}
Instead of FilterSupport, I think we could also follow the model and naming of
https://docs.rs/datafusion/latest/datafusion/datasource/provider/enum.TableProviderFilterPushDown.html
And consolidate many of these other enums
pub enum ExecutonPlanFilterPushDown {
Unsupported,
Inexact,
Exact,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of two methods (request + rewrite) I wonder if we could simplify the API by combining the two following the model of ExecutionPlan::try_swapping_with_projection
The reason it's two methods is that we need to recurse into children before passing the filters into the current node, but we need to know which filters we are allowed to push into children.
In other words, the between those two methods is when we recurse into the children.
I think we could possibly combine filter_pushdown_request
and filters_for_pushdown
into:
struct FilterPushdownResonse {
parent_filters: Vec<FilterPushdownAllowed>,
own_filters: Vec<PhysicalExprRef>,
}
fn request_filters_for_pushdown(
&self,
parent_filters: &[PhysicalExprRef],
) -> Result<
But I don't know that this is much simpler.
The point is that we need to keep track of which are parent filters and which are this node's own filters because we need to transmit that information up the recursion.
Instead of FilterSupport, I think we could also follow the model and naming of
https://docs.rs/datafusion/latest/datafusion/datasource/provider/enum.TableProviderFilterPushDown.html
I thought about doing this, the reason I didn't go with that is that:
- For
FilterPushdownAllowed
it needs to be able to rewrite the filter that is going to be pushed down so that projections can adjust the schema to their input schema if needed. - For both
FilterPushdownAllowed
andFilterSupport
there is no distinction betweenUnhandled
andInexact
. To be honest I'm not sure how useful that distinction is inTableProviderFilterPushDown
either, as far as I can tell allUnhandled
gets you is that you don't get called with filters you possibly don't know -> you don't have to ignore them.
We can try consolidating but I fear it may end up making things more complicated to grok not simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried looking into try_swapping_with_projection
, it's a bit complicated but I think it does do the recursion internally which won't work for us as discussed previously / that's why we made this an optimizer rule:
self.inner.try_swapping_with_projection(projection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm realizing there's another layer of complexity we need to add: operators like joins will want to partition the filters it gets from its parents into different buckets for each child:
CREATE TABLE t1 (id int, col1 text);
CREATE TABLE t2 (id int, col2 text);
EXPLAIN VERBOSE
SELECT *
FROM t1
LEFT JOIN t2 ON t1.id = t2.id
ORDER BY t1.col1 LIMIT 5;
This currently produces the plan:
| physical_plan | SortExec: TopK(fetch=5), expr=[col1@1 ASC NULLS LAST], preserve_partitioning=[false] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | HashJoinExec: mode=Partitioned, join_type=Left, on=[(id@0, id@0)] |
| | MemoryExec: partitions=1, partition_sizes=[0] |
| | MemoryExec: partitions=1, partition_sizes=[0] |
| | |
Once we have the dynamic TopK filter pushdown we'd want to be able to push down the dynamic filter into the left side of the join (t1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Playing around with it a little bit I'm not sure that merging filters_for_pushdown
and filter_pushdown_request
is any better. Some operators will care about implementing one and not the other, and merging them pushes complexity into the implementer which is now handled in the optimizer rule.
I'll let you give it a try and see if you agree.
I might push a fix for https://github.com/apache/datafusion/pull/15566/files#r2029910723 which will complicate things even more.
" | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some more tests here (I can help write the but I about out of time this morning)
- Tests with actual ParquetSource (to ensure everything works when hooked up correctly)
- Tests for
CoealesceBatches
andProjectionExec
(basically using the great examples on the comments of the rule)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great question.
The reason I specifically didn't implement ParquetSource
is because that has a bunch of knock-on effects related to the interaction with the existing filter pushdown mechanisms in ParquetSource
+ ListingTable
that I think should be dealt with in their own PR.
My plan was to avoid bloating this PR and limit the blast radius by only implementing FilterExec
as an example.
So the testing plan for this PR becomes (1) it doesn't break any other tests / the rest of the system despite being on by default and (2) these minimal tests show that the POC works.
Then as we add more implementations we can enrich these tests.
One idea is that we could add mock implementations for joins, projections, repartitions, etc. like I did for a DataSource and use those in tests.
Another proposal could be merging this but feature flagging the whole thing until we have a rich enough implementation for it to (1) be useful and (2) have extensive real world e2e tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for the filter + projection case, aggregations (not yet supported) and CoalesceBatchesExec
+ RepartitionExec
.
I still didn't implement any other cases, including ParquetSource
, for the reasons above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense
* Improve doc comments * Apply suggestions from code review --------- Co-authored-by: Adrian Garcia Badaracco <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I did a pretty extensive refactor, I think I got it into the state you wanted 😄
fn try_pushdown_filters( | ||
&self, | ||
plan: &Arc<dyn ExecutionPlan>, | ||
parent_filters: &[PhysicalExprRef], | ||
) -> Result<ExecutionPlanFilterPushdownResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is the only added method to ExecutionPlan
. There's a bit of weirdness with having to have &self
and plan: Arc<dyn ExecutionPlan>
which are the same thing just one is arc'ed and the other isn't. Arc<Self>
won't cut it because you can't call methods on it and for this to work I need a default implementation in ExecutionPlan
that does the recursion (otherwise we force ourselves to immediately implement this method on all nodes otherwise we would not recurse into subtrees, see the test with AggregateExec
).
If you have a way to avoid this happy to do it but it also doesn't seem like a huge deal.
" | ||
); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for the filter + projection case, aggregations (not yet supported) and CoalesceBatchesExec
+ RepartitionExec
.
I still didn't implement any other cases, including ParquetSource
, for the reasons above.
#[derive(Debug, Clone, Copy)] | ||
pub enum FilterPushdownSupport { | ||
/// Filter may not have been pushed down to the child plan, or the child plan | ||
/// can only partially apply the filter but may have false positives (but not false negatives). | ||
/// In this case the parent **must** behave as if the filter was not pushed down | ||
/// and must apply the filter itself. | ||
Unsupported, | ||
/// Filter was pushed down to the child plan and the child plan promises that | ||
/// it will apply the filter correctly with no false positives or false negatives. | ||
/// The parent can safely drop the filter. | ||
Exact, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't merge this with the TableProvider
API because there is not difference between Inexact
and Unsupported
in this API while there is in `TableProvider. We could merge them and treat those two as synonymous here but I'm not sure that's worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While there is no difference in behavior now between Inexact
and Unsupported
I still think we should use the same three enums as I can see it being useful in the future to know the difference
@berkaysynnada I updated the SLT tests. I'll have another review tomorrow but things I'd like to point out now:
|
I'll try to improve those points now. I'll give an update then
Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches |
Do you agree that we should go on with the plan emerging after FilterPushdown rule works only if the Filter's are joining the sources? If a Filter cannot join the source and stop at any intermediate point, then we don't accept that pushed down version and keep the original plan? |
Current Status:
Related with that, I've one question: Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan). |
I'm enabling this selective passing feature |
I'm not sure. The flow before was:
The story is the same if there are no filters from parents except for the last two steps which are skipped and the plan is unmodified. |
I still don't understand why any sort of "retry" or "revisit" is necessary. I feel like if we just add a second method to ExecutionPlan and are maybe less eager about popping out |
I'll be done with my changes in an hour or less. Would you like to discuss over the code after that? |
If |
Sure sounds good. Should we set up a call to move this along faster? |
I'm done with my turn for now. Only todo's are updating docs and reverting the tests changes. I've not yet gone into those as we might still have some changes after the meet. |
I have also updated the tests. Now, we don't break any existing behavior, but we can safely pushdown filters into sources |
While I am worried about the |
If I have to select one option, I prefer the simple API honestly, and take the cloning cost as it's not a big deal for now at least. Let's listen some other people's opinion on this? For the multiple recursion issue, if we are stick to dropping plans which the filter does't reach to the source, O(N^2) is a must |
Ok sounds good to me for now. We'll measure in our production system and if there's overhead from planning time we can come back and edit the API. |
@berkaysynnada if CI passes I think this is ready to merge and we can tweak later as we implement in more places 😄 |
/// └──────────────────────┘ | ||
/// │ | ||
/// ▼ | ||
/// ┌──────────────────────┐ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb this example needs an update 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed it. Why does it need an update, did we change how this behaves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops, I assumed outer filter is pushed over the projection, but it's already there before, and we are not pushing over projections at all yet. sorry :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @alamb and @ozankabak can take a look after the merge, since we've been iterating on this quite a bit. We know the fragile parts, and there will likely be many follow-up PRs related to these features. So, any advice/concern can be addresses on the follow-up PR's.
Great collaboration @adriangb, thank you. I hope more will come. |
Me too, great stuff! |
ORDER BY LIMIT
queries) #15037This introduces APIs for ExecutionPlans to push down, accept and forward filters as well as an optimizer rule that coordinates the pushdown.
Some basic docs and tests are added, e2e tests need actual operators to implement this.