Skip to content

Commit c76c1f0

Browse files
authored
fix: [branch-48] Revert "Improve performance of constant aggregate window expression" (#16307)
* Revert "Improve performance of constant aggregate window expression (#16234)" This reverts commit 0c30374. * update changelog * update changelog
1 parent 85f6621 commit c76c1f0

File tree

4 files changed

+3
-50
lines changed

4 files changed

+3
-50
lines changed

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use arrow::array::ArrayRef;
3434
use arrow::datatypes::FieldRef;
3535
use arrow::record_batch::RecordBatch;
3636
use datafusion_common::{DataFusionError, Result, ScalarValue};
37-
use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits};
37+
use datafusion_expr::{Accumulator, WindowFrame};
3838
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3939

4040
/// A window expr that takes the form of an aggregate function.
@@ -46,7 +46,6 @@ pub struct PlainAggregateWindowExpr {
4646
partition_by: Vec<Arc<dyn PhysicalExpr>>,
4747
order_by: LexOrdering,
4848
window_frame: Arc<WindowFrame>,
49-
is_constant_in_partition: bool,
5049
}
5150

5251
impl PlainAggregateWindowExpr {
@@ -57,14 +56,11 @@ impl PlainAggregateWindowExpr {
5756
order_by: &LexOrdering,
5857
window_frame: Arc<WindowFrame>,
5958
) -> Self {
60-
let is_constant_in_partition =
61-
Self::is_window_constant_in_partition(order_by, &window_frame);
6259
Self {
6360
aggregate,
6461
partition_by: partition_by.to_vec(),
6562
order_by: order_by.clone(),
6663
window_frame,
67-
is_constant_in_partition,
6864
}
6965
}
7066

@@ -89,30 +85,6 @@ impl PlainAggregateWindowExpr {
8985
);
9086
}
9187
}
92-
93-
// Returns true if every row in the partition has the same window frame. This allows
94-
// for preventing bound + function calculation for every row due to the values being the
95-
// same.
96-
//
97-
// This occurs when both bounds fall under either condition below:
98-
// 1. Bound is unbounded (`Preceding` or `Following`)
99-
// 2. Bound is `CurrentRow` while using `Range` units with no order by clause
100-
// This results in an invalid range specification. Following PostgreSQL’s convention,
101-
// we interpret this as the entire partition being used for the current window frame.
102-
fn is_window_constant_in_partition(
103-
order_by: &LexOrdering,
104-
window_frame: &WindowFrame,
105-
) -> bool {
106-
let is_constant_bound = |bound: &WindowFrameBound| match bound {
107-
WindowFrameBound::CurrentRow => {
108-
window_frame.units == WindowFrameUnits::Range && order_by.is_empty()
109-
}
110-
_ => bound.is_unbounded(),
111-
};
112-
113-
is_constant_bound(&window_frame.start_bound)
114-
&& is_constant_bound(&window_frame.end_bound)
115-
}
11688
}
11789

11890
/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
@@ -241,8 +213,4 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
241213
accumulator.evaluate()
242214
}
243215
}
244-
245-
fn is_constant_in_partition(&self) -> bool {
246-
self.is_constant_in_partition
247-
}
248216
}

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,4 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
210210
accumulator.evaluate()
211211
}
212212
}
213-
214-
fn is_constant_in_partition(&self) -> bool {
215-
false
216-
}
217213
}

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,6 @@ pub trait AggregateWindowExpr: WindowExpr {
186186
accumulator: &mut Box<dyn Accumulator>,
187187
) -> Result<ScalarValue>;
188188

189-
/// Indicates whether this window function always produces the same result
190-
/// for all rows in the partition.
191-
fn is_constant_in_partition(&self) -> bool;
192-
193189
/// Evaluates the window function against the batch.
194190
fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
195191
let mut accumulator = self.get_accumulator()?;
@@ -276,13 +272,8 @@ pub trait AggregateWindowExpr: WindowExpr {
276272
not_end: bool,
277273
) -> Result<ArrayRef> {
278274
let values = self.evaluate_args(record_batch)?;
279-
280-
if self.is_constant_in_partition() {
281-
accumulator.update_batch(&values)?;
282-
let value = accumulator.evaluate()?;
283-
return value.to_array_of_size(record_batch.num_rows());
284-
}
285275
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
276+
286277
let most_recent_row_order_bys = most_recent_row
287278
.map(|batch| self.order_by_columns(batch))
288279
.transpose()?

dev/changelog/48.0.0.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ under the License.
1919

2020
# Apache DataFusion 48.0.0 Changelog
2121

22-
This release consists of 267 commits from 89 contributors. See credits at the end of this changelog for more information.
22+
This release consists of 266 commits from 88 contributors. See credits at the end of this changelog for more information.
2323

2424
**Breaking changes:**
2525

@@ -297,7 +297,6 @@ This release consists of 267 commits from 89 contributors. See credits at the en
297297
- Simplify FileSource / SchemaAdapterFactory API [#16214](https://github.com/apache/datafusion/pull/16214) (alamb)
298298
- Add dicts to aggregation fuzz testing [#16232](https://github.com/apache/datafusion/pull/16232) (blaginin)
299299
- chore(deps): bump sysinfo from 0.35.1 to 0.35.2 [#16247](https://github.com/apache/datafusion/pull/16247) (dependabot[bot])
300-
- Improve performance of constant aggregate window expression [#16234](https://github.com/apache/datafusion/pull/16234) (suibianwanwank)
301300
- Support compound identifier when parsing tuples [#16225](https://github.com/apache/datafusion/pull/16225) (hozan23)
302301
- Schema adapter helper [#16108](https://github.com/apache/datafusion/pull/16108) (kosiew)
303302
- Update tpch, clickbench, sort_tpch to mark failed queries [#16182](https://github.com/apache/datafusion/pull/16182) (ding-young)
@@ -397,7 +396,6 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co
397396
1 irenjj
398397
1 jsai28
399398
1 m09526
400-
1 suibianwanwan
401399
1 the0ninjas
402400
1 wiedld
403401
```

0 commit comments

Comments
 (0)