Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics,
};
use datafusion_execution::cache::cache_manager::{FileStatisticsCache, TableScopedPath};
use datafusion_execution::cache::cache_manager::{
CachedFileMetadata, FileStatisticsCache, FileStatisticsCacheKey, SchemaFingerprint,
TableScopedPath,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
Expand Down Expand Up @@ -197,6 +200,9 @@ pub struct ListingTable {
column_defaults: HashMap<String, Expr>,
/// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// Precomputed fingerprint of `file_schema` for the file-statistics cache
/// key. Constant for the table, so computed once here instead of per file.
file_schema_fingerprint: Arc<SchemaFingerprint>,
}

impl ListingTable {
Expand Down Expand Up @@ -227,6 +233,9 @@ impl ListingTable {
.with_metadata(file_schema.metadata().clone()),
);

let file_schema_fingerprint =
Arc::new(SchemaFingerprint::from_schema(&file_schema));

let table = Self {
table_paths: config.table_paths,
file_schema,
Expand All @@ -238,6 +247,7 @@ impl ListingTable {
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
file_schema_fingerprint,
};

Ok(table)
Expand Down Expand Up @@ -268,21 +278,6 @@ impl ListingTable {
self
}

fn statistics_cache(
&self,
has_table_reference: bool,
) -> Option<&Arc<FileStatisticsCache>> {
let shared_cache = self.collected_statistics.as_ref()?;
if has_table_reference || self.schema_source == SchemaSource::Inferred {
Some(shared_cache)
} else {
// Anonymous specified-schema reads can use the same file path with
// different logical schemas. File statistics are schema-dependent,
// so avoid reusing stats computed for a different read schema.
None
}
}

/// Specify the SQL definition for this table, if any
pub fn with_definition(mut self, definition: Option<String>) -> Self {
self.definition = definition;
Expand Down Expand Up @@ -990,17 +985,21 @@ impl ListingTable {
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

let path = TableScopedPath {
let key = FileStatisticsCacheKey {
table: part_file.table_reference.clone(),
path: part_file.object_meta.location.clone(),
// Statistics are computed against `file_schema`, so key on its
// fingerprint: reads of the same path under a different schema get
// their own entry rather than reusing incompatible column statistics.
// The fingerprint is precomputed once per table (see `try_new`).
schema: Arc::clone(&self.file_schema_fingerprint),

@mkleen mkleen Jun 27, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precalculating the hash for the fingerprint could be a solution to fix the regression. Right now we calculate the hash for the fingerprint for each entry which is expensive.

@Phoenix500526 Phoenix500526 Jun 27, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. I've precalcuated hash for the fingerprint. It seems that only whitelisted users can trigger benchmark. Could you help trigger one?

};
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cache) = self.statistics_cache(path.table.is_some())
&& let Some(cached) = cache.get(&path)
// Check cache first - the schema is part of the key, so a hit is already
// schema-compatible; `is_valid_for` only confirms the file is unchanged.
if let Some(cache) = &self.collected_statistics
&& let Some(cached) = cache.get(&key)
&& cached.is_valid_for(meta)
{
// Return cached statistics and ordering
Expand All @@ -1017,9 +1016,9 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
if let Some(cache) = self.statistics_cache(path.table.is_some()) {
if let Some(cache) = &self.collected_statistics {
cache.put(
&path,
&key,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
Expand Down
11 changes: 11 additions & 0 deletions datafusion/common/src/heap_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ where
}
}

impl<A, B, C> DFHeapSize for (A, B, C)
where
A: DFHeapSize,
B: DFHeapSize,
C: DFHeapSize,
{
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.0.heap_size(ctx) + self.1.heap_size(ctx) + self.2.heap_size(ctx)
}
}

impl DFHeapSize for String {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.capacity()
Expand Down
24 changes: 23 additions & 1 deletion datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,29 @@ async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() {
let stats = plan.statistics_with_args(&StatisticsArgs::new()).unwrap();
assert_eq!(stats.column_statistics.len(), 2);
assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1));
assert_eq!(cache.len(), 1);

// #23072: the cache now keys on file_schema, so the wider read no longer
// bypasses the cache (as in #22950) — it lands in its own entry and
// coexists with the inferred one. Was `1` under the bypass.
assert_eq!(cache.len(), 2);

// Repeat the wider read: same path + same file_schema -> reuse (no new
// entry) and a cache hit. Under #22950's bypass this read could never reuse.
ctx.read_parquet(
&parquet_path,
ParquetReadOptions::default().schema(&wider_schema),
)
.await
.unwrap()
.create_physical_plan()
.await
.unwrap();
assert_eq!(cache.len(), 2);
let hits: usize = cache.list_entries().values().map(|e| e.hits).sum();
assert_eq!(
hits, 1,
"expected a cache hit on the repeat read, got {hits}"
);
}

#[tokio::test]
Expand Down
8 changes: 5 additions & 3 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

use crate::cache::default_cache::DefaultCache;
pub use crate::cache::{Cache, CacheValue, TableScopedPath};
pub use crate::cache::{
Cache, CacheValue, FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath,
};
use datafusion_common::HashMap;
use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
use datafusion_common::{Result, Statistics};
Expand Down Expand Up @@ -54,7 +56,7 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
/// 3. If invalid or missing, compute new value and call `put(path, new_value)`
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
pub type FileStatisticsCache = dyn Cache<TableScopedPath, CachedFileMetadata>;
pub type FileStatisticsCache = dyn Cache<FileStatisticsCacheKey, CachedFileMetadata>;

/// A cache for storing the [`ObjectMeta`]s that result from listing a path.
///
Expand Down Expand Up @@ -305,7 +307,7 @@ impl CacheManager {
Some(Arc::clone(fsc))
}
None if config.file_statistics_cache_limit > 0 => Some(Arc::new(
DefaultCache::<TableScopedPath, CachedFileMetadata>::new(
DefaultCache::<FileStatisticsCacheKey, CachedFileMetadata>::new(
config.file_statistics_cache_limit,
)
.with_name("DefaultFileStatisticsCache"),
Expand Down
23 changes: 13 additions & 10 deletions datafusion/execution/src/cache/default_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl<K: CacheKey, V: CacheValue> Cache<K, V> for DefaultCache<K, V> {
mod tests {
use std::sync::Arc;

use crate::cache::TableScopedPath;
use crate::cache::cache_manager::{
CachedFileList, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, meta_heap_bytes,
};
Expand All @@ -311,6 +310,7 @@ mod tests {
use crate::cache::default_cache::TimeProvider;
use crate::cache::{Cache, CacheEntryInfo};
use crate::cache::{CacheKey, CacheValue};
use crate::cache::{FileStatisticsCacheKey, SchemaFingerprint, TableScopedPath};
use arrow::array::{Int32Array, ListArray, RecordBatch};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand Down Expand Up @@ -827,24 +827,25 @@ mod tests {
false,
)]);

let path = TableScopedPath {
let key_1 = FileStatisticsCacheKey {
path: meta.location.clone(),
table: None,
schema: Arc::new(SchemaFingerprint::from_schema(&schema)),
};

// Cache miss
assert!(cache.get(&path).is_none());
assert!(cache.get(&key_1).is_none());

// Put a value
let cached_value = CachedFileMetadata::new(
meta.clone(),
Arc::new(Statistics::new_unknown(&schema)),
None,
);
cache.put(&path, cached_value);
cache.put(&key_1, cached_value);

// Cache hit
let result = cache.get(&path);
let result = cache.get(&key_1);
assert!(result.is_some());

let cached = result.unwrap();
Expand All @@ -853,12 +854,13 @@ mod tests {
// File size changed - validation should fail
let meta2 = create_test_meta("test", 2048);

let path_2 = TableScopedPath {
let key_2 = FileStatisticsCacheKey {
path: meta2.location.clone(),
table: None,
schema: Arc::new(SchemaFingerprint::from_schema(&schema)),
};

let cached = cache.get(&path_2).unwrap();
let cached = cache.get(&key_2).unwrap();
assert!(!cached.is_valid_for(&meta2));

// Update with new value
Expand All @@ -867,18 +869,19 @@ mod tests {
Arc::new(Statistics::new_unknown(&schema)),
None,
);
cache.put(&path_2, cached_value2);
cache.put(&key_2, cached_value2);

// Test list_entries
let entries = cache.list_entries();
assert_eq!(entries.len(), 1);

let path_3 = TableScopedPath {
let key_3 = FileStatisticsCacheKey {
path: Path::from("test"),
table: None,
schema: Arc::new(SchemaFingerprint::from_schema(&schema)),
};

let entry = entries.get(&path_3).unwrap();
let entry = entries.get(&key_3).unwrap();
assert_eq!(entry.value.meta.size, 2048); // Should be updated value
}

Expand Down
Loading
Loading