Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Jan 27, 2026

This PR ensures that when Iceberg scans are executed natively, each Spark task's native plan only contains file scan tasks for that specific partition, rather than all partitions.

Problem

Previously, when CometIcebergNativeScanExec was a child of another operator (e.g., CometFilterExec), the parent operator's serialized native plan (nativeOp) contained the entire IcebergScan message with data for all partitions. This meant:

  • Each task received metadata for all partitions when it only needed one
  • The serialized plan size grew linearly with partition count
  • Every task deserialized unnecessary partition data

Solution

This PR implements per-partition plan building that injects only the relevant partition's data at execution time:

New Components

CometIcebergSplitRDD - A custom RDD that:

  • Holds common data (deduplication pools, catalog properties) in the closure
  • Stores each partition's file scan tasks in its Partition objects
  • Combines common + partition data at compute time to build partition-specific native plans

IcebergPartitionInjector - A helper that traverses an Operator tree and injects partition data into IcebergScan nodes that are missing it

findIcebergSplitData() - Locates CometIcebergNativeScanExec descendants in the plan tree and retrieves their per-partition data

Modified Execution Flow

In CometNativeExec.doExecuteColumnar():

  1. Check if the plan tree contains an IcebergScan with per-partition data available
  2. If so, for each partition:
    • Parse the base operator tree from the serialized plan
    • Inject that partition's file scan task data into the IcebergScan node
    • Re-serialize and pass to native execution
  3. Each task's native plan now only contains its own partition's data

Protobuf Changes

Added IcebergScanCommon message to hold shared data (pools, metadata) separately from per-partition file scan tasks. The IcebergScan message now has:

  • common field for shared deduplication pools
  • partition field for a single partition's file tasks

Rust Changes

Simplified the Iceberg scan handling in planner.rs to expect common + partition data, removing the code path that read from a list of all partitions.

Test Plan

  • All 62 existing Iceberg tests pass
  • Filter pushdown tests verify parent operators work correctly
  • MOR (Merge-On-Read) tests with positional and equality deletes
  • Schema evolution, complex types, partition pruning tests
  • REST catalog integration test

andygrove and others added 2 commits January 26, 2026 19:31
…etwork transfer

This PR implements split serialization for Iceberg native scans to reduce
network transfer and deserialization overhead when scanning tables with
many partitions.

**Problem:**
Currently, ALL partition metadata is serialized into a single IcebergScan
message sent to every executor task. Each task only uses one partition's
data but deserializes all partitions. For 10,000 partitions, this means
each task receives ~10,000x more file_partitions data than needed.

**Solution:**
Split serialization into:
- Common data (IcebergScanCommon): pools, metadata, catalog properties -
  serialized once, captured in RDD closure
- Per-partition data (IcebergFilePartition[]): file scan tasks - one per
  partition, stored in Partition objects

Each task now receives only: commonData + its own partitionBytes

**Changes:**
- Proto: Added IcebergScanCommon message and split_mode fields to IcebergScan
- Rust: Handle split_mode in planner, added parse_file_scan_tasks_from_common()
- Scala: New CometIcebergSplitRDD with custom Partition holding per-partition bytes
- Scala: Thread-local to pass split data from convert() to createExec()

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Update the three rust-specific Iceberg test jobs to only run when
the PR title contains [iceberg-rust] rather than [iceberg]. This
allows running Java-based Iceberg tests separately from Rust-based
ones.

Also applies cargo fmt to planner.rs.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove andygrove changed the title [EXPERIMENTAL] Split Iceberg FileScanTask serialization for reduced network transfer feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] Jan 27, 2026
@andygrove andygrove changed the title feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg-rust] Jan 27, 2026
@andygrove andygrove changed the title feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg-rust] feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg] Jan 27, 2026
The IcebergScan variant is larger after adding split mode fields.
This is expected for generated protobuf code.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove andygrove changed the title feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg] feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg-rust] Jan 27, 2026
andygrove and others added 2 commits January 26, 2026 19:42
@codecov-commenter
Copy link

codecov-commenter commented Jan 27, 2026

Codecov Report

❌ Patch coverage is 89.17197% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.19%. Comparing base (f09f8af) to head (0a02bb7).
⚠️ Report is 902 commits behind head on main.

Files with missing lines Patch % Lines
...n/scala/org/apache/spark/sql/comet/operators.scala 79.24% 7 Missing and 4 partials ⚠️
.../comet/serde/operator/CometIcebergNativeScan.scala 88.88% 4 Missing ⚠️
...e/spark/sql/comet/CometIcebergNativeScanExec.scala 96.15% 0 Missing and 1 partial ⚠️
.../apache/spark/sql/comet/CometIcebergSplitRDD.scala 97.61% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3295      +/-   ##
============================================
+ Coverage     56.12%   60.19%   +4.07%     
- Complexity      976     1470     +494     
============================================
  Files           119      176      +57     
  Lines         11743    16301    +4558     
  Branches       2251     2698     +447     
============================================
+ Hits           6591     9813    +3222     
- Misses         4012     5136    +1124     
- Partials       1140     1352     +212     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

andygrove and others added 5 commits January 27, 2026 06:26
Remove the explicit split_mode boolean flag. Instead, detect split mode
by checking if both common and partition fields are present. This
simplifies the proto and code while maintaining compatibility with
native shuffle which uses the serialized plan directly.

- Remove split_mode field from IcebergScan proto
- Keep both legacy fields (1-12) and split mode fields (20-21)
- Detect split mode in Rust by checking common/partition presence
- Simplify Scala code to always use CometIcebergSplitRDD

Co-Authored-By: Claude Opus 4.5 <[email protected]>
When IcebergScanExec is a child of another operator (e.g., FilterExec),
the parent calls execute(partition=N) but in split mode there's only 1
task group containing the correct data for partition N.

This fix checks if there's only 1 task group and uses effective
partition 0 regardless of the requested partition index. This allows
parent operators to correctly execute their partition N while the
IcebergScanExec returns data from its single task group.

Also removed debug eprintln statement and improved error messages for
legacy mode.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Split mode is used when IcebergScan executes directly via CometIcebergSplitRDD.
Legacy mode is used when IcebergScan is a child of another native operator
(like FilterExec), where the parent's nativeOp contains the child IcebergScan
with all partitions in file_partitions.

- Restore legacy code path in planner.rs that uses file_partitions
- Add parse_file_scan_tasks() for legacy mode
- Require split data in CometIcebergNativeScanExec.doExecuteColumnar()
- Require split data in CometIcebergNativeScan.createExec()

Co-Authored-By: Claude Opus 4.5 <[email protected]>
When IcebergScan is a child of another operator (e.g., FilterExec), the
parent's nativeOp tree contains the IcebergScan. Previously, each task
would receive all partitions' data even though it only needed its own.

This change implements per-partition plan injection:
- Add IcebergPartitionInjector to traverse operator trees and inject
  partition data into IcebergScan nodes with split_mode=true
- Add findIcebergSplitData() to locate CometIcebergNativeScanExec
  descendants with split mode data
- Modify doExecuteColumnar() to detect Iceberg split data and create
  per-partition plans by injecting partition data before serialization
- Remove legacy code path from Rust planner (all Iceberg scans now
  require split_mode=true with common + partition data)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove andygrove changed the title feat: Split Iceberg FileScanTask serialization for reduced network transfer [EXPERIMENTAL] [iceberg-rust] feat: Split Iceberg FileScanTask Serialization [iceberg-rust] Jan 27, 2026
@andygrove andygrove changed the title feat: Split Iceberg FileScanTask Serialization [iceberg-rust] feat: Per-Partition Plan Building for Native Iceberg Scans [iceberg-rust] Jan 27, 2026
@andygrove andygrove marked this pull request as ready for review January 27, 2026 16:21
andygrove and others added 2 commits January 27, 2026 09:44
IcebergScan now only has two fields:
- common: IcebergScanCommon (shared pools, metadata, catalog props)
- partition: IcebergFilePartition (single partition's file tasks)

The legacy fields (file_partitions, separate pools, etc.) and split_mode
flag are removed since all Iceberg scans now use per-partition plan
injection.

Also build pools directly into commonBuilder during convert() instead
of building into icebergScanBuilder and copying.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-Authored-By: Claude Opus 4.5 <[email protected]>
@andygrove
Copy link
Member Author

@hsiang-c fyi

andygrove and others added 8 commits January 27, 2026 10:50
When outputPartitioning is KeyGroupedPartitioning, the reported partition
count can differ from the actual physical partition count in the RDD.
This caused ArrayIndexOutOfBoundsException when partitionIndex exceeded
perPartitionData.length.

Fix by using perPartitionData.length as the source of truth for partition
count, ensuring consistency between the serialized per-partition data
and the number of partitions reported by the operator.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
The parent CometNativeExec.convertBlock() uses makeCopy() which loses
@transient fields. Override it in CometIcebergNativeScanExec to preserve
commonData and perPartitionData fields which are needed for split-mode
partition injection.

This fixes ArrayIndexOutOfBoundsException when the plan transformation
in CometExecRule.scala created a copy without the split serialization data.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
When using CometIcebergNativeScanExec in join operations with
ZippedPartitionsRDD, the partition index may exceed perPartitionData
bounds since tables can have different partition counts. This adds
bounds checking to inject an empty IcebergFilePartition for
out-of-bounds indices, preventing ArrayIndexOutOfBoundsException.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
The previous implementation of split serialization only collected partition
data from one Iceberg scan, causing joins over multiple Iceberg tables to
use incorrect partition data for the second table.

Changes:
- Rename findIcebergSplitData to findAllIcebergSplitData
- Return Map[String, Array[Array[Byte]]] keyed by metadataLocation instead
  of Option[Array[Array[Byte]]] to collect data from ALL Iceberg scans
- Update IcebergPartitionInjector.injectPartitionData to take the map and
  match each IcebergScan by its metadata_location from common data
- Add stage boundary checks to prevent partition index misalignment

This fixes TestStoragePartitionedJoins.testJoinsWithMultipleTransformTypes
which was returning 13 rows instead of 26 expected.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
andygrove and others added 2 commits January 27, 2026 19:20
Two fixes for split serialization with Iceberg tables:

1. Look inside BroadcastQueryStageExec and CometBroadcastExchangeExec to
   find Iceberg scan partition data. For broadcast joins, the partition
   indices still align because broadcast data is replicated to all partitions.

2. Remove the inputs.isEmpty check when deciding to inject partition data.
   For broadcast joins, there are inputs (the broadcast data), but the
   native plan still contains IcebergScan nodes that need partition data.

This fixes CometFuzzIcebergSuite "join" test which was failing with
"IcebergScan missing partition data" for broadcast joins.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@mbutrovich
Copy link
Contributor

Closing in favor of #3349.

@mbutrovich mbutrovich closed this Jan 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants