-
Notifications
You must be signed in to change notification settings - Fork 282
feat: Per-Partition Plan Building for Native Iceberg Scans [iceberg-rust] #3295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…etwork transfer This PR implements split serialization for Iceberg native scans to reduce network transfer and deserialization overhead when scanning tables with many partitions. **Problem:** Currently, ALL partition metadata is serialized into a single IcebergScan message sent to every executor task. Each task only uses one partition's data but deserializes all partitions. For 10,000 partitions, this means each task receives ~10,000x more file_partitions data than needed. **Solution:** Split serialization into: - Common data (IcebergScanCommon): pools, metadata, catalog properties - serialized once, captured in RDD closure - Per-partition data (IcebergFilePartition[]): file scan tasks - one per partition, stored in Partition objects Each task now receives only: commonData + its own partitionBytes **Changes:** - Proto: Added IcebergScanCommon message and split_mode fields to IcebergScan - Rust: Handle split_mode in planner, added parse_file_scan_tasks_from_common() - Scala: New CometIcebergSplitRDD with custom Partition holding per-partition bytes - Scala: Thread-local to pass split data from convert() to createExec() Co-Authored-By: Claude Opus 4.5 <[email protected]>
Update the three rust-specific Iceberg test jobs to only run when the PR title contains [iceberg-rust] rather than [iceberg]. This allows running Java-based Iceberg tests separately from Rust-based ones. Also applies cargo fmt to planner.rs. Co-Authored-By: Claude Opus 4.5 <[email protected]>
The IcebergScan variant is larger after adding split mode fields. This is expected for generated protobuf code. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-Authored-By: Claude Opus 4.5 <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3295 +/- ##
============================================
+ Coverage 56.12% 60.19% +4.07%
- Complexity 976 1470 +494
============================================
Files 119 176 +57
Lines 11743 16301 +4558
Branches 2251 2698 +447
============================================
+ Hits 6591 9813 +3222
- Misses 4012 5136 +1124
- Partials 1140 1352 +212 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Remove the explicit split_mode boolean flag. Instead, detect split mode by checking if both common and partition fields are present. This simplifies the proto and code while maintaining compatibility with native shuffle which uses the serialized plan directly. - Remove split_mode field from IcebergScan proto - Keep both legacy fields (1-12) and split mode fields (20-21) - Detect split mode in Rust by checking common/partition presence - Simplify Scala code to always use CometIcebergSplitRDD Co-Authored-By: Claude Opus 4.5 <[email protected]>
This reverts commit d78d9dd.
When IcebergScanExec is a child of another operator (e.g., FilterExec), the parent calls execute(partition=N) but in split mode there's only 1 task group containing the correct data for partition N. This fix checks if there's only 1 task group and uses effective partition 0 regardless of the requested partition index. This allows parent operators to correctly execute their partition N while the IcebergScanExec returns data from its single task group. Also removed debug eprintln statement and improved error messages for legacy mode. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Split mode is used when IcebergScan executes directly via CometIcebergSplitRDD. Legacy mode is used when IcebergScan is a child of another native operator (like FilterExec), where the parent's nativeOp contains the child IcebergScan with all partitions in file_partitions. - Restore legacy code path in planner.rs that uses file_partitions - Add parse_file_scan_tasks() for legacy mode - Require split data in CometIcebergNativeScanExec.doExecuteColumnar() - Require split data in CometIcebergNativeScan.createExec() Co-Authored-By: Claude Opus 4.5 <[email protected]>
When IcebergScan is a child of another operator (e.g., FilterExec), the parent's nativeOp tree contains the IcebergScan. Previously, each task would receive all partitions' data even though it only needed its own. This change implements per-partition plan injection: - Add IcebergPartitionInjector to traverse operator trees and inject partition data into IcebergScan nodes with split_mode=true - Add findIcebergSplitData() to locate CometIcebergNativeScanExec descendants with split mode data - Modify doExecuteColumnar() to detect Iceberg split data and create per-partition plans by injecting partition data before serialization - Remove legacy code path from Rust planner (all Iceberg scans now require split_mode=true with common + partition data) Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-Authored-By: Claude Opus 4.5 <[email protected]>
IcebergScan now only has two fields: - common: IcebergScanCommon (shared pools, metadata, catalog props) - partition: IcebergFilePartition (single partition's file tasks) The legacy fields (file_partitions, separate pools, etc.) and split_mode flag are removed since all Iceberg scans now use per-partition plan injection. Also build pools directly into commonBuilder during convert() instead of building into icebergScanBuilder and copying. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-Authored-By: Claude Opus 4.5 <[email protected]>
Member
Author
|
@hsiang-c fyi |
When outputPartitioning is KeyGroupedPartitioning, the reported partition count can differ from the actual physical partition count in the RDD. This caused ArrayIndexOutOfBoundsException when partitionIndex exceeded perPartitionData.length. Fix by using perPartitionData.length as the source of truth for partition count, ensuring consistency between the serialized per-partition data and the number of partitions reported by the operator. Co-Authored-By: Claude Opus 4.5 <[email protected]>
The parent CometNativeExec.convertBlock() uses makeCopy() which loses @transient fields. Override it in CometIcebergNativeScanExec to preserve commonData and perPartitionData fields which are needed for split-mode partition injection. This fixes ArrayIndexOutOfBoundsException when the plan transformation in CometExecRule.scala created a copy without the split serialization data. Co-Authored-By: Claude Opus 4.5 <[email protected]>
When using CometIcebergNativeScanExec in join operations with ZippedPartitionsRDD, the partition index may exceed perPartitionData bounds since tables can have different partition counts. This adds bounds checking to inject an empty IcebergFilePartition for out-of-bounds indices, preventing ArrayIndexOutOfBoundsException. Co-Authored-By: Claude Opus 4.5 <[email protected]>
…enarios" This reverts commit 76b6047.
The previous implementation of split serialization only collected partition data from one Iceberg scan, causing joins over multiple Iceberg tables to use incorrect partition data for the second table. Changes: - Rename findIcebergSplitData to findAllIcebergSplitData - Return Map[String, Array[Array[Byte]]] keyed by metadataLocation instead of Option[Array[Array[Byte]]] to collect data from ALL Iceberg scans - Update IcebergPartitionInjector.injectPartitionData to take the map and match each IcebergScan by its metadata_location from common data - Add stage boundary checks to prevent partition index misalignment This fixes TestStoragePartitionedJoins.testJoinsWithMultipleTransformTypes which was returning 13 rows instead of 26 expected. Co-Authored-By: Claude Opus 4.5 <[email protected]>
Co-Authored-By: Claude Opus 4.5 <[email protected]>
Two fixes for split serialization with Iceberg tables: 1. Look inside BroadcastQueryStageExec and CometBroadcastExchangeExec to find Iceberg scan partition data. For broadcast joins, the partition indices still align because broadcast data is replicated to all partitions. 2. Remove the inputs.isEmpty check when deciding to inject partition data. For broadcast joins, there are inputs (the broadcast data), but the native plan still contains IcebergScan nodes that need partition data. This fixes CometFuzzIcebergSuite "join" test which was failing with "IcebergScan missing partition data" for broadcast joins. Co-Authored-By: Claude Opus 4.5 <[email protected]>
mbutrovich
added a commit
to mbutrovich/datafusion-comet
that referenced
this pull request
Jan 30, 2026
Contributor
|
Closing in favor of #3349. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR ensures that when Iceberg scans are executed natively, each Spark task's native plan only contains file scan tasks for that specific partition, rather than all partitions.
Problem
Previously, when
CometIcebergNativeScanExecwas a child of another operator (e.g.,CometFilterExec), the parent operator's serialized native plan (nativeOp) contained the entireIcebergScanmessage with data for all partitions. This meant:Solution
This PR implements per-partition plan building that injects only the relevant partition's data at execution time:
New Components
CometIcebergSplitRDD- A custom RDD that:PartitionobjectsIcebergPartitionInjector- A helper that traverses anOperatortree and injects partition data intoIcebergScannodes that are missing itfindIcebergSplitData()- LocatesCometIcebergNativeScanExecdescendants in the plan tree and retrieves their per-partition dataModified Execution Flow
In
CometNativeExec.doExecuteColumnar():IcebergScanwith per-partition data availableIcebergScannodeProtobuf Changes
Added
IcebergScanCommonmessage to hold shared data (pools, metadata) separately from per-partition file scan tasks. TheIcebergScanmessage now has:commonfield for shared deduplication poolspartitionfield for a single partition's file tasksRust Changes
Simplified the Iceberg scan handling in
planner.rsto expect common + partition data, removing the code path that read from a list of all partitions.Test Plan