Skip to content

ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them #15566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Apr 17, 2025

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Apr 3, 2025

This introduces APIs for ExecutionPlans to push down, accept and forward filters as well as an optimizer rule that coordinates the pushdown.

Some basic docs and tests are added, e2e tests need actual operators to implement this.

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Apr 3, 2025
@github-actions github-actions bot added physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate labels Apr 3, 2025
@alamb
Copy link
Contributor

alamb commented Apr 3, 2025

This is first thing on my list to review tomorrow

@alamb
Copy link
Contributor

alamb commented Apr 4, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.8.0-1016-gcp #18-Ubuntu SMP Fri Oct 4 22:16:29 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
Comparing filter-pushdown-apis (c78a590) to 28451b5 diff
Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Apr 4, 2025

🤖: Benchmark completed

Details

Comparing HEAD and filter-pushdown-apis
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ filter-pushdown-apis ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │  2009.92ms │            2134.27ms │ 1.06x slower │
│ QQuery 1     │   758.87ms │             722.05ms │    no change │
│ QQuery 2     │  1497.10ms │            1469.49ms │    no change │
│ QQuery 3     │   735.82ms │             720.90ms │    no change │
│ QQuery 4     │  1509.46ms │            1523.63ms │    no change │
│ QQuery 5     │ 17393.83ms │           16999.25ms │    no change │
│ QQuery 6     │  6931.69ms │            6936.68ms │    no change │
└──────────────┴────────────┴──────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 30836.68ms │
│ Total Time (filter-pushdown-apis)   │ 30506.27ms │
│ Average Time (HEAD)                 │  4405.24ms │
│ Average Time (filter-pushdown-apis) │  4358.04ms │
│ Queries Faster                      │          0 │
│ Queries Slower                      │          1 │
│ Queries with No Change              │          6 │
└─────────────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ filter-pushdown-apis ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     2.32ms │               2.10ms │ +1.10x faster │
│ QQuery 1     │    34.95ms │              36.84ms │  1.05x slower │
│ QQuery 2     │    94.57ms │              96.83ms │     no change │
│ QQuery 3     │   103.64ms │             100.63ms │     no change │
│ QQuery 4     │   757.59ms │             782.45ms │     no change │
│ QQuery 5     │   896.80ms │             881.90ms │     no change │
│ QQuery 6     │    32.59ms │              32.71ms │     no change │
│ QQuery 7     │    40.35ms │              43.14ms │  1.07x slower │
│ QQuery 8     │   954.03ms │             944.76ms │     no change │
│ QQuery 9     │  1270.70ms │            1231.08ms │     no change │
│ QQuery 10    │   285.36ms │             272.30ms │     no change │
│ QQuery 11    │   321.62ms │             321.02ms │     no change │
│ QQuery 12    │   948.64ms │             962.72ms │     no change │
│ QQuery 13    │  1435.73ms │            1330.99ms │ +1.08x faster │
│ QQuery 14    │   891.22ms │             912.56ms │     no change │
│ QQuery 15    │  1081.74ms │            1073.90ms │     no change │
│ QQuery 16    │  1800.08ms │            1816.52ms │     no change │
│ QQuery 17    │  1676.70ms │            1664.81ms │     no change │
│ QQuery 18    │  3177.05ms │            3198.94ms │     no change │
│ QQuery 19    │    85.20ms │              89.85ms │  1.05x slower │
│ QQuery 20    │  1173.96ms │            1170.95ms │     no change │
│ QQuery 21    │  1381.40ms │            1381.84ms │     no change │
│ QQuery 22    │  2544.67ms │            2546.62ms │     no change │
│ QQuery 23    │  8813.68ms │            8910.03ms │     no change │
│ QQuery 24    │   484.22ms │             492.88ms │     no change │
│ QQuery 25    │   402.11ms │             400.78ms │     no change │
│ QQuery 26    │   555.84ms │             554.17ms │     no change │
│ QQuery 27    │  1721.35ms │            1728.88ms │     no change │
│ QQuery 28    │ 13322.28ms │           12648.55ms │ +1.05x faster │
│ QQuery 29    │   544.27ms │             541.74ms │     no change │
│ QQuery 30    │   864.79ms │             854.92ms │     no change │
│ QQuery 31    │   905.05ms │             916.46ms │     no change │
│ QQuery 32    │  2803.51ms │            2819.80ms │     no change │
│ QQuery 33    │  3440.98ms │            3445.99ms │     no change │
│ QQuery 34    │  3447.60ms │            3461.27ms │     no change │
│ QQuery 35    │  1327.01ms │            1349.66ms │     no change │
│ QQuery 36    │   234.13ms │             218.10ms │ +1.07x faster │
│ QQuery 37    │    91.26ms │              91.87ms │     no change │
│ QQuery 38    │   131.62ms │             130.44ms │     no change │
│ QQuery 39    │   425.06ms │             407.65ms │     no change │
│ QQuery 40    │    51.60ms │              49.98ms │     no change │
│ QQuery 41    │    44.77ms │              47.57ms │  1.06x slower │
│ QQuery 42    │    55.56ms │              55.68ms │     no change │
└──────────────┴────────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 60657.58ms │
│ Total Time (filter-pushdown-apis)   │ 60021.90ms │
│ Average Time (HEAD)                 │  1410.64ms │
│ Average Time (filter-pushdown-apis) │  1395.86ms │
│ Queries Faster                      │          4 │
│ Queries Slower                      │          4 │
│ Queries with No Change              │         35 │
└─────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ filter-pushdown-apis ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 122.87ms │             125.44ms │     no change │
│ QQuery 2     │  24.28ms │              24.60ms │     no change │
│ QQuery 3     │  37.03ms │              36.16ms │     no change │
│ QQuery 4     │  21.41ms │              22.46ms │     no change │
│ QQuery 5     │  56.63ms │              58.42ms │     no change │
│ QQuery 6     │   8.60ms │               8.26ms │     no change │
│ QQuery 7     │ 108.01ms │             107.06ms │     no change │
│ QQuery 8     │  26.95ms │              27.84ms │     no change │
│ QQuery 9     │  63.08ms │              65.79ms │     no change │
│ QQuery 10    │  61.87ms │              61.87ms │     no change │
│ QQuery 11    │  12.99ms │              13.09ms │     no change │
│ QQuery 12    │  37.74ms │              39.04ms │     no change │
│ QQuery 13    │  29.20ms │              29.74ms │     no change │
│ QQuery 14    │   9.90ms │              10.24ms │     no change │
│ QQuery 15    │  25.46ms │              25.31ms │     no change │
│ QQuery 16    │  24.41ms │              23.19ms │ +1.05x faster │
│ QQuery 17    │  99.92ms │              98.66ms │     no change │
│ QQuery 18    │ 246.18ms │             248.22ms │     no change │
│ QQuery 19    │  30.37ms │              28.67ms │ +1.06x faster │
│ QQuery 20    │  42.06ms │              39.48ms │ +1.07x faster │
│ QQuery 21    │ 174.76ms │             174.08ms │     no change │
│ QQuery 22    │  17.60ms │              17.91ms │     no change │
└──────────────┴──────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                   │ 1281.30ms │
│ Total Time (filter-pushdown-apis)   │ 1285.51ms │
│ Average Time (HEAD)                 │   58.24ms │
│ Average Time (filter-pushdown-apis) │   58.43ms │
│ Queries Faster                      │         3 │
│ Queries Slower                      │         0 │
│ Queries with No Change              │        19 │
└─────────────────────────────────────┴───────────┘

@adriangb
Copy link
Contributor Author

adriangb commented Apr 4, 2025

@alamb I'm curious, could you do a main vs. main run just so we get an idea of what the variability is? I don't know if I should be looking at <10% changes or not.

@alamb
Copy link
Contributor

alamb commented Apr 4, 2025

@alamb I'm curious, could you do a main vs. main run just so we get an idea of what the variability is? I don't know if I should be looking at <10% changes or not.

I did that here: #15572 (comment)

I think the queries where the absolute runtime is small (10s-100s of ms have more variation)

I don't see any thing concerning in the benchmark run above.

I ran out of time to review this PR today but it is very much high on my list

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb -- this is a great PR

I have some suggestions on how we can simplify the APIs and increase the testing. I am willing to help work on them, but I think I am out of time for today.

Let me know what you think

fn push_down_filters(
&self,
_filters: &[PhysicalExprRef],
) -> datafusion_common::Result<Option<DataSourceFilterPushdownResult>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this type is getting pretty complicated. Maybe we could add the Option to FilterPushdownResult-- maybe make it a enum with variants for did pushdown, or no pushdown

pub enum FilterPushdownResult<T> {
    Pushed {
      pub inner: T,
      pub support: Vec<FilterSupport>,
    },
    /// filters could not be pushed down
    NotPushed
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep makes sense, happy to do that. The important bit is that you can cheaply return the NotPushed variant without having to clone yourself, etc.

/// That is, it only allows some filters through because it changes the schema of the data.
/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data.
/// RepartitionExec nodes allow all filters to be pushed down as they don't change the schema or cardinality.
fn filter_pushdown_request(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of two methods (request + rewrite) I wonder if we could simplify the API by combing the two following the model of ExecutionPlan::try_swapping_with_projection

Maybe something like

impl ExecutionPlan {
    /// Attempt to push down filters into this node's children
    /// this will be called with the result for each filter that this node gave in `filters_for_pushdown`
    /// **and** any filters that children could not handle.
    fn try_pushdown_filter(
        self: Arc<Self>,
        filters: &[PhysicalExprRef],
    ) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
        Ok(None)
    }
...

}

/// The  result of a filter pushdown operation.
/// This includes:
pub enum FilterPushdownResult<T> {
   Pushed {
      /// * The inner plan that was produced by the pushdown operation.
      pub inner: T,
      /// * The support for each filter that was pushed down.
      pub support: Vec<FilterSupport>,
   },
   /// No filters were pushed down
   NotPushed,     
}

Instead of FilterSupport, I think we could also follow the model and naming of
https://docs.rs/datafusion/latest/datafusion/datasource/provider/enum.TableProviderFilterPushDown.html

And consolidate many of these other enums

pub enum ExecutonPlanFilterPushDown {
    Unsupported,
    Inexact,
    Exact,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of two methods (request + rewrite) I wonder if we could simplify the API by combining the two following the model of ExecutionPlan::try_swapping_with_projection

The reason it's two methods is that we need to recurse into children before passing the filters into the current node, but we need to know which filters we are allowed to push into children.
In other words, the between those two methods is when we recurse into the children.

I think we could possibly combine filter_pushdown_request and filters_for_pushdown into:

struct FilterPushdownResonse {
  parent_filters: Vec<FilterPushdownAllowed>,
  own_filters: Vec<PhysicalExprRef>,
}

fn request_filters_for_pushdown(
  &self,
  parent_filters: &[PhysicalExprRef],
) -> Result<

But I don't know that this is much simpler.
The point is that we need to keep track of which are parent filters and which are this node's own filters because we need to transmit that information up the recursion.

Instead of FilterSupport, I think we could also follow the model and naming of
https://docs.rs/datafusion/latest/datafusion/datasource/provider/enum.TableProviderFilterPushDown.html

I thought about doing this, the reason I didn't go with that is that:

  1. For FilterPushdownAllowed it needs to be able to rewrite the filter that is going to be pushed down so that projections can adjust the schema to their input schema if needed.
  2. For both FilterPushdownAllowed and FilterSupport there is no distinction between Unhandled and Inexact. To be honest I'm not sure how useful that distinction is in TableProviderFilterPushDown either, as far as I can tell all Unhandled gets you is that you don't get called with filters you possibly don't know -> you don't have to ignore them.

We can try consolidating but I fear it may end up making things more complicated to grok not simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried looking into try_swapping_with_projection, it's a bit complicated but I think it does do the recursion internally which won't work for us as discussed previously / that's why we made this an optimizer rule:

self.inner.try_swapping_with_projection(projection)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing there's another layer of complexity we need to add: operators like joins will want to partition the filters it gets from its parents into different buckets for each child:

CREATE TABLE t1 (id int, col1 text);
CREATE TABLE t2 (id int, col2 text);

EXPLAIN VERBOSE
SELECT *
FROM t1
LEFT JOIN t2 ON t1.id = t2.id
ORDER BY t1.col1 LIMIT 5;

This currently produces the plan:

| physical_plan | SortExec: TopK(fetch=5), expr=[col1@1 ASC NULLS LAST], preserve_partitioning=[false] |
|               |   CoalesceBatchesExec: target_batch_size=8192                                        |
|               |     HashJoinExec: mode=Partitioned, join_type=Left, on=[(id@0, id@0)]                |
|               |       MemoryExec: partitions=1, partition_sizes=[0]                                  |
|               |       MemoryExec: partitions=1, partition_sizes=[0]                                  |
|               |                                                                                      |

Once we have the dynamic TopK filter pushdown we'd want to be able to push down the dynamic filter into the left side of the join (t1).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Playing around with it a little bit I'm not sure that merging filters_for_pushdown and filter_pushdown_request is any better. Some operators will care about implementing one and not the other, and merging them pushes complexity into the implementer which is now handled in the optimizer rule.

I'll let you give it a try and see if you agree.

I might push a fix for https://github.com/apache/datafusion/pull/15566/files#r2029910723 which will complicate things even more.

"
);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some more tests here (I can help write the but I about out of time this morning)

  1. Tests with actual ParquetSource (to ensure everything works when hooked up correctly)
  2. Tests for CoealesceBatches and ProjectionExec (basically using the great examples on the comments of the rule)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great question.

The reason I specifically didn't implement ParquetSource is because that has a bunch of knock-on effects related to the interaction with the existing filter pushdown mechanisms in ParquetSource + ListingTable that I think should be dealt with in their own PR.

My plan was to avoid bloating this PR and limit the blast radius by only implementing FilterExec as an example.
So the testing plan for this PR becomes (1) it doesn't break any other tests / the rest of the system despite being on by default and (2) these minimal tests show that the POC works.
Then as we add more implementations we can enrich these tests.

One idea is that we could add mock implementations for joins, projections, repartitions, etc. like I did for a DataSource and use those in tests.

Another proposal could be merging this but feature flagging the whole thing until we have a rich enough implementation for it to (1) be useful and (2) have extensive real world e2e tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for the filter + projection case, aggregations (not yet supported) and CoalesceBatchesExec + RepartitionExec.

I still didn't implement any other cases, including ParquetSource, for the reasons above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense

alamb and others added 6 commits April 5, 2025 09:24
Copy link
Contributor Author

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I did a pretty extensive refactor, I think I got it into the state you wanted 😄

Comment on lines 747 to 751
fn try_pushdown_filters(
&self,
plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
) -> Result<ExecutionPlanFilterPushdownResult> {
Copy link
Contributor Author

@adriangb adriangb Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is the only added method to ExecutionPlan. There's a bit of weirdness with having to have &self and plan: Arc<dyn ExecutionPlan> which are the same thing just one is arc'ed and the other isn't. Arc<Self> won't cut it because you can't call methods on it and for this to work I need a default implementation in ExecutionPlan that does the recursion (otherwise we force ourselves to immediately implement this method on all nodes otherwise we would not recurse into subtrees, see the test with AggregateExec).

If you have a way to avoid this happy to do it but it also doesn't seem like a huge deal.

"
);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for the filter + projection case, aggregations (not yet supported) and CoalesceBatchesExec + RepartitionExec.

I still didn't implement any other cases, including ParquetSource, for the reasons above.

Comment on lines 20 to 31
#[derive(Debug, Clone, Copy)]
pub enum FilterPushdownSupport {
/// Filter may not have been pushed down to the child plan, or the child plan
/// can only partially apply the filter but may have false positives (but not false negatives).
/// In this case the parent **must** behave as if the filter was not pushed down
/// and must apply the filter itself.
Unsupported,
/// Filter was pushed down to the child plan and the child plan promises that
/// it will apply the filter correctly with no false positives or false negatives.
/// The parent can safely drop the filter.
Exact,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't merge this with the TableProvider API because there is not difference between Inexact and Unsupported in this API while there is in `TableProvider. We could merge them and treat those two as synonymous here but I'm not sure that's worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While there is no difference in behavior now between Inexact and Unsupported I still think we should use the same three enums as I can see it being useful in the future to know the difference

@github-actions github-actions bot removed the physical-expr Changes to the physical-expr crates label Apr 16, 2025
@adriangb
Copy link
Contributor Author

@berkaysynnada I updated the SLT tests. I'll have another review tomorrow but things I'd like to point out now:

  1. We should still think about the retry parameter. Ideally we can get rid of it somehow.
  2. The SLT plans show a lot of updates where the position of the FilterExec was swapped with other operators. Unless we can unequivocally prove that the new position is better we should probably minimize the risk for this PR by minimizing the changes, meaning the FilterExec doesn't move in all of these plans.

@berkaysynnada
Copy link
Contributor

@berkaysynnada I updated the SLT tests. I'll have another review tomorrow but things I'd like to point out now:

  1. We should still think about the retry parameter. Ideally we can get rid of it somehow.

I'll try to improve those points now. I'll give an update then

  1. The SLT plans show a lot of updates where the position of the FilterExec was swapped with other operators. Unless we can unequivocally prove that the new position is better we should probably minimize the risk for this PR by minimizing the changes, meaning the FilterExec doesn't move in all of these plans.

Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches

@berkaysynnada
Copy link
Contributor

Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches

Do you agree that we should go on with the plan emerging after FilterPushdown rule works only if the Filter's are joining the sources? If a Filter cannot join the source and stop at any intermediate point, then we don't accept that pushed down version and keep the original plan?

@berkaysynnada
Copy link
Contributor

Current Status:

  1. Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this

  2. I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this.

  3. I'm currently on this:

Do you agree that we should go on with the plan emerging after FilterPushdown rule works only if the Filter's are joining the sources? If a Filter cannot join the source and stop at any intermediate point, then we don't accept that pushed down version and keep the original plan?

Related with that, I've one question: Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan).
Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make?

@berkaysynnada
Copy link
Contributor

Related with that, I've one question: Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan). Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make?

I'm enabling this selective passing feature

@adriangb
Copy link
Contributor Author

Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches

I'm not sure. The flow before was:

  1. Recurse down with filters from parents x = 5 (example).
  2. Hit a FilterExec. It says it's okay to push down x = 5.
  3. Pass through CoalesceBatchesExec.
  4. Hit a scan that says it can't absorb the filters and returns Unsupported.
  5. Recurse back up to the FilterExec.
  6. Ask the FilterExec if it can handle x = 5 itself.
  7. FilterExec says yes, it replaces itself with a new FilterExec.

The story is the same if there are no filters from parents except for the last two steps which are skipped and the plan is unmodified.

@adriangb
Copy link
Contributor Author

  • I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this.

I still don't understand why any sort of "retry" or "revisit" is necessary. I feel like if we just add a second method to ExecutionPlan and are maybe less eager about popping out FilterExecs this won't be needed.

@berkaysynnada
Copy link
Contributor

  • I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this.

I still don't understand why any sort of "retry" or "revisit" is necessary. I feel like if we just add a second method to ExecutionPlan and are maybe less eager about popping out FilterExecs this won't be needed.

I'll be done with my changes in an hour or less. Would you like to discuss over the code after that?

@adriangb
Copy link
Contributor Author

Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan).
Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make?

If F does not allow filter pushdown than anything upstream of it will not be able to push down filters into the scan (G).
However there may still be filter pushdown e.g. from B to E: TopK <- FilterExec <- F <- Scan. In this case TopK may push down into FilterExec and FilterExec will try to push down into Scan but be blocked by F, so FilterExec stays where it is with it's current filters but also absorbs any filters from TopK.

@adriangb
Copy link
Contributor Author

I'll be done with my changes in an hour or less. Would you like to discuss over the code after that?

Sure sounds good. Should we set up a call to move this along faster?

@berkaysynnada
Copy link
Contributor

I'm done with my turn for now. Only todo's are updating docs and reverting the tests changes. I've not yet gone into those as we might still have some changes after the meet.

@berkaysynnada
Copy link
Contributor

I have also updated the tests. Now, we don't break any existing behavior, but we can safely pushdown filters into sources

@adriangb
Copy link
Contributor Author

adriangb commented Apr 17, 2025

  • Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this

While I am worried about the ExecutionPlans we have in DataFusion I am even more concerned about the one's we don't see: custom user plans which might be quite expensive to clone. And even if it's not a big impact for this rule if we add more and more rules each with multiple recursions it's going to be a non-trivial overhead. Especially since DataFusion doesn't support prepared statements or anything like that, it has to re-plan every time a query is run.

@berkaysynnada
Copy link
Contributor

  • Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this

While I am worried about the ExecutionPlans we have in DataFusion but even more concerning to me is the one's we don't see: custom user plans which might be quite expensive to clone. And even if it's not a big impact for this rule if we add more and more rules each with multiple recursions it's going to be a non-trivial overhead. Especially since DataFusion doesn't support prepared statements or anything like that, it has to re-plan every time a query is run.

If I have to select one option, I prefer the simple API honestly, and take the cloning cost as it's not a big deal for now at least. Let's listen some other people's opinion on this?

For the multiple recursion issue, if we are stick to dropping plans which the filter does't reach to the source, O(N^2) is a must

@adriangb
Copy link
Contributor Author

Ok sounds good to me for now. We'll measure in our production system and if there's overhead from planning time we can come back and edit the API.

@adriangb
Copy link
Contributor Author

@berkaysynnada if CI passes I think this is ready to merge and we can tweak later as we implement in more places 😄

/// └──────────────────────┘
/// │
/// ▼
/// ┌──────────────────────┐
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb this example needs an update 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed it. Why does it need an update, did we change how this behaves?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops, I assumed outer filter is pushed over the projection, but it's already there before, and we are not pushing over projections at all yet. sorry :D

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @alamb and @ozankabak can take a look after the merge, since we've been iterating on this quite a bit. We know the fragile parts, and there will likely be many follow-up PRs related to these features. So, any advice/concern can be addresses on the follow-up PR's.

@berkaysynnada berkaysynnada merged commit 112cde8 into apache:main Apr 17, 2025
27 checks passed
@berkaysynnada
Copy link
Contributor

Great collaboration @adriangb, thank you. I hope more will come.

@adriangb
Copy link
Contributor Author

Great collaboration @adriangb, thank you. I hope more will come.

Me too, great stuff!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants