Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ needless_range_loop = "allow"
or_fun_call = "deny"
panic = "deny"
# panic_in_result_fn = "deny" -- we cannot disable this for tests to use assertions
clone_on_ref_ptr = "warn"
redundant_clone = "deny"
same_name_method = "deny"
tests_outside_test_module = "deny"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/compress-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn chunked_to_vec_record_batch(
.map(|array| {
// TODO(connor)[ListView]: The rust Parquet implementation does not support writing
// `ListView` to Parquet files yet.
let converted_array = recursive_list_from_list_view(array.clone())?;
let converted_array = recursive_list_from_list_view(Arc::clone(&array))?;
Ok(RecordBatch::try_from(converted_array.as_ref())?)
})
.collect::<anyhow::Result<Vec<_>>>()?;
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/compress-bench/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Compressor for ParquetCompressor {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(&builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -69,7 +69,7 @@ impl Compressor for ParquetCompressor {
// First compress to get the bytes we'll decompress
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(&builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down
6 changes: 3 additions & 3 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn make_object_store(
.build()?,
);
session
.register_object_store(&Url::parse(&format!("s3://{bucket_name}/"))?, s3.clone());
.register_object_store(&Url::parse(&format!("s3://{bucket_name}/"))?, Arc::clone(&s3));
Ok(s3)
}
"gs" => {
Expand All @@ -93,12 +93,12 @@ pub fn make_object_store(
.build()?,
);
session
.register_object_store(&Url::parse(&format!("gs://{bucket_name}/"))?, gcs.clone());
.register_object_store(&Url::parse(&format!("gs://{bucket_name}/"))?, Arc::clone(&gcs));
Ok(gcs)
}
_ => {
let fs = Arc::new(LocalFileSystem::default());
session.register_object_store(&Url::parse("file:/")?, fs.clone());
session.register_object_store(&Url::parse("file:/")?, Arc::clone(&fs));
Ok(fs)
}
}
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async fn main() -> anyhow::Result<()> {
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
plans_mut.push((query_idx, *format, Arc::clone(&plan)));
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
let pattern = benchmark.pattern(table.name, format);
let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern)?;

let mut listing_options = ListingOptions::new(file_format.clone())
let mut listing_options = ListingOptions::new(Arc::clone(&file_format))
.with_session_config_options(session.state().config());
if benchmark.dataset_name() == "polarsignals" && format == Format::Parquet {
// Work around a DataFusion bug (fixed in 53.0.0) where the
Expand Down Expand Up @@ -305,7 +305,7 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
.object_store(table_url.object_store())?;

let fs: vortex::io::filesystem::FileSystemRef =
Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle()));
Arc::new(ObjectStoreFileSystem::new(Arc::clone(&store), SESSION.handle()));
let base_prefix = benchmark_base.path().trim_start_matches('/').to_string();
let fs = fs.with_prefix(base_prefix);

Expand Down Expand Up @@ -416,7 +416,7 @@ pub async fn execute_query(
.create_physical_plan()
.with_labelset(get_labelset_from_global())
.await?;
let result = collect(plan.clone(), task_ctx)
let result = collect(Arc::clone(&plan), task_ctx)
.with_labelset(get_labelset_from_global())
.await?;

Expand Down
5 changes: 3 additions & 2 deletions benchmarks/lance-bench/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use vortex_bench::compress::Compressor;

use crate::convert::convert_utf8view_batch;
use crate::convert::convert_utf8view_schema;
use std::sync::Arc;

/// Read a Lance dataset and decompress it back into RecordBatches.
pub async fn lance_decompress_read(path: &str) -> anyhow::Result<usize> {
Expand Down Expand Up @@ -92,7 +93,7 @@ impl Compressor for LanceCompressor {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(&builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down Expand Up @@ -131,7 +132,7 @@ impl Compressor for LanceCompressor {
// First compress to get the Lance dataset
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(&builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/lance-bench/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Iterator for ParquetFilesIterator {

impl RecordBatchReader for ParquetFilesIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
}

Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn convert_parquet_to_lance<'p>(
// Get schema from the first Parquet file
let first_file = File::open(&parquet_files[0])?;
let first_builder = ParquetRecordBatchReaderBuilder::try_new(first_file)?;
let schema = first_builder.schema().clone();
let schema = Arc::clone(&first_builder.schema());

// Create a streaming iterator that reads from all Parquet files
let batch_iter = ParquetFilesIterator::new(parquet_files, schema)?;
Expand Down Expand Up @@ -237,7 +237,7 @@ pub fn convert_utf8view_batch(batch: RecordBatch) -> anyhow::Result<RecordBatch>
// Cast Utf8View to Utf8.
cast(column, &DataType::Utf8)?
} else {
column.clone()
Arc::clone(&column)
};
new_columns.push(new_column);
}
Expand Down Expand Up @@ -277,6 +277,6 @@ impl Iterator for ConvertingParquetFilesIterator {

impl RecordBatchReader for ConvertingParquetFilesIterator {
fn schema(&self) -> SchemaRef {
self.converted_schema.clone()
Arc::clone(&self.converted_schema)
}
}
16 changes: 8 additions & 8 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,14 @@ impl ALPArray {
fn make_slots(encoded: &ArrayRef, patches: &Option<Patches>) -> Vec<Option<ArrayRef>> {
let (patch_indices, patch_values, patch_chunk_offsets) = match patches {
Some(p) => (
Some(p.indices().clone()),
Some(p.values().clone()),
Some(Arc::clone(&p.indices())),
Some(Arc::clone(&p.values())),
p.chunk_offsets().clone(),
),
None => (None, None, None),
};
vec![
Some(encoded.clone()),
Some(Arc::clone(&encoded)),
patch_indices,
patch_values,
patch_chunk_offsets,
Expand Down Expand Up @@ -501,8 +501,8 @@ impl ALPArray {
Patches::new_unchecked(
self.encoded().len(),
patch_offset,
indices.clone(),
values.clone(),
Arc::clone(&indices),
Arc::clone(&values),
self.slots[PATCH_CHUNK_OFFSETS_SLOT].clone(),
self.patch_offset_within_chunk,
)
Expand Down Expand Up @@ -836,15 +836,15 @@ mod tests {
let patches_without_chunk_offsets = Patches::new(
original_patches.array_len(),
original_patches.offset(),
original_patches.indices().clone(),
original_patches.values().clone(),
Arc::clone(&original_patches.indices()),
Arc::clone(&original_patches.values()),
None, // NO chunk_offsets - this triggers the bug!
)
.unwrap();

// Build a new ALPArray with the same encoded data but patches without chunk_offsets.
let alp_without_chunk_offsets = ALPArray::new(
normally_encoded.encoded().clone(),
Arc::clone(&normally_encoded.encoded()),
normally_encoded.exponents(),
Some(patches_without_chunk_offsets),
);
Expand Down
3 changes: 2 additions & 1 deletion encodings/alp/src/alp/compute/between.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::ALP;
use crate::ALPArray;
use crate::ALPFloat;
use crate::match_each_alp_float_ptype;
use std::sync::Arc;

impl BetweenReduce for ALP {
fn between(
Expand Down Expand Up @@ -84,7 +85,7 @@ where
upper_strict,
};

array.encoded().clone().between(
Arc::clone(&array.encoded()).between(
ConstantArray::new(Scalar::primitive(lower_enc, nullability), array.len()).into_array(),
ConstantArray::new(Scalar::primitive(upper_enc, nullability), array.len()).into_array(),
options,
Expand Down
3 changes: 2 additions & 1 deletion encodings/alp/src/alp/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex_error::VortexResult;

use crate::alp::ALP;
use crate::alp::ALPArray;
use std::sync::Arc;

impl CastReduce for ALP {
fn cast(array: &ALPArray, dtype: &DType) -> VortexResult<Option<ArrayRef>> {
Expand All @@ -34,7 +35,7 @@ impl CastReduce for ALP {
Patches::new(
p.array_len(),
p.offset(),
p.indices().clone(),
Arc::clone(&p.indices()),
p.values().cast(dtype.clone())?,
p.chunk_offsets().clone(),
)
Expand Down
5 changes: 3 additions & 2 deletions encodings/alp/src/alp/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ use vortex_error::VortexResult;

use crate::ALP;
use crate::ALPArray;
use std::sync::Arc;

impl MaskReduce for ALP {
fn mask(array: &ALPArray, mask: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
// Masking sparse patches requires reading indices, fall back to kernel.
if array.patches().is_some() {
return Ok(None);
}
let masked_encoded = array.encoded().clone().mask(mask.clone())?;
let masked_encoded = Arc::clone(&array.encoded()).mask(Arc::clone(&mask))?;
Ok(Some(
ALPArray::new(masked_encoded, array.exponents(), None).into_array(),
))
Expand All @@ -33,7 +34,7 @@ impl MaskKernel for ALP {
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let vortex_mask = Validity::Array(mask.not()?).execute_mask(array.len(), ctx)?;
let masked_encoded = array.encoded().clone().mask(mask.clone())?;
let masked_encoded = Arc::clone(&array.encoded()).mask(Arc::clone(&mask))?;
let masked_patches = array
.patches()
.map(|p| p.mask(&vortex_mask, ctx))
Expand Down
13 changes: 7 additions & 6 deletions encodings/alp/src/alp/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::ALPArray;
use crate::ALPFloat;
use crate::Exponents;
use crate::match_each_alp_float_ptype;
use std::sync::Arc;

/// Decompresses an ALP-encoded array using `to_primitive` (legacy path).
///
Expand All @@ -32,9 +33,9 @@ pub fn decompress_into_array(
&& let Some(chunk_offsets) = patches.chunk_offsets()
{
let prim_encoded = encoded.execute::<PrimitiveArray>(ctx)?;
let patches_chunk_offsets = chunk_offsets.clone().execute::<PrimitiveArray>(ctx)?;
let patches_indices = patches.indices().clone().execute::<PrimitiveArray>(ctx)?;
let patches_values = patches.values().clone().execute::<PrimitiveArray>(ctx)?;
let patches_chunk_offsets = Arc::clone(&chunk_offsets).execute::<PrimitiveArray>(ctx)?;
let patches_indices = Arc::clone(&patches.indices()).execute::<PrimitiveArray>(ctx)?;
let patches_values = Arc::clone(&patches.values()).execute::<PrimitiveArray>(ctx)?;
Ok(decompress_chunked_core(
prim_encoded,
exponents,
Expand Down Expand Up @@ -65,9 +66,9 @@ pub fn execute_decompress(array: ALPArray, ctx: &mut ExecutionCtx) -> VortexResu
{
// TODO(joe): have into parts.
let encoded = encoded.execute::<PrimitiveArray>(ctx)?;
let patches_chunk_offsets = chunk_offsets.clone().execute::<PrimitiveArray>(ctx)?;
let patches_indices = patches.indices().clone().execute::<PrimitiveArray>(ctx)?;
let patches_values = patches.values().clone().execute::<PrimitiveArray>(ctx)?;
let patches_chunk_offsets = Arc::clone(&chunk_offsets).execute::<PrimitiveArray>(ctx)?;
let patches_indices = Arc::clone(&patches.indices()).execute::<PrimitiveArray>(ctx)?;
let patches_values = Arc::clone(&patches.values()).execute::<PrimitiveArray>(ctx)?;
Ok(decompress_chunked_core(
encoded,
exponents,
Expand Down
16 changes: 8 additions & 8 deletions encodings/alp/src/alp_rd/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ impl VTable for ALPRD {
Some(Patches::new(
old.array_len(),
old.offset(),
indices.clone(),
values.clone(),
Arc::clone(&indices),
Arc::clone(&values),
slots[LP_PATCH_CHUNK_OFFSETS_SLOT].clone(),
)?)
}
Expand Down Expand Up @@ -492,15 +492,15 @@ impl ALPRDArray {
) -> Vec<Option<ArrayRef>> {
let (pi, pv, pco) = match patches {
Some(p) => (
Some(p.indices().clone()),
Some(p.values().clone()),
Some(Arc::clone(&p.indices())),
Some(Arc::clone(&p.values())),
p.chunk_offsets().clone(),
),
None => (None, None, None),
};
vec![
Some(left_parts.clone()),
Some(right_parts.clone()),
Some(Arc::clone(&left_parts)),
Some(Arc::clone(&right_parts)),
pi,
pv,
pco,
Expand Down Expand Up @@ -569,8 +569,8 @@ impl ALPRDArray {
// Update both the patches and the corresponding slots to keep them in sync.
let (pi, pv, pco) = match &patches {
Some(p) => (
Some(p.indices().clone()),
Some(p.values().clone()),
Some(Arc::clone(&p.indices())),
Some(Arc::clone(&p.values())),
p.chunk_offsets().clone(),
),
None => (None, None, None),
Expand Down
3 changes: 2 additions & 1 deletion encodings/alp/src/alp_rd/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex_error::VortexResult;

use crate::alp_rd::ALPRD;
use crate::alp_rd::ALPRDArray;
use std::sync::Arc;

impl CastReduce for ALPRD {
fn cast(array: &ALPRDArray, dtype: &DType) -> VortexResult<Option<ArrayRef>> {
Expand All @@ -32,7 +33,7 @@ impl CastReduce for ALPRD {
dtype.clone(),
new_left_parts,
array.left_parts_dictionary().clone(),
array.right_parts().clone(),
Arc::clone(&array.right_parts()),
array.right_bit_width(),
array.left_parts_patches(),
)?
Expand Down
5 changes: 3 additions & 2 deletions encodings/alp/src/alp_rd/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ use vortex_error::VortexResult;

use crate::ALPRD;
use crate::ALPRDArray;
use std::sync::Arc;

impl MaskReduce for ALPRD {
fn mask(array: &ALPRDArray, mask: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
let masked_left_parts = MaskExpr.try_new_array(
array.left_parts().len(),
EmptyOptions,
[array.left_parts().clone(), mask.clone()],
[Arc::clone(&array.left_parts()), Arc::clone(&mask)],
)?;
Ok(Some(
ALPRDArray::try_new(
array.dtype().as_nullable(),
masked_left_parts,
array.left_parts_dictionary().clone(),
array.right_parts().clone(),
Arc::clone(&array.right_parts()),
array.right_bit_width(),
array.left_parts_patches(),
)?
Expand Down
5 changes: 3 additions & 2 deletions encodings/alp/src/alp_rd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vortex_array::IntoArray;
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_fastlanes::bitpack_compress::bitpack_encode_unchecked;
use std::sync::Arc;

mod array;
mod compute;
Expand Down Expand Up @@ -314,8 +315,8 @@ pub fn alp_rd_decode<T: ALPRDFloat>(
}

// Overwrite exception positions with their true left bit-patterns.
let indices = patches.indices().clone().execute::<PrimitiveArray>(ctx)?;
let patch_values = patches.values().clone().execute::<PrimitiveArray>(ctx)?;
let indices = Arc::clone(&patches.indices()).execute::<PrimitiveArray>(ctx)?;
let patch_values = Arc::clone(&patches.values()).execute::<PrimitiveArray>(ctx)?;
alp_rd_apply_patches(&mut left_parts, &indices, &patch_values, patches.offset());

// Reconstruct floats by shifting each decoded left value into the MSBs
Expand Down
Loading
Loading