diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 5a7598ea1f299..4215eb20f4259 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -22,7 +22,7 @@ use std::collections::HashSet; use std::sync::Arc; use crate::error::DataFusionError; -use crate::stats::Precision; +use crate::stats::{Precision, StatisticsKey}; use crate::{Column, Statistics}; use crate::{ColumnStatistics, ScalarValue}; @@ -356,33 +356,67 @@ impl PrunableStatistics { } } } -} -impl PruningStatistics for PrunableStatistics { - fn min_values(&self, column: &Column) -> Option { - self.get_exact_column_statistics(column, |stat| &stat.min_value) - } + /// Look up expression_statistics by parsing a dotted column name into a + /// [`StatisticsKey::FieldPath`]. For example, column name `"col.a.b"` maps + /// to `FieldPath(["col", "a", "b"])`. + fn get_expression_statistics( + &self, + column: &Column, + get_stat: impl Fn(&ColumnStatistics) -> &Precision, + ) -> Option { + let name = column.name(); + // Only try dotted-name lookup if the name contains a dot + if !name.contains('.') { + return None; + } + let parts: Vec> = name.split('.').map(Arc::from).collect(); + let key = StatisticsKey::FieldPath(parts); - fn max_values(&self, column: &Column) -> Option { - self.get_exact_column_statistics(column, |stat| &stat.max_value) + let mut has_value = false; + match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { + s.expression_statistics + .get(&key) + .and_then(|stat| { + if let Precision::Exact(val) = get_stat(stat) { + has_value = true; + Some(val.clone()) + } else { + None + } + }) + .unwrap_or(ScalarValue::Null) + })) { + Ok(array) => has_value.then_some(array), + Err(_) => { + log::warn!( + "Failed to convert expression statistics to array for column {}", + column.name() + ); + None + } + } } - fn num_containers(&self) -> usize { - self.statistics.len() - } + /// Look up null counts from expression_statistics for a dotted column name. + fn get_expression_null_counts(&self, column: &Column) -> Option { + let name = column.name(); + if !name.contains('.') { + return None; + } + let parts: Vec> = name.split('.').map(Arc::from).collect(); + let key = StatisticsKey::FieldPath(parts); - fn null_counts(&self, column: &Column) -> Option { - let index = self.schema.index_of(column.name()).ok()?; if self.statistics.iter().any(|s| { - s.column_statistics - .get(index) + s.expression_statistics + .get(&key) .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false)) }) { Some(Arc::new( self.statistics .iter() .map(|s| { - s.column_statistics.get(index).and_then(|stat| { + s.expression_statistics.get(&key).and_then(|stat| { if let Precision::Exact(null_count) = &stat.null_count { u64::try_from(*null_count).ok() } else { @@ -396,10 +430,55 @@ impl PruningStatistics for PrunableStatistics { None } } +} + +impl PruningStatistics for PrunableStatistics { + fn min_values(&self, column: &Column) -> Option { + self.get_exact_column_statistics(column, |stat| &stat.min_value) + .or_else(|| self.get_expression_statistics(column, |stat| &stat.min_value)) + } + + fn max_values(&self, column: &Column) -> Option { + self.get_exact_column_statistics(column, |stat| &stat.max_value) + .or_else(|| self.get_expression_statistics(column, |stat| &stat.max_value)) + } + + fn num_containers(&self) -> usize { + self.statistics.len() + } + + fn null_counts(&self, column: &Column) -> Option { + let result = self.schema.index_of(column.name()).ok().and_then(|index| { + if self.statistics.iter().any(|s| { + s.column_statistics + .get(index) + .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false)) + }) { + Some(Arc::new( + self.statistics + .iter() + .map(|s| { + s.column_statistics.get(index).and_then(|stat| { + if let Precision::Exact(null_count) = &stat.null_count { + u64::try_from(*null_count).ok() + } else { + None + } + }) + }) + .collect::(), + ) as ArrayRef) + } else { + None + } + }); + result.or_else(|| self.get_expression_null_counts(column)) + } fn row_counts(&self, column: &Column) -> Option { - // If the column does not exist in the schema, return None - if self.schema.index_of(column.name()).is_err() { + // If the column does not exist in the schema and not in expression stats, + // return None + if self.schema.index_of(column.name()).is_err() && !column.name().contains('.') { return None; } if self diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index cecf1d03418d7..f5fe839a659f1 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -17,13 +17,26 @@ //! This module provides data structures to represent statistics +use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug, Display}; +use std::sync::Arc; use crate::{Result, ScalarValue}; use crate::error::_plan_err; use arrow::datatypes::{DataType, Schema}; +/// Key for looking up statistics. Columns are expressions; so are nested +/// field accesses like `get_field(col, 'a')`. This enum can be extended to +/// support other expression types in the future. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum StatisticsKey { + /// Schema column by index + Column(usize), + /// Nested field path, e.g. `["col", "a", "b"]` for `col.a.b` + FieldPath(Vec>), +} + /// Represents a value with a degree of certainty. `Precision` is used to /// propagate information the precision of statistical values. #[derive(Clone, PartialEq, Eq, Default, Copy)] @@ -296,6 +309,10 @@ pub struct Statistics { /// It must contains a [`ColumnStatistics`] for each field in the schema of /// the table to which the [`Statistics`] refer. pub column_statistics: Vec, + /// Statistics keyed by expression. Extends `column_statistics` with stats + /// for expressions that don't correspond to top-level schema columns + /// (e.g., struct field paths from Parquet leaf columns). + pub expression_statistics: HashMap, } impl Default for Statistics { @@ -306,6 +323,7 @@ impl Default for Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: vec![], + expression_statistics: HashMap::new(), } } } @@ -318,6 +336,7 @@ impl Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: Statistics::unknown_column(schema), + expression_statistics: HashMap::new(), } } @@ -383,6 +402,11 @@ impl Statistics { .into_iter() .map(|s| s.to_inexact()) .collect(); + self.expression_statistics = self + .expression_statistics + .into_iter() + .map(|(k, v)| (k, v.to_inexact())) + .collect(); self } @@ -431,6 +455,14 @@ impl Statistics { } } + // Keep expression_statistics for Column keys that are in the projection, + // and all FieldPath entries (they are not directly projected) + let projection_set: HashSet = projection.iter().copied().collect(); + self.expression_statistics.retain(|key, _| match key { + StatisticsKey::Column(idx) => projection_set.contains(idx), + StatisticsKey::FieldPath(_) => true, + }); + self } @@ -523,6 +555,20 @@ impl Statistics { cs }) .collect(); + self.expression_statistics = self + .expression_statistics + .into_iter() + .map(|(k, cs)| { + let mut cs = cs.to_inexact(); + cs.byte_size = match cs.byte_size { + Precision::Exact(n) | Precision::Inexact(n) => { + Precision::Inexact((n as f64 * ratio) as usize) + } + Precision::Absent => Precision::Absent, + }; + (k, cs) + }) + .collect(); // Compute total_byte_size as sum of column byte_size values if all are present, // otherwise fall back to scaling the original total_byte_size @@ -614,6 +660,7 @@ impl Statistics { mut num_rows, mut total_byte_size, mut column_statistics, + mut expression_statistics, } = self; // Accumulate statistics for subsequent items @@ -641,10 +688,24 @@ impl Statistics { col_stats.byte_size = col_stats.byte_size.add(&item_col_stats.byte_size); } + // Merge expression_statistics + for (key, other_stats) in &other.expression_statistics { + let entry = expression_statistics + .entry(key.clone()) + .or_insert_with(ColumnStatistics::new_unknown); + entry.null_count = entry.null_count.add(&other_stats.null_count); + entry.max_value = entry.max_value.max(&other_stats.max_value); + entry.min_value = entry.min_value.min(&other_stats.min_value); + entry.sum_value = entry.sum_value.add(&other_stats.sum_value); + entry.distinct_count = Precision::Absent; + entry.byte_size = entry.byte_size.add(&other_stats.byte_size); + } + Ok(Statistics { num_rows, total_byte_size, column_statistics, + expression_statistics, }) } } @@ -1103,6 +1164,7 @@ mod tests { num_rows: Precision::Exact(42), total_byte_size: Precision::Exact(500), column_statistics: counts.into_iter().map(col_stats_i64).collect(), + expression_statistics: Default::default(), } } @@ -1147,6 +1209,7 @@ mod tests { byte_size: Precision::Exact(40), }, ], + expression_statistics: Default::default(), }; let stats2 = Statistics { @@ -1170,6 +1233,7 @@ mod tests { byte_size: Precision::Exact(60), }, ], + expression_statistics: Default::default(), }; let items = vec![stats1, stats2]; @@ -1233,6 +1297,7 @@ mod tests { distinct_count: Precision::Absent, byte_size: Precision::Exact(40), }], + expression_statistics: Default::default(), }; let stats2 = Statistics { @@ -1246,6 +1311,7 @@ mod tests { distinct_count: Precision::Absent, byte_size: Precision::Inexact(60), }], + expression_statistics: Default::default(), }; let items = vec![stats1, stats2]; @@ -1385,6 +1451,7 @@ mod tests { byte_size: Precision::Exact(8000), }, ], + expression_statistics: Default::default(), }; // Apply fetch of 100 rows (10% of original) @@ -1459,6 +1526,7 @@ mod tests { distinct_count: Precision::Inexact(50), byte_size: Precision::Inexact(4000), }], + expression_statistics: Default::default(), }; let result = original_stats.clone().with_fetch(Some(500), 0, 1).unwrap(); @@ -1484,6 +1552,7 @@ mod tests { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(800), column_statistics: vec![col_stats_i64(10)], + expression_statistics: Default::default(), }; let result = original_stats.clone().with_fetch(Some(50), 100, 1).unwrap(); @@ -1500,6 +1569,7 @@ mod tests { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(800), column_statistics: vec![col_stats_i64(10)], + expression_statistics: Default::default(), }; let result = original_stats.clone().with_fetch(None, 0, 1).unwrap(); @@ -1516,6 +1586,7 @@ mod tests { num_rows: Precision::Exact(1000), total_byte_size: Precision::Exact(8000), column_statistics: vec![col_stats_i64(10)], + expression_statistics: Default::default(), }; // Skip 200, fetch 300, so we get rows 200-500 @@ -1536,6 +1607,7 @@ mod tests { num_rows: Precision::Exact(1000), // per partition total_byte_size: Precision::Exact(8000), column_statistics: vec![col_stats_i64(10)], + expression_statistics: Default::default(), }; // Fetch 100 per partition, 4 partitions = 400 total @@ -1560,6 +1632,7 @@ mod tests { distinct_count: Precision::Absent, byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; let result = original_stats.clone().with_fetch(Some(100), 0, 1).unwrap(); @@ -1578,6 +1651,7 @@ mod tests { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(800), column_statistics: vec![col_stats_i64(10)], + expression_statistics: Default::default(), }; // Skip 50, fetch 100, but only 50 rows remain @@ -1604,6 +1678,7 @@ mod tests { num_rows: Precision::Exact(1000), total_byte_size: Precision::Exact(8000), column_statistics: vec![original_col_stats.clone()], + expression_statistics: Default::default(), }; let result = original_stats.with_fetch(Some(250), 0, 1).unwrap(); @@ -1651,11 +1726,13 @@ mod tests { num_rows: Precision::Exact(50), total_byte_size: Precision::Exact(1000), column_statistics: vec![col_stats1], + expression_statistics: Default::default(), }; let stats2 = Statistics { num_rows: Precision::Exact(100), total_byte_size: Precision::Exact(2000), column_statistics: vec![col_stats2], + expression_statistics: Default::default(), }; let merged = stats1.try_merge(&stats2).unwrap(); @@ -1711,6 +1788,7 @@ mod tests { byte_size: Precision::Exact(8000), }, ], + expression_statistics: Default::default(), }; // Apply fetch of 100 rows (10% of original) @@ -1754,6 +1832,7 @@ mod tests { byte_size: Precision::Absent, // One column has no byte_size }, ], + expression_statistics: Default::default(), }; // Apply fetch of 100 rows (10% of original) diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index ec0b9e253d2ab..c18d738f637ba 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -204,6 +204,7 @@ impl ExecutionPlan for CustomExecutionPlan { ..Default::default() }) .collect(), + expression_statistics: Default::default(), }) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index e81cd9f6b81b1..bb8b29dff15a4 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -120,6 +120,7 @@ impl TableProvider for StatisticsValidation { column_statistics: proj_col_stats, // TODO stats: knowing the type of the new columns we can guess the output size total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, projected_schema, ))) @@ -221,6 +222,7 @@ fn fully_defined() -> (Statistics, Schema) { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }, Schema::new(vec![ Field::new("c1", DataType::Int32, false), @@ -276,7 +278,8 @@ async fn sql_limit() -> Result<()> { .iter() .map(|c| c.clone().to_inexact()) .collect(), - total_byte_size: Precision::Absent + total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, physical_plan.partition_statistics(None)? ); diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 567af64c6a366..10889b07331ad 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -58,6 +58,7 @@ fn empty_statistics() -> Statistics { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: vec![ColumnStatistics::new_unknown()], + expression_statistics: Default::default(), } } @@ -77,6 +78,7 @@ fn small_statistics() -> Statistics { num_rows: Precision::Inexact(threshold_num_rows / 128), total_byte_size: Precision::Inexact(threshold_byte_size / 128), column_statistics: vec![ColumnStatistics::new_unknown()], + expression_statistics: Default::default(), } } @@ -87,6 +89,7 @@ fn big_statistics() -> Statistics { num_rows: Precision::Inexact(threshold_num_rows * 2), total_byte_size: Precision::Inexact(threshold_byte_size * 2), column_statistics: vec![ColumnStatistics::new_unknown()], + expression_statistics: Default::default(), } } @@ -97,6 +100,7 @@ fn bigger_statistics() -> Statistics { num_rows: Precision::Inexact(threshold_num_rows * 4), total_byte_size: Precision::Inexact(threshold_byte_size * 4), column_statistics: vec![ColumnStatistics::new_unknown()], + expression_statistics: Default::default(), } } @@ -180,6 +184,7 @@ fn create_nested_with_min_max() -> ( num_rows: Precision::Inexact(100_000), column_statistics: create_column_stats(Some(0), Some(50_000), Some(50_000)), total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, Schema::new(vec![Field::new("big_col", DataType::Int32, false)]), )); @@ -189,6 +194,7 @@ fn create_nested_with_min_max() -> ( num_rows: Precision::Inexact(10_000), column_statistics: create_column_stats(Some(1000), Some(5000), Some(1000)), total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, Schema::new(vec![Field::new("medium_col", DataType::Int32, false)]), )); @@ -198,6 +204,7 @@ fn create_nested_with_min_max() -> ( num_rows: Precision::Inexact(1000), column_statistics: create_column_stats(Some(0), Some(100_000), Some(1000)), total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, Schema::new(vec![Field::new("small_col", DataType::Int32, false)]), )); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index fa021ed3dcce3..18e5c954ede8f 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -165,6 +165,7 @@ mod test { num_rows: Precision::Exact(num_rows), total_byte_size: Precision::Exact(total_byte_size), column_statistics: column_stats, + expression_statistics: Default::default(), } } @@ -401,6 +402,7 @@ mod test { byte_size: Precision::Exact(16), // 4 rows * 4 bytes (Date32) }, ], + expression_statistics: Default::default(), }; assert_eq!(full_statistics, expected_full_statistic); @@ -430,6 +432,7 @@ mod test { byte_size: Precision::Exact(8), // 2 rows * 4 bytes (Date32) }, ], + expression_statistics: Default::default(), }; assert_eq!(statistics[0], expected_partition_statistic); assert_eq!(statistics[1], expected_partition_statistic); @@ -517,6 +520,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; assert_eq!(stats[0], expected_stats); assert_eq!(stats[1], expected_stats); @@ -590,6 +594,7 @@ mod test { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; let expected_statistic_partition_2 = Statistics { num_rows: Precision::Exact(8), @@ -624,6 +629,7 @@ mod test { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; assert_eq!(statistics[0], expected_statistic_partition_1); assert_eq!(statistics[1], expected_statistic_partition_2); @@ -847,6 +853,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; assert_eq!(&p0_statistics, &expected_p0_statistics); @@ -866,6 +873,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?; @@ -933,6 +941,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?); @@ -997,6 +1006,7 @@ mod test { num_rows: Precision::Exact(1), total_byte_size: Precision::Absent, column_statistics: vec![ColumnStatistics::new_unknown()], + expression_statistics: Default::default(), }; assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?); @@ -1066,6 +1076,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; // All partitions should have the same statistics @@ -1173,6 +1184,7 @@ mod test { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; assert_eq!(stats[0], expected_stats); assert_eq!(stats[1], expected_stats); @@ -1252,6 +1264,7 @@ mod test { }, ColumnStatistics::new_unknown(), // window column ], + expression_statistics: Default::default(), }; let expected_statistic_partition_2 = Statistics { @@ -1280,6 +1293,7 @@ mod test { }, ColumnStatistics::new_unknown(), // window column ], + expression_statistics: Default::default(), }; assert_eq!(statistics[0], expected_statistic_partition_1); diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 5a4c0bcdd514d..dd17a088c06da 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -27,7 +27,7 @@ use arrow::compute::kernels::cmp::eq; use arrow::compute::sum; use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; -use datafusion_common::stats::Precision; +use datafusion_common::stats::{Precision, StatisticsKey}; use datafusion_common::{ ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, }; @@ -375,10 +375,277 @@ impl<'a> DFParquetMetadata<'a> { ); } + // Populate expression_statistics for struct leaf columns + if has_statistics { + for field in logical_file_schema.fields() { + if let DataType::Struct(children) = field.data_type() { + populate_struct_field_stats( + &mut statistics.expression_statistics, + &[Arc::from(field.name().as_str())], + children, + file_metadata.schema_descr(), + row_groups_metadata, + num_rows, + ); + } + } + } + Ok(statistics) } } +/// Recursively populate expression_statistics for struct fields. +/// +/// Walks struct children, finds matching Parquet leaf columns by path, +/// and extracts min/max/null statistics from row group metadata using +/// `StatisticsConverter`. +fn populate_struct_field_stats( + expression_statistics: &mut HashMap, + parent_path: &[Arc], + fields: &arrow::datatypes::Fields, + parquet_schema: &SchemaDescriptor, + row_groups_metadata: &[RowGroupMetaData], + num_rows: usize, +) { + for field in fields.iter() { + let mut path: Vec> = parent_path.to_vec(); + path.push(Arc::from(field.name().as_str())); + + match field.data_type() { + DataType::Struct(children) => { + // Recurse into nested structs + populate_struct_field_stats( + expression_statistics, + &path, + children, + parquet_schema, + row_groups_metadata, + num_rows, + ); + } + DataType::Map(_, _) => { + // Skip Map types — get_field on maps doesn't have + // meaningful per-key statistics in Parquet + } + _ => { + // Leaf field — try to find matching Parquet column + let path_parts: Vec<&str> = path.iter().map(|s| s.as_ref()).collect(); + if let Some(parquet_col_idx) = + find_parquet_leaf_column(parquet_schema, &path_parts) + { + if let Some(col_stats) = extract_leaf_column_stats( + parquet_col_idx, + field.as_ref(), + parquet_schema, + row_groups_metadata, + num_rows, + ) { + let key = StatisticsKey::FieldPath(path); + expression_statistics.insert(key, col_stats); + } + } + } + } + } +} + +/// Find the Parquet leaf column index that matches the given field path. +fn find_parquet_leaf_column(schema: &SchemaDescriptor, path: &[&str]) -> Option { + (0..schema.num_columns()).find(|&i| schema.column(i).path().parts() == path) +} + +/// Convert a Parquet `Statistics` value to a `ScalarValue` for the given Arrow data type. +/// If `is_min` is true, extracts the min value; otherwise extracts the max value. +fn scalar_from_parquet_stats( + stats: &parquet::file::statistics::Statistics, + data_type: &DataType, + is_min: bool, +) -> Option { + use parquet::file::statistics::Statistics as PqStats; + + match stats { + PqStats::Boolean(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + val.map(|v| ScalarValue::Boolean(Some(*v))) + } + PqStats::Int32(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + let v = *val?; + match data_type { + DataType::Int32 => Some(ScalarValue::Int32(Some(v))), + DataType::Date32 => Some(ScalarValue::Date32(Some(v))), + DataType::Int16 => Some(ScalarValue::Int16(Some(v as i16))), + DataType::Int8 => Some(ScalarValue::Int8(Some(v as i8))), + DataType::UInt8 => Some(ScalarValue::UInt8(Some(v as u8))), + DataType::UInt16 => Some(ScalarValue::UInt16(Some(v as u16))), + DataType::UInt32 => Some(ScalarValue::UInt32(Some(v as u32))), + DataType::Decimal128(p, s) => { + Some(ScalarValue::Decimal128(Some(v as i128), *p, *s)) + } + _ => Some(ScalarValue::Int32(Some(v))), + } + } + PqStats::Int64(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + let v = *val?; + match data_type { + DataType::Int64 => Some(ScalarValue::Int64(Some(v))), + DataType::UInt64 => Some(ScalarValue::UInt64(Some(v as u64))), + DataType::Timestamp(TimeUnit::Millisecond, tz) => { + Some(ScalarValue::TimestampMillisecond(Some(v), tz.clone())) + } + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + Some(ScalarValue::TimestampMicrosecond(Some(v), tz.clone())) + } + DataType::Timestamp(TimeUnit::Nanosecond, tz) => { + Some(ScalarValue::TimestampNanosecond(Some(v), tz.clone())) + } + DataType::Date64 => Some(ScalarValue::Date64(Some(v))), + DataType::Decimal128(p, s) => { + Some(ScalarValue::Decimal128(Some(v as i128), *p, *s)) + } + _ => Some(ScalarValue::Int64(Some(v))), + } + } + PqStats::Float(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + val.map(|v| ScalarValue::Float32(Some(*v))) + } + PqStats::Double(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + val.map(|v| ScalarValue::Float64(Some(*v))) + } + PqStats::ByteArray(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + let bytes = val?; + match data_type { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + let s = std::str::from_utf8(bytes.data()).ok()?; + Some(ScalarValue::Utf8(Some(s.to_string()))) + } + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Some(ScalarValue::Binary(Some(bytes.data().to_vec()))) + } + _ => None, + } + } + PqStats::FixedLenByteArray(s) => { + let val = if is_min { s.min_opt() } else { s.max_opt() }; + let bytes = val?; + match data_type { + DataType::Decimal128(p, scale) => { + let mut padded = [0u8; 16]; + let data = bytes.data(); + let len = data.len().min(16); + // Sign extend + if !data.is_empty() && data[0] & 0x80 != 0 { + padded.fill(0xFF); + } + padded[16 - len..].copy_from_slice(&data[..len]); + Some(ScalarValue::Decimal128( + Some(i128::from_be_bytes(padded)), + *p, + *scale, + )) + } + DataType::FixedSizeBinary(size) => Some(ScalarValue::FixedSizeBinary( + *size, + Some(bytes.data().to_vec()), + )), + _ => None, + } + } + _ => None, + } +} + +/// Extract min/max/null statistics for a single Parquet leaf column +/// across all row groups using [`Accumulator`]s to aggregate per-row-group values. +fn extract_leaf_column_stats( + parquet_col_idx: usize, + arrow_field: &arrow::datatypes::Field, + _parquet_schema: &SchemaDescriptor, + row_groups_metadata: &[RowGroupMetaData], + num_rows: usize, +) -> Option { + let data_type = arrow_field.data_type(); + let agg_data_type = min_max_aggregate_data_type(data_type); + let mut max_acc = MaxAccumulator::try_new(agg_data_type).ok()?; + let mut min_acc = MinAccumulator::try_new(agg_data_type).ok()?; + let mut total_null_count: u64 = 0; + let mut has_stats = false; + let mut has_null_counts = false; + + for rg in row_groups_metadata { + let col_chunk = rg.columns().get(parquet_col_idx)?; + let stats = col_chunk.statistics()?; + has_stats = true; + + // Null counts + if let Some(nc) = stats.null_count_opt() { + total_null_count += nc; + has_null_counts = true; + } + + // Min/max: extract typed values from parquet Statistics + if let (Some(min_sv), Some(max_sv)) = ( + scalar_from_parquet_stats(stats, data_type, true), + scalar_from_parquet_stats(stats, data_type, false), + ) { + if let Ok(arr) = min_sv.to_array() { + let _ = min_acc.update_batch(&[arr]); + } + if let Ok(arr) = max_sv.to_array() { + let _ = max_acc.update_batch(&[arr]); + } + } + } + + if !has_stats { + return None; + } + + let min_value = min_acc + .evaluate() + .ok() + .map(Precision::Exact) + .unwrap_or(Precision::Absent); + let max_value = max_acc + .evaluate() + .ok() + .map(Precision::Exact) + .unwrap_or(Precision::Absent); + + let null_count = if has_null_counts { + Precision::Exact(total_null_count as usize) + } else { + Precision::Absent + }; + + // Only return stats if we got at least something + if matches!(max_value, Precision::Absent) + && matches!(min_value, Precision::Absent) + && matches!(null_count, Precision::Absent) + { + return None; + } + + Some(ColumnStatistics { + null_count, + max_value, + min_value, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: compute_arrow_column_size( + data_type, + row_groups_metadata, + Some(parquet_col_idx), + num_rows, + ), + }) +} + /// Min/max aggregation can take Dictionary encode input but always produces unpacked /// (aka non Dictionary) output. We need to adjust the output data type to reflect this. /// The reason min/max aggregate produces unpacked output because there is only one diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..f1150faddebec 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1221,6 +1221,7 @@ mod test { }, ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }; (statistics, schema) } @@ -1248,6 +1249,7 @@ mod test { distinct_count: Precision::Absent, byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; let constants = constant_columns_from_stats(Some(&statistics), &schema); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c3e5cabce7bc2..c00e10875590d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1919,6 +1919,7 @@ mod tests { .unwrap_or_default() }) .collect::>(), + expression_statistics: Default::default(), }); PartitionedFile::new_from_meta(object_meta) .with_partition_values(vec![ScalarValue::from(file.date)]) @@ -2380,6 +2381,7 @@ mod tests { ..ColumnStatistics::new_unknown() }, ], + expression_statistics: Default::default(), }; // Create a file group with statistics diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..d527b1a08dc74 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -965,6 +965,7 @@ mod tests { sum_value: Precision::Absent, byte_size: Precision::Absent, },], + expression_statistics: Default::default(), } ); diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index f80c9cb0b0daa..9434cef5e88ce 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -528,6 +528,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec { @@ -1107,6 +1108,7 @@ impl AggregateExec { num_rows, column_statistics, total_byte_size, + expression_statistics: Default::default(), }) } } @@ -3684,6 +3686,7 @@ mod tests { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }, (*schema).clone(), )) as Arc; @@ -3714,6 +3717,7 @@ mod tests { ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), ], + expression_statistics: Default::default(), }, (*schema).clone(), )) as Arc; diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 590f6f09e8b9e..5d07779b41c8b 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -175,6 +175,7 @@ pub fn compute_record_batch_statistics( num_rows: Precision::Exact(nb_rows), total_byte_size: Precision::Exact(total_byte_size), column_statistics, + expression_statistics: Default::default(), } } @@ -273,6 +274,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; assert_eq!(actual, expected); @@ -306,6 +308,7 @@ mod tests { null_count: Precision::Exact(3), byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; assert_eq!(actual, expected); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 2af0731fb7a63..fb285690d1440 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -343,6 +343,7 @@ impl FilterExec { num_rows, total_byte_size, column_statistics, + expression_statistics: Default::default(), }) } @@ -1058,6 +1059,7 @@ mod tests { max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), ..Default::default() }], + expression_statistics: Default::default(), }, schema.clone(), )); @@ -1102,6 +1104,7 @@ mod tests { ..Default::default() }], total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, schema.clone(), )); @@ -1159,6 +1162,7 @@ mod tests { }, ], total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }, schema.clone(), )); @@ -1263,6 +1267,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); @@ -1376,6 +1381,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); @@ -1431,6 +1437,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); @@ -1501,6 +1508,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); @@ -1575,6 +1583,7 @@ mod tests { distinct_count: Precision::Absent, byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; assert_eq!(filter_statistics, expected_filter_statistics); @@ -1634,6 +1643,7 @@ mod tests { column_statistics: vec![ColumnStatistics { ..Default::default() }], + expression_statistics: Default::default(), }, schema, )); @@ -1809,6 +1819,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); @@ -1877,6 +1888,7 @@ mod tests { ..Default::default() }, ], + expression_statistics: Default::default(), }, schema, )); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index d5b540885efae..20740b197ca1a 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -473,6 +473,7 @@ fn stats_cartesian_product( num_rows, total_byte_size, column_statistics: cross_join_stats, + expression_statistics: Default::default(), } } @@ -713,6 +714,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; let right = Statistics { @@ -726,6 +728,7 @@ mod tests { null_count: Precision::Exact(2), byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; let result = stats_cartesian_product(left, right); @@ -763,6 +766,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; assert_eq!(result, expected); @@ -793,6 +797,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; let right = Statistics { @@ -806,6 +811,7 @@ mod tests { null_count: Precision::Exact(2), byte_size: Precision::Absent, }], + expression_statistics: Default::default(), }; let result = stats_cartesian_product(left, right); @@ -841,6 +847,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; assert_eq!(result, expected); diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 83fd418d73d72..b36ef672a8bc7 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -456,6 +456,7 @@ pub(crate) fn estimate_join_statistics( num_rows, total_byte_size: Precision::Absent, column_statistics, + expression_statistics: Default::default(), }) } @@ -492,11 +493,13 @@ fn estimate_join_cardinality( num_rows: left_stats.num_rows, total_byte_size: Precision::Absent, column_statistics: left_col_stats, + expression_statistics: Default::default(), }, Statistics { num_rows: right_stats.num_rows, total_byte_size: Precision::Absent, column_statistics: right_col_stats, + expression_statistics: Default::default(), }, )?; @@ -2102,6 +2105,7 @@ mod tests { .unwrap_or(Absent), column_statistics: column_stats, total_byte_size: Absent, + expression_statistics: Default::default(), } } @@ -2304,11 +2308,13 @@ mod tests { num_rows: Inexact(left_num_rows), total_byte_size: Absent, column_statistics: left_col_stats.clone(), + expression_statistics: Default::default(), }, Statistics { num_rows: Inexact(right_num_rows), total_byte_size: Absent, column_statistics: right_col_stats.clone(), + expression_statistics: Default::default(), }, ), expected_cardinality.clone() @@ -2359,11 +2365,13 @@ mod tests { num_rows: Inexact(400), total_byte_size: Absent, column_statistics: left_col_stats, + expression_statistics: Default::default(), }, Statistics { num_rows: Inexact(400), total_byte_size: Absent, column_statistics: right_col_stats, + expression_statistics: Default::default(), }, ), Some(Inexact((400 * 400) / 200)) @@ -2393,11 +2401,13 @@ mod tests { num_rows: Inexact(100), total_byte_size: Absent, column_statistics: left_col_stats, + expression_statistics: Default::default(), }, Statistics { num_rows: Inexact(100), total_byte_size: Absent, column_statistics: right_col_stats, + expression_statistics: Default::default(), }, ), Some(Inexact(100)) @@ -2651,11 +2661,13 @@ mod tests { num_rows: Inexact(outer_num_rows), total_byte_size: Absent, column_statistics: outer_col_stats, + expression_statistics: Default::default(), }, Statistics { num_rows: Inexact(inner_num_rows), total_byte_size: Absent, column_statistics: inner_col_stats, + expression_statistics: Default::default(), }, &join_on, ) @@ -2685,11 +2697,13 @@ mod tests { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + expression_statistics: Default::default(), }, Statistics { num_rows: Exact(10), total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + expression_statistics: Default::default(), }, &join_on, ); @@ -2704,11 +2718,13 @@ mod tests { num_rows: Inexact(500), total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + expression_statistics: Default::default(), }, Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + expression_statistics: Default::default(), }, &join_on, ).expect("Expected non-empty PartialJoinStatistics for SemiJoin with absent inner num_rows"); @@ -2724,11 +2740,13 @@ mod tests { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats.clone(), + expression_statistics: Default::default(), }, Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: dummy_column_stats, + expression_statistics: Default::default(), }, &join_on, ); diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 8174160dc9332..78cd29ad92d96 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -918,6 +918,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; let right = Statistics { @@ -949,6 +950,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; let result = stats_union(left, right); @@ -981,6 +983,7 @@ mod tests { byte_size: Precision::Absent, }, ], + expression_statistics: Default::default(), }; assert_eq!(result, expected); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 20d54303a94b4..cf841c3ba1244 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -246,6 +246,7 @@ impl BoundedWindowAggExec { num_rows: statistics.num_rows, column_statistics, total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 0c73cf23523d5..f4d944af5d162 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -287,6 +287,7 @@ impl ExecutionPlan for WindowAggExec { num_rows: input_stat.num_rows, column_statistics, total_byte_size: Precision::Absent, + expression_statistics: Default::default(), }) } } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..f3b68ca23beeb 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -919,6 +919,7 @@ impl TryFrom<&protobuf::Statistics> for Statistics { }, // No column statistic (None) is encoded with empty array column_statistics: s.column_stats.iter().map(|s| s.into()).collect(), + expression_statistics: std::collections::HashMap::new(), }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bc310150d8982..39544167983f2 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -923,6 +923,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( vec![Field::new("col", DataType::Utf8, false)], ))), + expression_statistics: Default::default(), }) .build(); @@ -947,6 +948,7 @@ fn roundtrip_arrow_scan() -> Result<()> { num_rows: Precision::Inexact(100), total_byte_size: Precision::Inexact(1024), column_statistics: Statistics::unknown_column(&file_schema), + expression_statistics: Default::default(), }) .build(); @@ -1006,6 +1008,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( vec![Field::new("col", DataType::Utf8, false)], ))), + expression_statistics: Default::default(), }) .build(); diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index d0cb0674424ba..8754b2dee274f 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -42,6 +42,7 @@ use datafusion_common::{ tree_node::{Transformed, TreeNode}, }; use datafusion_expr_common::operator::Operator; +use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr}; @@ -1019,7 +1020,13 @@ impl<'a> PruningExpressionBuilder<'a> { let field = match schema.column_with_name(column.name()) { Some((_, f)) => f, _ => { - return plan_err!("Field not found in schema"); + // Try resolving a dotted name (e.g. "col.a.b") by walking + // the schema hierarchy + if let Some(f) = resolve_dotted_field(schema, column.name()) { + f + } else { + return plan_err!("Field not found in schema"); + } } }; @@ -1186,6 +1193,13 @@ fn rewrite_expr_to_prunable( } else { plan_err!("Not with complex expression {column_expr:?} is not supported") } + } else if let Some((dotted_name, _data_type)) = + get_field_to_dotted_name(column_expr.as_ref()) + { + // `get_field(col, 'a') op lit()` → rewrite to synthetic dotted column + let synthetic_col = + Arc::new(phys_expr::Column::new(&dotted_name, 0)) as PhysicalExprRef; + Ok((synthetic_col, op, Arc::clone(scalar_expr))) } else { plan_err!("column expression {column_expr:?} is not supported") } @@ -1205,6 +1219,89 @@ fn is_compare_op(op: Operator) -> bool { ) } +/// Resolve a dotted field name like `"col.a.b"` by walking the schema hierarchy. +/// +/// Returns the leaf `Field` if found, or `None` if the path doesn't resolve. +fn resolve_dotted_field<'a>(schema: &'a Schema, dotted_name: &str) -> Option<&'a Field> { + let parts: Vec<&str> = dotted_name.split('.').collect(); + if parts.len() < 2 { + return None; + } + + // Find the top-level field + let (_, top_field) = schema.column_with_name(parts[0])?; + let mut current_data_type = top_field.data_type(); + let mut last_field: Option<&Field> = None; + + for part in &parts[1..] { + match current_data_type { + DataType::Struct(fields) => { + let field = fields.iter().find(|f| f.name() == *part)?; + current_data_type = field.data_type(); + last_field = Some(field.as_ref()); + } + _ => return None, + } + } + + last_field +} + +/// Extract a nested field path from a chain of `get_field` calls. +/// +/// For example, `get_field(get_field(col, 'a'), 'b')` returns `["col", "a", "b"]` +/// and the data type of the innermost field. +/// +/// Returns `None` if the expression is not a `get_field` chain rooted at a `Column`. +fn extract_get_field_path(expr: &dyn PhysicalExpr) -> Option<(Vec>, DataType)> +where +{ + use std::sync::Arc as StdArc; + + if let Some(func) = expr.as_any().downcast_ref::() { + if func.name() == "get_field" { + let args = func.args(); + if args.len() < 2 { + return None; + } + // Second arg should be a string literal (the field name) + let field_name = args[1] + .as_any() + .downcast_ref::()? + .value() + .try_as_str() + .flatten()?; + let field_name = StdArc::from(field_name); + + let base = &args[0]; + // Base can be a Column or another get_field + if let Some(col) = base.as_any().downcast_ref::() { + let data_type = func.return_type().clone(); + return Some((vec![StdArc::from(col.name()), field_name], data_type)); + } else if let Some((mut path, _)) = extract_get_field_path(base.as_ref()) { + let data_type = func.return_type().clone(); + path.push(field_name); + return Some((path, data_type)); + } + } + } + None +} + +/// Convert a `get_field` expression (or chain) into a synthetic dotted column +/// name and its data type for statistics lookup. +/// +/// For example, `get_field(col, 'a')` becomes `("col.a", DataType::...)`. +fn get_field_to_dotted_name(expr: &dyn PhysicalExpr) -> Option<(String, DataType)> { + let (path, data_type) = extract_get_field_path(expr)?; + let dotted = path + .iter() + .map(|s| s.as_ref()) + .collect::>() + .join("."); + Some((dotted, data_type)) +} + // The pruning logic is based on the comparing the min/max bounds. // Must make sure the two type has order. // For example, casts from string to numbers is not correct. @@ -1316,43 +1413,53 @@ fn build_is_null_column_expr( required_columns: &mut RequiredColumns, with_not: bool, ) -> Option> { - if let Some(col) = expr.as_any().downcast_ref::() { + // Try to get the column reference - either a direct Column or a get_field chain + let (col, field) = if let Some(col) = + expr.as_any().downcast_ref::() + { let field = schema.field_with_name(col.name()).ok()?; + (col.clone(), field.clone()) + } else if let Some((dotted_name, data_type)) = get_field_to_dotted_name(expr.as_ref()) + { + let synthetic_col = phys_expr::Column::new(&dotted_name, 0); + let field = Field::new(&dotted_name, data_type, true); + (synthetic_col, field) + } else { + return None; + }; - let null_count_field = &Field::new(field.name(), DataType::UInt64, true); - if with_not { - if let Ok(row_count_expr) = - required_columns.row_count_column_expr(col, expr, null_count_field) - { - required_columns - .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { - // IsNotNull(column) => null_count != row_count - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::NotEq, - row_count_expr, - )) as _ - }) - .ok() - } else { - None - } - } else { + let null_count_field = &Field::new(field.name(), DataType::UInt64, true); + let col_expr: Arc = Arc::new(col.clone()); + if with_not { + if let Ok(row_count_expr) = + required_columns.row_count_column_expr(&col, &col_expr, null_count_field) + { required_columns - .null_count_column_expr(col, expr, null_count_field) + .null_count_column_expr(&col, &col_expr, null_count_field) .map(|null_count_column_expr| { - // IsNull(column) => null_count > 0 + // IsNotNull(column) => null_count != row_count Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, - Operator::Gt, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), + Operator::NotEq, + row_count_expr, )) as _ }) .ok() + } else { + None } } else { - None + required_columns + .null_count_column_expr(&col, &col_expr, null_count_field) + .map(|null_count_column_expr| { + // IsNull(column) => null_count > 0 + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Gt, + Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), + )) as _ + }) + .ok() } } @@ -1596,10 +1703,29 @@ enum ColumnReferenceCount { } impl ColumnReferenceCount { - /// Count the number of distinct column references in an expression + /// Count the number of distinct column references in an expression. + /// + /// Also recognizes `get_field` chains as single-column references: + /// `get_field(col, 'a')` is treated as one synthetic column `"col.a"`. fn from_expression(expr: &Arc) -> Self { + // First check if the entire expression is a get_field chain + if let Some((dotted_name, _)) = get_field_to_dotted_name(expr.as_ref()) { + // Treat the whole get_field chain as a single synthetic column + // Use index 0 as a placeholder; the name carries the real info + return ColumnReferenceCount::One(phys_expr::Column::new(&dotted_name, 0)); + } + let mut seen = HashSet::::new(); expr.apply(|expr| { + // If this sub-expression is a get_field chain, treat it as a single column + if let Some((dotted_name, _)) = get_field_to_dotted_name(expr.as_ref()) { + seen.insert(phys_expr::Column::new(&dotted_name, 0)); + if seen.len() > 1 { + return Ok(TreeNodeRecursion::Stop); + } + // Don't recurse into get_field children + return Ok(TreeNodeRecursion::Jump); + } if let Some(column) = expr.as_any().downcast_ref::() { seen.insert(column.clone()); if seen.len() > 1 {