Skip to content

[AURON #2253] Support insert-only Iceberg changelog native scan#2254

Open
weimingdiit wants to merge 1 commit into
apache:masterfrom
weimingdiit:feat/support-iceberg-changelog
Open

[AURON #2253] Support insert-only Iceberg changelog native scan#2254
weimingdiit wants to merge 1 commit into
apache:masterfrom
weimingdiit:feat/support-iceberg-changelog

Conversation

@weimingdiit
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2253

Rationale for this change

Iceberg changelog scans currently fall back to Spark even when the changelog is insert-only. Insert-only changelog scans read added data files and can be executed by the existing native Parquet/ORC scan path.

What changes are included in this PR?

  • Add native scan planning support for Iceberg SparkChangelogScan.
  • Convert insert-only AddedRowsScanTask into native Iceberg scan tasks.
  • Materialize changelog metadata columns as per-task partition values.
  • Keep non-insert changelog operations, delete files, mixed file formats, and unsupported metadata columns on Spark fallback.
  • Reuse the existing native Iceberg scan execution path.

Are there any user-facing changes?

No API changes. Some insert-only Iceberg changelog queries can now use native scan instead of falling back to Spark.

How was this patch tested?

Added Iceberg integration test coverage for insert-only changelog native scan correctness and plan conversion.

@weimingdiit weimingdiit marked this pull request as ready for review May 14, 2026 12:58
@slfan1989 slfan1989 requested a review from Copilot May 15, 2026 15:17
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds native scan support for Iceberg SparkChangelogScan when the changelog is insert-only. Generalizes the existing native Iceberg scan planning to handle ChangelogScanTask (specifically AddedRowsScanTask), materializes changelog metadata columns (_change_type, _change_ordinal, _commit_snapshot_id) as per-task partition values, and refactors the executor to consume a new IcebergNativeScanTask abstraction instead of FileScanTask directly.

Changes:

  • Introduce IcebergNativeScanTask and route both file-scan and changelog-scan planning through a shared task abstraction with per-task partition values.
  • Plan SparkChangelogScan natively only for insert-only AddedRowsScanTask, Parquet/ORC, no deletes, and supported metadata columns; otherwise fall back to Spark.
  • Update NativeIcebergTableScanExec to read file size, partition values, and split info from the new task type and propagate per-task partition values into PartitionedFile.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala Splits planning into file-scan vs changelog-scan paths, adds IcebergNativeScanTask and per-task metadata-value materialization including changelog columns.
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala Switches from FileScanTask to IcebergNativeScanTask; sources partition values from each task via PartitionedFile.partitionValues.
thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala Adds tests for insert-only changelog native scan and fallback for delete-bearing changelog scans, plus helpers.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@yew1eb yew1eb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the PR is clean and well-structured. Two issues to address:

}

private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = {
val readSchema = scan.readSchema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelogTask.get will throw NoSuchElementException if called with None and a partition schema containing a changelog column. The current callers keep these two in sync, but the method signature does not enforce it. Consider taking ChangelogScanTask directly instead of Option, or at least replace the .get calls with a match / getOrElse that throws a meaningful error.

IcebergScanPlan(
Seq.empty,
FileFormat.PARQUET,
readSchema,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks.collect { case task: AnyRef => task } matches everything, so icebergTasks.size != tasks.size is always false. The size guard has become a no-op. Either keep the type as Seq[ScanTask] (the Iceberg interface), or drop the dead guard entirely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support insert-only Iceberg changelog native scan

3 participants