Skip to content

Improve performance of constant aggregate window expression #16234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::array::ArrayRef;
use arrow::datatypes::FieldRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{Accumulator, WindowFrame};
use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A window expr that takes the form of an aggregate function.
Expand All @@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr {
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: LexOrdering,
window_frame: Arc<WindowFrame>,
is_window_constant: bool,
}

impl PlainAggregateWindowExpr {
Expand All @@ -56,11 +57,13 @@ impl PlainAggregateWindowExpr {
order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
) -> Self {
let is_window_constant = Self::is_window_constant(order_by, &window_frame);
Self {
aggregate,
partition_by: partition_by.to_vec(),
order_by: order_by.clone(),
window_frame,
is_window_constant,
}
}

Expand All @@ -85,6 +88,25 @@ impl PlainAggregateWindowExpr {
);
}
}

// Returns true if every row in the partition has the same window frame. This allows
// for preventing bound + function calculation for every row due to the values being the
// same.
//
// This occurs when both bounds fall under either condition below:
// 1. Bound is unbounded (`Preceding` or `Following`)
// 2. Bound is `CurrentRow` while using `Range` units with no order by clause
fn is_window_constant(order_by: &LexOrdering, window_frame: &WindowFrame) -> bool {
let is_constant_bound = |bound: &WindowFrameBound| match bound {
WindowFrameBound::CurrentRow => {
window_frame.units == WindowFrameUnits::Range && order_by.is_empty()
}
_ => bound.is_unbounded(),
};

is_constant_bound(&window_frame.start_bound)
&& is_constant_bound(&window_frame.end_bound)
}
}

/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
Expand Down Expand Up @@ -213,4 +235,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
accumulator.evaluate()
}
}

fn is_constant(&self) -> bool {
self.is_window_constant
}
}
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
accumulator.evaluate()
}
}

fn is_constant(&self) -> bool {
false
}
}
11 changes: 10 additions & 1 deletion datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr {
accumulator: &mut Box<dyn Accumulator>,
) -> Result<ScalarValue>;

/// Indicates whether this window function always produces the same result
/// for all rows in the partition.
fn is_constant(&self) -> bool;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn is_constant(&self) -> bool;
fn is_constant_in_partition(&self) -> bool;

nit: It would be great to be more specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll address it a bit later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/// Evaluates the window function against the batch.
fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let mut accumulator = self.get_accumulator()?;
Expand Down Expand Up @@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr {
not_end: bool,
) -> Result<ArrayRef> {
let values = self.evaluate_args(record_batch)?;
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);

if self.is_constant() {
accumulator.update_batch(&values)?;
let value = accumulator.evaluate()?;
return value.to_array_of_size(record_batch.num_rows());
}
let order_bys = get_orderby_values(self.order_by_columns(record_batch)?);
let most_recent_row_order_bys = most_recent_row
.map(|batch| self.order_by_columns(batch))
.transpose()?
Expand Down