Skip to content

Commit b0a15be

Browse files
suibianwanwankjonathanc-n2010YOUY01
authored andcommitted
Improve performance of constant aggregate window expression (apache#16234)
* Improve performance of constant aggregate window expression * Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Jonathan Chen <[email protected]> * fmt * Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Yongting You <[email protected]> * Rename * fmt --------- Co-authored-by: Jonathan Chen <[email protected]> Co-authored-by: Yongting You <[email protected]>
1 parent d0ccde7 commit b0a15be

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use arrow::array::ArrayRef;
3434
use arrow::datatypes::FieldRef;
3535
use arrow::record_batch::RecordBatch;
3636
use datafusion_common::{DataFusionError, Result, ScalarValue};
37-
use datafusion_expr::{Accumulator, WindowFrame};
37+
use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits};
3838
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3939

4040
/// A window expr that takes the form of an aggregate function.
@@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr {
4646
partition_by: Vec<Arc<dyn PhysicalExpr>>,
4747
order_by: LexOrdering,
4848
window_frame: Arc<WindowFrame>,
49+
is_constant_in_partition: bool,
4950
}
5051

5152
impl PlainAggregateWindowExpr {
@@ -56,11 +57,14 @@ impl PlainAggregateWindowExpr {
5657
order_by: &LexOrdering,
5758
window_frame: Arc<WindowFrame>,
5859
) -> Self {
60+
let is_constant_in_partition =
61+
Self::is_window_constant_in_partition(order_by, &window_frame);
5962
Self {
6063
aggregate,
6164
partition_by: partition_by.to_vec(),
6265
order_by: order_by.clone(),
6366
window_frame,
67+
is_constant_in_partition,
6468
}
6569
}
6670

@@ -85,6 +89,30 @@ impl PlainAggregateWindowExpr {
8589
);
8690
}
8791
}
92+
93+
// Returns true if every row in the partition has the same window frame. This allows
94+
// for preventing bound + function calculation for every row due to the values being the
95+
// same.
96+
//
97+
// This occurs when both bounds fall under either condition below:
98+
// 1. Bound is unbounded (`Preceding` or `Following`)
99+
// 2. Bound is `CurrentRow` while using `Range` units with no order by clause
100+
// This results in an invalid range specification. Following PostgreSQL’s convention,
101+
// we interpret this as the entire partition being used for the current window frame.
102+
fn is_window_constant_in_partition(
103+
order_by: &LexOrdering,
104+
window_frame: &WindowFrame,
105+
) -> bool {
106+
let is_constant_bound = |bound: &WindowFrameBound| match bound {
107+
WindowFrameBound::CurrentRow => {
108+
window_frame.units == WindowFrameUnits::Range && order_by.is_empty()
109+
}
110+
_ => bound.is_unbounded(),
111+
};
112+
113+
is_constant_bound(&window_frame.start_bound)
114+
&& is_constant_bound(&window_frame.end_bound)
115+
}
88116
}
89117

90118
/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
@@ -213,4 +241,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
213241
accumulator.evaluate()
214242
}
215243
}
244+
245+
fn is_constant_in_partition(&self) -> bool {
246+
self.is_constant_in_partition
247+
}
216248
}

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
210210
accumulator.evaluate()
211211
}
212212
}
213+
214+
fn is_constant_in_partition(&self) -> bool {
215+
false
216+
}
213217
}

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr {
186186
accumulator: &mut Box<dyn Accumulator>,
187187
) -> Result<ScalarValue>;
188188

189+
/// Indicates whether this window function always produces the same result
190+
/// for all rows in the partition.
191+
fn is_constant_in_partition(&self) -> bool;
192+
189193
/// Evaluates the window function against the batch.
190194
fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
191195
let mut accumulator = self.get_accumulator()?;
@@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr {
272276
not_end: bool,
273277
) -> Result<ArrayRef> {
274278
let values = self.evaluate_args(record_batch)?;
275-
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
276279

280+
if self.is_constant_in_partition() {
281+
accumulator.update_batch(&values)?;
282+
let value = accumulator.evaluate()?;
283+
return value.to_array_of_size(record_batch.num_rows());
284+
}
285+
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
277286
let most_recent_row_order_bys = most_recent_row
278287
.map(|batch| self.order_by_columns(batch))
279288
.transpose()?

0 commit comments

Comments
 (0)