diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 36d85b981c06c..433d5414ebe8b 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -37,7 +37,10 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; -use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath}; +use datafusion_execution::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheKey, SchemaFingerprint, + TableScopedPath, +}; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{ @@ -197,6 +200,9 @@ pub struct ListingTable { column_defaults: HashMap, /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters expr_adapter_factory: Option>, + /// Precomputed fingerprint of `file_schema` for the file-statistics cache + /// key. Constant for the table, so computed once here instead of per file. + file_schema_fingerprint: Arc, } impl ListingTable { @@ -227,6 +233,9 @@ impl ListingTable { .with_metadata(file_schema.metadata().clone()), ); + let file_schema_fingerprint = + Arc::new(SchemaFingerprint::from_schema(&file_schema)); + let table = Self { table_paths: config.table_paths, file_schema, @@ -238,6 +247,7 @@ impl ListingTable { constraints: Constraints::default(), column_defaults: HashMap::new(), expr_adapter_factory: config.expr_adapter_factory, + file_schema_fingerprint, }; Ok(table) @@ -268,21 +278,6 @@ impl ListingTable { self } - fn statistics_cache( - &self, - has_table_reference: bool, - ) -> Option<&Arc> { - let shared_cache = self.collected_statistics.as_ref()?; - if has_table_reference || self.schema_source == SchemaSource::Inferred { - Some(shared_cache) - } else { - // Anonymous specified-schema reads can use the same file path with - // different logical schemas. File statistics are schema-dependent, - // so avoid reusing stats computed for a different read schema. - None - } - } - /// Specify the SQL definition for this table, if any pub fn with_definition(mut self, definition: Option) -> Self { self.definition = definition; @@ -990,17 +985,21 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> datafusion_common::Result<(Arc, Option)> { - use datafusion_execution::cache::cache_manager::CachedFileMetadata; - - let path = TableScopedPath { + let key = FileStatisticsCacheKey { table: part_file.table_reference.clone(), path: part_file.object_meta.location.clone(), + // Statistics are computed against `file_schema`, so key on its + // fingerprint: reads of the same path under a different schema get + // their own entry rather than reusing incompatible column statistics. + // The fingerprint is precomputed once per table (see `try_new`). + schema: Arc::clone(&self.file_schema_fingerprint), }; let meta = &part_file.object_meta; - // Check cache first - if we have valid cached statistics and ordering - if let Some(cache) = self.statistics_cache(path.table.is_some()) - && let Some(cached) = cache.get(&path) + // Check cache first - the schema is part of the key, so a hit is already + // schema-compatible; `is_valid_for` only confirms the file is unchanged. + if let Some(cache) = &self.collected_statistics + && let Some(cached) = cache.get(&key) && cached.is_valid_for(meta) { // Return cached statistics and ordering @@ -1017,9 +1016,9 @@ impl ListingTable { let statistics = Arc::new(file_meta.statistics); // Store in cache - if let Some(cache) = self.statistics_cache(path.table.is_some()) { + if let Some(cache) = &self.collected_statistics { cache.put( - &path, + &key, CachedFileMetadata::new( meta.clone(), Arc::clone(&statistics), diff --git a/datafusion/common/src/heap_size.rs b/datafusion/common/src/heap_size.rs index 869946d82414f..405736dbf9c9b 100644 --- a/datafusion/common/src/heap_size.rs +++ b/datafusion/common/src/heap_size.rs @@ -348,6 +348,17 @@ where } } +impl DFHeapSize for (A, B, C) +where + A: DFHeapSize, + B: DFHeapSize, + C: DFHeapSize, +{ + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.0.heap_size(ctx) + self.1.heap_size(ctx) + self.2.heap_size(ctx) + } +} + impl DFHeapSize for String { fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize { self.capacity() diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index b082271d67fd0..c04dad30ddde4 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -255,7 +255,29 @@ async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() { let stats = plan.statistics_with_args(&StatisticsArgs::new()).unwrap(); assert_eq!(stats.column_statistics.len(), 2); assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1)); - assert_eq!(cache.len(), 1); + + // #23072: the cache now keys on file_schema, so the wider read no longer + // bypasses the cache (as in #22950) — it lands in its own entry and + // coexists with the inferred one. Was `1` under the bypass. + assert_eq!(cache.len(), 2); + + // Repeat the wider read: same path + same file_schema -> reuse (no new + // entry) and a cache hit. Under #22950's bypass this read could never reuse. + ctx.read_parquet( + &parquet_path, + ParquetReadOptions::default().schema(&wider_schema), + ) + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + assert_eq!(cache.len(), 2); + let hits: usize = cache.list_entries().values().map(|e| e.hits).sum(); + assert_eq!( + hits, 1, + "expected a cache hit on the repeat read, got {hits}" + ); } #[tokio::test] diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index cecce81c63c62..dbc3e3c260c3d 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,7 +16,9 @@ // under the License. use crate::cache::default_cache::DefaultCache; -pub use crate::cache::{Cache, CacheValue, TableScopedPath}; +pub use crate::cache::{ + Cache, CacheValue, FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath, +}; use datafusion_common::HashMap; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::{Result, Statistics}; @@ -54,7 +56,7 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M /// 3. If invalid or missing, compute new value and call `put(path, new_value)` /// /// See [`crate::runtime_env::RuntimeEnv`] for more details -pub type FileStatisticsCache = dyn Cache; +pub type FileStatisticsCache = dyn Cache; /// A cache for storing the [`ObjectMeta`]s that result from listing a path. /// @@ -305,7 +307,7 @@ impl CacheManager { Some(Arc::clone(fsc)) } None if config.file_statistics_cache_limit > 0 => Some(Arc::new( - DefaultCache::::new( + DefaultCache::::new( config.file_statistics_cache_limit, ) .with_name("DefaultFileStatisticsCache"), diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index a1d89619eb256..99c8156fffd7d 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -299,7 +299,6 @@ impl Cache for DefaultCache { mod tests { use std::sync::Arc; - use crate::cache::TableScopedPath; use crate::cache::cache_manager::{ CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes, }; @@ -311,6 +310,7 @@ mod tests { use crate::cache::default_cache::TimeProvider; use crate::cache::{Cache, CacheEntryInfo}; use crate::cache::{CacheKey, CacheValue}; + use crate::cache::{FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -827,13 +827,14 @@ mod tests { false, )]); - let path = TableScopedPath { + let key_1 = FileStatisticsCacheKey { path: meta.location.clone(), table: None, + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; // Cache miss - assert!(cache.get(&path).is_none()); + assert!(cache.get(&key_1).is_none()); // Put a value let cached_value = CachedFileMetadata::new( @@ -841,10 +842,10 @@ mod tests { Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&path, cached_value); + cache.put(&key_1, cached_value); // Cache hit - let result = cache.get(&path); + let result = cache.get(&key_1); assert!(result.is_some()); let cached = result.unwrap(); @@ -853,12 +854,13 @@ mod tests { // File size changed - validation should fail let meta2 = create_test_meta("test", 2048); - let path_2 = TableScopedPath { + let key_2 = FileStatisticsCacheKey { path: meta2.location.clone(), table: None, + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; - let cached = cache.get(&path_2).unwrap(); + let cached = cache.get(&key_2).unwrap(); assert!(!cached.is_valid_for(&meta2)); // Update with new value @@ -867,18 +869,19 @@ mod tests { Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&path_2, cached_value2); + cache.put(&key_2, cached_value2); // Test list_entries let entries = cache.list_entries(); assert_eq!(entries.len(), 1); - let path_3 = TableScopedPath { + let key_3 = FileStatisticsCacheKey { path: Path::from("test"), table: None, + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; - let entry = entries.get(&path_3).unwrap(); + let entry = entries.get(&key_3).unwrap(); assert_eq!(entry.value.meta.size, 2048); // Should be updated value } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 49c2969587a06..ef45b1a7aaa06 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -20,12 +20,15 @@ pub mod lru_queue; pub mod default_cache; +use datafusion_common::arrow::datatypes::{DataType, Schema}; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::instant::Instant; use datafusion_common::{HashMap, TableReference}; use object_store::path::Path; +use std::collections::hash_map::DefaultHasher; use std::fmt::{Debug, Display, Formatter}; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; use std::time::Duration; /// Base trait for cache implementations with common operations. @@ -165,3 +168,171 @@ impl Display for TableScopedPath { } } } + +/// A fingerprint of the `file_schema` used to compute a file's statistics. +/// +/// Captures exactly the attributes that determine the layout and meaning of +/// `Statistics::column_statistics`: each column's name, data type and +/// nullability, in order. It deliberately excludes field/schema metadata, which +/// cannot affect statistics — including it would needlessly fragment the cache. +#[derive(Clone, Debug)] +pub struct SchemaFingerprint { + columns: Vec<(String, DataType, bool)>, + /// Precomputed hash of `columns`, so hashing a key on every cache lookup is + /// O(1) rather than O(schema width). Computed once in `from_schema` with a + /// fixed-seed hasher so it is stable across keys; `PartialEq` still compares + /// `columns` exactly, so a hash collision can never make two different + /// schemas share a cache entry. + hash: u64, +} + +impl SchemaFingerprint { + /// Builds a fingerprint from the `file_schema` used to compute statistics + /// (the schema of the columns physically read, not the full table schema — + /// partition columns and their statistics are handled separately). + pub fn from_schema(file_schema: &Schema) -> Self { + let columns: Vec<(String, DataType, bool)> = file_schema + .fields() + .iter() + .map(|f| (f.name().clone(), f.data_type().clone(), f.is_nullable())) + .collect(); + let mut hasher = DefaultHasher::new(); + columns.hash(&mut hasher); + Self { + columns, + hash: hasher.finish(), + } + } +} + +impl PartialEq for SchemaFingerprint { + fn eq(&self, other: &Self) -> bool { + // Cheap hash gate first, then an exact comparison so collisions are safe. + self.hash == other.hash && self.columns == other.columns + } +} + +impl Eq for SchemaFingerprint {} + +impl Hash for SchemaFingerprint { + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} + +impl DFHeapSize for SchemaFingerprint { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.columns.heap_size(ctx) + } +} + +/// Cache key for the file-statistics cache. +/// +/// Like [`TableScopedPath`] it is scoped by table and path, but it additionally +/// carries a [`SchemaFingerprint`]. File statistics are computed against a +/// specific `file_schema`, so the same path read under different schemas must +/// not share an entry; the fingerprint keeps those entries distinct while a +/// repeated read of the same schema still reuses its entry. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct FileStatisticsCacheKey { + pub table: Option, + pub path: Path, + /// `Arc` so building a key per file is a cheap refcount bump rather than a + /// deep clone of the fingerprint. `Arc`'s `Eq`/`Hash` compare the inner value, + /// so keying remains by schema contents (not pointer identity). + pub schema: Arc, +} + +impl DFHeapSize for FileStatisticsCacheKey { + /// The schema fingerprint is shared by all file-statistics keys for a + /// ListingTable. Do not deep-count it per key, as that would charge the same + /// allocation once per file and overstate cache memory. + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) + } +} + +impl CacheKey for FileStatisticsCacheKey { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + self.table.as_ref() + } +} + +#[cfg(test)] +mod schema_fingerprint_tests { + use super::*; + use datafusion_common::arrow::datatypes::Field; + + fn fp(fields: Vec) -> SchemaFingerprint { + SchemaFingerprint::from_schema(&Schema::new(fields)) + } + + /// `from_schema` must capture nullability and field order — the two + /// attributes most easily dropped by a wrong implementation. + #[test] + fn fingerprint_captures_nullability_and_order() { + assert_ne!( + fp(vec![Field::new("id", DataType::Int64, false)]), + fp(vec![Field::new("id", DataType::Int64, true)]), + "nullability must affect the fingerprint", + ); + + let ab = fp(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, true), + ]); + let ba = fp(vec![ + Field::new("b", DataType::Utf8, true), + Field::new("a", DataType::Int64, false), + ]); + assert_ne!(ab, ba, "field order must affect the fingerprint"); + } + + /// Metadata must NOT affect the fingerprint: it cannot change column + /// statistics, so including it would needlessly fragment the cache. + #[test] + fn fingerprint_ignores_metadata() { + let plain = fp(vec![Field::new("id", DataType::Int64, false)]); + + let field_md = SchemaFingerprint::from_schema(&Schema::new(vec![ + Field::new("id", DataType::Int64, false) + .with_metadata([("note".to_string(), "x".to_string())].into()), + ])); + assert_eq!(plain, field_md, "field metadata must be ignored"); + + let schema_md = SchemaFingerprint::from_schema( + &Schema::new(vec![Field::new("id", DataType::Int64, false)]) + .with_metadata([("k".to_string(), "v".to_string())].into()), + ); + assert_eq!(plain, schema_md, "schema metadata must be ignored"); + } + + #[test] + fn file_statistics_cache_key_size_excludes_schema_fingerprint() { + let path = Path::from("file.parquet"); + let table = Some(TableReference::bare("t")); + let narrow_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let wide_schema = Schema::new( + (0..100) + .map(|i| Field::new(format!("col_{i}"), DataType::Utf8, true)) + .collect::>(), + ); + + let narrow_key = FileStatisticsCacheKey { + table: table.clone(), + path: path.clone(), + schema: Arc::new(SchemaFingerprint::from_schema(&narrow_schema)), + }; + let wide_key = FileStatisticsCacheKey { + table, + path, + schema: Arc::new(SchemaFingerprint::from_schema(&wide_schema)), + }; + + assert_eq!(narrow_key.size(), wide_key.size()); + } +} diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md index d0778a3619c4e..c1cc5c0e66478 100644 --- a/docs/source/library-user-guide/upgrading/55.0.0.md +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -431,3 +431,33 @@ type aliases: Implement the newly introduced types for your custom cache implementation. See [PR #22613](https://github.com/apache/datafusion/pull/22613) for details. + +### `FileStatisticsCache` is now keyed by `FileStatisticsCacheKey` + +The file-statistics cache is now keyed by a schema-aware `FileStatisticsCacheKey` +instead of `TableScopedPath`: + +```diff +- pub type FileStatisticsCache = dyn Cache; ++ pub type FileStatisticsCache = dyn Cache; +``` + +The new key carries a `SchemaFingerprint` of the `file_schema` alongside the +table and path, so the same file path read under different schemas no longer +shares a cache entry — statistics computed for one schema are never reused for a +different one. + +**Who is affected:** + +- Users with a custom `FileStatisticsCache` implementation. +- Users constructing keys to call `get` / `put` on the statistics cache directly. + +**Migration guide:** + +- For a custom implementation, change the implemented trait from + `Cache` to + `Cache`. +- To build a key, construct a `FileStatisticsCacheKey { table, path, schema }`, + where `schema` is `Arc::new(SchemaFingerprint::from_schema(file_schema))`. + +See [PR #23201](https://github.com/apache/datafusion/pull/23201) for details.