Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ required-features = ["test_utils"]
harness = false
name = "aggregate_vectorized"
required-features = ["test_utils"]

[[bench]]
harness = false
name = "dict_group_values"
required-features = ["test_utils"]
146 changes: 146 additions & 0 deletions datafusion/physical-plan/benches/dict_group_values.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmarks for GROUP BY on dictionary-encoded columns.
//!
//! Compares three paths:
//! - `column_utf8`: GroupValuesColumn with plain Utf8 (fast-path baseline)
//! - `column_dict`: GroupValuesColumn with Dictionary(Int32, Utf8) (new path)
//! - `rows_dict`: GroupValuesRows with Dictionary(Int32, Utf8) (old fallback)

use std::sync::Arc;

use arrow::array::{ArrayRef, StringArray};
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field, Schema};
use criterion::{
BenchmarkId, Criterion, criterion_group, criterion_main,
};
use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn;
use datafusion_physical_plan::aggregates::group_values::row::GroupValuesRows;
use datafusion_physical_plan::aggregates::group_values::GroupValues;
use rand::Rng;
use rand::rngs::StdRng;
use rand::SeedableRng;

const CARDINALITIES: [usize; 3] = [50, 1_000, 10_000];
const BATCH_SIZES: [usize; 2] = [8_192, 65_536];
const NUM_BATCHES: usize = 10;

/// Generate `num_rows` random string values chosen from `cardinality` distinct strings,
/// returned as both plain Utf8 and Dictionary(Int32, Utf8) arrays.
fn generate_string_batches(
num_rows: usize,
cardinality: usize,
seed: u64,
) -> (ArrayRef, ArrayRef) {
let mut rng = StdRng::seed_from_u64(seed);

// Build a pool of distinct strings
let pool: Vec<String> = (0..cardinality)
.map(|i| format!("group_value_{i:06}"))
.collect();

let values: Vec<&str> = (0..num_rows)
.map(|_| pool[rng.random_range(0..cardinality)].as_str())
.collect();

let utf8_array: ArrayRef = Arc::new(StringArray::from(values));
let dict_array = cast(&utf8_array, &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)))
.expect("cast to dictionary");

(utf8_array, dict_array)
}

fn bench_dict_group_values(c: &mut Criterion) {
let mut group = c.benchmark_group("dict_group_values");

for &cardinality in &CARDINALITIES {
for &batch_size in &BATCH_SIZES {
// Pre-generate batches (both utf8 and dict variants)
let batches: Vec<(ArrayRef, ArrayRef)> = (0..NUM_BATCHES as u64)
.map(|seed| generate_string_batches(batch_size, cardinality, seed))
.collect();

let param = format!("card_{cardinality}/batch_{batch_size}");

// ---- column_utf8: GroupValuesColumn with plain Utf8 (baseline fast path) ----
{
let schema = Arc::new(Schema::new(vec![
Field::new("key", DataType::Utf8, false),
]));
let id = BenchmarkId::new("column_utf8", &param);
group.bench_function(id, |b| {
b.iter(|| {
let mut gv = GroupValuesColumn::<false>::try_new(Arc::clone(&schema)).unwrap();
let mut groups = Vec::new();
for (utf8, _dict) in &batches {
gv.intern(&[Arc::clone(utf8)], &mut groups).unwrap();
}
});
});
}

// ---- column_dict: GroupValuesColumn with Dictionary(Int32, Utf8) (new path) ----
{
let schema = Arc::new(Schema::new(vec![
Field::new(
"key",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
]));
let id = BenchmarkId::new("column_dict", &param);
group.bench_function(id, |b| {
b.iter(|| {
let mut gv = GroupValuesColumn::<false>::try_new(Arc::clone(&schema)).unwrap();
let mut groups = Vec::new();
for (_utf8, dict) in &batches {
gv.intern(&[Arc::clone(dict)], &mut groups).unwrap();
}
});
});
}

// ---- rows_dict: GroupValuesRows with Dictionary(Int32, Utf8) (old fallback) ----
{
let schema = Arc::new(Schema::new(vec![
Field::new(
"key",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
]));
let id = BenchmarkId::new("rows_dict", &param);
group.bench_function(id, |b| {
b.iter(|| {
let mut gv = GroupValuesRows::try_new(Arc::clone(&schema)).unwrap();
let mut groups = Vec::new();
for (_utf8, dict) in &batches {
gv.intern(&[Arc::clone(dict)], &mut groups).unwrap();
}
});
});
}
}
}

group.finish();
}

criterion_group!(benches, bench_dict_group_values);
criterion_main!(benches);
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_expr::EmitTo;

pub mod multi_group_by;

mod row;
pub mod row;
mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`DictionaryGroupValueBuilder`] for dictionary-encoded GROUP BY columns.

use std::marker::PhantomData;

use arrow::array::{Array, ArrayRef, DictionaryArray, new_null_array};
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType};
use datafusion_common::Result;

use super::GroupColumn;

/// A [`GroupColumn`] wrapper that transparently handles dictionary-encoded
/// input arrays by resolving dictionary keys on-demand.
///
/// Instead of materializing the full decoded array via `cast()` (which copies
/// O(batch_size) strings per batch), this builder looks up values through
/// dictionary keys, only copying data for rows that are actually appended as
/// new groups. Comparisons index directly into the dictionary's values array.
///
/// The inner builder stores decoded values. On emit, the existing code in
/// [`GroupValuesColumn::emit`] re-encodes back to dictionary via `cast()`.
///
/// [`GroupValuesColumn::emit`]: super::GroupValuesColumn
pub struct DictionaryGroupValueBuilder<K: ArrowDictionaryKeyType> {
/// Inner builder that operates on the dictionary's value type
inner: Box<dyn GroupColumn>,
/// A single-element null array of the value type, used to represent null
/// dictionary keys to the inner builder
null_array: ArrayRef,
_phantom: PhantomData<K>,
}

impl<K: ArrowDictionaryKeyType> DictionaryGroupValueBuilder<K> {
pub fn new(inner: Box<dyn GroupColumn>, value_type: &DataType) -> Self {
let null_array = new_null_array(value_type, 1);
Self {
inner,
null_array,
_phantom: PhantomData,
}
}
}

impl<K: ArrowDictionaryKeyType + Send + Sync> GroupColumn for DictionaryGroupValueBuilder<K> {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<K>>()
.unwrap();
if dict.is_null(rhs_row) {
return self.inner.equal_to(lhs_row, &self.null_array, 0);
}
let key = dict.keys().value(rhs_row).as_usize();
self.inner.equal_to(lhs_row, dict.values(), key)
}

fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<K>>()
.unwrap();
if dict.is_null(row) {
return self.inner.append_val(&self.null_array, 0);
}
let key = dict.keys().value(row).as_usize();
self.inner.append_val(dict.values(), key)
}

fn vectorized_equal_to(
&self,
lhs_rows: &[usize],
array: &ArrayRef,
rhs_rows: &[usize],
equal_to_results: &mut [bool],
) {
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<K>>()
.unwrap();
let keys = dict.keys();
let values = dict.values();

if dict.null_count() == 0 {
// Fast path: no null keys, remap indices and delegate to inner
let mapped_rhs: Vec<usize> = rhs_rows
.iter()
.map(|&row| keys.value(row).as_usize())
.collect();
self.inner
.vectorized_equal_to(lhs_rows, values, &mapped_rhs, equal_to_results);
} else {
// Null keys present: fall back to scalar comparison
for (i, (&lhs_row, &rhs_row)) in
lhs_rows.iter().zip(rhs_rows.iter()).enumerate()
{
if !equal_to_results[i] {
continue;
}
if dict.is_null(rhs_row) {
equal_to_results[i] =
self.inner.equal_to(lhs_row, &self.null_array, 0);
} else {
let key = keys.value(rhs_row).as_usize();
equal_to_results[i] =
self.inner.equal_to(lhs_row, values, key);
}
}
}
}

fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> {
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<K>>()
.unwrap();
let keys = dict.keys();
let values = dict.values();

if dict.null_count() == 0 {
// Fast path: no null keys, remap indices and delegate to inner
let mapped_rows: Vec<usize> = rows
.iter()
.map(|&row| keys.value(row).as_usize())
.collect();
self.inner.vectorized_append(values, &mapped_rows)
} else {
// Null keys present: process in order, chunking consecutive
// non-null rows for vectorized processing
let mut i = 0;
while i < rows.len() {
if dict.is_null(rows[i]) {
self.inner.append_val(&self.null_array, 0)?;
i += 1;
} else {
// Collect consecutive non-null rows
let mut chunk = Vec::new();
while i < rows.len() && !dict.is_null(rows[i]) {
chunk.push(keys.value(rows[i]).as_usize());
i += 1;
}
self.inner.vectorized_append(values, &chunk)?;
}
}
Ok(())
}
}

fn len(&self) -> usize {
self.inner.len()
}

fn size(&self) -> usize {
self.inner.size() + self.null_array.get_array_memory_size()
}

fn build(self: Box<Self>) -> ArrayRef {
self.inner.build()
}

fn take_n(&mut self, n: usize) -> ArrayRef {
self.inner.take_n(n)
}
}
Loading
Loading