Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/expr/src/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,37 @@ pub trait PartitionEvaluator: Debug + Send + std::any::Any {
not_impl_err!("evaluate is not implemented by default")
}

/// Returns `true` if this evaluator can produce its entire output column in
/// a single pass from the per-row window-frame ranges via
/// [`PartitionEvaluator::evaluate_all_with_frame_ranges`].
///
/// This is for window functions that *do* depend on the window frame (so
/// the frameless [`PartitionEvaluator::evaluate_all`] cannot be used), but
/// whose result is a simple positional gather from the input — e.g.
/// `first_value`/`last_value`/`nth_value`. Implementing the pair of methods
/// lets the framework build the output with one vectorized kernel instead
/// of boxing a [`ScalarValue`] per row. Defaults to `false`, in which case
/// the framework falls back to calling [`PartitionEvaluator::evaluate`] once
/// per output row.
fn supports_evaluate_all_with_frame_ranges(&self) -> bool {
false
}

/// Vectorized counterpart to [`PartitionEvaluator::evaluate`]: produces the
/// whole output column at once given the already-computed per-row frame
/// `ranges` (`ranges[i]` is the window frame of output row `i`).
///
/// Only called when
/// [`PartitionEvaluator::supports_evaluate_all_with_frame_ranges`] returns
/// `true`; the default is therefore unreachable for in-tree functions.
fn evaluate_all_with_frame_ranges(
&mut self,
_values: &[ArrayRef],
_ranges: &[Range<usize>],
) -> Result<ArrayRef> {
not_impl_err!("evaluate_all_with_frame_ranges is not implemented by default")
}

/// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
Expand Down
97 changes: 97 additions & 0 deletions datafusion/functions-window/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

use crate::utils::{get_scalar_value_from_args, get_signed_integer};

use arrow::array::UInt32Builder;
use arrow::buffer::NullBuffer;
use arrow::compute::take;
use arrow::datatypes::FieldRef;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -480,6 +482,44 @@ impl PartitionEvaluator for NthValueEvaluator {
fn uses_window_frame(&self) -> bool {
true
}

fn supports_evaluate_all_with_frame_ranges(&self) -> bool {
// The IGNORE NULLS path walks the null bitmap per row (see
// `valid_index_with_nulls`) and stays on the scalar `evaluate` path.
// The default (respect NULLs) case is a pure positional gather that we
// vectorize with a single `take`.
!self.ignore_nulls
}

fn evaluate_all_with_frame_ranges(
&mut self,
values: &[ArrayRef],
ranges: &[Range<usize>],
) -> Result<ArrayRef> {
// A finalized (constant) result fills the whole column in one shot.
if let Some(result) = &self.state.finalized_result {
return result.to_array_of_size(ranges.len());
}
// FIRST_VALUE, LAST_VALUE and NTH_VALUE take a single column.
let arr = &values[0];
let mut indices = UInt32Builder::with_capacity(ranges.len());
for range in ranges {
// Empty frame -> NULL, matching the `n_range == 0` guard in `evaluate`.
let index = if range.end == range.start {
None
} else {
self.valid_index(arr, range)
};
match index {
Some(index) => indices.append_value(index as u32),
None => indices.append_null(),
}
}
let indices = indices.finish();
// `take` yields NULL for the NULL index slots, matching the `None`
// branch of `evaluate`.
Ok(take(arr.as_ref(), &indices, None)?)
}
}

impl NthValueEvaluator {
Expand Down Expand Up @@ -670,6 +710,63 @@ mod tests {
Ok(())
}

#[test]
fn evaluate_all_with_frame_ranges_matches_per_row() -> Result<()> {
// Array with nulls so the gather-index null handling is exercised.
let arr: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
Some(3),
Some(4),
None,
Some(6),
]));
let values = vec![arr];
// Diverse frames: empty, single, growing, sliding, full.
let ranges: Vec<Range<usize>> =
vec![0..0, 0..1, 0..3, 2..2, 1..5, 0..6, 3..6, 4..6, 5..6];

let field: FieldRef = Field::new("f", DataType::Int32, true).into();
let col = Arc::new(Column::new("c", 0)) as Arc<dyn PhysicalExpr>;
let n2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let nm1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(-1)))) as Arc<dyn PhysicalExpr>;

let cases: Vec<(NthValue, Vec<Arc<dyn PhysicalExpr>>)> = vec![
(NthValue::first(), vec![Arc::clone(&col)]),
(NthValue::last(), vec![Arc::clone(&col)]),
(NthValue::nth(), vec![Arc::clone(&col), Arc::clone(&n2)]),
(NthValue::nth(), vec![Arc::clone(&col), Arc::clone(&nm1)]),
];

for (expr, exprs) in cases {
let make_args = || {
PartitionEvaluatorArgs::new(
&exprs,
std::slice::from_ref(&field),
false,
false,
)
};
// Per-row reference result.
let mut per_row_eval = expr.partition_evaluator(make_args())?;
let per_row = ranges
.iter()
.map(|range| per_row_eval.evaluate(&values, range))
.collect::<Result<Vec<_>>>()?;
let per_row = ScalarValue::iter_to_array(per_row)?;

// Vectorized result must match element-for-element (including NULLs).
let mut vectorized_eval = expr.partition_evaluator(make_args())?;
let vectorized =
vectorized_eval.evaluate_all_with_frame_ranges(&values, &ranges)?;

assert_eq!(as_int32_array(&per_row)?, as_int32_array(&vectorized)?);
}
Ok(())
}

#[test]
fn nth_value_i64_min_returns_error() {
let expr = Arc::new(Column::new("c3", 0)) as Arc<dyn PhysicalExpr>;
Expand Down
45 changes: 32 additions & 13 deletions datafusion/physical-expr/src/window/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl WindowExpr for StandardWindowExpr {
let num_rows = batch.num_rows();
if evaluator.uses_window_frame() {
let sort_options = self.order_by.iter().map(|o| o.options).collect();
let mut row_wise_results = vec![];

let mut values = self.evaluate_args(batch)?;
let order_bys = get_orderby_values(self.order_by_columns(batch)?);
Expand All @@ -128,19 +127,39 @@ impl WindowExpr for StandardWindowExpr {
let mut window_frame_ctx =
WindowFrameContext::new(Arc::clone(&self.window_frame), sort_options);
let mut last_range = Range { start: 0, end: 0 };
// We iterate on each row to calculate window frame range and and window function result
for idx in 0..num_rows {
let range = window_frame_ctx.calculate_range(
order_bys_ref,
&last_range,
num_rows,
idx,
)?;
let value = evaluator.evaluate(&values, &range)?;
row_wise_results.push(value);
last_range = range;

if evaluator.supports_evaluate_all_with_frame_ranges() {
// Compute every frame range up front, then let the evaluator
// build the whole output column in one vectorized pass instead
// of boxing a `ScalarValue` per row.
let mut ranges = Vec::with_capacity(num_rows);
for idx in 0..num_rows {
let range = window_frame_ctx.calculate_range(
order_bys_ref,
&last_range,
num_rows,
idx,
)?;
last_range = range.clone();
ranges.push(range);
}
evaluator.evaluate_all_with_frame_ranges(&values, &ranges)
} else {
let mut row_wise_results = vec![];
// We iterate on each row to calculate window frame range and and window function result
for idx in 0..num_rows {
let range = window_frame_ctx.calculate_range(
order_bys_ref,
&last_range,
num_rows,
idx,
)?;
let value = evaluator.evaluate(&values, &range)?;
row_wise_results.push(value);
last_range = range;
}
ScalarValue::iter_to_array(row_wise_results)
}
ScalarValue::iter_to_array(row_wise_results)
} else if evaluator.include_rank() {
let columns = self.order_by_columns(batch)?;
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
Expand Down
Loading