Skip to content

Improve performance of constant aggregate window expression #16234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::array::ArrayRef;
use arrow::datatypes::FieldRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{Accumulator, WindowFrame};
use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A window expr that takes the form of an aggregate function.
Expand All @@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr {
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: LexOrdering,
window_frame: Arc<WindowFrame>,
is_constant_in_partition: bool,
}

impl PlainAggregateWindowExpr {
Expand All @@ -56,11 +57,14 @@ impl PlainAggregateWindowExpr {
order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
) -> Self {
let is_constant_in_partition =
Self::is_window_constant_in_partition(order_by, &window_frame);
Self {
aggregate,
partition_by: partition_by.to_vec(),
order_by: order_by.clone(),
window_frame,
is_constant_in_partition,
}
}

Expand All @@ -85,6 +89,30 @@ impl PlainAggregateWindowExpr {
);
}
}

// Returns true if every row in the partition has the same window frame. This allows
// for preventing bound + function calculation for every row due to the values being the
// same.
//
// This occurs when both bounds fall under either condition below:
// 1. Bound is unbounded (`Preceding` or `Following`)
// 2. Bound is `CurrentRow` while using `Range` units with no order by clause
// This results in an invalid range specification. Following PostgreSQL’s convention,
// we interpret this as the entire partition being used for the current window frame.
fn is_window_constant_in_partition(
order_by: &LexOrdering,
window_frame: &WindowFrame,
) -> bool {
let is_constant_bound = |bound: &WindowFrameBound| match bound {
WindowFrameBound::CurrentRow => {
window_frame.units == WindowFrameUnits::Range && order_by.is_empty()
}
_ => bound.is_unbounded(),
};

is_constant_bound(&window_frame.start_bound)
&& is_constant_bound(&window_frame.end_bound)
}
}

/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
Expand Down Expand Up @@ -213,4 +241,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
accumulator.evaluate()
}
}

fn is_constant_in_partition(&self) -> bool {
self.is_constant_in_partition
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
accumulator.evaluate()
}
}

fn is_constant_in_partition(&self) -> bool {
false
}
}
11 changes: 10 additions & 1 deletion datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr {
accumulator: &mut Box<dyn Accumulator>,
) -> Result<ScalarValue>;

/// Indicates whether this window function always produces the same result
/// for all rows in the partition.
fn is_constant_in_partition(&self) -> bool;

/// Evaluates the window function against the batch.
fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let mut accumulator = self.get_accumulator()?;
Expand Down Expand Up @@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr {
not_end: bool,
) -> Result<ArrayRef> {
let values = self.evaluate_args(record_batch)?;
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);

if self.is_constant_in_partition() {
accumulator.update_batch(&values)?;
let value = accumulator.evaluate()?;
return value.to_array_of_size(record_batch.num_rows());
}
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
let most_recent_row_order_bys = most_recent_row
.map(|batch| self.order_by_columns(batch))
.transpose()?
Expand Down