Skip to content

deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.)#3536

Open
comphead wants to merge 31 commits intoapache:mainfrom
comphead:df52
Open

deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.)#3536
comphead wants to merge 31 commits intoapache:mainfrom
comphead:df52

Conversation

@comphead
Copy link
Contributor

@comphead comphead commented Feb 16, 2026

Which issue does this PR close?

Closes #3046 .
Closes #3515

Rationale for this change

What changes are included in this PR?

How are these changes tested?

comphead and others added 28 commits February 16, 2026 08:31
…pache#3471)

DataFusion 52's arrow-arith kernels only support Date32 +/- Interval
types, not raw integers. When Spark sends Date32 + Int8/Int16/Int32
arithmetic, the planner now routes these operations to the Spark
date_add/date_sub UDFs which handle integer types directly.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…che#3475)

DataFusion 52's default PhysicalExprAdapter can fail when casting
complex nested types (List<Struct>, Map) between physical and logical
schemas. This adds a fallback path in SparkPhysicalExprAdapter that
wraps type-mismatched columns with CometCastColumnExpr using
spark_parquet_convert for the actual conversion.

Changes to CometCastColumnExpr:
- Add optional SparkParquetOptions for complex nested type conversions
- Use == instead of equals_datatype to detect field name differences
  in nested types (Struct, List, Map)
- Add relabel_array for types that differ only in field names (e.g.,
  List element "item" vs "element", Map "key_value" vs "entries")
- Fallback to spark_parquet_convert for structural nested type changes

Changes to SparkPhysicalExprAdapter:
- Try default adapter first, fall back to wrap_all_type_mismatches
  when it fails on complex nested types
- Route Struct/List/Map casts to CometCastColumnExpr instead of
  Spark Cast, which doesn't handle nested type rewriting

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…pache#3493)

* fix: make relabel_array recursive for nested type mismatches

The shallow ArrayData type swap in relabel_array caused panics when
Arrow's ArrayData::build() validated child types recursively. This
rebuilds arrays from typed constructors (ListArray, LargeListArray,
MapArray, StructArray) so nested field name and metadata differences
are handled correctly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style: run cargo fmt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…_convert (apache#3494)

INT96 Parquet timestamps are coerced to Timestamp(us, None) by DataFusion
but the logical schema expects Timestamp(us, Some("UTC")). The schema
adapter was routing this mismatch through Spark's Cast expression, which
incorrectly treats None-timezone values as TimestampNTZ (local time) and
applies a timezone conversion. This caused results to be shifted by the
session timezone offset (e.g., -5h45m for Asia/Kathmandu).

Route Timestamp->Timestamp mismatches through CometCastColumnExpr which
delegates to spark_parquet_convert, handling this as a metadata-only
timezone relabel without modifying the underlying values.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…apter (apache#3495)

The DefaultPhysicalExprAdapter uses exact case-sensitive name matching
(Arrow's field_with_name/index_of) to resolve columns. When a parquet
file has lowercase "a" but the table schema has uppercase "A", the lookup
fails and columns are filled with nulls.

Fix by remapping physical schema field names to match logical names
(case-insensitively) before passing to the default adapter, then
restoring original physical names in the rewritten expressions so that
downstream reassign_expr_columns can find columns in the actual parquet
stream schema.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…pache#3473)

DataFusion 52 changed how FilterExec's batch coalescer works - streams
now return Poll::Pending when accumulating input instead of blocking on
a channel. Update test_unpack_dictionary_primitive and
test_unpack_dictionary_string to poll the stream directly and send EOF
on Pending, rather than using a separate mpsc channel/spawned task to
feed batches.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
)

When Spark's `LEGACY_PARQUET_NANOS_AS_LONG=true` converts TIMESTAMP(NANOS)
to LongType, the PhysicalExprAdapter detects a type mismatch between the
file's Timestamp(Nanosecond) and the logical Int64. The DefaultAdapter
creates a CastColumnExpr, which SparkPhysicalExprAdapter then replaces
with Spark's Cast expression. Spark's Cast postprocess for Timestamp→Int64
unconditionally divides by MICROS_PER_SECOND (10^6), assuming microsecond
precision. But the values are nanoseconds, so the raw value
1668537129123534758 becomes 1668537129123 — losing sub-millisecond
precision.

Fix: route Timestamp→Int64 casts through CometCastColumnExpr (which uses
spark_parquet_convert → Arrow cast) instead of Spark Cast. Arrow's cast
correctly reinterprets the raw i64 value without any division.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
* fix: [df52] schema pruning crash on complex nested types

When `data_schema` is provided but `projection_vector` is None (the
NativeBatchReader / native_iceberg_compat path), the base schema was
incorrectly set to the pruned `required_schema`. This caused DataFusion
to think the table had only the pruned columns, leading to column index
misalignment in PhysicalExprAdapter. For example, reading "friends" at
logical index 0 would map to physical index 0 ("id") instead of the
correct index 4.

Fix: when `data_schema` is provided without a `projection_vector`,
compute the projection by mapping required field names to their indices
in the full data schema. Also harden `wrap_all_type_mismatches` to use
name-based lookup for physical fields instead of positional index.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: handle field ID mapping in projection computation

When computing a name-based projection from required_schema to
data_schema, fall back to using required_schema directly when not
all fields can be matched by name. This handles Parquet field ID
mapping where column names differ between the read schema and file
schema.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
)

Add IgnoreCometSuite to ParquetVariantShreddingSuite in the 4.0.1 diff.
VariantType shredding is a Spark 4.0 feature that Comet does not yet
support (apache#2209). VariantShreddingSuite was already skipped but
ParquetVariantShreddingSuite was missed, causing test failures in CI.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove
Copy link
Member

@sqlbenchmark run tpch

1 similar comment
@andygrove
Copy link
Member

@sqlbenchmark run tpch

@andygrove
Copy link
Member

andygrove commented Feb 16, 2026

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.

Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)

DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

@comphead
Copy link
Contributor Author

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.

Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

The total mem used seems to be less? +4 for Heap and -8 for on Heap, so -4 total. The JVM growth looks weird as DF52 has very few JVM changes. Is the baseline for latest main?

@sqlbenchmark
Copy link

Comet TPC-H Benchmark Results

Baseline: main (e22f35c)
PR: 749a407 - DataFusion 52 migration
Scale Factor: SF100
Iterations: 1

Query Times

Query Baseline Avg (s) Baseline Best (s) PR Avg (s) PR Best (s) Change (Avg) Change (Best)
Q1 10.64 10.64 10.75 10.75 ⚪ +1.1% ⚪ +1.1%
Q2 5.90 5.90 5.76 5.76 ⚪ -2.4% ⚪ -2.4%
Q3 9.79 9.79 9.81 9.81 ⚪ +0.2% ⚪ +0.2%
Q4 11.62 11.62 13.99 13.99 🔴 +20.4% 🔴 +20.4%
Q5 19.16 19.16 18.81 18.81 ⚪ -1.8% ⚪ -1.8%
Q6 2.52 2.52 2.52 2.52 ⚪ -0.1% ⚪ -0.1%
Q7 12.44 12.44 11.79 11.79 🟢 -5.2% 🟢 -5.2%
Q8 25.43 25.43 24.39 24.39 ⚪ -4.1% ⚪ -4.1%
Q9 39.40 39.40 37.94 37.94 ⚪ -3.7% ⚪ -3.7%
Q10 10.37 10.37 10.62 10.62 ⚪ +2.4% ⚪ +2.4%
Q11 4.50 4.50 4.80 4.80 🔴 +6.5% 🔴 +6.5%
Q12 7.07 7.07 6.41 6.41 🟢 -9.3% 🟢 -9.3%
Q13 7.36 7.36 7.40 7.40 ⚪ +0.6% ⚪ +0.6%
Q14 3.59 3.59 3.27 3.27 🟢 -8.9% 🟢 -8.9%
Q15 7.21 7.21 7.32 7.32 ⚪ +1.4% ⚪ +1.4%
Q16 4.73 4.73 4.93 4.93 ⚪ +4.0% ⚪ +4.0%
Q17 32.63 32.63 32.55 32.55 ⚪ -0.2% ⚪ -0.2%
Q18 33.85 33.85 34.62 34.62 ⚪ +2.3% ⚪ +2.3%
Q19 6.85 6.85 6.01 6.01 🟢 -12.2% 🟢 -12.2%
Q20 6.75 6.75 6.12 6.12 🟢 -9.3% 🟢 -9.3%
Q21 46.71 46.71 51.97 51.97 🔴 +11.3% 🔴 +11.3%
Q22 4.92 4.92 5.41 5.41 🔴 +9.9% 🔴 +9.9%
Total 313.45 313.45 317.18 317.18 ⚪ +1.2% ⚪ +1.2%
Spark Configuration
Setting Value
Spark Master local[*]
Driver Memory 32G
Driver Cores 8
Executor Memory 32G
Executor Cores 8
Off-Heap Enabled true
Off-Heap Size 24g
Shuffle Manager CometShuffleManager
Comet Replace SMJ true

Automated benchmark run by dfbench

@andygrove
Copy link
Member

@sqlbenchmark run tpch --iterations 3

@andygrove
Copy link
Member

andygrove commented Feb 16, 2026

not sure how true this is yet, but Claude claims that we there may be a performance regression related to Iceberg scans:

In iceberg_scan.rs, the old code cached (SchemaRef, Arc<dyn SchemaMapper>) and reused it across batches with the same schema. The new adapt_batch_with_expressions() creates a new SparkPhysicalExprAdapterFactory + adapter + expression trees for every batch that needs adaptation. This adds per-batch allocation churn, though the actual data buffers are shared via Arc.

@andygrove
Copy link
Member

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.
Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

The total mem used seems to be less? +4 for Heap and -8 for on Heap, so -4 total. The JVM growth looks weird as DF52 has very few JVM changes. Is the baseline for latest main?

The baseline was a build from Friday.

Claude is suggesting this as the root cause:

"The memory shift is primarily driven by DataFusion 52's internal changes to how operators manage buffers (batch coalescing, expression evaluation), combined with these buffers likely not being tracked through Comet's CometUnifiedMemoryPool → JNI → Spark accounting path. The result is that native memory still exists but is "invisible" to Spark's off-heap accounting, while Spark's JVM-side operations expand to fill the perceived available space."

@sqlbenchmark
Copy link

Comet TPC-H Benchmark Results

Baseline: main (e22f35c)
PR: 749a407 - DataFusion 52 migration
Scale Factor: SF100
Iterations: 3

Query Times

Query Baseline Avg (s) Baseline Best (s) PR Avg (s) PR Best (s) Change (Avg) Change (Best)
Q1 10.03 9.46 10.02 9.43 ⚪ -0.2% ⚪ -0.3%
Q2 5.43 5.11 5.34 4.98 ⚪ -1.7% ⚪ -2.7%
Q3 9.96 9.79 10.01 9.82 ⚪ +0.5% ⚪ +0.3%
Q4 10.95 10.72 13.36 12.60 🔴 +21.9% 🔴 +17.6%
Q5 19.33 19.03 19.36 18.80 ⚪ +0.2% ⚪ -1.2%
Q6 2.46 2.32 2.51 2.32 ⚪ +1.8% ⚪ -0.1%
Q7 12.46 12.06 12.24 11.74 ⚪ -1.7% ⚪ -2.7%
Q8 25.65 25.10 25.47 24.41 ⚪ -0.7% ⚪ -2.7%
Q9 41.14 39.22 40.66 38.29 ⚪ -1.2% ⚪ -2.3%
Q10 10.63 10.18 11.11 10.84 ⚪ +4.5% 🔴 +6.5%
Q11 4.41 4.38 4.75 4.52 🔴 +7.7% ⚪ +3.2%
Q12 7.37 7.25 6.60 6.44 🟢 -10.5% 🟢 -11.1%
Q13 7.43 7.29 7.51 7.23 ⚪ +1.2% ⚪ -0.8%
Q14 3.56 3.51 3.39 3.27 ⚪ -4.9% 🟢 -7.0%
Q15 7.23 7.07 7.35 7.13 ⚪ +1.7% ⚪ +0.9%
Q16 4.39 4.25 4.36 4.27 ⚪ -0.7% ⚪ +0.4%
Q17 33.93 32.87 34.23 32.74 ⚪ +0.9% ⚪ -0.4%
Q18 34.33 33.82 35.75 35.14 ⚪ +4.2% ⚪ +3.9%
Q19 7.00 6.73 6.48 6.25 🟢 -7.4% 🟢 -7.1%
Q20 6.68 6.52 6.47 6.33 ⚪ -3.1% ⚪ -3.0%
Q21 46.87 46.40 52.85 51.53 🔴 +12.8% 🔴 +11.1%
Q22 5.02 5.00 5.52 5.42 🔴 +10.0% 🔴 +8.4%
Total 316.29 308.08 325.36 313.50 ⚪ +2.9% ⚪ +1.8%
Spark Configuration
Setting Value
Spark Master local[*]
Driver Memory 32G
Driver Cores 8
Executor Memory 32G
Executor Cores 8
Off-Heap Enabled true
Off-Heap Size 24g
Shuffle Manager CometShuffleManager
Comet Replace SMJ true

Automated benchmark run by dfbench

@andygrove
Copy link
Member

I ran the queries individually and compared memory usage between main and this PR.

Key findings from Claude analysis of the results:

  1. The memory shift is NOT consistent — it's highly query-dependent. Some queries see off-heap decrease (Q4, Q10, Q11), others see large increases (Q7, Q12, Q13). There is no single directional trend.
  2. Off-heap and JVM heap sometimes move inversely. Q11 is the clearest example: off-heap dropped 56.4% while JVM heap increased 127%. Q10 shows the same pattern (off-heap -72.7%, heap +36.5%). DF52 appears to shift work between native and JVM memory for certain query shapes.
  3. Join-heavy queries are most affected. The queries with the largest memory changes (Q4, Q7, Q10, Q11, Q12, Q13, Q21) all involve complex joins, correlated subqueries, or GROUP BY with HAVING. Simpler scan-and-aggregate queries (Q1, Q6) are stable. This points to changes in DataFusion 52's hash join/aggregate memory management.

@andygrove
Copy link
Member

I ran a memory profiling comparison to main with replaceSortMergeJoin=false as well.

Query BL Time (s) DF52 Time (s) Time Delta BL OffHeap Exec DF52 OffHeap Exec OffHeap Delta BL JVM Heap DF52 JVM Heap Heap Delta
Q1 82.6 82.5 -0.2% 514 MB 514 MB 0.0% 5.53 GB 4.15 GB -25.0%
Q2 15.9 15.6 -1.8% n/a n/a n/a n/a 2.84 GB n/a
Q3 30.0 30.1 +0.3% 3.75 GB 1.69 GB -54.9% 4.37 GB 3.55 GB -18.8%
Q4 24.9 25.1 +0.9% 16.00 GB 16.00 GB 0.0% 2.52 GB 2.15 GB -14.8%
Q5 58.2 57.3 -1.4% 3.51 GB 3.00 GB -14.5% 3.72 GB 5.55 GB +49.3%
Q6 4.8 5.0 +5.3% n/a n/a n/a n/a n/a n/a
Q7 28.5 27.7 -2.5% 2.62 GB n/a n/a 6.55 GB 6.76 GB +3.3%
Q8 42.8 41.9 -2.1% 657 MB n/a n/a 7.01 GB 6.12 GB -12.6%
Q9 86.1 86.6 +0.6% 3.00 GB 3.00 GB 0.0% 5.39 GB 6.56 GB +21.7%
Q10 24.1 25.0 +3.7% 3.04 GB 1.43 GB -53.0% 2.29 GB 1.25 GB -45.5%
Q11 17.2 17.4 +1.2% 1.00 GB 2.00 GB +100.0% 2.82 GB 3.79 GB +34.5%
Q12 16.7 16.3 -2.7% 644 MB n/a n/a 4.37 GB n/a n/a
Q13 25.2 24.8 -1.7% 5.16 GB 5.13 GB -0.5% 3.05 GB 4.06 GB +33.1%
Q14 8.6 8.7 +0.9% n/a n/a n/a n/a n/a n/a
Q15 19.6 19.5 -0.4% 460 MB 520 MB +12.9% 1.81 GB 2.13 GB +17.2%
Q16 11.4 10.6 -6.7% n/a 2.00 GB n/a n/a 3.69 GB n/a
Q17 77.7 78.0 +0.4% 2.03 GB 2.02 GB -0.8% 4.44 GB 5.24 GB +17.9%
Q18 82.7 82.1 -0.8% 3.19 GB 4.75 GB +49.0% 5.96 GB 5.91 GB -0.9%
Q19 9.6 9.6 -0.5% n/a n/a n/a n/a n/a n/a
Q20 14.4 14.5 +0.8% n/a n/a n/a 3.44 GB n/a n/a
Q21 79.1 78.6 -0.7% 5.13 GB 5.25 GB +2.4% 4.58 GB 4.93 GB +7.7%
Q22 12.6 12.6 -0.4% n/a n/a n/a 1.95 GB n/a n/a
Total 772.6 769.4 -0.4%

Comment on lines 731 to 739
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed now?

Comment on lines 955 to 958
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logging can be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logging can be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an Err here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an Err here

.with_table_partition_cols(partition_fields)
/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
/// Returns a new `SendableRecordBatchStream` that yields the same batches.
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is unused now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it useful for outputting a stream, preserving the original stream to the output. it is not used now, but would be handy for debug purposes.

@mbutrovich mbutrovich changed the title chore: [df52] migration deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) Feb 18, 2026
Comment on lines 157 to 160
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let millis_values: TimestampMillisecondArray = micros_array
.iter()
.map(|opt| opt.map(|v| v / 1000))
.collect();
let millis_values: TimestampMillisecondArray =
arrow::compute::kernels::arity::unary(micros_array, |v| v / 1000);

Comment on lines 85 to 88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use unary kernel instead

inner: S,
schema: SchemaRef,
/// Cached schema adapter with its source schema. Created when schema changes.
cached_adapter: Option<(SchemaRef, Arc<dyn SchemaMapper>)>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is caching no longer possible with DF52?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

@@ -2795,7 +2795,7 @@ index d675503a8ba..f220892396e 100644
+ }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file just has a whitespace change, can you get the version of the file from main to reduce the diff?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Default values doesn't work with native_datafusion DataFusion 52 migration

4 participants

Comments