Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ChunkError {
UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),

#[error(transparent)]
WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
WrongDatatypeError(#[from] re_arrow_util::WrongDatatypeError),

#[error(transparent)]
MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_sorbet/src/chunk_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use arrow::array::{
ArrayRef as ArrowArrayRef, AsArray as _, FixedSizeBinaryArray, RecordBatch as ArrowRecordBatch,
};
use arrow::datatypes::Fields as ArrowFields;
use re_arrow_util::WrongDatatypeError;
use re_log_types::EntityPath;
use re_types_core::ChunkId;

use crate::{ChunkSchema, RowIdColumnDescriptor, SorbetBatch, SorbetError, WrongDatatypeError};
use crate::{ChunkSchema, RowIdColumnDescriptor, SorbetBatch, SorbetError};

#[derive(thiserror::Error, Debug)]
pub enum MismatchedChunkSchemaError {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_sorbet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum SorbetError {
UnsupportedTimeType(#[from] crate::UnsupportedTimeType),

#[error(transparent)]
WrongDatatypeError(#[from] crate::WrongDatatypeError),
WrongDatatypeError(#[from] re_arrow_util::WrongDatatypeError),

#[error(transparent)]
ArrowError(#[from] ArrowError),
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_sorbet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub use self::metadata::{
ArrowBatchMetadata, ArrowFieldMetadata, MetadataExt, MissingFieldMetadata, MissingMetadataKey,
};
pub use self::migrations::{migrate_record_batch, migrate_schema_ref};
pub use self::row_id_column_descriptor::{RowIdColumnDescriptor, WrongDatatypeError};
pub use self::row_id_column_descriptor::RowIdColumnDescriptor;
pub use self::schema_builder::SchemaBuilder;
pub use self::selectors::{
ColumnSelector, ColumnSelectorParseError, ComponentColumnSelector, TimeColumnSelector,
Expand Down
21 changes: 5 additions & 16 deletions crates/store/re_sorbet/src/row_id_column_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField};
use re_arrow_util::WrongDatatypeError;
use re_types_core::{Loggable as _, RowId};

use crate::MetadataExt as _;

#[derive(thiserror::Error, Debug)]
#[error("{0}")]
pub struct WrongDatatypeError(String);

/// Describes the schema of the primary [`RowId`] column.
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RowIdColumnDescriptor {
Expand Down Expand Up @@ -78,17 +75,9 @@ impl TryFrom<&ArrowField> for RowIdColumnDescriptor {
type Error = WrongDatatypeError;

fn try_from(field: &ArrowField) -> Result<Self, Self::Error> {
let actual_datatype = field.data_type();
let expected_datatype = RowId::arrow_datatype();
if actual_datatype == &expected_datatype {
Ok(Self {
is_sorted: field.metadata().get_bool("rerun:is_sorted"),
})
} else {
Err(WrongDatatypeError(format!(
"Expected a RowId column with datatype {expected_datatype}, but column {:?} has datatype {actual_datatype}",
field.name()
)))
}
WrongDatatypeError::ensure_datatype(field, &RowId::arrow_datatype())?;
Ok(Self {
is_sorted: field.metadata().get_bool("rerun:is_sorted"),
})
}
}
5 changes: 3 additions & 2 deletions crates/store/re_types_core/src/chunk_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ impl ChunkId {
Ok(bytemuck::cast_slice(array.value_data()))
} else {
Err(WrongDatatypeError {
expected: Self::arrow_datatype(),
actual: array.data_type().clone(),
column_name: None,
expected: Self::arrow_datatype().into(),
actual: array.data_type().clone().into(),
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions crates/utils/re_arrow_util/src/arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub trait ArrowArrayDowncastRef<'a>: 'a {
/// Similar to `downcast_array_ref`, but returns an error in case the downcast
/// returns `None`.
fn try_downcast_array_ref<T: Array + 'static>(self) -> Result<&'a T, ArrowError>;

/// Similar to `downcast_array_ref`, but returns an error in case the downcast
/// returns `None`.
fn try_downcast_array<T: Array + Clone + 'static>(self) -> Result<T, ArrowError>;
}

impl<'a> ArrowArrayDowncastRef<'a> for &'a dyn Array {
Expand All @@ -36,6 +40,12 @@ impl<'a> ArrowArrayDowncastRef<'a> for &'a dyn Array {
))
})
}

/// Similar to `downcast_array_ref`, but returns an error in case the downcast
/// returns `None`.
fn try_downcast_array<T: Array + Clone + 'static>(self) -> Result<T, ArrowError> {
Ok(self.try_downcast_array_ref::<T>()?.clone())
}
}

impl<'a> ArrowArrayDowncastRef<'a> for &'a ArrayRef {
Expand All @@ -52,6 +62,12 @@ impl<'a> ArrowArrayDowncastRef<'a> for &'a ArrayRef {
))
})
}

/// Similar to `downcast_array_ref`, but returns an error in case the downcast
/// returns `None`.
fn try_downcast_array<T: Array + Clone + 'static>(self) -> Result<T, ArrowError> {
Ok(self.try_downcast_array_ref::<T>()?.clone())
}
}

// ---------------------------------------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions crates/utils/re_arrow_util/src/batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use itertools::Itertools as _;

use crate::MissingColumnError;

// ---

/// Concatenates the given [`RecordBatch`]es, regardless of their respective schema.
Expand Down Expand Up @@ -76,6 +78,9 @@ pub fn concat_polymorphic_batches(batches: &[RecordBatch]) -> arrow::error::Resu
}

pub trait RecordBatchExt {
/// Helper for [`RecordBatchExt`].
fn inner(&self) -> &RecordBatch;

/// Returns a new [`RecordBatch`] where all *top-level* fields are nullable.
///
/// ⚠️ This is *not* recursive! E.g. for a `StructArray` containing 2 fields, only the field
Expand Down Expand Up @@ -112,9 +117,20 @@ pub trait RecordBatchExt {

/// Rename columns based on the provided (original, new) pairs.
fn rename_columns(self, renames: &[(&str, &str)]) -> arrow::error::Result<RecordBatch>;

/// Get a column by name, with a nice error message otherwise
fn try_get_column(&self, name: &str) -> Result<&ArrayRef, MissingColumnError> {
self.inner()
.column_by_name(name)
.ok_or_else(|| MissingColumnError::new(name))
}
}

impl RecordBatchExt for RecordBatch {
fn inner(&self) -> &RecordBatch {
self
}

fn make_nullable(&self) -> RecordBatch {
let schema = Schema::new_with_metadata(
self.schema()
Expand Down
55 changes: 51 additions & 4 deletions crates/utils/re_arrow_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,63 @@ mod tests {

// ----------------------------------------------------------------

/// Error used when a column is missing from a record batcj
#[derive(Debug, Clone, thiserror::Error)]
pub struct MissingColumnError {
pub name: String,
}

impl MissingColumnError {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}

impl std::fmt::Display for MissingColumnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { name } = self;
write!(f, "Missing column: {name:?}")
}
}

// ----------------------------------------------------------------

/// Error used for arrow datatype mismatch.
#[derive(Debug, Clone, thiserror::Error)]
pub struct WrongDatatypeError {
pub expected: DataType,
pub actual: DataType,
pub column_name: Option<String>,
pub expected: Box<DataType>,
pub actual: Box<DataType>,
}

impl WrongDatatypeError {
pub fn ensure_datatype(field: &Field, expected: &DataType) -> Result<(), Self> {
if field.data_type() == expected {
Ok(())
} else {
Err(Self {
column_name: Some(field.name().to_owned()),
expected: expected.clone().into(),
actual: field.data_type().clone().into(),
})
}
}
}

impl std::fmt::Display for WrongDatatypeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { expected, actual } = self;
write!(f, "Wrong datatype: expected {expected}, got {actual}")
let Self {
column_name,
expected,
actual,
} = self;
if let Some(column_name) = column_name {
write!(
f,
"Expected column {column_name:?} to be {expected}, got {actual}"
)
} else {
write!(f, "Expected {expected}, got {actual}")
}
}
}
Loading