Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ anyhow = "1.0.97"
arbitrary = "1.3.2"
arc-swap = "1.8"
arcref = "0.2.0"
arrayref = "0.3.7"
arrow-arith = "58"
arrow-array = "58"
arrow-buffer = "58"
Expand Down
1 change: 0 additions & 1 deletion encodings/fastlanes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ version = { workspace = true }
workspace = true

[dependencies]
arrayref = { workspace = true }
fastlanes = { workspace = true }
itertools = { workspace = true }
lending-iterator = { workspace = true }
Expand Down
226 changes: 214 additions & 12 deletions encodings/fastlanes/src/rle/array/rle_compress.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrayref::array_mut_ref;
use fastlanes::RLE;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
Expand All @@ -12,6 +11,7 @@ use vortex_array::match_each_native_ptype;
use vortex_array::validity::Validity;
use vortex_buffer::BitBufferMut;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;

use crate::FL_CHUNK_SIZE;
Expand Down Expand Up @@ -48,46 +48,56 @@ where
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));

let values_uninit = values_buf.spare_capacity_mut();
let indices_uninit = indices_buf.spare_capacity_mut();
let (indices_uninit, _) = indices_buf
.spare_capacity_mut()
.as_chunks_mut::<FL_CHUNK_SIZE>();
let mut value_count_acc = 0; // Chunk value count prefix sum.

let (chunks, remainder) = values.as_chunks::<FL_CHUNK_SIZE>();

let mut process_chunk = |chunk_start_idx: usize, input: &[T; FL_CHUNK_SIZE]| {
let mut process_chunk = |input: &[T; FL_CHUNK_SIZE], rle_idxs: &mut [u16; FL_CHUNK_SIZE]| {
// SAFETY: NativeValue is repr(transparent)
let input: &[NativeValue<T>; FL_CHUNK_SIZE] = unsafe { std::mem::transmute(input) };

// SAFETY: `MaybeUninit<NativeValue<T>>` and `NativeValue<T>` have the same layout.
let rle_vals: &mut [NativeValue<T>] =
unsafe { std::mem::transmute(&mut values_uninit[value_count_acc..][..FL_CHUNK_SIZE]) };

// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
let rle_idxs: &mut [u16] =
unsafe { std::mem::transmute(&mut indices_uninit[chunk_start_idx..][..FL_CHUNK_SIZE]) };

// Capture chunk start indices. This is necessary as indices
// returned from `T::encode` are relative to the chunk.
values_idx_offsets.push(value_count_acc as u64);

let value_count = NativeValue::<T>::encode(
input,
array_mut_ref![rle_vals, 0, FL_CHUNK_SIZE],
array_mut_ref![rle_idxs, 0, FL_CHUNK_SIZE],
unsafe { &mut *(rle_vals.as_mut_ptr() as *mut [_; FL_CHUNK_SIZE]) },
rle_idxs,
);

value_count_acc += value_count;
};

for (chunk_idx, chunk_slice) in chunks.iter().enumerate() {
process_chunk(chunk_idx * FL_CHUNK_SIZE, chunk_slice);
for (chunk_slice, rle_idxs) in chunks.iter().zip(indices_uninit.iter_mut()) {
// SAFETY: `MaybeUninit<u16>` and `u16` have the same layout.
process_chunk(chunk_slice, unsafe {
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
rle_idxs,
)
});
}

if !remainder.is_empty() {
// Repeat the last value for padding to prevent
// accounting for an additional value change.
let mut padded_chunk = [values[len - 1]; FL_CHUNK_SIZE];
padded_chunk[..remainder.len()].copy_from_slice(remainder);
process_chunk((len / FL_CHUNK_SIZE) * FL_CHUNK_SIZE, &padded_chunk);
let last_idx_chunk = indices_uninit
.last_mut()
.vortex_expect("Must have the trailing chunk");
process_chunk(&padded_chunk, unsafe {
std::mem::transmute::<&mut [std::mem::MaybeUninit<u16>; 1024], &mut [u16; 1024]>(
last_idx_chunk,
)
});
}

unsafe {
Expand Down Expand Up @@ -137,10 +147,14 @@ mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::MaskedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::half::f16;
use vortex_buffer::Buffer;
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use super::*;

Expand Down Expand Up @@ -258,6 +272,194 @@ mod tests {
assert_arrays_eq!(decoded, expected);
}

/// Replaces the indices of an RLE array with MaskedArray(ConstantArray(1u16), validity).
///
/// Simulates a compressor that represents indices as a masked constant.
/// Valid when every chunk has at least two RLE dictionary entries (the
/// fill-forward default at index 0 and the actual value at index 1), which
/// holds whenever the first position of each chunk is null.
fn with_masked_constant_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
let indices_prim = rle.indices().to_primitive();
let masked_indices = MaskedArray::try_new(
ConstantArray::new(1u16, indices_prim.len()).into_array(),
indices_prim.validity(),
)?
.into_array();
// SAFETY: we only replace the indices child; dtype and length are unchanged
// and index 1 is valid in every chunk because each has ≥ 2 dictionary entries.
unsafe {
RLEArray::try_from_data(RLEData::new_unchecked(
rle.values().clone(),
masked_indices,
rle.values_idx_offsets().clone(),
rle.dtype().clone(),
rle.offset(),
rle.len(),
))
}
}

#[test]
fn test_encode_all_null_chunk() -> VortexResult<()> {
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}

#[test]
fn test_encode_all_null_chunk_then_value_chunk() -> VortexResult<()> {
// First chunk is entirely null, second chunk has a value preceded by nulls.
let mut values: Vec<Option<u32>> = vec![None; 2 * FL_CHUNK_SIZE];
values[FL_CHUNK_SIZE + 100] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}

#[test]
fn test_encode_one_value_near_end() -> VortexResult<()> {
// Single distinct value near the end of the chunk.
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
values[1000] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}

#[test]
fn test_encode_value_chunk_then_all_null_remainder() -> VortexResult<()> {
// 1085 elements (2 chunks: 1024 + 61 padded to 1024).
// Chunk 0 has -1i16 at scattered positions (273..=366), rest null.
// Chunk 1 (the remainder) is entirely null.
const NEG1_POSITIONS: &[usize] = &[
273, 276, 277, 278, 279, 281, 282, 284, 285, 286, 287, 288, 289, 291, 292, 293, 296,
298, 299, 302, 304, 308, 310, 311, 313, 314, 315, 317, 318, 322, 324, 325, 334, 335,
336, 337, 338, 339, 340, 341, 342, 343, 344, 346, 347, 348, 350, 352, 353, 355, 358,
359, 362, 363, 364, 366,
];
let mut values: Vec<Option<i16>> = vec![None; 1085];
for &pos in NEG1_POSITIONS {
values[pos] = Some(-1);
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let decoded = with_masked_constant_indices(&rle)?;
assert_arrays_eq!(decoded, original);
Ok(())
}

/// Replaces indices at invalid (null) positions with random garbage values.
///
/// This simulates a compressor that doesn't preserve index values at null
/// positions, which can happen when indices are further compressed and the
/// compressor clobbers invalid entries with arbitrary data.
fn with_random_invalid_indices(rle: &RLEArray) -> VortexResult<RLEArray> {
let indices_prim = rle.indices().to_primitive();
let mut indices_data: Vec<u16> = indices_prim.as_slice::<u16>().to_vec();

// Use a simple deterministic "random" sequence.
let mut rng_state: u32 = 0xDEAD_BEEF;
let validity = indices_prim.validity();
for (i, idx) in indices_data.iter_mut().enumerate() {
if !validity.is_valid(i).unwrap_or(true) {
// xorshift32
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 17;
rng_state ^= rng_state << 5;
*idx = rng_state as u16;
}
}

let clobbered_indices =
PrimitiveArray::new(Buffer::from(indices_data), indices_prim.validity()).into_array();

unsafe {
RLEArray::try_from_data(RLEData::new_unchecked(
rle.values().clone(),
clobbered_indices,
rle.values_idx_offsets().clone(),
rle.dtype().clone(),
rle.offset(),
rle.len(),
))
}
}

#[test]
fn test_random_invalid_indices_all_null_chunk() -> VortexResult<()> {
let values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}

#[test]
fn test_random_invalid_indices_sparse_values() -> VortexResult<()> {
let mut values: Vec<Option<u32>> = vec![None; FL_CHUNK_SIZE];
values[0] = Some(10);
values[500] = Some(20);
values[1000] = Some(30);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}

#[test]
fn test_random_invalid_indices_multi_chunk() -> VortexResult<()> {
// Two chunks: first has scattered values, second is all null.
let mut values: Vec<Option<i16>> = vec![None; 2 * FL_CHUNK_SIZE];
values[0] = Some(10);
values[500] = Some(20);
values[FL_CHUNK_SIZE + 100] = Some(42);
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}

#[test]
fn test_random_invalid_indices_partial_last_chunk() -> VortexResult<()> {
// 1085 elements: chunk 0 has values at scattered positions, chunk 1 is
// a partial (61 elements padded to 1024) that is entirely null.
let mut values: Vec<Option<u32>> = vec![None; 1085];
for i in (100..200).step_by(7) {
values[i] = Some(i as u32);
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}

#[test]
fn test_random_invalid_indices_mostly_valid() -> VortexResult<()> {
// Most positions are valid, only a few are null with garbage indices.
let mut values: Vec<Option<u64>> =
(0..FL_CHUNK_SIZE).map(|i| Some((i / 100) as u64)).collect();
// Sprinkle in some nulls.
for i in (0..FL_CHUNK_SIZE).step_by(37) {
values[i] = None;
}
let original = PrimitiveArray::from_option_iter(values);
let rle = RLEData::encode(&original)?;
let clobbered = with_random_invalid_indices(&rle)?;
assert_arrays_eq!(clobbered, original);
Ok(())
}

// Regression test: RLE compression properly supports decoding pos/neg zeros
// See <https://github.com/vortex-data/vortex/issues/6491>
#[rstest]
Expand Down
Loading
Loading