diff --git a/datafusion/expr/src/partition_evaluator.rs b/datafusion/expr/src/partition_evaluator.rs index 5a4e20e5ac9ac..19e6e9b99d786 100644 --- a/datafusion/expr/src/partition_evaluator.rs +++ b/datafusion/expr/src/partition_evaluator.rs @@ -200,6 +200,37 @@ pub trait PartitionEvaluator: Debug + Send + std::any::Any { not_impl_err!("evaluate is not implemented by default") } + /// Returns `true` if this evaluator can produce its entire output column in + /// a single pass from the per-row window-frame ranges via + /// [`PartitionEvaluator::evaluate_all_with_frame_ranges`]. + /// + /// This is for window functions that *do* depend on the window frame (so + /// the frameless [`PartitionEvaluator::evaluate_all`] cannot be used), but + /// whose result is a simple positional gather from the input — e.g. + /// `first_value`/`last_value`/`nth_value`. Implementing the pair of methods + /// lets the framework build the output with one vectorized kernel instead + /// of boxing a [`ScalarValue`] per row. Defaults to `false`, in which case + /// the framework falls back to calling [`PartitionEvaluator::evaluate`] once + /// per output row. + fn supports_evaluate_all_with_frame_ranges(&self) -> bool { + false + } + + /// Vectorized counterpart to [`PartitionEvaluator::evaluate`]: produces the + /// whole output column at once given the already-computed per-row frame + /// `ranges` (`ranges[i]` is the window frame of output row `i`). + /// + /// Only called when + /// [`PartitionEvaluator::supports_evaluate_all_with_frame_ranges`] returns + /// `true`; the default is therefore unreachable for in-tree functions. + fn evaluate_all_with_frame_ranges( + &mut self, + _values: &[ArrayRef], + _ranges: &[Range], + ) -> Result { + not_impl_err!("evaluate_all_with_frame_ranges is not implemented by default") + } + /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window /// functions that only need the rank of a row within its window /// frame. diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs index df723772166a6..1f211c9639591 100644 --- a/datafusion/functions-window/src/nth_value.rs +++ b/datafusion/functions-window/src/nth_value.rs @@ -19,7 +19,9 @@ use crate::utils::{get_scalar_value_from_args, get_signed_integer}; +use arrow::array::UInt32Builder; use arrow::buffer::NullBuffer; +use arrow::compute::take; use arrow::datatypes::FieldRef; use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::datatypes::{DataType, Field}; @@ -480,6 +482,44 @@ impl PartitionEvaluator for NthValueEvaluator { fn uses_window_frame(&self) -> bool { true } + + fn supports_evaluate_all_with_frame_ranges(&self) -> bool { + // The IGNORE NULLS path walks the null bitmap per row (see + // `valid_index_with_nulls`) and stays on the scalar `evaluate` path. + // The default (respect NULLs) case is a pure positional gather that we + // vectorize with a single `take`. + !self.ignore_nulls + } + + fn evaluate_all_with_frame_ranges( + &mut self, + values: &[ArrayRef], + ranges: &[Range], + ) -> Result { + // A finalized (constant) result fills the whole column in one shot. + if let Some(result) = &self.state.finalized_result { + return result.to_array_of_size(ranges.len()); + } + // FIRST_VALUE, LAST_VALUE and NTH_VALUE take a single column. + let arr = &values[0]; + let mut indices = UInt32Builder::with_capacity(ranges.len()); + for range in ranges { + // Empty frame -> NULL, matching the `n_range == 0` guard in `evaluate`. + let index = if range.end == range.start { + None + } else { + self.valid_index(arr, range) + }; + match index { + Some(index) => indices.append_value(index as u32), + None => indices.append_null(), + } + } + let indices = indices.finish(); + // `take` yields NULL for the NULL index slots, matching the `None` + // branch of `evaluate`. + Ok(take(arr.as_ref(), &indices, None)?) + } } impl NthValueEvaluator { @@ -670,6 +710,63 @@ mod tests { Ok(()) } + #[test] + fn evaluate_all_with_frame_ranges_matches_per_row() -> Result<()> { + // Array with nulls so the gather-index null handling is exercised. + let arr: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + Some(4), + None, + Some(6), + ])); + let values = vec![arr]; + // Diverse frames: empty, single, growing, sliding, full. + let ranges: Vec> = + vec![0..0, 0..1, 0..3, 2..2, 1..5, 0..6, 3..6, 4..6, 5..6]; + + let field: FieldRef = Field::new("f", DataType::Int32, true).into(); + let col = Arc::new(Column::new("c", 0)) as Arc; + let n2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc; + let nm1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(-1)))) as Arc; + + let cases: Vec<(NthValue, Vec>)> = vec![ + (NthValue::first(), vec![Arc::clone(&col)]), + (NthValue::last(), vec![Arc::clone(&col)]), + (NthValue::nth(), vec![Arc::clone(&col), Arc::clone(&n2)]), + (NthValue::nth(), vec![Arc::clone(&col), Arc::clone(&nm1)]), + ]; + + for (expr, exprs) in cases { + let make_args = || { + PartitionEvaluatorArgs::new( + &exprs, + std::slice::from_ref(&field), + false, + false, + ) + }; + // Per-row reference result. + let mut per_row_eval = expr.partition_evaluator(make_args())?; + let per_row = ranges + .iter() + .map(|range| per_row_eval.evaluate(&values, range)) + .collect::>>()?; + let per_row = ScalarValue::iter_to_array(per_row)?; + + // Vectorized result must match element-for-element (including NULLs). + let mut vectorized_eval = expr.partition_evaluator(make_args())?; + let vectorized = + vectorized_eval.evaluate_all_with_frame_ranges(&values, &ranges)?; + + assert_eq!(as_int32_array(&per_row)?, as_int32_array(&vectorized)?); + } + Ok(()) + } + #[test] fn nth_value_i64_min_returns_error() { let expr = Arc::new(Column::new("c3", 0)) as Arc; diff --git a/datafusion/physical-expr/src/window/standard.rs b/datafusion/physical-expr/src/window/standard.rs index 46f3cabbadd48..96bb45895a80e 100644 --- a/datafusion/physical-expr/src/window/standard.rs +++ b/datafusion/physical-expr/src/window/standard.rs @@ -117,7 +117,6 @@ impl WindowExpr for StandardWindowExpr { let num_rows = batch.num_rows(); if evaluator.uses_window_frame() { let sort_options = self.order_by.iter().map(|o| o.options).collect(); - let mut row_wise_results = vec![]; let mut values = self.evaluate_args(batch)?; let order_bys = get_orderby_values(self.order_by_columns(batch)?); @@ -128,19 +127,39 @@ impl WindowExpr for StandardWindowExpr { let mut window_frame_ctx = WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options); let mut last_range = Range { start: 0, end: 0 }; - // We iterate on each row to calculate window frame range and and window function result - for idx in 0..num_rows { - let range = window_frame_ctx.calculate_range( - order_bys_ref, - &last_range, - num_rows, - idx, - )?; - let value = evaluator.evaluate(&values, &range)?; - row_wise_results.push(value); - last_range = range; + + if evaluator.supports_evaluate_all_with_frame_ranges() { + // Compute every frame range up front, then let the evaluator + // build the whole output column in one vectorized pass instead + // of boxing a `ScalarValue` per row. + let mut ranges = Vec::with_capacity(num_rows); + for idx in 0..num_rows { + let range = window_frame_ctx.calculate_range( + order_bys_ref, + &last_range, + num_rows, + idx, + )?; + last_range = range.clone(); + ranges.push(range); + } + evaluator.evaluate_all_with_frame_ranges(&values, &ranges) + } else { + let mut row_wise_results = vec![]; + // We iterate on each row to calculate window frame range and and window function result + for idx in 0..num_rows { + let range = window_frame_ctx.calculate_range( + order_bys_ref, + &last_range, + num_rows, + idx, + )?; + let value = evaluator.evaluate(&values, &range)?; + row_wise_results.push(value); + last_range = range; + } + ScalarValue::iter_to_array(row_wise_results) } - ScalarValue::iter_to_array(row_wise_results) } else if evaluator.include_rank() { let columns = self.order_by_columns(batch)?; let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;