From f9c759606526b7767b2bb0ac80a69c27033e8952 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Fri, 26 Jun 2026 15:40:06 +0800 Subject: [PATCH 1/8] refactor: make file-statistics cache keys schema-aware File statistics are computed against a specific `file_schema`, but the file-statistics cache was keyed only by table and path. Reading the same path under a different schema could reuse statistics whose `column_statistics` no longer line up, panicking during statistics projection. #22950 worked around this by bypassing the cache entirely for anonymous explicit-schema reads, at the cost of losing cache reuse for them. Introduce a `SchemaFingerprint` (per-column name, type and nullability, derived from `file_schema`) and a `FileStatisticsCacheKey { table, path, schema }`, and key the file-statistics cache on it. Different schemas now get distinct entries (no stale cross-schema reuse) while a repeated read of the same schema reuses its entry, so the #22950 bypass is removed and anonymous explicit-schema reads cache safely again. - The fingerprint excludes field/schema metadata (cannot affect statistics) and partition columns (their statistics are computed separately). - Table-drop invalidation is unchanged: drop_table_entries matches on CacheKey::table_ref(), which still returns the table, so all schema variants for a table are removed together. - The list-files cache continues to key on TableScopedPath. Closes #23072. Signed-off-by: Jiawei Zhao --- datafusion/catalog-listing/src/table.rs | 36 ++---- .../core/tests/parquet/file_statistics.rs | 24 +++- .../execution/src/cache/cache_manager.rs | 8 +- datafusion/execution/src/cache/mod.rs | 122 ++++++++++++++++++ 4 files changed, 164 insertions(+), 26 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 36d85b981c06c..4625f28bad71b 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -268,21 +268,6 @@ impl ListingTable { self } - fn statistics_cache( - &self, - has_table_reference: bool, - ) -> Option<&Arc> { - let shared_cache = self.collected_statistics.as_ref()?; - if has_table_reference || self.schema_source == SchemaSource::Inferred { - Some(shared_cache) - } else { - // Anonymous specified-schema reads can use the same file path with - // different logical schemas. File statistics are schema-dependent, - // so avoid reusing stats computed for a different read schema. - None - } - } - /// Specify the SQL definition for this table, if any pub fn with_definition(mut self, definition: Option) -> Self { self.definition = definition; @@ -990,17 +975,24 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> datafusion_common::Result<(Arc, Option)> { - use datafusion_execution::cache::cache_manager::CachedFileMetadata; + use datafusion_execution::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCacheKey, SchemaFingerprint, + }; - let path = TableScopedPath { + let key = FileStatisticsCacheKey { table: part_file.table_reference.clone(), path: part_file.object_meta.location.clone(), + // Statistics are computed against `file_schema`, so key on it: reads + // of the same path under a different schema get their own entry + // rather than reusing incompatible column statistics. + schema: SchemaFingerprint::from_schema(&self.file_schema), }; let meta = &part_file.object_meta; - // Check cache first - if we have valid cached statistics and ordering - if let Some(cache) = self.statistics_cache(path.table.is_some()) - && let Some(cached) = cache.get(&path) + // Check cache first - the schema is part of the key, so a hit is already + // schema-compatible; `is_valid_for` only confirms the file is unchanged. + if let Some(cache) = &self.collected_statistics + && let Some(cached) = cache.get(&key) && cached.is_valid_for(meta) { // Return cached statistics and ordering @@ -1017,9 +1009,9 @@ impl ListingTable { let statistics = Arc::new(file_meta.statistics); // Store in cache - if let Some(cache) = self.statistics_cache(path.table.is_some()) { + if let Some(cache) = &self.collected_statistics { cache.put( - &path, + &key, CachedFileMetadata::new( meta.clone(), Arc::clone(&statistics), diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index b082271d67fd0..c04dad30ddde4 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -255,7 +255,29 @@ async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() { let stats = plan.statistics_with_args(&StatisticsArgs::new()).unwrap(); assert_eq!(stats.column_statistics.len(), 2); assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1)); - assert_eq!(cache.len(), 1); + + // #23072: the cache now keys on file_schema, so the wider read no longer + // bypasses the cache (as in #22950) — it lands in its own entry and + // coexists with the inferred one. Was `1` under the bypass. + assert_eq!(cache.len(), 2); + + // Repeat the wider read: same path + same file_schema -> reuse (no new + // entry) and a cache hit. Under #22950's bypass this read could never reuse. + ctx.read_parquet( + &parquet_path, + ParquetReadOptions::default().schema(&wider_schema), + ) + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + assert_eq!(cache.len(), 2); + let hits: usize = cache.list_entries().values().map(|e| e.hits).sum(); + assert_eq!( + hits, 1, + "expected a cache hit on the repeat read, got {hits}" + ); } #[tokio::test] diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index cecce81c63c62..dbc3e3c260c3d 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,7 +16,9 @@ // under the License. use crate::cache::default_cache::DefaultCache; -pub use crate::cache::{Cache, CacheValue, TableScopedPath}; +pub use crate::cache::{ + Cache, CacheValue, FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath, +}; use datafusion_common::HashMap; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::{Result, Statistics}; @@ -54,7 +56,7 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M /// 3. If invalid or missing, compute new value and call `put(path, new_value)` /// /// See [`crate::runtime_env::RuntimeEnv`] for more details -pub type FileStatisticsCache = dyn Cache; +pub type FileStatisticsCache = dyn Cache; /// A cache for storing the [`ObjectMeta`]s that result from listing a path. /// @@ -305,7 +307,7 @@ impl CacheManager { Some(Arc::clone(fsc)) } None if config.file_statistics_cache_limit > 0 => Some(Arc::new( - DefaultCache::::new( + DefaultCache::::new( config.file_statistics_cache_limit, ) .with_name("DefaultFileStatisticsCache"), diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 49c2969587a06..c278f24a01eb0 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -20,6 +20,7 @@ pub mod lru_queue; pub mod default_cache; +use datafusion_common::arrow::datatypes::{DataType, Schema}; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::instant::Instant; use datafusion_common::{HashMap, TableReference}; @@ -165,3 +166,124 @@ impl Display for TableScopedPath { } } } + +/// A fingerprint of the `file_schema` used to compute a file's statistics. +/// +/// Captures exactly the attributes that determine the layout and meaning of +/// `Statistics::column_statistics`: each column's name, data type and +/// nullability, in order. It deliberately excludes field/schema metadata, which +/// cannot affect statistics — including it would needlessly fragment the cache. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct SchemaFingerprint(Vec<(String, DataType, bool)>); + +impl SchemaFingerprint { + /// Builds a fingerprint from the `file_schema` used to compute statistics + /// (the schema of the columns physically read, not the full table schema — + /// partition columns and their statistics are handled separately). + pub fn from_schema(file_schema: &Schema) -> Self { + Self( + file_schema + .fields() + .iter() + .map(|f| (f.name().clone(), f.data_type().clone(), f.is_nullable())) + .collect(), + ) + } +} + +impl DFHeapSize for SchemaFingerprint { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + // `(String, DataType, bool)` has no `DFHeapSize` impl (only 2-tuples do), + // so account for each column by hand. `bool` carries no heap. + self.0.capacity() * size_of::<(String, DataType, bool)>() + + self + .0 + .iter() + .map(|(name, data_type, _)| { + name.heap_size(ctx) + data_type.heap_size(ctx) + }) + .sum::() + } +} + +/// Cache key for the file-statistics cache. +/// +/// Like [`TableScopedPath`] it is scoped by table and path, but it additionally +/// carries a [`SchemaFingerprint`]. File statistics are computed against a +/// specific `file_schema`, so the same path read under different schemas must +/// not share an entry; the fingerprint keeps those entries distinct while a +/// repeated read of the same schema still reuses its entry. +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct FileStatisticsCacheKey { + pub table: Option, + pub path: Path, + pub schema: SchemaFingerprint, +} + +impl DFHeapSize for FileStatisticsCacheKey { + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.path.as_ref().heap_size(ctx) + + self.table.heap_size(ctx) + + self.schema.heap_size(ctx) + } +} + +impl CacheKey for FileStatisticsCacheKey { + fn size(&self) -> usize { + DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) + } + + fn table_ref(&self) -> Option<&TableReference> { + self.table.as_ref() + } +} + +#[cfg(test)] +mod schema_fingerprint_tests { + use super::*; + use datafusion_common::arrow::datatypes::Field; + + fn fp(fields: Vec) -> SchemaFingerprint { + SchemaFingerprint::from_schema(&Schema::new(fields)) + } + + /// `from_schema` must capture nullability and field order — the two + /// attributes most easily dropped by a wrong implementation. + #[test] + fn fingerprint_captures_nullability_and_order() { + assert_ne!( + fp(vec![Field::new("id", DataType::Int64, false)]), + fp(vec![Field::new("id", DataType::Int64, true)]), + "nullability must affect the fingerprint", + ); + + let ab = fp(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, true), + ]); + let ba = fp(vec![ + Field::new("b", DataType::Utf8, true), + Field::new("a", DataType::Int64, false), + ]); + assert_ne!(ab, ba, "field order must affect the fingerprint"); + } + + /// Metadata must NOT affect the fingerprint: it cannot change column + /// statistics, so including it would needlessly fragment the cache. + #[test] + fn fingerprint_ignores_metadata() { + let plain = fp(vec![Field::new("id", DataType::Int64, false)]); + + let field_md = SchemaFingerprint::from_schema(&Schema::new(vec![ + Field::new("id", DataType::Int64, false) + .with_metadata([("note".to_string(), "x".to_string())].into()), + ])); + assert_eq!(plain, field_md, "field metadata must be ignored"); + + let schema_md = SchemaFingerprint::from_schema( + &Schema::new(vec![Field::new("id", DataType::Int64, false)]) + .with_metadata([("k".to_string(), "v".to_string())].into()), + ); + assert_eq!(plain, schema_md, "schema metadata must be ignored"); + } +} From eb3a831017afc29cde46ce8a4b4b4b3810d67053 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Fri, 26 Jun 2026 19:21:34 +0800 Subject: [PATCH 2/8] refactor: simplify SchemaFingerprint heap accounting via 3-tuple DFHeapSize Add a `DFHeapSize` impl for 3-tuples (mirroring the existing 2-tuple one) so `Vec<(String, DataType, bool)>` accounts for its heap automatically, letting `SchemaFingerprint::heap_size` delegate to it instead of computing the size by hand. Also update the `test_statistics_cache` unit test to key on `FileStatisticsCacheKey` so it matches the real file-statistics cache. Signed-off-by: Jiawei Zhao --- datafusion/common/src/heap_size.rs | 11 +++++++++ .../execution/src/cache/default_cache.rs | 23 +++++++++++-------- datafusion/execution/src/cache/mod.rs | 11 +-------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/datafusion/common/src/heap_size.rs b/datafusion/common/src/heap_size.rs index 869946d82414f..405736dbf9c9b 100644 --- a/datafusion/common/src/heap_size.rs +++ b/datafusion/common/src/heap_size.rs @@ -348,6 +348,17 @@ where } } +impl DFHeapSize for (A, B, C) +where + A: DFHeapSize, + B: DFHeapSize, + C: DFHeapSize, +{ + fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { + self.0.heap_size(ctx) + self.1.heap_size(ctx) + self.2.heap_size(ctx) + } +} + impl DFHeapSize for String { fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize { self.capacity() diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index a1d89619eb256..09c8ab10d536d 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -299,7 +299,6 @@ impl Cache for DefaultCache { mod tests { use std::sync::Arc; - use crate::cache::TableScopedPath; use crate::cache::cache_manager::{ CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes, }; @@ -311,6 +310,7 @@ mod tests { use crate::cache::default_cache::TimeProvider; use crate::cache::{Cache, CacheEntryInfo}; use crate::cache::{CacheKey, CacheValue}; + use crate::cache::{FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -827,13 +827,14 @@ mod tests { false, )]); - let path = TableScopedPath { + let key_1 = FileStatisticsCacheKey { path: meta.location.clone(), table: None, + schema: SchemaFingerprint::from_schema(&schema), }; // Cache miss - assert!(cache.get(&path).is_none()); + assert!(cache.get(&key_1).is_none()); // Put a value let cached_value = CachedFileMetadata::new( @@ -841,10 +842,10 @@ mod tests { Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&path, cached_value); + cache.put(&key_1, cached_value); // Cache hit - let result = cache.get(&path); + let result = cache.get(&key_1); assert!(result.is_some()); let cached = result.unwrap(); @@ -853,12 +854,13 @@ mod tests { // File size changed - validation should fail let meta2 = create_test_meta("test", 2048); - let path_2 = TableScopedPath { + let key_2 = FileStatisticsCacheKey { path: meta2.location.clone(), table: None, + schema: SchemaFingerprint::from_schema(&schema), }; - let cached = cache.get(&path_2).unwrap(); + let cached = cache.get(&key_2).unwrap(); assert!(!cached.is_valid_for(&meta2)); // Update with new value @@ -867,18 +869,19 @@ mod tests { Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&path_2, cached_value2); + cache.put(&key_2, cached_value2); // Test list_entries let entries = cache.list_entries(); assert_eq!(entries.len(), 1); - let path_3 = TableScopedPath { + let key_3 = FileStatisticsCacheKey { path: Path::from("test"), table: None, + schema: SchemaFingerprint::from_schema(&schema), }; - let entry = entries.get(&path_3).unwrap(); + let entry = entries.get(&key_3).unwrap(); assert_eq!(entry.value.meta.size, 2048); // Should be updated value } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index c278f24a01eb0..d7ba64accf6fb 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -193,16 +193,7 @@ impl SchemaFingerprint { impl DFHeapSize for SchemaFingerprint { fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - // `(String, DataType, bool)` has no `DFHeapSize` impl (only 2-tuples do), - // so account for each column by hand. `bool` carries no heap. - self.0.capacity() * size_of::<(String, DataType, bool)>() - + self - .0 - .iter() - .map(|(name, data_type, _)| { - name.heap_size(ctx) + data_type.heap_size(ctx) - }) - .sum::() + self.0.heap_size(ctx) } } From d652d1e1427673062826bc428a039d7db699a4d1 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Fri, 26 Jun 2026 23:22:14 +0800 Subject: [PATCH 3/8] perf: compute the file-schema fingerprint once per table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `do_collect_statistics_and_ordering` rebuilt the `SchemaFingerprint` for every file, deep-cloning all column names and types — O(files x schema width) of redundant work, since `file_schema` is constant for a table. Compute the fingerprint once in `ListingTable::try_new` and store it as `Arc`; `FileStatisticsCacheKey.schema` now holds the `Arc`, so building a key per file is an O(1) refcount bump instead of a deep clone. `Arc`'s `Eq`/`Hash` compare the inner value, so cache keying remains by schema contents. Signed-off-by: Jiawei Zhao --- datafusion/catalog-listing/src/table.rs | 25 ++++++++++++------- .../execution/src/cache/default_cache.rs | 6 ++--- datafusion/execution/src/cache/mod.rs | 8 ++++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 4625f28bad71b..433d5414ebe8b 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -37,7 +37,10 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; -use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath}; +use datafusion_execution::cache::cache_manager::{ + CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheKey, SchemaFingerprint, + TableScopedPath, +}; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{ @@ -197,6 +200,9 @@ pub struct ListingTable { column_defaults: HashMap, /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters expr_adapter_factory: Option>, + /// Precomputed fingerprint of `file_schema` for the file-statistics cache + /// key. Constant for the table, so computed once here instead of per file. + file_schema_fingerprint: Arc, } impl ListingTable { @@ -227,6 +233,9 @@ impl ListingTable { .with_metadata(file_schema.metadata().clone()), ); + let file_schema_fingerprint = + Arc::new(SchemaFingerprint::from_schema(&file_schema)); + let table = Self { table_paths: config.table_paths, file_schema, @@ -238,6 +247,7 @@ impl ListingTable { constraints: Constraints::default(), column_defaults: HashMap::new(), expr_adapter_factory: config.expr_adapter_factory, + file_schema_fingerprint, }; Ok(table) @@ -975,17 +985,14 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> datafusion_common::Result<(Arc, Option)> { - use datafusion_execution::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCacheKey, SchemaFingerprint, - }; - let key = FileStatisticsCacheKey { table: part_file.table_reference.clone(), path: part_file.object_meta.location.clone(), - // Statistics are computed against `file_schema`, so key on it: reads - // of the same path under a different schema get their own entry - // rather than reusing incompatible column statistics. - schema: SchemaFingerprint::from_schema(&self.file_schema), + // Statistics are computed against `file_schema`, so key on its + // fingerprint: reads of the same path under a different schema get + // their own entry rather than reusing incompatible column statistics. + // The fingerprint is precomputed once per table (see `try_new`). + schema: Arc::clone(&self.file_schema_fingerprint), }; let meta = &part_file.object_meta; diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 09c8ab10d536d..99c8156fffd7d 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -830,7 +830,7 @@ mod tests { let key_1 = FileStatisticsCacheKey { path: meta.location.clone(), table: None, - schema: SchemaFingerprint::from_schema(&schema), + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; // Cache miss @@ -857,7 +857,7 @@ mod tests { let key_2 = FileStatisticsCacheKey { path: meta2.location.clone(), table: None, - schema: SchemaFingerprint::from_schema(&schema), + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; let cached = cache.get(&key_2).unwrap(); @@ -878,7 +878,7 @@ mod tests { let key_3 = FileStatisticsCacheKey { path: Path::from("test"), table: None, - schema: SchemaFingerprint::from_schema(&schema), + schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; let entry = entries.get(&key_3).unwrap(); diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index d7ba64accf6fb..4f22ffb62aa02 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -27,6 +27,7 @@ use datafusion_common::{HashMap, TableReference}; use object_store::path::Path; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; +use std::sync::Arc; use std::time::Duration; /// Base trait for cache implementations with common operations. @@ -208,14 +209,17 @@ impl DFHeapSize for SchemaFingerprint { pub struct FileStatisticsCacheKey { pub table: Option, pub path: Path, - pub schema: SchemaFingerprint, + // `Arc` so building a key per file is a cheap refcount bump rather than a + // deep clone of the fingerprint. `Arc`'s `Eq`/`Hash` compare the inner value, + // so keying remains by schema contents (not pointer identity). + pub schema: Arc, } impl DFHeapSize for FileStatisticsCacheKey { fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) - + self.schema.heap_size(ctx) + + self.schema.as_ref().heap_size(ctx) } } From a767586f51afc19e2b83e021c0a86ff7b21a5b10 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Sat, 27 Jun 2026 21:17:45 +0800 Subject: [PATCH 4/8] perf: precompute SchemaFingerprint hash for O(1) key hashing Hashing a FileStatisticsCacheKey on every cache lookup previously digested the entire file schema (O(schema width)). Store a fixed-seed hash of the fingerprint columns, computed once in from_schema, and feed only that u64 into the map hasher. PartialEq still compares the columns exactly, so a hash collision can never make two different schemas share a cache entry. Signed-off-by: Jiawei Zhao --- datafusion/execution/src/cache/mod.rs | 50 +++++++++++++++++++++------ 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 4f22ffb62aa02..55a67f9f1d5f4 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -25,8 +25,9 @@ use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::instant::Instant; use datafusion_common::{HashMap, TableReference}; use object_store::path::Path; +use std::collections::hash_map::DefaultHasher; use std::fmt::{Debug, Display, Formatter}; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::Duration; @@ -174,27 +175,54 @@ impl Display for TableScopedPath { /// `Statistics::column_statistics`: each column's name, data type and /// nullability, in order. It deliberately excludes field/schema metadata, which /// cannot affect statistics — including it would needlessly fragment the cache. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub struct SchemaFingerprint(Vec<(String, DataType, bool)>); +#[derive(Clone, Debug)] +pub struct SchemaFingerprint { + columns: Vec<(String, DataType, bool)>, + /// Precomputed hash of `columns`, so hashing a key on every cache lookup is + /// O(1) rather than O(schema width). Computed once in `from_schema` with a + /// fixed-seed hasher so it is stable across keys; `PartialEq` still compares + /// `columns` exactly, so a hash collision can never make two different + /// schemas share a cache entry. + hash: u64, +} impl SchemaFingerprint { /// Builds a fingerprint from the `file_schema` used to compute statistics /// (the schema of the columns physically read, not the full table schema — /// partition columns and their statistics are handled separately). pub fn from_schema(file_schema: &Schema) -> Self { - Self( - file_schema - .fields() - .iter() - .map(|f| (f.name().clone(), f.data_type().clone(), f.is_nullable())) - .collect(), - ) + let columns: Vec<(String, DataType, bool)> = file_schema + .fields() + .iter() + .map(|f| (f.name().clone(), f.data_type().clone(), f.is_nullable())) + .collect(); + let mut hasher = DefaultHasher::new(); + columns.hash(&mut hasher); + Self { + columns, + hash: hasher.finish(), + } + } +} + +impl PartialEq for SchemaFingerprint { + fn eq(&self, other: &Self) -> bool { + // Cheap hash gate first, then an exact comparison so collisions are safe. + self.hash == other.hash && self.columns == other.columns + } +} + +impl Eq for SchemaFingerprint {} + +impl Hash for SchemaFingerprint { + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); } } impl DFHeapSize for SchemaFingerprint { fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.0.heap_size(ctx) + self.columns.heap_size(ctx) } } From d4b860c668020799fc957c4da1311598f9abfd8a Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Sat, 27 Jun 2026 23:20:16 +0800 Subject: [PATCH 5/8] doc: document schema-aware FileStatisticsCacheKey in 55.0.0 upgrade guide The file-statistics cache key changed from TableScopedPath to a schema-aware FileStatisticsCacheKey. Add an upgrade-guide entry covering the type-alias change and how to migrate custom cache implementations and direct get/put callers. Signed-off-by: Jiawei Zhao --- .../library-user-guide/upgrading/55.0.0.md | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md index d0778a3619c4e..c1cc5c0e66478 100644 --- a/docs/source/library-user-guide/upgrading/55.0.0.md +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -431,3 +431,33 @@ type aliases: Implement the newly introduced types for your custom cache implementation. See [PR #22613](https://github.com/apache/datafusion/pull/22613) for details. + +### `FileStatisticsCache` is now keyed by `FileStatisticsCacheKey` + +The file-statistics cache is now keyed by a schema-aware `FileStatisticsCacheKey` +instead of `TableScopedPath`: + +```diff +- pub type FileStatisticsCache = dyn Cache; ++ pub type FileStatisticsCache = dyn Cache; +``` + +The new key carries a `SchemaFingerprint` of the `file_schema` alongside the +table and path, so the same file path read under different schemas no longer +shares a cache entry — statistics computed for one schema are never reused for a +different one. + +**Who is affected:** + +- Users with a custom `FileStatisticsCache` implementation. +- Users constructing keys to call `get` / `put` on the statistics cache directly. + +**Migration guide:** + +- For a custom implementation, change the implemented trait from + `Cache` to + `Cache`. +- To build a key, construct a `FileStatisticsCacheKey { table, path, schema }`, + where `schema` is `Arc::new(SchemaFingerprint::from_schema(file_schema))`. + +See [PR #23201](https://github.com/apache/datafusion/pull/23201) for details. From aeb432c398824627c8fb33232f73b31517e2aece Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Mon, 29 Jun 2026 21:12:12 +0800 Subject: [PATCH 6/8] fix: exclude schema from cache key size FileStatisticsCacheKey stores a shared Arc to SchemaFingerprint. Do not charge the fingerprint heap allocation to each file key, because that overstates memory use for wide schemas and causes early eviction. Add a regression test to keep schema width out of per-key sizing. Signed-off-by: Jiawei Zhao --- datafusion/execution/src/cache/mod.rs | 38 ++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 55a67f9f1d5f4..ef45b1a7aaa06 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -237,17 +237,18 @@ impl DFHeapSize for SchemaFingerprint { pub struct FileStatisticsCacheKey { pub table: Option, pub path: Path, - // `Arc` so building a key per file is a cheap refcount bump rather than a - // deep clone of the fingerprint. `Arc`'s `Eq`/`Hash` compare the inner value, - // so keying remains by schema contents (not pointer identity). + /// `Arc` so building a key per file is a cheap refcount bump rather than a + /// deep clone of the fingerprint. `Arc`'s `Eq`/`Hash` compare the inner value, + /// so keying remains by schema contents (not pointer identity). pub schema: Arc, } impl DFHeapSize for FileStatisticsCacheKey { + /// The schema fingerprint is shared by all file-statistics keys for a + /// ListingTable. Do not deep-count it per key, as that would charge the same + /// allocation once per file and overstate cache memory. fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.path.as_ref().heap_size(ctx) - + self.table.heap_size(ctx) - + self.schema.as_ref().heap_size(ctx) + self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) } } @@ -309,4 +310,29 @@ mod schema_fingerprint_tests { ); assert_eq!(plain, schema_md, "schema metadata must be ignored"); } + + #[test] + fn file_statistics_cache_key_size_excludes_schema_fingerprint() { + let path = Path::from("file.parquet"); + let table = Some(TableReference::bare("t")); + let narrow_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let wide_schema = Schema::new( + (0..100) + .map(|i| Field::new(format!("col_{i}"), DataType::Utf8, true)) + .collect::>(), + ); + + let narrow_key = FileStatisticsCacheKey { + table: table.clone(), + path: path.clone(), + schema: Arc::new(SchemaFingerprint::from_schema(&narrow_schema)), + }; + let wide_key = FileStatisticsCacheKey { + table, + path, + schema: Arc::new(SchemaFingerprint::from_schema(&wide_schema)), + }; + + assert_eq!(narrow_key.size(), wide_key.size()); + } } From 985b8f3747deecd29067fa01ee92406cc73ca296 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Tue, 30 Jun 2026 21:21:04 +0800 Subject: [PATCH 7/8] refactor: validate file stats schema in cache value Keep the file-statistics cache keyed by table and path while storing the file-schema fingerprint in the cached metadata value. This preserves schema correctness without changing the public cache key type. Signed-off-by: Jiawei Zhao --- datafusion/catalog-listing/src/table.rs | 27 +++---- .../core/tests/parquet/file_statistics.rs | 10 +-- .../execution/src/cache/cache_manager.rs | 28 ++++--- .../execution/src/cache/default_cache.rs | 77 ++++++++++++------- datafusion/execution/src/cache/mod.rs | 62 --------------- .../library-user-guide/upgrading/55.0.0.md | 29 +++---- 6 files changed, 95 insertions(+), 138 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 433d5414ebe8b..06bef17ed5ae6 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -38,8 +38,7 @@ use datafusion_datasource::{ ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; use datafusion_execution::cache::cache_manager::{ - CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheKey, SchemaFingerprint, - TableScopedPath, + CachedFileMetadata, FileStatisticsCache, SchemaFingerprint, TableScopedPath, }; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; @@ -200,8 +199,9 @@ pub struct ListingTable { column_defaults: HashMap, /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters expr_adapter_factory: Option>, - /// Precomputed fingerprint of `file_schema` for the file-statistics cache - /// key. Constant for the table, so computed once here instead of per file. + /// Precomputed fingerprint of `file_schema` for file-statistics cache + /// validation. Constant for the table, so computed once here instead of per + /// file. file_schema_fingerprint: Arc, } @@ -985,22 +985,18 @@ impl ListingTable { store: &Arc, part_file: &PartitionedFile, ) -> datafusion_common::Result<(Arc, Option)> { - let key = FileStatisticsCacheKey { + let path = TableScopedPath { table: part_file.table_reference.clone(), path: part_file.object_meta.location.clone(), - // Statistics are computed against `file_schema`, so key on its - // fingerprint: reads of the same path under a different schema get - // their own entry rather than reusing incompatible column statistics. - // The fingerprint is precomputed once per table (see `try_new`). - schema: Arc::clone(&self.file_schema_fingerprint), }; let meta = &part_file.object_meta; - // Check cache first - the schema is part of the key, so a hit is already - // schema-compatible; `is_valid_for` only confirms the file is unchanged. + // Check cache first. The key stays `{table, path}` for cheap lookups; + // the cached value carries the schema fingerprint to prevent reusing + // stats computed under a different file schema. if let Some(cache) = &self.collected_statistics - && let Some(cached) = cache.get(&key) - && cached.is_valid_for(meta) + && let Some(cached) = cache.get(&path) + && cached.is_valid_for(meta, self.file_schema_fingerprint.as_ref()) { // Return cached statistics and ordering return Ok((Arc::clone(&cached.statistics), cached.ordering.clone())); @@ -1018,9 +1014,10 @@ impl ListingTable { // Store in cache if let Some(cache) = &self.collected_statistics { cache.put( - &key, + &path, CachedFileMetadata::new( meta.clone(), + Arc::clone(&self.file_schema_fingerprint), Arc::clone(&statistics), file_meta.ordering.clone(), ), diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index c04dad30ddde4..e0eed40283520 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -256,10 +256,10 @@ async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() { assert_eq!(stats.column_statistics.len(), 2); assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1)); - // #23072: the cache now keys on file_schema, so the wider read no longer - // bypasses the cache (as in #22950) — it lands in its own entry and - // coexists with the inferred one. Was `1` under the bypass. - assert_eq!(cache.len(), 2); + // #23072: the cache now validates file_schema, so the wider read no + // longer bypasses the cache (as in #22950), but it overwrites the existing + // `{table, path}` entry instead of adding a schema-specific key. + assert_eq!(cache.len(), 1); // Repeat the wider read: same path + same file_schema -> reuse (no new // entry) and a cache hit. Under #22950's bypass this read could never reuse. @@ -272,7 +272,7 @@ async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() { .create_physical_plan() .await .unwrap(); - assert_eq!(cache.len(), 2); + assert_eq!(cache.len(), 1); let hits: usize = cache.list_entries().values().map(|e| e.hits).sum(); assert_eq!( hits, 1, diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index dbc3e3c260c3d..19b28dd5652d3 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -16,9 +16,7 @@ // under the License. use crate::cache::default_cache::DefaultCache; -pub use crate::cache::{ - Cache, CacheValue, FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath, -}; +pub use crate::cache::{Cache, CacheValue, SchemaFingerprint, TableScopedPath}; use datafusion_common::HashMap; use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx}; use datafusion_common::{Result, Statistics}; @@ -52,11 +50,12 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M /// /// The typical usage pattern is: /// 1. Call `get(path)` to check for cached value -/// 2. If `Some(cached)`, validate with `cached.is_valid_for(¤t_meta)` +/// 2. If `Some(cached)`, validate with +/// `cached.is_valid_for(¤t_meta, ¤t_schema_fingerprint)` /// 3. If invalid or missing, compute new value and call `put(path, new_value)` /// /// See [`crate::runtime_env::RuntimeEnv`] for more details -pub type FileStatisticsCache = dyn Cache; +pub type FileStatisticsCache = dyn Cache; /// A cache for storing the [`ObjectMeta`]s that result from listing a path. /// @@ -93,11 +92,13 @@ pub type FileMetadataCache = dyn Cache; /// Cached metadata for a file, including statistics and ordering. /// /// This struct embeds the [`ObjectMeta`] used for cache validation, -/// along with the cached statistics and ordering information. +/// the `file_schema` fingerprint, cached statistics, and ordering information. #[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedFileMetadata { /// File metadata used for cache validation (size, last_modified). pub meta: ObjectMeta, + /// Fingerprint of the `file_schema` used to compute `statistics`. + pub schema_fingerprint: Arc, /// Cached statistics for the file, if available. pub statistics: Arc, /// Cached ordering for the file. @@ -108,11 +109,13 @@ impl CachedFileMetadata { /// Create a new cached file metadata entry. pub fn new( meta: ObjectMeta, + schema_fingerprint: Arc, statistics: Arc, ordering: Option, ) -> Self { Self { meta, + schema_fingerprint, statistics, ordering, } @@ -120,10 +123,15 @@ impl CachedFileMetadata { /// Check if this cached entry is still valid for the given metadata. /// - /// Returns true if the file size and last modified time match. - pub fn is_valid_for(&self, current_meta: &ObjectMeta) -> bool { + /// Returns true if the file size, last modified time, and schema match. + pub fn is_valid_for( + &self, + current_meta: &ObjectMeta, + current_schema_fingerprint: &SchemaFingerprint, + ) -> bool { self.meta.size == current_meta.size && self.meta.last_modified == current_meta.last_modified + && self.schema_fingerprint.as_ref() == current_schema_fingerprint } } @@ -141,6 +149,8 @@ impl DFHeapSize for CachedFileMetadata { + self.meta.e_tag.heap_size(ctx) + self.meta.location.as_ref().heap_size(ctx) + self.statistics.heap_size(ctx) + // Do not deep-count `schema_fingerprint`: each ListingTable shares one + // fingerprint across all cached files. //TODO add ordering once LexOrdering/PhysicalExpr implements DFHeapSize } } @@ -307,7 +317,7 @@ impl CacheManager { Some(Arc::clone(fsc)) } None if config.file_statistics_cache_limit > 0 => Some(Arc::new( - DefaultCache::::new( + DefaultCache::::new( config.file_statistics_cache_limit, ) .with_name("DefaultFileStatisticsCache"), diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index 99c8156fffd7d..ae9de9a3920db 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -310,7 +310,7 @@ mod tests { use crate::cache::default_cache::TimeProvider; use crate::cache::{Cache, CacheEntryInfo}; use crate::cache::{CacheKey, CacheValue}; - use crate::cache::{FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath}; + use crate::cache::{SchemaFingerprint, TableScopedPath}; use arrow::array::{Int32Array, ListArray, RecordBatch}; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -827,61 +827,65 @@ mod tests { false, )]); - let key_1 = FileStatisticsCacheKey { + let path = TableScopedPath { path: meta.location.clone(), table: None, - schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); // Cache miss - assert!(cache.get(&key_1).is_none()); + assert!(cache.get(&path).is_none()); // Put a value let cached_value = CachedFileMetadata::new( meta.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&key_1, cached_value); + cache.put(&path, cached_value); // Cache hit - let result = cache.get(&key_1); + let result = cache.get(&path); assert!(result.is_some()); let cached = result.unwrap(); - assert!(cached.is_valid_for(&meta)); + assert!(cached.is_valid_for(&meta, schema_fingerprint.as_ref())); + + let different_schema = Schema::new(vec![Field::new( + "different_column", + DataType::Timestamp(TimeUnit::Second, None), + false, + )]); + let different_schema_fingerprint = + SchemaFingerprint::from_schema(&different_schema); + assert!(!cached.is_valid_for(&meta, &different_schema_fingerprint)); // File size changed - validation should fail let meta2 = create_test_meta("test", 2048); - let key_2 = FileStatisticsCacheKey { - path: meta2.location.clone(), - table: None, - schema: Arc::new(SchemaFingerprint::from_schema(&schema)), - }; - - let cached = cache.get(&key_2).unwrap(); - assert!(!cached.is_valid_for(&meta2)); + let cached = cache.get(&path).unwrap(); + assert!(!cached.is_valid_for(&meta2, schema_fingerprint.as_ref())); // Update with new value let cached_value2 = CachedFileMetadata::new( meta2.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, ); - cache.put(&key_2, cached_value2); + cache.put(&path, cached_value2); // Test list_entries let entries = cache.list_entries(); assert_eq!(entries.len(), 1); - let key_3 = FileStatisticsCacheKey { + let path_3 = TableScopedPath { path: Path::from("test"), table: None, - schema: Arc::new(SchemaFingerprint::from_schema(&schema)), }; - let entry = entries.get(&key_3).unwrap(); + let entry = entries.get(&path_3).unwrap(); assert_eq!(entry.value.meta.size, 2048); // Should be updated value } @@ -941,10 +945,12 @@ mod tests { let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); // Cache statistics with no ordering let cached_value = CachedFileMetadata::new( meta.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, // No ordering yet ); @@ -961,7 +967,9 @@ mod tests { // Update to add ordering let mut cached = cache.get(&path).unwrap(); - if cached.is_valid_for(&meta) && cached.ordering.is_none() { + if cached.is_valid_for(&meta, schema_fingerprint.as_ref()) + && cached.ordering.is_none() + { cached.ordering = Some(ordering()); } cache.put(&path, cached); @@ -983,12 +991,14 @@ mod tests { table: None, }; let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); let meta_v1 = create_test_meta("test.parquet", 100); // Cache initial value let cached_value = CachedFileMetadata::new( meta_v1.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, ); @@ -999,11 +1009,12 @@ mod tests { let cached = cache.get(&path).unwrap(); // Should not be valid for new meta - assert!(!cached.is_valid_for(&meta_v2)); + assert!(!cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); // Compute new value and update let new_cached = CachedFileMetadata::new( meta_v2.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, ); @@ -1022,6 +1033,7 @@ mod tests { table: None, }; let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); // Cache with original metadata and ordering let meta_v1 = ObjectMeta { @@ -1036,6 +1048,7 @@ mod tests { let ordering_v1 = ordering(); let cached_v1 = CachedFileMetadata::new( meta_v1.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), Some(ordering_v1), ); @@ -1043,7 +1056,7 @@ mod tests { // Verify cached ordering is valid let cached = cache.get(&path).unwrap(); - assert!(cached.is_valid_for(&meta_v1)); + assert!(cached.is_valid_for(&meta_v1, schema_fingerprint.as_ref())); assert!(cached.ordering.is_some()); // File modified (size changed) @@ -1059,12 +1072,13 @@ mod tests { // Cache entry exists but should be invalid for new metadata let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v2)); + assert!(!cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); // Cache new version with different ordering let ordering_v2 = ordering(); // New ordering instance let cached_v2 = CachedFileMetadata::new( meta_v2.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), Some(ordering_v2), ); @@ -1072,10 +1086,10 @@ mod tests { // Old metadata should be invalid let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v1)); + assert!(!cached.is_valid_for(&meta_v1, schema_fingerprint.as_ref())); // New metadata should be valid - assert!(cached.is_valid_for(&meta_v2)); + assert!(cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); assert!(cached.ordering.is_some()); } @@ -1083,11 +1097,13 @@ mod tests { fn test_list_entries() { let cache = DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT); let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); let meta1 = create_test_meta("test1.parquet", 100); let cached_value_1 = CachedFileMetadata::new( meta1.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), None, ); @@ -1101,6 +1117,7 @@ mod tests { let meta2 = create_test_meta("test2.parquet", 200); let cached_value_2 = CachedFileMetadata::new( meta2.clone(), + Arc::clone(&schema_fingerprint), Arc::new(Statistics::new_unknown(&schema)), Some(ordering()), ); @@ -1276,8 +1293,14 @@ mod tests { }; let mut ctx = DFHeapSizeCtx::default(); let object_meta = create_test_meta(file_name, stats.heap_size(&mut ctx) as u64); - let value = - CachedFileMetadata::new(object_meta.clone(), Arc::new(stats.clone()), None); + let schema = Schema::new(vec![Field::new("list", DataType::Int32, true)]); + let schema_fingerprint = Arc::new(SchemaFingerprint::from_schema(&schema)); + let value = CachedFileMetadata::new( + object_meta.clone(), + schema_fingerprint, + Arc::new(stats.clone()), + None, + ); (object_meta, value) } diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index ef45b1a7aaa06..f47a3f3ca49f3 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -28,7 +28,6 @@ use object_store::path::Path; use std::collections::hash_map::DefaultHasher; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; -use std::sync::Arc; use std::time::Duration; /// Base trait for cache implementations with common operations. @@ -226,42 +225,6 @@ impl DFHeapSize for SchemaFingerprint { } } -/// Cache key for the file-statistics cache. -/// -/// Like [`TableScopedPath`] it is scoped by table and path, but it additionally -/// carries a [`SchemaFingerprint`]. File statistics are computed against a -/// specific `file_schema`, so the same path read under different schemas must -/// not share an entry; the fingerprint keeps those entries distinct while a -/// repeated read of the same schema still reuses its entry. -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub struct FileStatisticsCacheKey { - pub table: Option, - pub path: Path, - /// `Arc` so building a key per file is a cheap refcount bump rather than a - /// deep clone of the fingerprint. `Arc`'s `Eq`/`Hash` compare the inner value, - /// so keying remains by schema contents (not pointer identity). - pub schema: Arc, -} - -impl DFHeapSize for FileStatisticsCacheKey { - /// The schema fingerprint is shared by all file-statistics keys for a - /// ListingTable. Do not deep-count it per key, as that would charge the same - /// allocation once per file and overstate cache memory. - fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize { - self.path.as_ref().heap_size(ctx) + self.table.heap_size(ctx) - } -} - -impl CacheKey for FileStatisticsCacheKey { - fn size(&self) -> usize { - DFHeapSize::heap_size(self, &mut DFHeapSizeCtx::default()) - } - - fn table_ref(&self) -> Option<&TableReference> { - self.table.as_ref() - } -} - #[cfg(test)] mod schema_fingerprint_tests { use super::*; @@ -310,29 +273,4 @@ mod schema_fingerprint_tests { ); assert_eq!(plain, schema_md, "schema metadata must be ignored"); } - - #[test] - fn file_statistics_cache_key_size_excludes_schema_fingerprint() { - let path = Path::from("file.parquet"); - let table = Some(TableReference::bare("t")); - let narrow_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - let wide_schema = Schema::new( - (0..100) - .map(|i| Field::new(format!("col_{i}"), DataType::Utf8, true)) - .collect::>(), - ); - - let narrow_key = FileStatisticsCacheKey { - table: table.clone(), - path: path.clone(), - schema: Arc::new(SchemaFingerprint::from_schema(&narrow_schema)), - }; - let wide_key = FileStatisticsCacheKey { - table, - path, - schema: Arc::new(SchemaFingerprint::from_schema(&wide_schema)), - }; - - assert_eq!(narrow_key.size(), wide_key.size()); - } } diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md index c1cc5c0e66478..2ae0bdd08e5bd 100644 --- a/docs/source/library-user-guide/upgrading/55.0.0.md +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -432,32 +432,21 @@ Implement the newly introduced types for your custom cache implementation. See [PR #22613](https://github.com/apache/datafusion/pull/22613) for details. -### `FileStatisticsCache` is now keyed by `FileStatisticsCacheKey` +### `CachedFileMetadata` now validates file schema -The file-statistics cache is now keyed by a schema-aware `FileStatisticsCacheKey` -instead of `TableScopedPath`: - -```diff -- pub type FileStatisticsCache = dyn Cache; -+ pub type FileStatisticsCache = dyn Cache; -``` - -The new key carries a `SchemaFingerprint` of the `file_schema` alongside the -table and path, so the same file path read under different schemas no longer -shares a cache entry — statistics computed for one schema are never reused for a -different one. +The file-statistics cache remains keyed by `TableScopedPath`, but +`CachedFileMetadata` now stores a `SchemaFingerprint` of the `file_schema` used +to compute the cached statistics. Cache hits are valid only when both the file +metadata and schema fingerprint match. **Who is affected:** -- Users with a custom `FileStatisticsCache` implementation. -- Users constructing keys to call `get` / `put` on the statistics cache directly. +- Users constructing `CachedFileMetadata` values directly. **Migration guide:** -- For a custom implementation, change the implemented trait from - `Cache` to - `Cache`. -- To build a key, construct a `FileStatisticsCacheKey { table, path, schema }`, - where `schema` is `Arc::new(SchemaFingerprint::from_schema(file_schema))`. +- Pass `Arc::new(SchemaFingerprint::from_schema(file_schema))` to + `CachedFileMetadata::new`. +- Pass the current schema fingerprint to `CachedFileMetadata::is_valid_for`. See [PR #23201](https://github.com/apache/datafusion/pull/23201) for details. From ec13f59d65b439c74b16b224defd30a4de975ae3 Mon Sep 17 00:00:00 2001 From: Jiawei Zhao Date: Fri, 3 Jul 2026 12:29:29 +0800 Subject: [PATCH 8/8] perf(cache): fast-path schema fingerprint validation Use Arc pointer identity to avoid deep schema comparison on common file statistics cache hits. Keep exact equality as a fallback for equivalent fingerprints built from different Arc allocations. Signed-off-by: Jiawei Zhao --- datafusion/catalog-listing/src/table.rs | 2 +- .../execution/src/cache/cache_manager.rs | 6 ++-- .../execution/src/cache/default_cache.rs | 28 +++++++++++-------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 06bef17ed5ae6..9d8b77cfcc4e7 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -996,7 +996,7 @@ impl ListingTable { // stats computed under a different file schema. if let Some(cache) = &self.collected_statistics && let Some(cached) = cache.get(&path) - && cached.is_valid_for(meta, self.file_schema_fingerprint.as_ref()) + && cached.is_valid_for(meta, &self.file_schema_fingerprint) { // Return cached statistics and ordering return Ok((Arc::clone(&cached.statistics), cached.ordering.clone())); diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 19b28dd5652d3..83dcf70975e2b 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -127,11 +127,13 @@ impl CachedFileMetadata { pub fn is_valid_for( &self, current_meta: &ObjectMeta, - current_schema_fingerprint: &SchemaFingerprint, + current_schema_fingerprint: &Arc, ) -> bool { self.meta.size == current_meta.size && self.meta.last_modified == current_meta.last_modified - && self.schema_fingerprint.as_ref() == current_schema_fingerprint + && (Arc::ptr_eq(&self.schema_fingerprint, current_schema_fingerprint) + || self.schema_fingerprint.as_ref() + == current_schema_fingerprint.as_ref()) } } diff --git a/datafusion/execution/src/cache/default_cache.rs b/datafusion/execution/src/cache/default_cache.rs index ae9de9a3920db..bfe326f3a47e1 100644 --- a/datafusion/execution/src/cache/default_cache.rs +++ b/datafusion/execution/src/cache/default_cache.rs @@ -850,7 +850,15 @@ mod tests { assert!(result.is_some()); let cached = result.unwrap(); - assert!(cached.is_valid_for(&meta, schema_fingerprint.as_ref())); + assert!(cached.is_valid_for(&meta, &schema_fingerprint)); + + let equivalent_schema_fingerprint = + Arc::new(SchemaFingerprint::from_schema(&schema)); + assert!(!Arc::ptr_eq( + &schema_fingerprint, + &equivalent_schema_fingerprint + )); + assert!(cached.is_valid_for(&meta, &equivalent_schema_fingerprint)); let different_schema = Schema::new(vec![Field::new( "different_column", @@ -858,14 +866,14 @@ mod tests { false, )]); let different_schema_fingerprint = - SchemaFingerprint::from_schema(&different_schema); + Arc::new(SchemaFingerprint::from_schema(&different_schema)); assert!(!cached.is_valid_for(&meta, &different_schema_fingerprint)); // File size changed - validation should fail let meta2 = create_test_meta("test", 2048); let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta2, schema_fingerprint.as_ref())); + assert!(!cached.is_valid_for(&meta2, &schema_fingerprint)); // Update with new value let cached_value2 = CachedFileMetadata::new( @@ -967,9 +975,7 @@ mod tests { // Update to add ordering let mut cached = cache.get(&path).unwrap(); - if cached.is_valid_for(&meta, schema_fingerprint.as_ref()) - && cached.ordering.is_none() - { + if cached.is_valid_for(&meta, &schema_fingerprint) && cached.ordering.is_none() { cached.ordering = Some(ordering()); } cache.put(&path, cached); @@ -1009,7 +1015,7 @@ mod tests { let cached = cache.get(&path).unwrap(); // Should not be valid for new meta - assert!(!cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); + assert!(!cached.is_valid_for(&meta_v2, &schema_fingerprint)); // Compute new value and update let new_cached = CachedFileMetadata::new( @@ -1056,7 +1062,7 @@ mod tests { // Verify cached ordering is valid let cached = cache.get(&path).unwrap(); - assert!(cached.is_valid_for(&meta_v1, schema_fingerprint.as_ref())); + assert!(cached.is_valid_for(&meta_v1, &schema_fingerprint)); assert!(cached.ordering.is_some()); // File modified (size changed) @@ -1072,7 +1078,7 @@ mod tests { // Cache entry exists but should be invalid for new metadata let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); + assert!(!cached.is_valid_for(&meta_v2, &schema_fingerprint)); // Cache new version with different ordering let ordering_v2 = ordering(); // New ordering instance @@ -1086,10 +1092,10 @@ mod tests { // Old metadata should be invalid let cached = cache.get(&path).unwrap(); - assert!(!cached.is_valid_for(&meta_v1, schema_fingerprint.as_ref())); + assert!(!cached.is_valid_for(&meta_v1, &schema_fingerprint)); // New metadata should be valid - assert!(cached.is_valid_for(&meta_v2, schema_fingerprint.as_ref())); + assert!(cached.is_valid_for(&meta_v2, &schema_fingerprint)); assert!(cached.ordering.is_some()); }