diff --git a/Cargo.lock b/Cargo.lock index 4a4f4b14629..e940aae8b80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10512,7 +10512,6 @@ dependencies = [ name = "vortex-fastlanes" version = "0.1.0" dependencies = [ - "arrayref", "codspeed-divan-compat", "fastlanes", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 90436b4f1a9..fc302ad1b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,6 @@ anyhow = "1.0.97" arbitrary = "1.3.2" arc-swap = "1.8" arcref = "0.2.0" -arrayref = "0.3.7" arrow-arith = "58" arrow-array = "58" arrow-buffer = "58" diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index ac30b2032b4..a14e19389bc 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -17,7 +17,6 @@ version = { workspace = true } workspace = true [dependencies] -arrayref = { workspace = true } fastlanes = { workspace = true } itertools = { workspace = true } lending-iterator = { workspace = true } diff --git a/encodings/fastlanes/src/rle/array/rle_compress.rs b/encodings/fastlanes/src/rle/array/rle_compress.rs index a432097b04e..b453ca27ec8 100644 --- a/encodings/fastlanes/src/rle/array/rle_compress.rs +++ b/encodings/fastlanes/src/rle/array/rle_compress.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use arrayref::array_mut_ref; use fastlanes::RLE; use vortex_array::IntoArray; use vortex_array::ToCanonical; @@ -12,6 +11,7 @@ use vortex_array::match_each_native_ptype; use vortex_array::validity::Validity; use vortex_buffer::BitBufferMut; use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use crate::FL_CHUNK_SIZE; @@ -48,12 +48,14 @@ where let mut values_idx_offsets = BufferMut::::with_capacity(len.div_ceil(FL_CHUNK_SIZE)); let values_uninit = values_buf.spare_capacity_mut(); - let indices_uninit = indices_buf.spare_capacity_mut(); + let (indices_uninit, _) = indices_buf + .spare_capacity_mut() + .as_chunks_mut::(); let mut value_count_acc = 0; // Chunk value count prefix sum. let (chunks, remainder) = values.as_chunks::(); - let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| { + let mut process_chunk = |input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [u16; FL_CHUNK_SIZE]| { // SAFETY: NativeValue is repr(transparent) let input: &[NativeValue; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) }; @@ -61,25 +63,26 @@ where let rle_vals: &mut [NativeValue] = unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) }; - // SAFETY: `MaybeUninit` and `u16` have the same layout. - let rle_idxs: &mut [u16] = - unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) }; - // Capture chunk start indices. This is necessary as indices // returned from `T::encode` are relative to the chunk. values_idx_offsets.push(value_count_acc as u64); let value_count = NativeValue::::encode( input, - array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE], - array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE], + unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) }, + rle_idxs, ); value_count_acc += value_count; }; - for (chunk_idx, chunk_slice) in chunks.iter().enumerate() { - process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice); + for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) { + // SAFETY: `MaybeUninit` and `u16` have the same layout. + process_chunk(chunk_slice, unsafe { + std::mem::transmute::<&mut [std::mem::MaybeUninit; 1024], &mut [u16; 1024]>( + rle_idxs, + ) + }); } if !remainder.is_empty() { @@ -87,7 +90,14 @@ where // accounting for an additional value change. let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE]; padded_chunk[..remainder.len()].copy_from_slice(remainder); - process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk); + let last_idx_chunk = indices_uninit + .last_mut() + .vortex_expect("Must have the trailing chunk"); + process_chunk(&padded_chunk, unsafe { + std::mem::transmute::<&mut [std::mem::MaybeUninit; 1024], &mut [u16; 1024]>( + last_idx_chunk, + ) + }); } unsafe { @@ -137,10 +147,14 @@ mod tests { use rstest::rstest; use vortex_array::IntoArray; use vortex_array::ToCanonical; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::MaskedArray; + use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::dtype::half::f16; use vortex_buffer::Buffer; use vortex_buffer::buffer; + use vortex_error::VortexResult; use super::*; @@ -258,6 +272,194 @@ mod tests { assert_arrays_eq!(decoded, expected); } + /// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity). + /// + /// Simulates a compressor that represents indices as a masked constant. + /// Valid when every chunk has at least two RLE dictionary entries (the + /// fill-forward default at index 0 and the actual value at index 1), which + /// holds whenever the first position of each chunk is null. + fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult { + let indices_prim = rle.indices().to_primitive(); + let masked_indices = MaskedArray::try_new( + ConstantArray::new(1u16, indices_prim.len()).into_array(), + indices_prim.validity(), + )? + .into_array(); + // SAFETY: we only replace the indices child; dtype and length are unchanged + // and index 1 is valid in every chunk because each has ≥ 2 dictionary entries. + unsafe { + RLEArray::try_from_data(RLEData::new_unchecked( + rle.values().clone(), + masked_indices, + rle.values_idx_offsets().clone(), + rle.dtype().clone(), + rle.offset(), + rle.len(), + )) + } + } + + #[test] + fn test_encode_all_null_chunk() -> VortexResult<()> { + let values: Vec> = vec![None; FL_CHUNK_SIZE]; + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> { + // First chunk is entirely null, second chunk has a value preceded by nulls. + let mut values: Vec> = vec![None; 2 * FL_CHUNK_SIZE]; + values[FL_CHUNK_SIZE + 100] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_one_value_near_end() -> VortexResult<()> { + // Single distinct value near the end of the chunk. + let mut values: Vec> = vec![None; FL_CHUNK_SIZE]; + values[1000] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + #[test] + fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> { + // 1085 elements (2 chunks: 1024 + 61 padded to 1024). + // Chunk 0 has -1i16 at scattered positions (273..=366), rest null. + // Chunk 1 (the remainder) is entirely null. + const NEG1_POSITIONS: &[usize] = &[ + 273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296, + 298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335, + 336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358, + 359, 362, 363, 364, 366, + ]; + let mut values: Vec> = vec![None; 1085]; + for &pos in NEG1_POSITIONS { + values[pos] = Some(-1); + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let decoded = with_masked_constant_indices(&rle)?; + assert_arrays_eq!(decoded, original); + Ok(()) + } + + /// Replaces indices at invalid (null) positions with random garbage values. + /// + /// This simulates a compressor that doesn't preserve index values at null + /// positions, which can happen when indices are further compressed and the + /// compressor clobbers invalid entries with arbitrary data. + fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult { + let indices_prim = rle.indices().to_primitive(); + let mut indices_data: Vec = indices_prim.as_slice::().to_vec(); + + // Use a simple deterministic "random" sequence. + let mut rng_state: u32 = 0xDEAD_BEEF; + let validity = indices_prim.validity(); + for (i, idx) in indices_data.iter_mut().enumerate() { + if !validity.is_valid(i).unwrap_or(true) { + // xorshift32 + rng_state ^= rng_state << 13; + rng_state ^= rng_state >> 17; + rng_state ^= rng_state << 5; + *idx = rng_state as u16; + } + } + + let clobbered_indices = + PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()).into_array(); + + unsafe { + RLEArray::try_from_data(RLEData::new_unchecked( + rle.values().clone(), + clobbered_indices, + rle.values_idx_offsets().clone(), + rle.dtype().clone(), + rle.offset(), + rle.len(), + )) + } + } + + #[test] + fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> { + let values: Vec> = vec![None; FL_CHUNK_SIZE]; + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_sparse_values() -> VortexResult<()> { + let mut values: Vec> = vec![None; FL_CHUNK_SIZE]; + values[0] = Some(10); + values[500] = Some(20); + values[1000] = Some(30); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> { + // Two chunks: first has scattered values, second is all null. + let mut values: Vec> = vec![None; 2 * FL_CHUNK_SIZE]; + values[0] = Some(10); + values[500] = Some(20); + values[FL_CHUNK_SIZE + 100] = Some(42); + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> { + // 1085 elements: chunk 0 has values at scattered positions, chunk 1 is + // a partial (61 elements padded to 1024) that is entirely null. + let mut values: Vec> = vec![None; 1085]; + for i in (100..200).step_by(7) { + values[i] = Some(i as u32); + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + + #[test] + fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> { + // Most positions are valid, only a few are null with garbage indices. + let mut values: Vec> = + (0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect(); + // Sprinkle in some nulls. + for i in (0..FL_CHUNK_SIZE).step_by(37) { + values[i] = None; + } + let original = PrimitiveArray::from_option_iter(values); + let rle = RLEData::encode(&original)?; + let clobbered = with_random_invalid_indices(&rle)?; + assert_arrays_eq!(clobbered, original); + Ok(()) + } + // Regression test: RLE compression properly supports decoding pos/neg zeros // See #[rstest] diff --git a/encodings/fastlanes/src/rle/array/rle_decompress.rs b/encodings/fastlanes/src/rle/array/rle_decompress.rs index d448ca1de0b..16121bf4953 100644 --- a/encodings/fastlanes/src/rle/array/rle_decompress.rs +++ b/encodings/fastlanes/src/rle/array/rle_decompress.rs @@ -1,10 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use arrayref::array_mut_ref; -use arrayref::array_ref; use fastlanes::RLE; use num_traits::AsPrimitive; +use num_traits::NumCast; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::PrimitiveArray; @@ -54,15 +53,16 @@ where let values = values.as_slice::(); let indices = array.indices().clone().execute::(ctx)?; - let indices = indices.as_slice::(); assert!(indices.len().is_multiple_of(FL_CHUNK_SIZE)); + let has_invalid = !indices.all_valid()?; + let (indices_sl, _) = indices.as_slice::().as_chunks::(); let chunk_start_idx = array.offset() / FL_CHUNK_SIZE; let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE); let num_chunks = chunk_end_idx - chunk_start_idx; let mut buffer = BufferMut::::with_capacity(num_chunks * FL_CHUNK_SIZE); - let buffer_uninit = buffer.spare_capacity_mut(); + let (out_buf, _) = buffer.spare_capacity_mut().as_chunks_mut::(); let values_idx_offsets = array .values_idx_offsets() @@ -70,26 +70,44 @@ where .execute::(ctx)?; let values_idx_offsets = values_idx_offsets.as_slice::(); - for chunk_idx in 0..num_chunks { + for (chunk_idx, (chunk_indices, chunk_out)) in + indices_sl.iter().zip(out_buf.iter_mut()).enumerate() + { // Offsets in `values_idx_offsets` are absolute and need to be shifted // by the offset of the first chunk, respective the current slice, in // order to make them relative. let value_idx_offset = (values_idx_offsets[chunk_idx].as_() - values_idx_offsets[0].as_()) as usize; - let chunk_values = &values[value_idx_offset..]; - let chunk_indices = &indices[chunk_idx * FL_CHUNK_SIZE..]; - - // SAFETY: `MaybeUninit` and `T` have the same layout. - let buffer_values: &mut [V] = unsafe { - std::mem::transmute(&mut buffer_uninit[chunk_idx * FL_CHUNK_SIZE..][..FL_CHUNK_SIZE]) + let next_value_idx_offset = if chunk_idx + 1 < num_chunks { + (values_idx_offsets[chunk_idx + 1].as_() - values_idx_offsets[0].as_()) as usize + } else { + values.len() }; + let num_chunk_values = next_value_idx_offset - value_idx_offset; - V::decode( - chunk_values, - array_ref![chunk_indices, 0, FL_CHUNK_SIZE], - array_mut_ref![buffer_values, 0, FL_CHUNK_SIZE], - ); + // SAFETY: `MaybeUninit` and `T` have the same layout. + let buffer_values: &mut [V; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(chunk_out) }; + let chunk_values = &values[value_idx_offset..]; + if num_chunk_values == 1 { + // Single-value chunk: fill directly to avoid out-of-bounds index + // access. The indices may contain values other than 0 when they + // have been further compressed (e.g., as a masked constant). + buffer_values.fill(chunk_values[0]); + } else if has_invalid { + // When the indices array has invalid (null) positions, those + // positions may contain arbitrary garbage values after further + // compression. Clamp all indices into [0, num_chunk_values) to + // prevent out-of-bounds access in the fastlanes decoder. + let mut sanitized = *chunk_indices; + for idx in sanitized.iter_mut() { + let v: usize = (*idx).into(); + *idx = NumCast::from(v % num_chunk_values).unwrap_or_default(); + } + V::decode(chunk_values, &sanitized, buffer_values); + } else { + V::decode(chunk_values, chunk_indices, buffer_values); + } } unsafe {