diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 78a3bb0481f..666a23c02c4 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -97,6 +97,14 @@ serde_json = { workspace = true } serde_test = { workspace = true } vortex-array = { path = ".", features = ["_test-harness", "table-display"] } +[[bench]] +name = "aggregate_max" +harness = false + +[[bench]] +name = "aggregate_sum" +harness = false + [[bench]] name = "cast_primitive" harness = false diff --git a/vortex-array/benches/aggregate_max.rs b/vortex-array/benches/aggregate_max.rs new file mode 100644 index 00000000000..8962ade2974 --- /dev/null +++ b/vortex-array/benches/aggregate_max.rs @@ -0,0 +1,93 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::LazyLock; + +use divan::Bencher; +use rand::prelude::*; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::session::ArraySession; +use vortex_session::VortexSession; + +fn main() { + divan::main(); +} + +const N: usize = 100_000; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +#[divan::bench] +fn max_i32(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(1); + let data: Vec = (0..N).map(|_| rng.random::()).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_max::(&mut SESSION.create_execution_ctx()) + }); +} + +#[divan::bench] +fn max_i64(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(2); + let data: Vec = (0..N).map(|_| rng.random::()).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_max::(&mut SESSION.create_execution_ctx()) + }); +} + +#[divan::bench] +fn max_f64(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(3); + let data: Vec = (0..N).map(|_| rng.random::()).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_max::(&mut SESSION.create_execution_ctx()) + }); +} + +// Clustered nulls: long valid runs broken up by null blocks (run-based path's best case). +#[divan::bench] +fn max_i32_nulls_clustered(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(4); + let data: Vec> = (0..N) + .map(|i| { + if (i / 64) % 10 == 0 { + None + } else { + Some(rng.random::()) + } + }) + .collect(); + bencher + .with_inputs(|| PrimitiveArray::from_option_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_max::(&mut SESSION.create_execution_ctx()) + }); +} + +// Scattered nulls: ~50% random nulls producing many short runs (run-based path's worst case). +#[divan::bench] +fn max_i32_nulls_scattered(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(5); + let data: Vec> = (0..N) + .map(|_| rng.random_bool(0.5).then(|| rng.random::())) + .collect(); + bencher + .with_inputs(|| PrimitiveArray::from_option_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_max::(&mut SESSION.create_execution_ctx()) + }); +} diff --git a/vortex-array/benches/aggregate_sum.rs b/vortex-array/benches/aggregate_sum.rs new file mode 100644 index 00000000000..db4bf5b284a --- /dev/null +++ b/vortex-array/benches/aggregate_sum.rs @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::LazyLock; + +use divan::Bencher; +use rand::prelude::*; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::expr::stats::Stat; +use vortex_array::session::ArraySession; +use vortex_session::VortexSession; + +fn main() { + divan::main(); +} + +const N: usize = 100_000; + +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + +#[divan::bench] +fn sum_i32(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(1); + let data: Vec = (0..N).map(|_| rng.random_range(-1000..1000)).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_as::(Stat::Sum, &mut SESSION.create_execution_ctx()) + }); +} + +#[divan::bench] +fn sum_u32(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(2); + let data: Vec = (0..N).map(|_| rng.random_range(0..2000)).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_as::(Stat::Sum, &mut SESSION.create_execution_ctx()) + }); +} + +#[divan::bench] +fn sum_i64(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(3); + let data: Vec = (0..N).map(|_| rng.random_range(-1000..1000)).collect(); + bencher + .with_inputs(|| PrimitiveArray::from_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_as::(Stat::Sum, &mut SESSION.create_execution_ctx()) + }); +} + +// Clustered nulls: long runs of valid values broken up by occasional null blocks. This is the +// case the run-based valid path is expected to accelerate. +#[divan::bench] +fn sum_i32_nulls_clustered(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(4); + let data: Vec> = (0..N) + .map(|i| { + if (i / 64) % 10 == 0 { + None + } else { + Some(rng.random_range(-1000..1000)) + } + }) + .collect(); + bencher + .with_inputs(|| PrimitiveArray::from_option_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_as::(Stat::Sum, &mut SESSION.create_execution_ctx()) + }); +} + +// Scattered nulls: ~50% nulls placed at random, producing many short runs. This is the worst case +// for a run-based valid path, used to guard against regressions versus a per-element loop. +#[divan::bench] +fn sum_i32_nulls_scattered(bencher: Bencher) { + let mut rng = StdRng::seed_from_u64(5); + let data: Vec> = (0..N) + .map(|_| rng.random_bool(0.5).then(|| rng.random_range(-1000..1000))) + .collect(); + bencher + .with_inputs(|| PrimitiveArray::from_option_iter(data.iter().copied()).into_array()) + .bench_refs(|a| { + a.statistics() + .compute_as::(Stat::Sum, &mut SESSION.create_execution_ctx()) + }); +} diff --git a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs index 03b10e32301..b9663baa84d 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/mod.rs @@ -330,6 +330,32 @@ mod tests { Ok(()) } + #[test] + fn test_prim_min_max_multiple_null_runs() -> VortexResult<()> { + // Several disjoint valid runs separated by nulls exercise the per-run fold; the extrema + // (min 1, max 9) fall in different runs. + let p = PrimitiveArray::from_option_iter([ + Some(5i32), + Some(3), + None, + None, + Some(9), + None, + Some(1), + Some(7), + ]) + .into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + assert_eq!( + min_max(&p, &mut ctx)?, + Some(MinMaxResult { + min: 1.into(), + max: 9.into() + }) + ); + Ok(()) + } + #[test] fn test_bool_min_max() -> VortexResult<()> { let mut ctx = LEGACY_SESSION.create_execution_ctx(); diff --git a/vortex-array/src/aggregate_fn/fns/min_max/primitive.rs b/vortex-array/src/aggregate_fn/fns/min_max/primitive.rs index 3470ba3728d..06b9749678d 100644 --- a/vortex-array/src/aggregate_fn/fns/min_max/primitive.rs +++ b/vortex-array/src/aggregate_fn/fns/min_max/primitive.rs @@ -41,19 +41,75 @@ where .validity()? .execute_mask(array.as_ref().len(), ctx)? { - Mask::AllTrue(_) => compute_min_max(array.as_slice::().iter()), + Mask::AllTrue(_) => { + let slice = array.as_slice::(); + // Integers have no NaNs, so a plain min/max reduction is correct and, unlike the + // `itertools::minmax_by` + NaN-filter path, autovectorizes to packed min/max. + if T::PTYPE.is_int() { + integer_min_max_raw(slice).map(min_max_result) + } else { + compute_min_max(slice.iter()) + } + } Mask::AllFalse(_) => None, - Mask::Values(v) => compute_min_max( - array - .as_slice::() - .iter() - .zip(v.bit_buffer().iter()) - .filter_map(|(v, m)| m.then_some(v)), - ), + Mask::Values(v) => { + let slice = array.as_slice::(); + // Each `[start, end)` run is fully valid, so integers can reuse the vectorized + // packed min/max per run and fold the run results; floats chain the runs through + // the NaN-filtering reduction. + if T::PTYPE.is_int() { + v.slices() + .iter() + .filter_map(|&(start, end)| integer_min_max_raw(&slice[start..end])) + .reduce(|(amin, amax), (rmin, rmax)| { + ( + if rmin.is_lt(amin) { rmin } else { amin }, + if rmax.is_gt(amax) { rmax } else { amax }, + ) + }) + .map(min_max_result) + } else { + compute_min_max( + v.slices() + .iter() + .flat_map(|&(start, end)| slice[start..end].iter()), + ) + } + } }, ) } +/// Min/max of an all-valid integer slice as native values. Autovectorizes to packed min/max. +fn integer_min_max_raw(slice: &[T]) -> Option<(T, T)> +where + T: NativePType, +{ + let (&first, rest) = slice.split_first()?; + let mut min = first; + let mut max = first; + for &v in rest { + if v.is_lt(min) { + min = v; + } + if v.is_gt(max) { + max = v; + } + } + Some((min, max)) +} + +fn min_max_result((min, max): (T, T)) -> MinMaxResult +where + T: NativePType, + PValue: From, +{ + MinMaxResult { + min: Scalar::primitive(min, NonNullable), + max: Scalar::primitive(max, NonNullable), + } +} + fn compute_min_max<'a, T>(iter: impl Iterator) -> Option where T: NativePType, diff --git a/vortex-array/src/aggregate_fn/fns/sum/primitive.rs b/vortex-array/src/aggregate_fn/fns/sum/primitive.rs index 19c414b8204..44418fb5628 100644 --- a/vortex-array/src/aggregate_fn/fns/sum/primitive.rs +++ b/vortex-array/src/aggregate_fn/fns/sum/primitive.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use itertools::Itertools; +use num_traits::AsPrimitive; use num_traits::ToPrimitive; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -13,45 +13,37 @@ use super::checked_add_i64; use super::checked_add_u64; use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; +use crate::dtype::NativePType; +use crate::dtype::PType; use crate::match_each_native_ptype; +/// Number of elements summed without an overflow check. Chosen so that a chunk of values narrower +/// than 64 bits cannot overflow the 64-bit accumulator: `2^16 * (2^32 - 1) < 2^64`. +const SUM_CHUNK: usize = 1 << 16; + pub(super) fn accumulate_primitive( inner: &mut SumState, p: &PrimitiveArray, ctx: &mut ExecutionCtx, ) -> VortexResult { let mask = p.as_ref().validity()?.execute_mask(p.as_ref().len(), ctx)?; - match mask.bit_buffer() { + match mask.slices() { AllOr::None => Ok(false), AllOr::All => accumulate_primitive_all(inner, p), - AllOr::Some(validity) => accumulate_primitive_valid(inner, p, validity), + AllOr::Some(slices) => accumulate_primitive_valid(inner, p, slices), } } fn accumulate_primitive_all(inner: &mut SumState, p: &PrimitiveArray) -> VortexResult { match inner { SumState::Unsigned(acc) => match_each_native_ptype!(p.ptype(), - unsigned: |T| { - for &v in p.as_slice::() { - if checked_add_u64(acc, v.to_u64().vortex_expect("unsigned to u64")) { - return Ok(true); - } - } - Ok(false) - }, + unsigned: |T| { Ok(sum_unsigned_all(acc, p.as_slice::())) }, signed: |_T| { vortex_panic!("unsigned sum state with signed input") }, floating: |_T| { vortex_panic!("unsigned sum state with float input") } ), SumState::Signed(acc) => match_each_native_ptype!(p.ptype(), unsigned: |_T| { vortex_panic!("signed sum state with unsigned input") }, - signed: |T| { - for &v in p.as_slice::() { - if checked_add_i64(acc, v.to_i64().vortex_expect("signed to i64")) { - return Ok(true); - } - } - Ok(false) - }, + signed: |T| { Ok(sum_signed_all(acc, p.as_slice::())) }, floating: |_T| { vortex_panic!("signed sum state with float input") } ), SumState::Float(acc) => match_each_native_ptype!(p.ptype(), @@ -70,16 +62,67 @@ fn accumulate_primitive_all(inner: &mut SumState, p: &PrimitiveArray) -> VortexR } } +/// Sum all values into a `u64` accumulator. For types narrower than 64 bits, values are summed in +/// chunks of [`SUM_CHUNK`] with a single checked add per chunk, which lets the inner loop vectorize +/// to packed widening adds. `u64` input keeps a per-element checked add since a chunk of `u64`s +/// could itself overflow. Returns `true` on overflow. +fn sum_unsigned_all(acc: &mut u64, slice: &[T]) -> bool +where + T: NativePType + AsPrimitive, +{ + if T::PTYPE == PType::U64 { + for &v in slice { + if checked_add_u64(acc, v.as_()) { + return true; + } + } + return false; + } + for chunk in slice.chunks(SUM_CHUNK) { + let chunk_sum: u64 = chunk.iter().map(|&v| v.as_()).sum(); + if checked_add_u64(acc, chunk_sum) { + return true; + } + } + false +} + +/// Signed counterpart of [`sum_unsigned_all`]. +fn sum_signed_all(acc: &mut i64, slice: &[T]) -> bool +where + T: NativePType + AsPrimitive, +{ + if T::PTYPE == PType::I64 { + for &v in slice { + if checked_add_i64(acc, v.as_()) { + return true; + } + } + return false; + } + for chunk in slice.chunks(SUM_CHUNK) { + let chunk_sum: i64 = chunk.iter().map(|&v| v.as_()).sum(); + if checked_add_i64(acc, chunk_sum) { + return true; + } + } + false +} + +/// Sum the valid elements, described as contiguous `[start, end)` runs of set validity bits. Each +/// run is a slice of fully-valid values, so it reuses the same vectorized reduction as the +/// all-valid path instead of a per-element validity branch. fn accumulate_primitive_valid( inner: &mut SumState, p: &PrimitiveArray, - validity: &vortex_buffer::BitBuffer, + slices: &[(usize, usize)], ) -> VortexResult { match inner { SumState::Unsigned(acc) => match_each_native_ptype!(p.ptype(), unsigned: |T| { - for (&v, valid) in p.as_slice::().iter().zip_eq(validity.iter()) { - if valid && checked_add_u64(acc, v.to_u64().vortex_expect("unsigned to u64")) { + let values = p.as_slice::(); + for &(start, end) in slices { + if sum_unsigned_all(acc, &values[start..end]) { return Ok(true); } } @@ -91,8 +134,9 @@ fn accumulate_primitive_valid( SumState::Signed(acc) => match_each_native_ptype!(p.ptype(), unsigned: |_T| { vortex_panic!("signed sum state with unsigned input") }, signed: |T| { - for (&v, valid) in p.as_slice::().iter().zip_eq(validity.iter()) { - if valid && checked_add_i64(acc, v.to_i64().vortex_expect("signed to i64")) { + let values = p.as_slice::(); + for &(start, end) in slices { + if sum_signed_all(acc, &values[start..end]) { return Ok(true); } } @@ -104,9 +148,12 @@ fn accumulate_primitive_valid( unsigned: |_T| { vortex_panic!("float sum state with unsigned input") }, signed: |_T| { vortex_panic!("float sum state with signed input") }, floating: |T| { - for (&v, valid) in p.as_slice::().iter().zip_eq(validity.iter()) { - if valid && !v.is_nan() { - *acc += ToPrimitive::to_f64(&v).vortex_expect("float to f64"); + let values = p.as_slice::(); + for &(start, end) in slices { + for &v in &values[start..end] { + if !v.is_nan() { + *acc += ToPrimitive::to_f64(&v).vortex_expect("float to f64"); + } } } Ok(false) @@ -170,6 +217,26 @@ mod tests { Ok(()) } + #[test] + fn sum_multiple_null_runs() -> VortexResult<()> { + // Several disjoint valid runs separated by nulls exercise the per-run fold. + let arr = PrimitiveArray::from_option_iter([ + Some(1i32), + Some(2), + None, + None, + Some(3), + None, + Some(4), + Some(5), + Some(6), + ]) + .into_array(); + let result = sum(&arr, &mut LEGACY_SESSION.create_execution_ctx())?; + assert_eq!(result.as_primitive().typed_value::(), Some(21)); + Ok(()) + } + #[test] fn sum_all_null() -> VortexResult<()> { let arr = PrimitiveArray::from_option_iter([None::, None, None]).into_array();