diff --git a/encodings/bytebool/public-api.lock b/encodings/bytebool/public-api.lock index 63d9c1b9518..61826569a6e 100644 --- a/encodings/bytebool/public-api.lock +++ b/encodings/bytebool/public-api.lock @@ -62,6 +62,10 @@ impl vortex_array::arrays::dict::take::TakeExecute for vortex_bytebool::ByteBool pub fn vortex_bytebool::ByteBool::take(array: vortex_array::array::view::ArrayView<'_, Self>, indices: &vortex_array::array::erased::ArrayRef, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::kernel::FilterReduce for vortex_bytebool::ByteBool + +pub fn vortex_bytebool::ByteBool::filter(array: vortex_array::array::view::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_bytebool::ByteBool pub fn vortex_bytebool::ByteBool::slice(array: vortex_array::array::view::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> diff --git a/encodings/bytebool/src/rules.rs b/encodings/bytebool/src/rules.rs index f67d3567326..b4dc61fa1c7 100644 --- a/encodings/bytebool/src/rules.rs +++ b/encodings/bytebool/src/rules.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use vortex_array::arrays::filter::FilterReduceAdaptor; use vortex_array::arrays::slice::SliceReduceAdaptor; use vortex_array::optimizer::rules::ParentRuleSet; use vortex_array::scalar_fn::fns::cast::CastReduceAdaptor; @@ -12,4 +13,5 @@ pub(crate) static RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&CastReduceAdaptor(ByteBool)), ParentRuleSet::lift(&MaskReduceAdaptor(ByteBool)), ParentRuleSet::lift(&SliceReduceAdaptor(ByteBool)), + ParentRuleSet::lift(&FilterReduceAdaptor(ByteBool)), ]); diff --git a/encodings/bytebool/src/slice.rs b/encodings/bytebool/src/slice.rs index 34ffa844b3f..85fd1cfcf0d 100644 --- a/encodings/bytebool/src/slice.rs +++ b/encodings/bytebool/src/slice.rs @@ -6,8 +6,11 @@ use std::ops::Range; use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::IntoArray; +use vortex_array::arrays::filter::FilterReduce; use vortex_array::arrays::slice::SliceReduce; use vortex_error::VortexResult; +use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ByteBool; @@ -22,3 +25,23 @@ impl SliceReduce for ByteBool { )) } } + +impl FilterReduce for ByteBool { + fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { + let ranges = match mask.slices() { + AllOr::Some(slices) => slices, + // Precondition: FilterReduce only runs for non-trivial masks. + AllOr::All | AllOr::None => { + unreachable!("precondition violated: expected a Mask::Values slice list") + } + }; + let ranges: Vec> = ranges.iter().map(|&(s, e)| s..e).collect(); + Ok(Some( + ByteBool::new( + array.buffer().filter_typed::(&ranges)?, + array.validity()?.filter(mask)?, + ) + .into_array(), + )) + } +} diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 08dffbab678..f1748852715 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -1720,6 +1720,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::Decimal pub fn vortex_array::arrays::Decimal::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::Decimal>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Decimal + +pub fn vortex_array::arrays::Decimal::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::Decimal pub fn vortex_array::arrays::Decimal::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -2556,14 +2560,30 @@ impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Consta pub fn vortex_array::arrays::Constant::filter(array: vortex_array::ArrayView<'_, vortex_array::arrays::Constant>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Decimal + +pub fn vortex_array::arrays::Decimal::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Extension pub fn vortex_array::arrays::Extension::filter(array: vortex_array::ArrayView<'_, vortex_array::arrays::Extension>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::FixedSizeList + +pub fn vortex_array::arrays::FixedSizeList::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Masked pub fn vortex_array::arrays::Masked::filter(array: vortex_array::ArrayView<'_, vortex_array::arrays::Masked>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Primitive + +pub fn vortex_array::arrays::Primitive::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::VarBinView + +pub fn vortex_array::arrays::VarBinView::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::dict::Dict pub fn vortex_array::arrays::dict::Dict::filter(array: vortex_array::ArrayView<'_, vortex_array::arrays::dict::Dict>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> @@ -2646,6 +2666,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::FixedSize pub fn vortex_array::arrays::FixedSizeList::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::FixedSizeList>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::FixedSizeList + +pub fn vortex_array::arrays::FixedSizeList::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::FixedSizeList pub fn vortex_array::arrays::FixedSizeList::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -3604,6 +3628,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::Primitive pub fn vortex_array::arrays::Primitive::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::Primitive>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Primitive + +pub fn vortex_array::arrays::Primitive::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::Primitive pub fn vortex_array::arrays::Primitive::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -4890,6 +4918,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::VarBinVie pub fn vortex_array::arrays::VarBinView::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::VarBinView>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::VarBinView + +pub fn vortex_array::arrays::VarBinView::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::VarBinView pub fn vortex_array::arrays::VarBinView::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -5428,6 +5460,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::Decimal pub fn vortex_array::arrays::Decimal::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::Decimal>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Decimal + +pub fn vortex_array::arrays::Decimal::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::Decimal pub fn vortex_array::arrays::Decimal::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -5762,6 +5798,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::FixedSize pub fn vortex_array::arrays::FixedSizeList::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::FixedSizeList>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::FixedSizeList + +pub fn vortex_array::arrays::FixedSizeList::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::FixedSizeList pub fn vortex_array::arrays::FixedSizeList::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -6242,6 +6282,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::Primitive pub fn vortex_array::arrays::Primitive::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::Primitive>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::Primitive + +pub fn vortex_array::arrays::Primitive::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::Primitive pub fn vortex_array::arrays::Primitive::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -6692,6 +6736,10 @@ impl vortex_array::arrays::dict::TakeExecute for vortex_array::arrays::VarBinVie pub fn vortex_array::arrays::VarBinView::take(array: vortex_array::ArrayView<'_, vortex_array::arrays::VarBinView>, indices: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::arrays::filter::FilterReduce for vortex_array::arrays::VarBinView + +pub fn vortex_array::arrays::VarBinView::filter(array: vortex_array::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult> + impl vortex_array::arrays::slice::SliceReduce for vortex_array::arrays::VarBinView pub fn vortex_array::arrays::VarBinView::slice(array: vortex_array::ArrayView<'_, Self>, range: core::ops::range::Range) -> vortex_error::VortexResult> @@ -7074,6 +7122,10 @@ pub fn vortex_array::buffer::BufferHandle::as_host_opt(&self) -> core::option::O pub fn vortex_array::buffer::BufferHandle::ensure_aligned(self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult +pub fn vortex_array::buffer::BufferHandle::filter(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult + +pub fn vortex_array::buffer::BufferHandle::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult + pub fn vortex_array::buffer::BufferHandle::into_host(self) -> futures_core::future::BoxFuture<'static, vortex_buffer::ByteBuffer> pub fn vortex_array::buffer::BufferHandle::into_host_sync(self) -> vortex_buffer::ByteBuffer @@ -7148,6 +7200,8 @@ pub fn vortex_array::buffer::DeviceBuffer::copy_to_host(&self, alignment: vortex pub fn vortex_array::buffer::DeviceBuffer::copy_to_host_sync(&self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult +pub fn vortex_array::buffer::DeviceBuffer::filter(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn vortex_array::buffer::DeviceBuffer::is_empty(&self) -> bool pub fn vortex_array::buffer::DeviceBuffer::len(&self) -> usize @@ -7156,10 +7210,14 @@ pub fn vortex_array::buffer::DeviceBuffer::slice(&self, range: core::ops::range: pub trait vortex_array::buffer::DeviceBufferExt: vortex_array::buffer::DeviceBuffer +pub fn vortex_array::buffer::DeviceBufferExt::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn vortex_array::buffer::DeviceBufferExt::slice_typed(&self, range: core::ops::range::Range) -> alloc::sync::Arc impl vortex_array::buffer::DeviceBufferExt for B +pub fn B::filter_typed(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + pub fn B::slice_typed(&self, range: core::ops::range::Range) -> alloc::sync::Arc pub mod vortex_array::builders @@ -21884,6 +21942,10 @@ pub fn vortex_array::ArrayRef::validity(&self) -> vortex_error::VortexResult vortex_error::VortexResult +pub fn vortex_array::ArrayRef::with_buffers(&self, buffers: alloc::vec::Vec) -> vortex_error::VortexResult + +pub fn vortex_array::ArrayRef::with_children(&self, children: alloc::vec::Vec) -> vortex_error::VortexResult + pub fn vortex_array::ArrayRef::with_slot(self, slot_idx: usize, replacement: vortex_array::ArrayRef) -> vortex_error::VortexResult pub fn vortex_array::ArrayRef::with_slots(self, slots: alloc::vec::Vec>) -> vortex_error::VortexResult diff --git a/vortex-array/src/aggregate_fn/fns/is_sorted/primitive.rs b/vortex-array/src/aggregate_fn/fns/is_sorted/primitive.rs index 21c80e7bd45..920052f575c 100644 --- a/vortex-array/src/aggregate_fn/fns/is_sorted/primitive.rs +++ b/vortex-array/src/aggregate_fn/fns/is_sorted/primitive.rs @@ -16,11 +16,11 @@ pub(super) fn check_primitive_sorted(array: &PrimitiveArray, strict: bool) -> Vo } fn compute_is_sorted(array: &PrimitiveArray, strict: bool) -> VortexResult { + let values = array.to_buffer::(); match array.validity_mask()? { Mask::AllFalse(_) => Ok(!strict), Mask::AllTrue(_) => { - let slice = array.as_slice::(); - let iter = slice.iter().copied().map(NativeValue); + let iter = values.iter().copied().map(NativeValue); Ok(if strict { iter.is_strict_sorted() @@ -32,7 +32,7 @@ fn compute_is_sorted(array: &PrimitiveArray, strict: bool) -> Vo let iter = mask_values .bit_buffer() .iter() - .zip_eq(array.as_slice::()) + .zip_eq(values.iter()) .map(|(is_valid, value)| is_valid.then_some(NativeValue(*value))); Ok(if strict { diff --git a/vortex-array/src/array/erased.rs b/vortex-array/src/array/erased.rs index c2fea73dd73..1bda1068e3f 100644 --- a/vortex-array/src/array/erased.rs +++ b/vortex-array/src/array/erased.rs @@ -501,6 +501,34 @@ impl ArrayRef { self.0.execute_parent(self, parent, child_idx, ctx) } + /// Returns a new array with its children replaced in child-order. + pub fn with_children(&self, children: Vec) -> VortexResult { + vortex_ensure!( + children.len() == self.nchildren(), + "expected {} children, got {}", + self.nchildren(), + children.len() + ); + let mut replacements = children.into_iter(); + let mut slots: Vec> = self.slots().to_vec(); + for slot in &mut slots { + if slot.is_some() { + *slot = Some( + replacements + .next() + .vortex_expect("validated child count must provide a replacement"), + ); + } + } + let inner = Arc::clone(&self.0); + inner.with_slots(self.clone(), slots) + } + + /// Returns a new array with its buffers replaced. + pub fn with_buffers(&self, buffers: Vec) -> VortexResult { + self.0.with_buffers(self, buffers) + } + // ArrayVisitor delegation methods /// Returns the children of the array. diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 75331ddc065..9298feaeabd 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -145,6 +145,12 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug { /// Returns a new array with the given slots. fn with_slots(&self, this: ArrayRef, slots: Vec>) -> VortexResult; + /// Returns a new array with its buffers replaced. + /// + /// This is used during lazy buffer materialization to swap device buffers + /// for host buffers while preserving metadata, slots, and statistics. + fn with_buffers(&self, this: &ArrayRef, buffers: Vec) -> VortexResult; + /// Attempt to reduce the array to a simpler representation. fn reduce(&self, this: &ArrayRef) -> VortexResult>; @@ -277,7 +283,6 @@ impl DynArray for ArrayInner { let view = unsafe { ArrayView::new_unchecked(this, &self.data) }; (0..V::nchildren(view)).map(|i| V::child(view, i)).collect() } - fn nchildren(&self, this: &ArrayRef) -> usize { let view = unsafe { ArrayView::new_unchecked(this, &self.data) }; V::nchildren(view) @@ -389,6 +394,26 @@ impl DynArray for ArrayInner { .into_array()) } + fn with_buffers(&self, this: &ArrayRef, buffers: Vec) -> VortexResult { + let view = unsafe { ArrayView::new_unchecked(this, &self.data) }; + let metadata_bytes = V::serialize(view, &LEGACY_SESSION)?.ok_or_else(|| { + vortex_err!("Array does not support serialization for buffer replacement") + })?; + let children: Vec = this.children(); + let stats = this.statistics().to_owned(); + let parts = self.vtable.deserialize( + this.dtype(), + this.len(), + &metadata_bytes, + &buffers, + &children.as_slice(), + &LEGACY_SESSION, + )?; + Ok(Array::::try_from_parts(parts)? + .with_stats_set(stats) + .into_array()) + } + fn reduce(&self, this: &ArrayRef) -> VortexResult> { let view = unsafe { ArrayView::new_unchecked(this, &self.data) }; let Some(reduced) = V::reduce(view)? else { diff --git a/vortex-array/src/arrays/decimal/compute/rules.rs b/vortex-array/src/arrays/decimal/compute/rules.rs index fae0c3dd866..4c90d6b67be 100644 --- a/vortex-array/src/arrays/decimal/compute/rules.rs +++ b/vortex-array/src/arrays/decimal/compute/rules.rs @@ -4,6 +4,8 @@ use std::ops::Range; use vortex_error::VortexResult; +use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ArrayRef; use crate::IntoArray; @@ -11,6 +13,8 @@ use crate::array::ArrayView; use crate::arrays::Decimal; use crate::arrays::DecimalArray; use crate::arrays::Masked; +use crate::arrays::filter::FilterReduce; +use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduce; use crate::arrays::slice::SliceReduceAdaptor; use crate::match_each_decimal_value_type; @@ -22,6 +26,7 @@ pub(crate) static RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&DecimalMaskedValidityRule), ParentRuleSet::lift(&MaskReduceAdaptor(Decimal)), ParentRuleSet::lift(&SliceReduceAdaptor(Decimal)), + ParentRuleSet::lift(&FilterReduceAdaptor(Decimal)), ]); /// Rule to push down validity masking from MaskedArray parent into DecimalArray child. @@ -71,3 +76,30 @@ impl SliceReduce for Decimal { Ok(Some(result)) } } + +impl FilterReduce for Decimal { + fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { + let ranges = match mask.slices() { + AllOr::Some(slices) => slices, + // Precondition: FilterReduce only runs for non-trivial masks. + AllOr::All | AllOr::None => { + unreachable!("precondition violated: expected a Mask::Values slice list") + } + }; + let ranges: Vec> = ranges.iter().map(|&(s, e)| s..e).collect(); + let result = match_each_decimal_value_type!(array.values_type(), |D| { + // SAFETY: Filtering preserves all DecimalArray invariants — values within + // precision bounds remain valid, and we correctly filter the validity. + unsafe { + DecimalArray::new_unchecked_handle( + array.buffer_handle().filter_typed::(&ranges)?, + array.values_type(), + array.decimal_dtype(), + array.validity()?.filter(mask)?, + ) + } + .into_array() + }); + Ok(Some(result)) + } +} diff --git a/vortex-array/src/arrays/fixed_size_list/compute/rules.rs b/vortex-array/src/arrays/fixed_size_list/compute/rules.rs index da1d91423e2..149b4bfe6f3 100644 --- a/vortex-array/src/arrays/fixed_size_list/compute/rules.rs +++ b/vortex-array/src/arrays/fixed_size_list/compute/rules.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use crate::arrays::FixedSizeList; +use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduceAdaptor; use crate::optimizer::rules::ParentRuleSet; use crate::scalar_fn::fns::cast::CastReduceAdaptor; @@ -11,4 +12,5 @@ pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new ParentRuleSet::lift(&CastReduceAdaptor(FixedSizeList)), ParentRuleSet::lift(&MaskReduceAdaptor(FixedSizeList)), ParentRuleSet::lift(&SliceReduceAdaptor(FixedSizeList)), + ParentRuleSet::lift(&FilterReduceAdaptor(FixedSizeList)), ]); diff --git a/vortex-array/src/arrays/fixed_size_list/compute/slice.rs b/vortex-array/src/arrays/fixed_size_list/compute/slice.rs index 5146fd2b600..7013ba151a0 100644 --- a/vortex-array/src/arrays/fixed_size_list/compute/slice.rs +++ b/vortex-array/src/arrays/fixed_size_list/compute/slice.rs @@ -4,12 +4,15 @@ use std::ops::Range; use vortex_error::VortexResult; +use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ArrayRef; use crate::IntoArray; use crate::array::ArrayView; use crate::arrays::FixedSizeList; use crate::arrays::FixedSizeListArray; +use crate::arrays::filter::FilterReduce; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; use crate::arrays::slice::SliceReduce; @@ -34,3 +37,44 @@ impl SliceReduce for FixedSizeList { )) } } + +impl FilterReduce for FixedSizeList { + fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { + let list_size = array.list_size() as usize; + let new_len = mask.true_count(); + + let filtered_elements = if list_size == 0 { + // Degenerate case: elements array is empty regardless of filter. + array.elements().clone() + } else { + let elements_len = array.elements().len(); + let slices = match mask.slices() { + AllOr::Some(slices) => slices, + // Precondition: FilterReduce only runs for non-trivial masks. + AllOr::All | AllOr::None => { + unreachable!("precondition violated: expected a Mask::Values slice list") + } + }; + let expanded_slices: Vec<(usize, usize)> = slices + .iter() + .map(|&(s, e)| (s * list_size, e * list_size)) + .collect(); + let elements_mask = Mask::from_slices(elements_len, expanded_slices); + array.elements().filter(elements_mask)? + }; + + // SAFETY: Filtering preserves FixedSizeListArray invariants — each selected list's + // elements are contiguously preserved, maintaining elements.len() == new_len * list_size. + Ok(Some( + unsafe { + FixedSizeListArray::new_unchecked( + filtered_elements, + array.list_size(), + array.validity()?.filter(mask)?, + new_len, + ) + } + .into_array(), + )) + } +} diff --git a/vortex-array/src/arrays/primitive/compute/rules.rs b/vortex-array/src/arrays/primitive/compute/rules.rs index 99b0a7464d5..74e35b5cf03 100644 --- a/vortex-array/src/arrays/primitive/compute/rules.rs +++ b/vortex-array/src/arrays/primitive/compute/rules.rs @@ -9,6 +9,7 @@ use crate::array::ArrayView; use crate::arrays::Masked; use crate::arrays::Primitive; use crate::arrays::PrimitiveArray; +use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduceAdaptor; use crate::optimizer::rules::ArrayParentReduceRule; use crate::optimizer::rules::ParentRuleSet; @@ -18,6 +19,7 @@ pub(crate) const RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&PrimitiveMaskedValidityRule), ParentRuleSet::lift(&MaskReduceAdaptor(Primitive)), ParentRuleSet::lift(&SliceReduceAdaptor(Primitive)), + ParentRuleSet::lift(&FilterReduceAdaptor(Primitive)), ]); /// Rule to push down validity masking from MaskedArray parent into PrimitiveArray child. diff --git a/vortex-array/src/arrays/primitive/compute/slice.rs b/vortex-array/src/arrays/primitive/compute/slice.rs index 1830ef3f8c6..a357e89527c 100644 --- a/vortex-array/src/arrays/primitive/compute/slice.rs +++ b/vortex-array/src/arrays/primitive/compute/slice.rs @@ -4,12 +4,15 @@ use std::ops::Range; use vortex_error::VortexResult; +use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ArrayRef; use crate::IntoArray; use crate::array::ArrayView; use crate::arrays::Primitive; use crate::arrays::PrimitiveArray; +use crate::arrays::filter::FilterReduce; use crate::arrays::slice::SliceReduce; use crate::dtype::NativePType; use crate::match_each_native_ptype; @@ -27,3 +30,25 @@ impl SliceReduce for Primitive { Ok(Some(result)) } } + +impl FilterReduce for Primitive { + fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { + let ranges = match mask.slices() { + AllOr::Some(slices) => slices, + // Precondition: FilterReduce only runs for non-trivial masks. + AllOr::All | AllOr::None => { + unreachable!("precondition violated: expected a Mask::Values slice list") + } + }; + let ranges: Vec> = ranges.iter().map(|&(s, e)| s..e).collect(); + let result = match_each_native_ptype!(array.ptype(), |T| { + PrimitiveArray::from_buffer_handle( + array.buffer_handle().filter_typed::(&ranges)?, + T::PTYPE, + array.validity()?.filter(mask)?, + ) + .into_array() + }); + Ok(Some(result)) + } +} diff --git a/vortex-array/src/arrays/primitive/vtable/operations.rs b/vortex-array/src/arrays/primitive/vtable/operations.rs index ddeaa386485..230da813e6b 100644 --- a/vortex-array/src/arrays/primitive/vtable/operations.rs +++ b/vortex-array/src/arrays/primitive/vtable/operations.rs @@ -17,7 +17,7 @@ impl OperationsVTable for Primitive { _ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(match_each_native_ptype!(array.ptype(), |T| { - Scalar::primitive(array.as_slice::()[index], array.dtype().nullability()) + Scalar::primitive(array.to_buffer::()[index], array.dtype().nullability()) })) } } diff --git a/vortex-array/src/arrays/varbinview/compute/rules.rs b/vortex-array/src/arrays/varbinview/compute/rules.rs index 5ec24dca7de..3a7b98cd5c5 100644 --- a/vortex-array/src/arrays/varbinview/compute/rules.rs +++ b/vortex-array/src/arrays/varbinview/compute/rules.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors use crate::arrays::VarBinView; +use crate::arrays::filter::FilterReduceAdaptor; use crate::arrays::slice::SliceReduceAdaptor; use crate::optimizer::rules::ParentRuleSet; use crate::scalar_fn::fns::cast::CastReduceAdaptor; @@ -10,4 +11,5 @@ pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&CastReduceAdaptor(VarBinView)), ParentRuleSet::lift(&MaskReduceAdaptor(VarBinView)), ParentRuleSet::lift(&SliceReduceAdaptor(VarBinView)), + ParentRuleSet::lift(&FilterReduceAdaptor(VarBinView)), ]); diff --git a/vortex-array/src/arrays/varbinview/compute/slice.rs b/vortex-array/src/arrays/varbinview/compute/slice.rs index 19f5a0d41b0..7fe9b09dd90 100644 --- a/vortex-array/src/arrays/varbinview/compute/slice.rs +++ b/vortex-array/src/arrays/varbinview/compute/slice.rs @@ -5,12 +5,15 @@ use std::ops::Range; use std::sync::Arc; use vortex_error::VortexResult; +use vortex_mask::AllOr; +use vortex_mask::Mask; use crate::ArrayRef; use crate::IntoArray; use crate::array::ArrayView; use crate::arrays::VarBinView; use crate::arrays::VarBinViewArray; +use crate::arrays::filter::FilterReduce; use crate::arrays::slice::SliceReduce; use crate::arrays::varbinview::BinaryView; @@ -29,3 +32,25 @@ impl SliceReduce for VarBinView { )) } } + +impl FilterReduce for VarBinView { + fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult> { + let ranges = match mask.slices() { + AllOr::Some(slices) => slices, + // Precondition: FilterReduce only runs for non-trivial masks. + AllOr::All | AllOr::None => { + unreachable!("precondition violated: expected a Mask::Values slice list") + } + }; + let ranges: Vec> = ranges.iter().map(|&(s, e)| s..e).collect(); + Ok(Some( + VarBinViewArray::new_handle( + array.views_handle().filter_typed::(&ranges)?, + Arc::clone(array.data_buffers()), + array.dtype().clone(), + array.validity()?.filter(mask)?, + ) + .into_array(), + )) + } +} diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs index 07baf843ead..3b20a7699ca 100644 --- a/vortex-array/src/buffer.rs +++ b/vortex-array/src/buffer.rs @@ -12,6 +12,7 @@ use futures::future::BoxFuture; use vortex_buffer::ALIGNMENT_TO_HOST_COPY; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_utils::dyn_traits::DynEq; @@ -87,6 +88,20 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { /// Note that slice indices are in byte units. fn slice(&self, range: Range) -> Arc; + /// Select and concatenate multiple byte ranges from this buffer into a new buffer. + /// + /// Unlike [`slice`](DeviceBuffer::slice), this method allocates new memory and copies the + /// selected ranges into a contiguous buffer. + /// + /// Follow-up: introduce a lazy/composite device-buffer representation for deferred multi-range + /// gathers. That would let accelerators accumulate slice/filter plans and realize them later + /// with a device-native gather/compaction kernel instead of eagerly copying here. + /// + /// # Errors + /// + /// Returns an error if the device cannot allocate memory or copy the data. + fn filter(&self, ranges: &[Range]) -> VortexResult>; + /// Return a buffer with the given alignment. Where possible, this will be zero-copy. /// /// # Errors @@ -98,6 +113,16 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { pub trait DeviceBufferExt: DeviceBuffer { /// Slice a range of elements `T` out of the device buffer. fn slice_typed(&self, range: Range) -> Arc; + + /// Select and concatenate multiple element ranges of type `T` from this buffer. + /// + /// # Errors + /// + /// Returns an error if the device cannot allocate memory or copy the data. + fn filter_typed( + &self, + ranges: &[Range], + ) -> VortexResult>; } impl DeviceBufferExt for B { @@ -106,6 +131,17 @@ impl DeviceBufferExt for B { let end_bytes = range.end * size_of::(); self.slice(start_bytes..end_bytes) } + + fn filter_typed( + &self, + ranges: &[Range], + ) -> VortexResult> { + let byte_ranges: Vec> = ranges + .iter() + .map(|r| (r.start * size_of::())..(r.end * size_of::())) + .collect(); + self.filter(&byte_ranges) + } } impl Hash for dyn DeviceBuffer { @@ -202,6 +238,55 @@ impl BufferHandle { } } + /// Select and concatenate multiple byte ranges from this buffer into a new buffer. + /// + /// Unlike [`slice`](BufferHandle::slice), this method allocates a new buffer and copies + /// the selected ranges. + /// + /// # Example + /// + /// ``` + /// # use vortex_array::buffer::BufferHandle; + /// # use vortex_buffer::buffer; + /// let handle = BufferHandle::new_host(buffer![1u8, 2, 3, 4, 5, 6]); + /// let filtered = handle.filter(&[0..2, 4..6]).unwrap(); + /// assert_eq!(filtered.unwrap_host(), buffer![1u8, 2, 5, 6]); + /// ``` + pub fn filter(&self, ranges: &[Range]) -> VortexResult { + match &self.0 { + Inner::Host(host) => { + let total_len: usize = ranges.iter().map(|r| r.len()).sum(); + let mut result = ByteBufferMut::with_capacity_aligned(total_len, host.alignment()); + for range in ranges { + result.extend_from_slice(&host.as_slice()[range.start..range.end]); + } + Ok(BufferHandle::new_host(result.freeze())) + } + Inner::Device(device) => Ok(BufferHandle::new_device(device.filter(ranges)?)), + } + } + + /// Select and concatenate multiple element ranges of type `T` from this buffer. + /// + /// # Example + /// + /// ``` + /// # use vortex_array::buffer::BufferHandle; + /// # use vortex_buffer::{buffer, Buffer}; + /// let values = buffer![1u32, 2u32, 3u32, 4u32, 5u32, 6u32]; + /// let handle = BufferHandle::new_host(values.into_byte_buffer()); + /// let filtered = handle.filter_typed::(&[0..2, 4..6]).unwrap(); + /// let result = Buffer::::from_byte_buffer(filtered.to_host_sync()); + /// assert_eq!(result, buffer![1, 2, 5, 6]); + /// ``` + pub fn filter_typed(&self, ranges: &[Range]) -> VortexResult { + let byte_ranges: Vec> = ranges + .iter() + .map(|r| (r.start * size_of::())..(r.end * size_of::())) + .collect(); + self.filter(&byte_ranges) + } + /// Reinterpret the pointee as a buffer of `T` and slice the provided element range. /// /// # Example diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 3257318cde4..80c7c8e8107 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -287,6 +287,67 @@ impl DeviceBuffer for CudaDeviceBuffer { })) } + fn filter(&self, ranges: &[Range]) -> VortexResult> { + let total_len: usize = ranges.iter().map(|r| r.len()).sum(); + + let stream = self.allocation.stream(); + stream + .context() + .bind_to_thread() + .map_err(|e| vortex_err!("Failed to bind CUDA context: {}", e))?; + + if total_len == 0 { + let dst_slice: CudaSlice = unsafe { + stream + .alloc::(0) + .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e))? + }; + return Ok(Arc::new(CudaDeviceBuffer::new(dst_slice))); + } + + // Allocate new device memory for the filtered result. + // Follow-up: represent multi-range device selections lazily so CUDA can defer this gather + // and realize it later with a dedicated gather/compaction kernel (for example via CUB or + // a custom kernel) instead of eagerly issuing one memcpy per selected range here. + let dst_slice: CudaSlice = unsafe { + stream + .alloc::(total_len) + .map_err(|e| vortex_err!("Failed to allocate device memory for filter: {}", e))? + }; + + let (dst_base_ptr, _) = dst_slice.device_ptr(stream); + + // Copy each selected range from source to the new contiguous buffer. + let mut dst_offset: u64 = 0; + for range in ranges { + assert!( + range.end <= self.len, + "Filter range end {} exceeds buffer size {}", + range.end, + self.len + ); + let src_ptr = self.device_ptr + (self.offset + range.start) as u64; + let len = range.len(); + if len > 0 { + unsafe { + sys::cuMemcpyDtoDAsync_v2( + dst_base_ptr + dst_offset, + src_ptr, + len, + stream.cu_stream(), + ) + .result() + .map_err(|e| { + vortex_err!("Failed to copy device memory during filter: {}", e) + })?; + } + } + dst_offset += len as u64; + } + + Ok(Arc::new(CudaDeviceBuffer::new(dst_slice))) + } + /// Slices the CUDA device buffer to a subrange. /// /// This is a byte range, not elements range, due to the DeviceBuffer interface. diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index 59ec83b7fe9..511223677f6 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -40,6 +40,10 @@ impl vortex_layout::segments::source::SegmentSource for vortex_file::segments::F pub fn vortex_file::segments::FileSegmentSource::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::source::SegmentFuture +pub fn vortex_file::segments::FileSegmentSource::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::source::SegmentFuture + +pub fn vortex_file::segments::FileSegmentSource::segment_len(&self, id: vortex_layout::segments::SegmentId) -> core::option::Option + pub struct vortex_file::segments::InitialReadSegmentCache pub vortex_file::segments::InitialReadSegmentCache::fallback: alloc::sync::Arc @@ -58,12 +62,18 @@ pub vortex_file::segments::RequestMetrics::coalesced_requests: vortex_metrics::c pub vortex_file::segments::RequestMetrics::individual_requests: vortex_metrics::counter::Counter +pub vortex_file::segments::RequestMetrics::logical_requested_bytes: vortex_metrics::counter::Counter + pub vortex_file::segments::RequestMetrics::num_requests_coalesced: vortex_metrics::histogram::Histogram impl vortex_file::segments::RequestMetrics pub fn vortex_file::segments::RequestMetrics::new(metrics_registry: &dyn vortex_metrics::MetricsRegistry, labels: alloc::vec::Vec) -> Self +impl core::clone::Clone for vortex_file::segments::RequestMetrics + +pub fn vortex_file::segments::RequestMetrics::clone(&self) -> vortex_file::segments::RequestMetrics + pub mod vortex_file::v2 pub struct vortex_file::v2::FileStatsLayoutReader diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 3a5d3cf90e5..f9119a9f811 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -13,6 +13,7 @@ use vortex_buffer::ByteBuffer; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::InstrumentedSegmentCache; @@ -212,6 +213,11 @@ impl VortexOpenOptions { .metrics_registry .clone() .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default())); + let reader = InstrumentedReadAt::new_with_labels( + reader, + metrics_registry.as_ref(), + self.labels.clone(), + ); let footer = if let Some(footer) = self.footer { footer diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..e51e2ceacc1 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicUsize; @@ -12,9 +13,11 @@ use futures::FutureExt; use futures::StreamExt; use futures::channel::mpsc; use futures::future; +use futures::future::try_join_all; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_error::vortex_panic; @@ -41,6 +44,17 @@ pub enum ReadEvent { Dropped(RequestId), } +fn apply_ranges(buffer: BufferHandle, ranges: &[Range]) -> VortexResult { + match ranges { + [] => buffer.filter(&[]), + [range] if range.start.is_multiple_of(*buffer.alignment()) => { + Ok(buffer.slice(range.clone())) + } + [range] => buffer.filter(std::slice::from_ref(range)), + _ => buffer.filter(ranges), + } +} + /// A [`SegmentSource`] for file-like IO. /// ## Coalescing and Pre-fetching /// @@ -68,6 +82,7 @@ pub struct FileSegmentSource { events: mpsc::UnboundedSender, /// The next read request ID. next_id: Arc, + metrics: RequestMetrics, } impl FileSegmentSource { @@ -103,7 +118,7 @@ impl FileSegmentSource { StreamExt::boxed(recv), coalesce_config, max_alignment, - metrics, + metrics.clone(), ) .boxed(); @@ -129,52 +144,143 @@ impl FileSegmentSource { segments, events: send, next_id: Arc::new(AtomicUsize::new(0)), + metrics, } } -} -impl SegmentSource for FileSegmentSource { - fn request(&self, id: SegmentId) -> SegmentFuture { - // We eagerly register the read request here assuming the behaviour of [`FileRead`], where - // coalescing becomes effective prior to the future being polled. - let spec = *match self.segments.get(*id as usize) { - Some(spec) => spec, - None => { - return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed(); - } - }; - - let SegmentSpec { - offset, - length, - alignment, - } = spec; + fn segment_spec(&self, id: SegmentId) -> VortexResult { + self.segments + .get(*id as usize) + .copied() + .ok_or_else(|| vortex_err!("Missing segment: {}", id)) + } + fn submit_read(&self, offset: u64, length: usize, alignment: Alignment) -> SegmentFuture { let (send, recv) = oneshot::channel(); let id = self.next_id.fetch_add(1, Ordering::Relaxed); let event = ReadEvent::Request(ReadRequest { id, offset, - length: length as usize, + length, alignment, callback: send, }); - // If we fail to submit the event, we create a future that has failed. if let Err(e) = self.events.unbounded_send(event) { return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed(); } - let fut = ReadFuture { + ReadFuture { id, recv: recv.into_future(), polled: false, finished: false, events: self.events.clone(), + } + .boxed() + } +} + +impl SegmentSource for FileSegmentSource { + fn segment_len(&self, id: SegmentId) -> Option { + self.segments + .get(*id as usize) + .map(|spec| spec.length as usize) + } + + fn request(&self, id: SegmentId) -> SegmentFuture { + // We eagerly register the read request here assuming the behaviour of [`FileRead`], where + // coalescing becomes effective prior to the future being polled. + let spec = match self.segment_spec(id) { + Ok(spec) => spec, + Err(err) => return future::ready(Err(err)).boxed(), + }; + + let requested_bytes = self.metrics.logical_requested_bytes.clone(); + let future = self.submit_read(spec.offset, spec.length as usize, spec.alignment); + async move { + let buffer = future.await?; + requested_bytes.add(u64::from(spec.length)); + Ok(buffer) + } + .boxed() + } + + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + let spec = match self.segment_spec(id) { + Ok(spec) => spec, + Err(err) => return future::ready(Err(err)).boxed(), }; - // One allocation: we only box the returned SegmentFuture, not the inner ReadFuture. - fut.boxed() + let segment_len = spec.length as usize; + for range in &ranges { + if range.start > range.end || range.end > segment_len { + return future::ready(Err(vortex_err!( + "Segment {} range {}..{} out of bounds for segment length {}", + id, + range.start, + range.end, + segment_len + ))) + .boxed(); + } + } + + let total_len: usize = ranges.iter().map(Range::len).sum(); + let requested_bytes = self.metrics.logical_requested_bytes.clone(); + + match ranges.as_slice() { + [] => { + requested_bytes.add(0); + future::ready(Ok(BufferHandle::new_host(ByteBuffer::empty()))).boxed() + } + [range] => { + let future = self.submit_read( + spec.offset + range.start as u64, + range.len(), + Alignment::none(), + ); + async move { + let buffer = future.await?; + requested_bytes.add(total_len as u64); + Ok(buffer) + } + .boxed() + } + _ => { + let read_futures = ranges + .into_iter() + .map(|range| { + self.submit_read( + spec.offset + range.start as u64, + range.len(), + Alignment::none(), + ) + }) + .collect::>(); + async move { + let chunks = try_join_all(read_futures.into_iter().map(|future| async move { + let handle = future.await?; + handle.try_into_host()?.await + })) + .await?; + // Follow-up: teach the lower I/O API to support scatter/gather into a + // caller-owned destination buffer so this gather copy can be removed. + // Local files should be able to use vectored `preadv`/`preadv2`-style reads + // into the final output slices, while other backends can continue to issue a + // smaller number of merged contiguous reads. A later follow-up should thread + // through `DIRECT_IO` alignment/padding constraints for local files as well. + let mut gathered = + ByteBufferMut::with_capacity_aligned(total_len, Alignment::none()); + for chunk in chunks { + gathered.extend_from_slice(chunk.as_ref()); + } + requested_bytes.add(total_len as u64); + Ok(BufferHandle::new_host(gathered.freeze())) + } + .boxed() + } + } } } @@ -231,10 +337,12 @@ impl Drop for ReadFuture { } } +#[derive(Clone)] pub struct RequestMetrics { pub individual_requests: Counter, pub coalesced_requests: Counter, pub num_requests_coalesced: Histogram, + pub logical_requested_bytes: Counter, } impl RequestMetrics { @@ -247,8 +355,11 @@ impl RequestMetrics { .add_labels(labels.clone()) .counter("io.requests.coalesced"), num_requests_coalesced: MetricBuilder::new(metrics_registry) - .add_labels(labels) + .add_labels(labels.clone()) .histogram("io.requests.coalesced.num_coalesced"), + logical_requested_bytes: MetricBuilder::new(metrics_registry) + .add_labels(labels) + .counter("vortex.file.segments.requested_bytes"), } } } @@ -270,6 +381,12 @@ impl BufferSegmentSource { } impl SegmentSource for BufferSegmentSource { + fn segment_len(&self, id: SegmentId) -> Option { + self.segments + .get(*id as usize) + .map(|spec| spec.length as usize) + } + fn request(&self, id: SegmentId) -> SegmentFuture { let spec = match self.segments.get(*id as usize) { Some(spec) => spec, @@ -294,4 +411,119 @@ impl SegmentSource for BufferSegmentSource { let slice = self.buffer.slice(start..end).aligned(spec.alignment); future::ready(Ok(BufferHandle::new_host(slice))).boxed() } + + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + let spec = match self.segments.get(*id as usize) { + Some(spec) => spec, + None => { + return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed(); + } + }; + + let start = spec.offset as usize; + let end = start + spec.length as usize; + if end > self.buffer.len() { + return future::ready(Err(vortex_err!( + "Segment {} range {}..{} out of bounds for buffer of length {}", + *id, + start, + end, + self.buffer.len() + ))) + .boxed(); + } + + let segment = BufferHandle::new_host(self.buffer.slice(start..end).aligned(spec.alignment)); + future::ready(apply_ranges(segment, &ranges)).boxed() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures::future::BoxFuture; + use vortex_buffer::ByteBuffer; + use vortex_io::InstrumentedReadAt; + use vortex_io::VortexReadAt; + use vortex_metrics::DefaultMetricsRegistry; + use vortex_metrics::MetricValue; + + use super::*; + + #[derive(Clone)] + struct YieldingReadAt(ByteBuffer); + + impl VortexReadAt for YieldingReadAt { + fn coalesce_config(&self) -> Option { + self.0.coalesce_config() + } + + fn concurrency(&self) -> usize { + self.0.concurrency() + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.0.size() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let inner = self.0.clone(); + async move { + tokio::task::yield_now().await; + inner.read_at(offset, length, alignment).await + } + .boxed() + } + } + + #[tokio::test] + async fn request_ranges_packs_bytes_and_exposes_metrics() { + let metrics_registry = DefaultMetricsRegistry::default(); + let reader = InstrumentedReadAt::new( + YieldingReadAt(ByteBuffer::from((0u8..64).collect::>())), + &metrics_registry, + ); + let metrics = RequestMetrics::new(&metrics_registry, vec![]); + let source = FileSegmentSource::open( + Arc::from([SegmentSpec { + offset: 10, + length: 20, + alignment: Alignment::none(), + }]), + reader, + Handle::find().expect("tokio runtime should provide a Vortex handle"), + metrics, + ); + + let result = source + .request_ranges(SegmentId::from(0), vec![1..4, 8..10]) + .await + .unwrap() + .unwrap_host(); + assert_eq!(result.as_slice(), &[11, 12, 13, 18, 19]); + + let snapshot = metrics_registry.snapshot(); + let mut logical_bytes = 0_u64; + let mut physical_bytes = 0_u64; + for metric in snapshot.iter() { + match metric.value() { + MetricValue::Counter(counter) => match metric.name().as_ref() { + "vortex.file.segments.requested_bytes" => logical_bytes = counter.value(), + "vortex.io.read.total_size" => physical_bytes = counter.value(), + _ => {} + }, + MetricValue::Histogram(_) => {} + _ => {} + } + } + + assert_eq!(logical_bytes, 5); + assert!(physical_bytes >= logical_bytes); + } } diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index a2dbc7860c2..08308c3bfb5 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -6,6 +6,70 @@ pub mod vortex_layout::aliases::paste pub use vortex_layout::aliases::paste::paste +pub mod vortex_layout::buffer + +pub struct vortex_layout::buffer::LazyBufferHandle + +impl vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::alignment(&self) -> vortex_buffer::alignment::Alignment + +pub fn vortex_layout::buffer::LazyBufferHandle::byte_ranges(&self) -> core::option::Option<&[core::ops::range::Range]> + +pub fn vortex_layout::buffer::LazyBufferHandle::filter(&self, ranges: &[core::ops::range::Range]) -> Self + +pub fn vortex_layout::buffer::LazyBufferHandle::is_empty(&self) -> bool + +pub fn vortex_layout::buffer::LazyBufferHandle::len(&self) -> usize + +pub async fn vortex_layout::buffer::LazyBufferHandle::materialize(&self) -> vortex_error::VortexResult + +pub fn vortex_layout::buffer::LazyBufferHandle::new(source: alloc::sync::Arc, segment_id: vortex_layout::segments::SegmentId, segment_len: usize, alignment: vortex_buffer::alignment::Alignment) -> Self + +pub fn vortex_layout::buffer::LazyBufferHandle::segment_id(&self) -> vortex_layout::segments::SegmentId + +pub fn vortex_layout::buffer::LazyBufferHandle::slice(&self, range: core::ops::range::Range) -> Self + +impl core::clone::Clone for vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::clone(&self) -> vortex_layout::buffer::LazyBufferHandle + +impl core::cmp::Eq for vortex_layout::buffer::LazyBufferHandle + +impl core::cmp::PartialEq for vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::eq(&self, other: &Self) -> bool + +impl core::fmt::Debug for vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl core::hash::Hash for vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::hash(&self, state: &mut H) + +impl vortex_array::buffer::DeviceBuffer for vortex_layout::buffer::LazyBufferHandle + +pub fn vortex_layout::buffer::LazyBufferHandle::aligned(self: alloc::sync::Arc, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult> + +pub fn vortex_layout::buffer::LazyBufferHandle::alignment(&self) -> vortex_buffer::alignment::Alignment + +pub fn vortex_layout::buffer::LazyBufferHandle::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_layout::buffer::LazyBufferHandle::copy_to_host(&self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult>> + +pub fn vortex_layout::buffer::LazyBufferHandle::copy_to_host_sync(&self, alignment: vortex_buffer::alignment::Alignment) -> vortex_error::VortexResult + +pub fn vortex_layout::buffer::LazyBufferHandle::filter(&self, ranges: &[core::ops::range::Range]) -> vortex_error::VortexResult> + +pub fn vortex_layout::buffer::LazyBufferHandle::len(&self) -> usize + +pub fn vortex_layout::buffer::LazyBufferHandle::slice(&self, range: core::ops::range::Range) -> alloc::sync::Arc + +pub fn vortex_layout::buffer::create_lazy_array_parts(array_tree: vortex_buffer::ByteBuffer, source: alloc::sync::Arc, segment_id: vortex_layout::segments::SegmentId) -> vortex_error::VortexResult + +pub async fn vortex_layout::buffer::materialize_recursive(array: &vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult + pub mod vortex_layout::display pub struct vortex_layout::display::DisplayLayoutTree @@ -1186,6 +1250,10 @@ impl vortex_layout::segments::SegmentSource for vortex_layout::segments::Segment pub fn vortex_layout::segments::SegmentCacheSourceAdapter::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::SegmentFuture +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::SegmentFuture + +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::segment_len(&self, id: vortex_layout::segments::SegmentId) -> core::option::Option + pub struct vortex_layout::segments::SegmentId(_) impl core::clone::Clone for vortex_layout::segments::SegmentId @@ -1252,6 +1320,10 @@ impl vortex_layout::segments::Segment pub fn vortex_layout::segments::SharedSegmentSource::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::SegmentFuture +pub fn vortex_layout::segments::SharedSegmentSource::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::SegmentFuture + +pub fn vortex_layout::segments::SharedSegmentSource::segment_len(&self, id: vortex_layout::segments::SegmentId) -> core::option::Option + pub trait vortex_layout::segments::SegmentCache: core::marker::Send + core::marker::Sync pub fn vortex_layout::segments::SegmentCache::get<'life0, 'async_trait>(&'life0 self, id: vortex_layout::segments::SegmentId) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait @@ -1284,14 +1356,26 @@ pub trait vortex_layout::segments::SegmentSource: 'static + core::marker::Send + pub fn vortex_layout::segments::SegmentSource::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::SegmentFuture +pub fn vortex_layout::segments::SegmentSource::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::SegmentFuture + +pub fn vortex_layout::segments::SegmentSource::segment_len(&self, _id: vortex_layout::segments::SegmentId) -> core::option::Option + impl vortex_layout::segments::SegmentSource for vortex_layout::segments::SegmentCacheSourceAdapter pub fn vortex_layout::segments::SegmentCacheSourceAdapter::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::SegmentFuture +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::SegmentFuture + +pub fn vortex_layout::segments::SegmentCacheSourceAdapter::segment_len(&self, id: vortex_layout::segments::SegmentId) -> core::option::Option + impl vortex_layout::segments::SegmentSource for vortex_layout::segments::SharedSegmentSource pub fn vortex_layout::segments::SharedSegmentSource::request(&self, id: vortex_layout::segments::SegmentId) -> vortex_layout::segments::SegmentFuture +pub fn vortex_layout::segments::SharedSegmentSource::request_ranges(&self, id: vortex_layout::segments::SegmentId, ranges: alloc::vec::Vec>) -> vortex_layout::segments::SegmentFuture + +pub fn vortex_layout::segments::SharedSegmentSource::segment_len(&self, id: vortex_layout::segments::SegmentId) -> core::option::Option + pub type vortex_layout::segments::SegmentFuture = futures_core::future::BoxFuture<'static, vortex_error::VortexResult> pub type vortex_layout::segments::SegmentSinkRef = alloc::sync::Arc diff --git a/vortex-layout/src/buffer.rs b/vortex-layout/src/buffer.rs new file mode 100644 index 00000000000..9b80abe89bd --- /dev/null +++ b/vortex-layout/src/buffer.rs @@ -0,0 +1,676 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::hash::Hash; +use std::hash::Hasher; +use std::ops::Range; +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use futures::future::try_join_all; +use vortex_array::ArrayRef; +use vortex_array::buffer::BufferHandle; +use vortex_array::buffer::DeviceBuffer; +use vortex_array::serde::SerializedArray; +use vortex_buffer::Alignment; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::segments::SegmentId; +use crate::segments::SegmentSource; + +/// A lazy buffer handle that defers segment I/O until materialization. +/// +/// Wraps a [`SegmentSource`] and [`SegmentId`] together with an optional byte +/// selection. Operations like [`slice`](Self::slice) and [`filter`](Self::filter) +/// accumulate without triggering I/O, allowing the system to determine the exact +/// byte ranges needed before reading. +#[derive(Clone)] +pub struct LazyBufferHandle { + source: Arc, + segment_id: SegmentId, + selection: Selection, + len: usize, + alignment: Alignment, +} + +/// Byte selection within a segment buffer. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +enum Selection { + /// The entire segment is selected. + All, + /// A single contiguous byte range within the segment. + Range(Range), + /// Multiple non-overlapping, sorted byte ranges within the segment. + Ranges(Arc<[Range]>), +} + +#[allow(clippy::same_name_method)] +impl LazyBufferHandle { + /// Create a new lazy handle selecting the entire segment. + /// + /// `segment_len` is the full logical length of the segment in bytes. + pub fn new( + source: Arc, + segment_id: SegmentId, + segment_len: usize, + alignment: Alignment, + ) -> Self { + Self { + source, + segment_id, + selection: Selection::All, + len: segment_len, + alignment, + } + } + + /// Returns the length of the selected byte range(s). + pub fn len(&self) -> usize { + self.len + } + + /// Returns whether the buffer is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the alignment of the buffer. + pub fn alignment(&self) -> Alignment { + self.alignment + } + + /// Returns the segment ID. + pub fn segment_id(&self) -> SegmentId { + self.segment_id + } + + /// Returns the byte ranges that will be read from the segment, or `None` if the + /// entire segment is selected. + pub fn byte_ranges(&self) -> Option<&[Range]> { + match &self.selection { + Selection::All => None, + Selection::Range(r) => Some(std::slice::from_ref(r)), + Selection::Ranges(ranges) => Some(ranges), + } + } + + /// Narrow to a contiguous byte range within the current selection. + /// + /// The range is interpreted relative to the current selection's logical byte + /// offsets (i.e., offsets into the bytes that would be produced by materializing + /// the current selection). + /// + /// # Panics + /// + /// Panics if the range exceeds the bounds of the current selection (when + /// those bounds are known). + pub fn slice(&self, range: Range) -> Self { + validate_slice_range(&range, self.len); + let new_len = range.len(); + let selection = match &self.selection { + Selection::All => Selection::Range(range), + Selection::Range(base) => { + let start = base.start + range.start; + let end = base.start + range.end; + assert!( + end <= base.end, + "slice range {}..{} exceeds current selection 0..{}", + range.start, + range.end, + base.len(), + ); + Selection::Range(start..end) + } + Selection::Ranges(existing) => slice_into_ranges(existing, range), + }; + Self { + source: Arc::clone(&self.source), + segment_id: self.segment_id, + selection, + len: new_len, + alignment: self.alignment, + } + } + + /// Select multiple byte ranges within the current view. + /// + /// Ranges are interpreted relative to the current selection's logical byte + /// offsets and must be sorted and non-overlapping. + /// + /// # Panics + /// + /// Panics if any range exceeds the bounds of the current selection (when + /// those bounds are known). + pub fn filter(&self, ranges: &[Range]) -> Self { + validate_filter_ranges(ranges, self.len); + let selection = match &self.selection { + Selection::All => Selection::Ranges(Arc::from(ranges)), + Selection::Range(base) => { + let absolute: Arc<[Range]> = ranges + .iter() + .map(|r| { + let abs = (base.start + r.start)..(base.start + r.end); + assert!( + abs.end <= base.end, + "filter range {}..{} exceeds current selection 0..{}", + r.start, + r.end, + base.len(), + ); + abs + }) + .collect(); + Selection::Ranges(absolute) + } + Selection::Ranges(existing) => { + // Each input range is relative to the concatenated output of + // the existing ranges. Map them back to absolute segment offsets. + let mut result = Vec::new(); + for r in ranges { + match slice_into_ranges(existing, r.clone()) { + Selection::All => unreachable!(), + Selection::Range(abs) => result.push(abs), + Selection::Ranges(abs) => result.extend_from_slice(&abs), + } + } + Selection::Ranges(result.into()) + } + }; + Self { + source: Arc::clone(&self.source), + segment_id: self.segment_id, + selection, + len: ranges.iter().map(Range::len).sum(), + alignment: self.alignment, + } + } + + /// Materialize the lazy buffer by performing I/O and applying the selection. + /// + /// # Errors + /// + /// Returns an error if the segment cannot be loaded or the selection cannot be + /// applied. + pub async fn materialize(&self) -> VortexResult { + match &self.selection { + Selection::All => self.source.request(self.segment_id).await, + Selection::Range(range) => { + self.source + .request_ranges(self.segment_id, vec![range.clone()]) + .await + } + Selection::Ranges(ranges) => { + self.source + .request_ranges(self.segment_id, ranges.iter().cloned().collect()) + .await + } + } + } +} + +impl Debug for LazyBufferHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LazyBufferHandle") + .field("segment_id", &self.segment_id) + .field("selection", &self.selection) + .field("len", &self.len) + .field("alignment", &self.alignment) + .finish() + } +} + +impl PartialEq for LazyBufferHandle { + fn eq(&self, other: &Self) -> bool { + self.segment_id == other.segment_id + && self.selection == other.selection + && self.len == other.len + && self.alignment == other.alignment + } +} + +impl Eq for LazyBufferHandle {} + +impl Hash for LazyBufferHandle { + fn hash(&self, state: &mut H) { + self.segment_id.hash(state); + self.selection.hash(state); + self.len.hash(state); + self.alignment.hash(state); + } +} + +impl DeviceBuffer for LazyBufferHandle { + fn as_any(&self) -> &dyn Any { + self + } + + fn len(&self) -> usize { + self.len() + } + + fn alignment(&self) -> Alignment { + self.alignment + } + + fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult { + futures::executor::block_on(async { + let handle = self.materialize().await?; + Ok(handle.try_into_host_sync()?.aligned(alignment)) + }) + } + + fn copy_to_host( + &self, + alignment: Alignment, + ) -> VortexResult>> { + let this = self.clone(); + Ok(async move { + let handle = this.materialize().await?; + Ok(handle.try_into_host_sync()?.aligned(alignment)) + } + .boxed()) + } + + fn slice(&self, range: Range) -> Arc { + Arc::new(LazyBufferHandle::slice(self, range)) + } + + fn filter(&self, ranges: &[Range]) -> VortexResult> { + Ok(Arc::new(LazyBufferHandle::filter(self, ranges))) + } + + fn aligned(self: Arc, alignment: Alignment) -> VortexResult> { + if self.alignment.is_aligned_to(alignment) { + Ok(self) + } else { + Ok(Arc::new(LazyBufferHandle { + source: Arc::clone(&self.source), + segment_id: self.segment_id, + selection: self.selection.clone(), + len: self.len, + alignment, + })) + } + } +} + +/// Build an [`SerializedArray`] with lazy device buffers that defer segment I/O. +/// +/// Each buffer descriptor in the flatbuffer is turned into a [`LazyBufferHandle`] +/// that records the segment source, segment ID, byte range, and alignment but +/// does **not** perform any I/O. The returned [`SerializedArray`] can be decoded into +/// an array tree and manipulated (sliced, filtered, optimized) before the lazy +/// buffers are materialized with [`materialize_recursive`]. +pub fn create_lazy_array_parts( + array_tree: ByteBuffer, + source: Arc, + segment_id: SegmentId, +) -> VortexResult { + use flatbuffers::root; + use vortex_flatbuffers::FlatBuffer; + use vortex_flatbuffers::array as fba; + + let segment_len = source + .segment_len(segment_id) + .ok_or_else(|| vortex_err!("Segment {} length is not available", segment_id))?; + let fb_aligned = FlatBuffer::align_from(array_tree.clone()); + let fb_array = root::(fb_aligned.as_ref())?; + + let mut offset: usize = 0; + let buffers: Vec = fb_array + .buffers() + .unwrap_or_default() + .iter() + .map(|fb_buf| { + offset += fb_buf.padding() as usize; + let buffer_len = fb_buf.length() as usize; + let alignment = Alignment::from_exponent(fb_buf.alignment_exponent()); + + let lazy = + LazyBufferHandle::new(Arc::clone(&source), segment_id, segment_len, alignment) + .slice(offset..offset + buffer_len); + + offset += buffer_len; + BufferHandle::new_device(Arc::new(lazy)) + }) + .collect(); + + SerializedArray::from_flatbuffer_with_buffers(array_tree, buffers) +} + +/// Recursively walk the array tree and materialize any [`LazyBufferHandle`] +/// device buffers by performing I/O, returning a new tree with host-resident +/// buffers. +pub async fn materialize_recursive(array: &ArrayRef) -> VortexResult { + // 1. Recursively materialize children. + let children = array.children(); + let new_children = try_join_all( + children + .iter() + .map(|child| Box::pin(materialize_recursive(child))), + ) + .await?; + let any_child_changed = children + .iter() + .zip(new_children.iter()) + .any(|(child, new_child)| !ArrayRef::ptr_eq(child, new_child)); + let current = if any_child_changed { + array.with_children(new_children)? + } else { + array.clone() + }; + + // 2. Check for lazy device buffers. + let handles = current.buffer_handles(); + let any_lazy = handles.iter().any(|h| { + h.as_device_opt() + .and_then(|d| d.as_any().downcast_ref::()) + .is_some() + }); + if !any_lazy { + return Ok(current); + } + + // 3. Materialize lazy buffers, ensuring proper alignment. + let materialized = try_join_all(handles.iter().map(|handle| async move { + if let Some(lazy) = handle + .as_device_opt() + .and_then(|d| d.as_any().downcast_ref::()) + .cloned() + { + let buf = lazy.materialize().await?; + buf.ensure_aligned(lazy.alignment()) + } else { + Ok(handle.clone()) + } + })) + .await?; + current.with_buffers(materialized) +} + +/// Map a logical byte range into the given set of existing absolute ranges. +/// +/// The `range` is interpreted as an offset into the concatenated output of +/// `existing`. The result contains the corresponding absolute segment byte +/// ranges. +/// +/// # Example +/// +/// Given `existing = [10..20, 30..50]` (30 logical bytes), +/// `slice_into_ranges(existing, 5..25)` returns `Ranges([15..20, 30..40])`. +fn slice_into_ranges(existing: &[Range], range: Range) -> Selection { + let mut result = Vec::new(); + let mut offset: usize = 0; + + for er in existing { + let er_len = er.len(); + let next_offset = offset + er_len; + + // Skip ranges entirely before the slice start. + if next_offset <= range.start { + offset = next_offset; + continue; + } + + // Stop once past the slice end. + if offset >= range.end { + break; + } + + // Intersect [range.start, range.end) with the logical span [offset, next_offset) + // and map back to absolute segment bytes. + let rel_start = range.start.saturating_sub(offset); + let rel_end = (range.end - offset).min(er_len); + result.push((er.start + rel_start)..(er.start + rel_end)); + + offset = next_offset; + } + + match result.len() { + 0 => Selection::Ranges(Arc::from([])), + 1 => Selection::Range(result.remove(0)), + _ => Selection::Ranges(result.into()), + } +} + +fn validate_slice_range(range: &Range, len: usize) { + assert!( + range.start <= range.end && range.end <= len, + "slice range {}..{} exceeds current selection 0..{}", + range.start, + range.end, + len, + ); +} + +fn validate_filter_ranges(ranges: &[Range], len: usize) { + let mut prev_end = 0; + for range in ranges { + assert!( + range.start <= range.end && range.end <= len, + "filter range {}..{} exceeds current selection 0..{}", + range.start, + range.end, + len, + ); + assert!( + range.start >= prev_end, + "filter ranges must be sorted and non-overlapping: {}..{} follows byte {}", + range.start, + range.end, + prev_end, + ); + prev_end = range.end; + } +} + +#[cfg(test)] +mod tests { + use std::iter; + use std::ops::Range; + use std::sync::Arc; + + use futures::FutureExt; + use parking_lot::Mutex; + use vortex_array::buffer::BufferHandle; + use vortex_buffer::Alignment; + use vortex_buffer::ByteBuffer; + use vortex_error::VortexResult; + use vortex_io::runtime::single::block_on; + + use super::*; + use crate::segments::SegmentFuture; + use crate::segments::SegmentId; + use crate::segments::SegmentSource; + use crate::segments::apply_ranges; + + type RangeRequest = Vec>; + type RangeRequestLog = Vec; + type SharedRangeRequestLog = Arc>; + + /// A trivial in-memory segment source for tests. + struct SingleSegment { + buffer: BufferHandle, + ranged_requests: SharedRangeRequestLog, + } + + impl SegmentSource for SingleSegment { + fn segment_len(&self, _id: SegmentId) -> Option { + Some(self.buffer.len()) + } + + fn request(&self, _id: SegmentId) -> SegmentFuture { + let handle = self.buffer.clone(); + async move { Ok(handle) }.boxed() + } + + fn request_ranges(&self, _id: SegmentId, ranges: Vec>) -> SegmentFuture { + self.ranged_requests.lock().push(ranges.clone()); + let handle = self.buffer.clone(); + async move { apply_ranges(handle, &ranges) }.boxed() + } + } + + fn lazy(data: &[u8]) -> LazyBufferHandle { + lazy_with_requests(data).0 + } + + fn lazy_with_requests(data: &[u8]) -> (LazyBufferHandle, SharedRangeRequestLog) { + let buf = BufferHandle::new_host(ByteBuffer::copy_from(data)); + let ranged_requests = Arc::new(Mutex::new(Vec::new())); + ( + LazyBufferHandle::new( + Arc::new(SingleSegment { + buffer: buf, + ranged_requests: Arc::clone(&ranged_requests), + }), + SegmentId::from(0u32), + data.len(), + Alignment::none(), + ), + ranged_requests, + ) + } + + #[test] + fn materialize_all() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[1, 2, 3, 4, 5, 6]).materialize().await?; + assert_eq!(handle.unwrap_host().as_slice(), &[1, 2, 3, 4, 5, 6]); + Ok(()) + }) + } + + #[test] + fn slice_single() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[1, 2, 3, 4, 5, 6]).slice(1..5).materialize().await?; + assert_eq!(handle.unwrap_host().as_slice(), &[2, 3, 4, 5]); + Ok(()) + }) + } + + #[test] + fn slice_of_slice() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[1, 2, 3, 4, 5, 6]) + .slice(1..5) + .slice(1..3) + .materialize() + .await?; + assert_eq!(handle.unwrap_host().as_slice(), &[3, 4]); + Ok(()) + }) + } + + #[test] + fn filter_from_all() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[1, 2, 3, 4, 5, 6]) + .filter(&[0..2, 4..6]) + .materialize() + .await?; + assert_eq!(handle.unwrap_host().as_slice(), &[1, 2, 5, 6]); + Ok(()) + }) + } + + #[test] + fn filter_of_slice() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[1, 2, 3, 4, 5, 6]) + .slice(1..5) + .filter(&[0..1, 2..4]) + .materialize() + .await?; + // slice(1..5) → [2, 3, 4, 5] + // filter([0..1, 2..4]) → [2, 4, 5] + assert_eq!(handle.unwrap_host().as_slice(), &[2, 4, 5]); + Ok(()) + }) + } + + #[test] + fn slice_of_filter() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[10, 20, 30, 40, 50, 60]) + .filter(&[0..2, 4..6]) + .slice(1..3) + .materialize() + .await?; + // filter([0..2, 4..6]) selects [10, 20, 50, 60] (4 logical bytes) + // slice(1..3) → logical bytes 1..3 → [20, 50] + assert_eq!(handle.unwrap_host().as_slice(), &[20, 50]); + Ok(()) + }) + } + + #[test] + fn filter_of_filter() -> VortexResult<()> { + block_on(|_| async { + let handle = lazy(&[10, 20, 30, 40, 50, 60]) + .filter(&[0..2, 4..6]) + .filter(&[0..1, 3..4]) + .materialize() + .await?; + // First filter selects [10, 20, 50, 60] (logical bytes 0..4) + // Second filter selects logical [0..1, 3..4] → [10, 60] + assert_eq!(handle.unwrap_host().as_slice(), &[10, 60]); + Ok(()) + }) + } + + #[test] + fn byte_ranges_none_for_all() { + let lazy = lazy(&[1, 2, 3]); + assert!(lazy.byte_ranges().is_none()); + } + + #[test] + fn len_for_all_is_known_without_materialization() { + let lazy = lazy(&[1, 2, 3, 4, 5]); + assert_eq!(lazy.len(), 5); + assert!(!lazy.is_empty()); + } + + #[test] + fn byte_ranges_after_slice() { + let lazy = lazy(&[1, 2, 3, 4, 5]).slice(1..4); + let expected = [Range { start: 1, end: 4 }]; + assert_eq!(lazy.byte_ranges(), Some(expected.as_slice())); + } + + #[test] + fn byte_ranges_after_filter() { + let lazy = lazy(&[1, 2, 3, 4, 5]).filter(&[0..2, 3..5]); + let expected = [Range { start: 0, end: 2 }, Range { start: 3, end: 5 }]; + assert_eq!(lazy.byte_ranges(), Some(expected.as_slice())); + } + + #[test] + fn materialize_uses_request_ranges_for_sliced_buffer() -> VortexResult<()> { + block_on(|_| async { + let (lazy, ranged_requests) = lazy_with_requests(&[1, 2, 3, 4, 5, 6]); + let handle = lazy.slice(1..5).materialize().await?; + let expected_ranges: RangeRequest = iter::once(1..5).collect(); + assert_eq!(handle.unwrap_host().as_slice(), &[2, 3, 4, 5]); + assert_eq!( + ranged_requests.lock().as_slice(), + std::slice::from_ref(&expected_ranges) + ); + Ok(()) + }) + } + + #[test] + #[should_panic(expected = "slice range 0..10 exceeds current selection 0..5")] + fn slice_from_all_checks_bounds() { + drop(lazy(&[1, 2, 3, 4, 5]).slice(0..10)); + } +} diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 29627e2b4fd..de5a2d794bb 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -21,6 +21,8 @@ use vortex_mask::Mask; use vortex_session::VortexSession; use crate::LayoutReader; +use crate::buffer::create_lazy_array_parts; +use crate::buffer::materialize_recursive; use crate::layouts::SharedArrayFuture; use crate::layouts::flat::FlatLayout; use crate::segments::SegmentSource; @@ -59,30 +61,37 @@ impl FlatReader { let row_count = usize::try_from(self.layout.row_count()).vortex_expect("row count must fit in usize"); - // We create the segment_fut here to ensure we give the segment reader visibility into - // how to prioritize this segment, even if the `array` future has already been initialized. - // This is gross... see the function's TODO for a maybe better solution? - let segment_fut = self.segment_source.request(self.layout.segment_id()); - let ctx = self.layout.array_ctx().clone(); let session = self.session.clone(); let dtype = self.layout.dtype().clone(); let array_tree = self.layout.array_tree().cloned(); - async move { - let segment = segment_fut.await?; - let parts = if let Some(array_tree) = array_tree { - // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. - SerializedArray::from_flatbuffer_and_segment(array_tree, segment)? - } else { - // Parse the flatbuffer from the segment itself. - SerializedArray::try_from(segment)? - }; - parts - .decode(&dtype, row_count, &ctx, &session) - .map_err(Arc::new) + + if let Some(array_tree) = array_tree { + // Build lazy — no segment I/O yet. Buffers are LazyBufferHandles wrapped as + // device buffers that will be materialized after slice/filter/optimize. + let source = Arc::clone(&self.segment_source); + let segment_id = self.layout.segment_id(); + async move { + let parts = create_lazy_array_parts(array_tree, source, segment_id)?; + parts + .decode(&dtype, row_count, &ctx, &session) + .map_err(Arc::new) + } + .boxed() + .shared() + } else { + // Legacy path: segment contains both flatbuffer and data buffers. + let segment_fut = self.segment_source.request(self.layout.segment_id()); + async move { + let segment = segment_fut.await?; + let parts = SerializedArray::try_from(segment)?; + parts + .decode(&dtype, row_count, &ctx, &session) + .map_err(Arc::new) + } + .boxed() + .shared() } - .boxed() - .shared() } } @@ -151,6 +160,7 @@ impl LayoutReader for FlatReader { // after this. let array = array.apply(&expr)?; let array = array.filter(mask.clone())?; + let array = materialize_recursive(&array).await?; let mut ctx = session.create_execution_ctx(); let array_mask = array.execute::(&mut ctx)?; @@ -158,6 +168,7 @@ impl LayoutReader for FlatReader { } else { // Run over the full array, with a simpler bitand at the end. let array = array.apply(&expr)?; + let array = materialize_recursive(&array).await?; let mut ctx = session.create_execution_ctx(); let array_mask = array.execute::(&mut ctx)?; @@ -212,6 +223,9 @@ impl LayoutReader for FlatReader { // Evaluate the projection expression. array = array.apply(&expr)?; + // Materialize any remaining lazy device buffers before returning. + array = materialize_recursive(&array).await?; + Ok(array) } .boxed()) diff --git a/vortex-layout/src/layouts/flat/writer.rs b/vortex-layout/src/layouts/flat/writer.rs index 0d804ee6fb8..d6e06672eb9 100644 --- a/vortex-layout/src/layouts/flat/writer.rs +++ b/vortex-layout/src/layouts/flat/writer.rs @@ -207,6 +207,7 @@ mod tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::struct_::StructArrayExt; + use vortex_array::assert_arrays_eq; use vortex_array::builders::ArrayBuilder; use vortex_array::builders::VarBinViewBuilder; use vortex_array::dtype::DType; @@ -418,7 +419,7 @@ mod tests { } #[test] - fn flat_invalid_array_fails() -> VortexResult<()> { + fn flat_filter_array_reduces_to_primitive() -> VortexResult<()> { block_on(|handle| async { let session = SESSION.clone().with_handle(handle); let prim: PrimitiveArray = (0..10).collect(); @@ -426,32 +427,32 @@ mod tests { let ctx = ArrayContext::empty(); - // Write the array into a byte buffer. - let (layout, _segments) = { - let segments = Arc::new(TestSegments::default()); - let (ptr, eof) = SequenceId::root().split(); - // Disallow all encodings so filter arrays fail normalization immediately. - let allowed = HashSet::default(); - let layout = FlatLayoutStrategy::default() - .with_allow_encodings(allowed) - .write_stream( - ctx, - Arc::::clone(&segments), - filter.into_array().to_array_stream().sequenced(ptr), - eof, - &session, - ) - .await; + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + let allowed = HashSet::default(); + let layout = FlatLayoutStrategy::default() + .with_allow_encodings(allowed) + .write_stream( + ctx, + Arc::::clone(&segments), + filter.into_array().to_array_stream().sequenced(ptr), + eof, + &session, + ) + .await?; - (layout, segments) - }; + let result = layout + .new_reader("".into(), segments, &SESSION)? + .projection_evaluation( + &(0..layout.row_count()), + &root(), + MaskFuture::new_true(layout.row_count().try_into()?), + )? + .await?; - let err = layout.expect_err("expected error"); - assert!( - err.to_string() - .contains("normalize forbids encoding (vortex.filter)"), - "unexpected error: {err}" - ); + let expected = + PrimitiveArray::new(buffer![2i32, 3], Validity::NonNullable).into_array(); + assert_arrays_eq!(result, expected); Ok(()) }) diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index be1bad67236..b6cd2877a6d 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +pub mod buffer; pub mod layouts; pub use children::*; diff --git a/vortex-layout/src/segments/cache.rs b/vortex-layout/src/segments/cache.rs index 37675c19d2a..b540ebc8db1 100644 --- a/vortex-layout/src/segments/cache.rs +++ b/vortex-layout/src/segments/cache.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; @@ -21,6 +22,7 @@ use vortex_metrics::MetricsRegistry; use crate::segments::SegmentFuture; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::apply_ranges; /// A cache for storing and retrieving individual segment data. #[async_trait] @@ -136,16 +138,20 @@ impl SegmentCacheSourceAdapter { } impl SegmentSource for SegmentCacheSourceAdapter { + fn segment_len(&self, id: SegmentId) -> Option { + self.source.segment_len(id) + } + fn request(&self, id: SegmentId) -> SegmentFuture { let cache = Arc::clone(&self.cache); - let delegate = self.source.request(id); + let source = Arc::clone(&self.source); async move { if let Ok(Some(segment)) = cache.get(id).await { tracing::debug!("Resolved segment {} from cache", id); return Ok(BufferHandle::new_host(segment)); } - let result = delegate.await?; + let result = source.request(id).await?; // Cache only CPU buffers; device buffers are not cached. if let Some(buffer) = result.as_host_opt() && let Err(e) = cache.put(id, buffer.clone()).await @@ -156,4 +162,89 @@ impl SegmentSource for SegmentCacheSourceAdapter { } .boxed() } + + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + let cache = Arc::clone(&self.cache); + let source = Arc::clone(&self.source); + + async move { + if let Ok(Some(segment)) = cache.get(id).await { + tracing::debug!("Resolved segment {} from cache for ranged read", id); + return apply_ranges(BufferHandle::new_host(segment), &ranges); + } + source.request_ranges(id, ranges).await + } + .boxed() + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use futures::FutureExt; + + use super::*; + + struct FixedCache(ByteBuffer); + + #[async_trait] + impl SegmentCache for FixedCache { + async fn get(&self, _id: SegmentId) -> VortexResult> { + Ok(Some(self.0.clone())) + } + + async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> { + Ok(()) + } + } + + #[derive(Default)] + struct CountingSource { + requests: AtomicUsize, + ranged_requests: AtomicUsize, + } + + impl SegmentSource for CountingSource { + fn segment_len(&self, _id: SegmentId) -> Option { + Some(4) + } + + fn request(&self, _id: SegmentId) -> SegmentFuture { + self.requests.fetch_add(1, Ordering::Relaxed); + async { Ok(BufferHandle::new_host(ByteBuffer::from(vec![9, 9, 9, 9]))) }.boxed() + } + + fn request_ranges(&self, _id: SegmentId, ranges: Vec>) -> SegmentFuture { + self.ranged_requests.fetch_add(1, Ordering::Relaxed); + async move { + let full = BufferHandle::new_host(ByteBuffer::from(vec![9, 9, 9, 9])); + apply_ranges(full, &ranges) + } + .boxed() + } + } + + #[tokio::test] + async fn cache_hit_skips_underlying_requests() { + let source = Arc::new(CountingSource::default()); + let adapter = SegmentCacheSourceAdapter::new( + Arc::new(FixedCache(ByteBuffer::from(vec![1, 2, 3, 4]))), + Arc::clone(&source) as Arc, + ); + + let full = adapter.request(SegmentId::from(0)).await.unwrap(); + assert_eq!(full.unwrap_host().as_slice(), &[1, 2, 3, 4]); + + let requested_ranges = std::iter::once(1..3).collect(); + let ranges = adapter + .request_ranges(SegmentId::from(0), requested_ranges) + .await + .unwrap(); + assert_eq!(ranges.unwrap_host().as_slice(), &[2, 3]); + + assert_eq!(source.requests.load(Ordering::Relaxed), 0); + assert_eq!(source.ranged_requests.load(Ordering::Relaxed), 0); + } } diff --git a/vortex-layout/src/segments/shared.rs b/vortex-layout/src/segments/shared.rs index c794daf608e..35f688c53a2 100644 --- a/vortex-layout/src/segments/shared.rs +++ b/vortex-layout/src/segments/shared.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; use std::sync::Arc; use futures::FutureExt; @@ -22,11 +23,17 @@ use crate::segments::SegmentSource; /// request. pub struct SharedSegmentSource { inner: S, - in_flight: DashMap>, + in_flight: DashMap>, } type SharedSegmentFuture = BoxFuture<'static, SharedVortexResult>; +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +enum RequestKey { + Full(SegmentId), + Ranges(SegmentId, Vec>), +} + impl SharedSegmentSource { /// Create a new `SharedSegmentSource` wrapping the provided inner source. pub fn new(inner: S) -> Self { @@ -38,9 +45,14 @@ impl SharedSegmentSource { } impl SegmentSource for SharedSegmentSource { + fn segment_len(&self, id: SegmentId) -> Option { + self.inner.segment_len(id) + } + fn request(&self, id: SegmentId) -> SegmentFuture { loop { - match self.in_flight.entry(id) { + let key = RequestKey::Full(id); + match self.in_flight.entry(key) { Entry::Occupied(e) => { if let Some(shared_future) = e.get().upgrade() { return shared_future.map_err(VortexError::from).boxed(); @@ -61,6 +73,35 @@ impl SegmentSource for SharedSegmentSource { } } } + + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + loop { + let key = RequestKey::Ranges(id, ranges.clone()); + match self.in_flight.entry(key) { + Entry::Occupied(e) => { + if let Some(shared_future) = e.get().upgrade() { + return shared_future.map_err(VortexError::from).boxed(); + } else { + e.remove(); + } + } + Entry::Vacant(e) => { + let future = self + .inner + .request_ranges(id, ranges) + .map_err(Arc::new) + .boxed() + .shared(); + e.insert( + future + .downgrade() + .vortex_expect("just created, cannot be polled to completion"), + ); + return future.map_err(VortexError::from).boxed(); + } + } + } + } } #[cfg(test)] @@ -80,13 +121,28 @@ mod tests { struct CountingSegmentSource { segments: TestSegments, request_count: Arc, + range_request_count: Arc, } impl SegmentSource for CountingSegmentSource { + fn segment_len(&self, id: SegmentId) -> Option { + self.segments.segment_len(id) + } + fn request(&self, id: SegmentId) -> SegmentFuture { self.request_count.fetch_add(1, Ordering::SeqCst); self.segments.request(id) } + + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + self.range_request_count.fetch_add(1, Ordering::SeqCst); + let segments = self.segments.clone(); + async move { + let buffer = segments.request(id).await?; + crate::segments::apply_ranges(buffer, &ranges) + } + .boxed() + } } #[tokio::test] @@ -118,6 +174,26 @@ mod tests { assert_eq!(source.request_count.load(Ordering::Relaxed), 1); } + #[tokio::test] + async fn test_shared_source_deduplicates_concurrent_ranged_requests() { + let source = CountingSegmentSource::default(); + + let data = ByteBuffer::from(vec![1, 2, 3, 4, 5, 6]); + let seq_id = SequenceId::root().downgrade(); + source.segments.write(seq_id, vec![data]).await.unwrap(); + + let shared_source = SharedSegmentSource::new(source.clone()); + let id = SegmentId::from(0); + let ranges = vec![1..3, 4..6]; + let future1 = shared_source.request_ranges(id, ranges.clone()); + let future2 = shared_source.request_ranges(id, ranges); + + let (result1, result2) = futures::join!(future1, future2); + assert_eq!(result1.unwrap().unwrap_host().as_slice(), &[2, 3, 5, 6]); + assert_eq!(result2.unwrap().unwrap_host().as_slice(), &[2, 3, 5, 6]); + assert_eq!(source.range_request_count.load(Ordering::Relaxed), 1); + } + #[tokio::test] async fn test_shared_source_handles_dropped_futures() { let source = CountingSegmentSource::default(); diff --git a/vortex-layout/src/segments/source.rs b/vortex-layout/src/segments/source.rs index a48a79b2889..dd1cbb7c6c5 100644 --- a/vortex-layout/src/segments/source.rs +++ b/vortex-layout/src/segments/source.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; + +use futures::FutureExt; use futures::future::BoxFuture; use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; @@ -9,8 +12,56 @@ use crate::segments::SegmentId; /// Static future resolving to a segment byte buffer. pub type SegmentFuture = BoxFuture<'static, VortexResult>; +/// Apply a set of segment-relative byte ranges to an already-resolved segment buffer. +pub(crate) fn apply_ranges( + buffer: BufferHandle, + ranges: &[Range], +) -> VortexResult { + match ranges { + [] => buffer.filter(&[]), + [range] if range.start.is_multiple_of(*buffer.alignment()) => { + Ok(buffer.slice(range.clone())) + } + [range] => buffer.filter(std::slice::from_ref(range)), + _ => buffer.filter(ranges), + } +} + /// A trait for providing segment data to a [`crate::LayoutReader`]. pub trait SegmentSource: 'static + Send + Sync { + /// Return the full length of a segment in bytes if known without issuing I/O. + fn segment_len(&self, _id: SegmentId) -> Option { + None + } + /// Request a segment, returning a future that will eventually resolve to the segment data. fn request(&self, id: SegmentId) -> SegmentFuture; + + /// Request a set of segment-relative byte ranges and return them packed into one contiguous buffer. + /// + /// Implementations may satisfy this by issuing multiple underlying reads, but the returned + /// [`BufferHandle`] must contain the concatenated bytes in the same order as `ranges`. + /// + /// Follow-up: push scatter/gather support into the lower I/O API so sources can fill a + /// caller-owned destination buffer directly and avoid the gather copy that this interface may + /// currently require. + /// + /// The intended split is: + /// - this API continues to describe the logical bytes to materialize; + /// - sources may normalize/merge ranges before issuing physical reads; + /// - lower I/O backends may optionally expose vectored reads into output slices. + /// + /// That lets local files eventually use `preadv`/`preadv2`-style reads for sparse ranges, + /// while remote/object-store backends can still choose a smaller number of coalesced + /// contiguous reads when that is cheaper. A later follow-up should also thread through + /// alignment requirements for `DIRECT_IO`, since that may force padded physical reads even + /// when the logical request stays sparse. + fn request_ranges(&self, id: SegmentId, ranges: Vec>) -> SegmentFuture { + let future = self.request(id); + async move { + let buffer = future.await?; + apply_ranges(buffer, &ranges) + } + .boxed() + } } diff --git a/vortex-layout/src/segments/test.rs b/vortex-layout/src/segments/test.rs index d880d15cc1a..44ce2fb0da0 100644 --- a/vortex-layout/src/segments/test.rs +++ b/vortex-layout/src/segments/test.rs @@ -26,6 +26,10 @@ pub struct TestSegments { } impl SegmentSource for TestSegments { + fn segment_len(&self, id: SegmentId) -> Option { + self.segments.lock().get(*id as usize).map(ByteBuffer::len) + } + fn request(&self, id: SegmentId) -> SegmentFuture { let buffer = self.segments.lock().get(*id as usize).cloned(); async move {