Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 161 additions & 38 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::utils::make_scalar_function;

use hashbrown::HashSet;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

// Create static instances of ScalarUDFs for each function
Expand Down Expand Up @@ -416,24 +417,46 @@ fn general_array_has_for_all_and_any<'a>(
needle: ArrayWrapper<'a>,
comparison_type: ComparisonType,
) -> Result<ArrayRef> {
let mut boolean_builder = BooleanArray::builder(haystack.len());
let num_rows = haystack.len();
let converter = RowConverter::new(vec![SortField::new(haystack.value_type())])?;

for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) {
let arr_values = converter.convert_columns(&[arr])?;
let sub_arr_values = converter.convert_columns(&[sub_arr])?;
boolean_builder.append_value(general_array_has_all_and_any_kernel(
&arr_values,
&sub_arr_values,
comparison_type,
));
} else {
boolean_builder.append_null();
let h_offsets: Vec<usize> = haystack.offsets().collect();
let n_offsets: Vec<usize> = needle.offsets().collect();

// For efficiency with sliced arrays, only process the visible elements, not
// the entire underlying buffer. This requires rebasing offsets to start at
// zero.
let h_base = h_offsets[0];
let n_base = n_offsets[0];
let h_vals = haystack
.values()
.slice(h_base, h_offsets[num_rows] - h_base);
let n_vals = needle.values().slice(n_base, n_offsets[num_rows] - n_base);

// Convert all haystack and needle values to rows up front
let all_h_rows = converter.convert_columns(&[h_vals])?;
let all_n_rows = converter.convert_columns(&[n_vals])?;

let h_nulls = haystack.nulls();
let n_nulls = needle.nulls();
let mut builder = BooleanArray::builder(num_rows);

for i in 0..num_rows {
if h_nulls.is_some_and(|n| n.is_null(i)) || n_nulls.is_some_and(|n| n.is_null(i))
{
builder.append_null();
continue;
}
builder.append_value(general_array_has_all_and_any_kernel(
&all_h_rows,
(h_offsets[i] - h_base)..(h_offsets[i + 1] - h_base),
&all_n_rows,
(n_offsets[i] - n_base)..(n_offsets[i + 1] - n_base),
comparison_type,
));
}

Ok(Arc::new(boolean_builder.finish()))
Ok(Arc::new(builder.finish()))
}

// String comparison for array_has_all and array_has_any
Expand All @@ -442,25 +465,46 @@ fn array_has_all_and_any_string_internal<'a>(
needle: ArrayWrapper<'a>,
comparison_type: ComparisonType,
) -> Result<ArrayRef> {
let mut boolean_builder = BooleanArray::builder(haystack.len());
for (arr, sub_arr) in haystack.iter().zip(needle.iter()) {
match (arr, sub_arr) {
(Some(arr), Some(sub_arr)) => {
let haystack_array = string_array_to_vec(&arr);
let needle_array = string_array_to_vec(&sub_arr);
boolean_builder.append_value(array_has_string_kernel(
&haystack_array,
&needle_array,
comparison_type,
));
}
(_, _) => {
boolean_builder.append_null();
}
let num_rows = haystack.len();

let h_offsets: Vec<usize> = haystack.offsets().collect();
let n_offsets: Vec<usize> = needle.offsets().collect();

// For efficiency with sliced arrays, only process the visible elements, not
// the entire underlying buffer. This requires rebasing offsets to start at
// zero.
let h_base = h_offsets[0];
let n_base = n_offsets[0];
let h_vals = haystack
.values()
.slice(h_base, h_offsets[num_rows] - h_base);
let n_vals = needle.values().slice(n_base, n_offsets[num_rows] - n_base);

let all_h_strings = string_array_to_vec(h_vals.as_ref());
let all_n_strings = string_array_to_vec(n_vals.as_ref());

let h_nulls = haystack.nulls();
let n_nulls = needle.nulls();
let mut builder = BooleanArray::builder(num_rows);

for i in 0..num_rows {
if h_nulls.is_some_and(|n| n.is_null(i)) || n_nulls.is_some_and(|n| n.is_null(i))
{
builder.append_null();
continue;
}
let h_start = h_offsets[i] - h_base;
let h_end = h_offsets[i + 1] - h_base;
let n_start = n_offsets[i] - n_base;
let n_end = n_offsets[i + 1] - n_base;
builder.append_value(array_has_string_kernel(
&all_h_strings[h_start..h_end],
&all_n_strings[n_start..n_end],
comparison_type,
));
}

Ok(Arc::new(boolean_builder.finish()))
Ok(Arc::new(builder.finish()))
}

fn array_has_all_and_any_dispatch<'a>(
Expand Down Expand Up @@ -893,19 +937,22 @@ fn array_has_string_kernel(

fn general_array_has_all_and_any_kernel(
haystack_rows: &Rows,
h_range: Range<usize>,
needle_rows: &Rows,
mut n_range: Range<usize>,
comparison_type: ComparisonType,
) -> bool {
let h_start = h_range.start;
let h_end = h_range.end;

match comparison_type {
ComparisonType::All => needle_rows.iter().all(|needle_row| {
haystack_rows
.iter()
.any(|haystack_row| haystack_row == needle_row)
ComparisonType::All => n_range.all(|ni| {
let needle_row = needle_rows.row(ni);
(h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
}),
ComparisonType::Any => needle_rows.iter().any(|needle_row| {
haystack_rows
.iter()
.any(|haystack_row| haystack_row == needle_row)
ComparisonType::Any => n_range.any(|ni| {
let needle_row = needle_rows.row(ni);
(h_start..h_end).any(|hi| haystack_rows.row(hi) == needle_row)
}),
}
}
Expand All @@ -931,7 +978,7 @@ mod tests {

use crate::expr_fn::make_array;

use super::ArrayHas;
use super::{ArrayHas, ArrayHasAll, ArrayHasAny};

#[test]
fn test_simplify_array_has_to_in_list() {
Expand Down Expand Up @@ -1095,4 +1142,80 @@ mod tests {

Ok(())
}

#[test]
fn test_sliced_list_offsets() -> Result<(), DataFusionError> {
// Full rows:
// row 0: [1, 2] (not visible after slicing)
// row 1: [11, 12] (visible row 0)
// row 2: [21, 22] (visible row 1)
// row 3: [31, 32] (not visible after slicing)
let field: Arc<Field> = Arc::new(Field::new("item", DataType::Int32, false));
let full_values = Arc::new(Int32Array::from(vec![1, 2, 11, 12, 21, 22, 31, 32]));
let full_offsets = OffsetBuffer::new(vec![0, 2, 4, 6, 8].into());
let full = ListArray::new(Arc::clone(&field), full_offsets, full_values, None);

// Slice with offset=1 and len=2, so only rows 1 and 2 are visible.
let sliced_haystack: ArrayRef = Arc::new(full.slice(1, 2));

let list_type = sliced_haystack.data_type();
let haystack_field = Arc::new(Field::new("haystack", list_type.clone(), false));
let needle_field = Arc::new(Field::new("needle", list_type.clone(), false));
let return_field = Arc::new(Field::new("return", DataType::Boolean, true));
let config_options = Arc::new(ConfigOptions::default());

// array_has_all should be true for both rows.
let needle_all_values = Arc::new(Int32Array::from(vec![11, 21]));
let needle_all_offsets = OffsetBuffer::new(vec![0, 1, 2].into());
let needle_all: ArrayRef = Arc::new(ListArray::new(
Arc::clone(&field),
needle_all_offsets,
needle_all_values,
None,
));
let all_result = ArrayHasAll::new().invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::clone(&sliced_haystack)),
ColumnarValue::Array(needle_all),
],
arg_fields: vec![Arc::clone(&haystack_field), Arc::clone(&needle_field)],
number_rows: 2,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
})?;

let all_output = all_result.into_array(2)?;
assert_eq!(
all_output.as_boolean().iter().collect::<Vec<_>>(),
vec![Some(true), Some(true)],
);

// array_has_any should be true for both rows as well.
let needle_any_values = Arc::new(Int32Array::from(vec![99, 11, 99, 21]));
let needle_any_offsets = OffsetBuffer::new(vec![0, 2, 4].into());
let needle_any: ArrayRef = Arc::new(ListArray::new(
field,
needle_any_offsets,
needle_any_values,
None,
));
let any_result = ArrayHasAny::new().invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(sliced_haystack),
ColumnarValue::Array(needle_any),
],
arg_fields: vec![haystack_field, needle_field],
number_rows: 2,
return_field,
config_options,
})?;

let any_output = any_result.into_array(2)?;
assert_eq!(
any_output.as_boolean().iter().collect::<Vec<_>>(),
vec![Some(true), Some(true)],
);

Ok(())
}
}