From b08c3a00b77ea35c225aaf8aebd63673624356cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Feb 2026 09:22:54 -0700 Subject: [PATCH 1/2] fix: [df52] schema pruning crash on complex nested types When `data_schema` is provided but `projection_vector` is None (the NativeBatchReader / native_iceberg_compat path), the base schema was incorrectly set to the pruned `required_schema`. This caused DataFusion to think the table had only the pruned columns, leading to column index misalignment in PhysicalExprAdapter. For example, reading "friends" at logical index 0 would map to physical index 0 ("id") instead of the correct index 4. Fix: when `data_schema` is provided without a `projection_vector`, compute the projection by mapping required field names to their indices in the full data schema. Also harden `wrap_all_type_mismatches` to use name-based lookup for physical fields instead of positional index. Co-Authored-By: Claude Opus 4.6 --- native/core/src/parquet/parquet_exec.rs | 42 ++++++++++++++--------- native/core/src/parquet/schema_adapter.rs | 15 +++++++- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 79c7e06c63..fa5c20bd30 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -80,18 +80,30 @@ pub(crate) fn init_datasource_exec( encryption_enabled, ); - // dbg!(&required_schema, &data_schema); - - // Determine the schema to use for ParquetSource - // // Use data_schema only if both data_schema and data_filters are set - let base_schema = match (&data_schema, &projection_vector) { - (Some(schema), Some(_)) => Arc::clone(schema), - _ => Arc::clone(&required_schema), + // Determine the schema and projection to use for ParquetSource. + // When data_schema is provided, use it as the base schema so DataFusion knows the full + // file schema. Compute a projection vector to select only the required columns. + let (base_schema, projection) = match (&data_schema, &projection_vector) { + (Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())), + (Some(schema), None) => { + // Compute projection: map required_schema field names to data_schema indices + let projection: Vec = required_schema + .fields() + .iter() + .filter_map(|req_field| { + schema.fields().iter().position(|data_field| { + if case_sensitive { + data_field.name() == req_field.name() + } else { + data_field.name().to_lowercase() == req_field.name().to_lowercase() + } + }) + }) + .collect(); + (Arc::clone(schema), Some(projection)) + } + _ => (Arc::clone(&required_schema), None), }; - //let base_schema = required_schema; - // dbg!(&base_schema); - // dbg!(&data_schema); - // dbg!(&data_filters); let partition_fields: Vec<_> = partition_schema .iter() .flat_map(|s| s.fields().iter()) @@ -100,13 +112,9 @@ pub(crate) fn init_datasource_exec( let table_schema = TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields); - // dbg!(&table_schema); - let mut parquet_source = ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options); - // dbg!(&parquet_source); - // Create a conjunctive form of the vector because ParquetExecBuilder takes // a single expression if let Some(data_filters) = data_filters { @@ -146,9 +154,9 @@ pub(crate) fn init_datasource_exec( .with_file_groups(file_groups) .with_expr_adapter(Some(expr_adapter_factory)); - if let Some(projection_vector) = projection_vector { + if let Some(projection) = projection { file_scan_config_builder = - file_scan_config_builder.with_projection_indices(Some(projection_vector))?; + file_scan_config_builder.with_projection_indices(Some(projection))?; } let file_scan_config = file_scan_config_builder.build(); diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index f19ec39fca..5e8b395b5b 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter { expr.transform(|e| { if let Some(column) = e.as_any().downcast_ref::() { let col_idx = column.index(); + let col_name = column.name(); let logical_field = self.logical_file_schema.fields().get(col_idx); - let physical_field = self.physical_file_schema.fields().get(col_idx); + // Look up physical field by name instead of index for correctness + // when logical and physical schemas have different column orderings + let physical_field = if self.parquet_options.case_sensitive { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name() == col_name) + } else { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name().to_lowercase() == col_name.to_lowercase()) + }; if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) { From 98bae1c2e956090b50ed38c4ac1eb9503f20f084 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Feb 2026 11:46:17 -0700 Subject: [PATCH 2/2] fix: handle field ID mapping in projection computation When computing a name-based projection from required_schema to data_schema, fall back to using required_schema directly when not all fields can be matched by name. This handles Parquet field ID mapping where column names differ between the read schema and file schema. Co-Authored-By: Claude Opus 4.6 --- native/core/src/parquet/parquet_exec.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index fa5c20bd30..f4cc7bf9fe 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -86,7 +86,9 @@ pub(crate) fn init_datasource_exec( let (base_schema, projection) = match (&data_schema, &projection_vector) { (Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())), (Some(schema), None) => { - // Compute projection: map required_schema field names to data_schema indices + // Compute projection: map required_schema field names to data_schema indices. + // This is needed for schema pruning when the data_schema has more columns than + // the required_schema. let projection: Vec = required_schema .fields() .iter() @@ -100,7 +102,15 @@ pub(crate) fn init_datasource_exec( }) }) .collect(); - (Arc::clone(schema), Some(projection)) + // Only use data_schema + projection when all required fields were found by name. + // When some fields can't be matched (e.g., Parquet field ID mapping where names + // differ between required and data schemas), fall back to using required_schema + // directly with no projection. + if projection.len() == required_schema.fields().len() { + (Arc::clone(schema), Some(projection)) + } else { + (Arc::clone(&required_schema), None) + } } _ => (Arc::clone(&required_schema), None), };