[AURON #2253] Support insert-only Iceberg changelog native scan#2254
[AURON #2253] Support insert-only Iceberg changelog native scan#2254weimingdiit wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds native scan support for Iceberg SparkChangelogScan when the changelog is insert-only. Generalizes the existing native Iceberg scan planning to handle ChangelogScanTask (specifically AddedRowsScanTask), materializes changelog metadata columns (_change_type, _change_ordinal, _commit_snapshot_id) as per-task partition values, and refactors the executor to consume a new IcebergNativeScanTask abstraction instead of FileScanTask directly.
Changes:
- Introduce
IcebergNativeScanTaskand route both file-scan and changelog-scan planning through a shared task abstraction with per-task partition values. - Plan
SparkChangelogScannatively only for insert-onlyAddedRowsScanTask, Parquet/ORC, no deletes, and supported metadata columns; otherwise fall back to Spark. - Update
NativeIcebergTableScanExecto read file size, partition values, and split info from the new task type and propagate per-task partition values intoPartitionedFile.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala | Splits planning into file-scan vs changelog-scan paths, adds IcebergNativeScanTask and per-task metadata-value materialization including changelog columns. |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala | Switches from FileScanTask to IcebergNativeScanTask; sources partition values from each task via PartitionedFile.partitionValues. |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala | Adds tests for insert-only changelog native scan and fallback for delete-bearing changelog scans, plus helpers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
yew1eb
left a comment
There was a problem hiding this comment.
Overall the PR is clean and well-structured. Two issues to address:
| } | ||
|
|
||
| private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { | ||
| val readSchema = scan.readSchema |
There was a problem hiding this comment.
changelogTask.get will throw NoSuchElementException if called with None and a partition schema containing a changelog column. The current callers keep these two in sync, but the method signature does not enforce it. Consider taking ChangelogScanTask directly instead of Option, or at least replace the .get calls with a match / getOrElse that throws a meaningful error.
| IcebergScanPlan( | ||
| Seq.empty, | ||
| FileFormat.PARQUET, | ||
| readSchema, |
There was a problem hiding this comment.
tasks.collect { case task: AnyRef => task } matches everything, so icebergTasks.size != tasks.size is always false. The size guard has become a no-op. Either keep the type as Seq[ScanTask] (the Iceberg interface), or drop the dead guard entirely.
Which issue does this PR close?
Closes #2253
Rationale for this change
Iceberg changelog scans currently fall back to Spark even when the changelog is insert-only. Insert-only changelog scans read added data files and can be executed by the existing native Parquet/ORC scan path.
What changes are included in this PR?
SparkChangelogScan.AddedRowsScanTaskinto native Iceberg scan tasks.Are there any user-facing changes?
No API changes. Some insert-only Iceberg changelog queries can now use native scan instead of falling back to Spark.
How was this patch tested?
Added Iceberg integration test coverage for insert-only changelog native scan correctness and plan conversion.