Skip to content

Add commit retry and concurrency validation for writes#3320

Open
lawofcycles wants to merge 14 commits into
apache:mainfrom
lawofcycles:feat/commit-retry-and-validation
Open

Add commit retry and concurrency validation for writes#3320
lawofcycles wants to merge 14 commits into
apache:mainfrom
lawofcycles:feat/commit-retry-and-validation

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

@lawofcycles lawofcycles commented May 3, 2026

Closes #3319
Closes #819
Closes #269

Rationale for this change

PyIceberg currently fails immediately with CommitFailedException when a concurrent transaction commits first, regardless of whether the writes actually conflict. Java Iceberg handles this transparently through its retry loop in SnapshotProducer.commit().

This PR adds automatic commit retry with exponential backoff and data conflict validation to PyIceberg, matching Java Iceberg's behavior. On CommitFailedException, the retry loop refreshes table metadata, re-runs validation, and regenerates manifests. If validation detects a real data conflict, the operation aborts with ValidationException instead of retrying.

The retry loop is placed in Transaction.commit_transaction() rather than in individual snapshot producers. This is necessary because Transaction.delete() uses two producers (_DeleteFiles + _OverwriteFiles) that must be committed atomically. Retrying at the producer level would break this atomicity.

Validation behavior follows Java's BaseOverwriteFiles.validate(), using the existing validation functions from validate.py that were contributed through #1935, #1938, #2050, and #3049.

Are these changes tested?

Yes. Unit tests and integration tests covering retry success, ValidationException abort, retry exhaustion, isolation levels, partition-level conflict detection, manifest cleanup, and producer state reset.

Are there any user-facing changes?

Yes. Previously, all concurrent write conflicts resulted in CommitFailedException.

Now:

  • Compatible concurrent writes (e.g. concurrent appends) are retried automatically and succeed transparently
  • Incompatible concurrent writes (e.g. concurrent deletes on the same data) raise ValidationException instead of CommitFailedException

The following new table properties are supported.

  • commit.retry.num-retries (default: 4)
  • commit.retry.min-wait-ms (default: 100)
  • commit.retry.max-wait-ms (default: 60000)
  • write.delete.isolation-level (default: serializable)
  • write.update.isolation-level (default: serializable)

Add automatic retry with exponential backoff when catalog commits fail
due to concurrent transactions (CommitFailedException), and integrate
the existing validation functions from validate.py into the write path
to detect incompatible concurrent modifications (ValidationException).

The retry loop is placed in Transaction.commit_transaction(). On each
retry attempt, table metadata is refreshed, registered snapshot
producers are re-executed to regenerate manifests, and data conflict
validation is run. Uncommitted manifests from failed attempts are
cleaned up after a successful commit.

Validation is performed for _OverwriteFiles and _DeleteFiles based on
the table's isolation level (serializable/snapshot). _FastAppendFiles
and _MergeAppendFiles do not require validation since appends never
conflict.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Skip _validate_no_new_delete_files and _validate_deleted_data_files
when conflict_detection_filter is None, matching Java's
BaseOverwriteFiles.validate() behavior for rowFilter == AlwaysFalse().

Route isolation level property based on the calling operation.
Transaction.delete() uses write.delete.isolation-level (default).
Transaction.overwrite(), dynamic_partition_overwrite(), and upsert()
use write.update.isolation-level via _isolation_level_property on
the snapshot producer.

Remove unused WRITE_MERGE_ISOLATION_LEVEL constant.`

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Use Operation enum instead of string literals for producer
construction. Use .value for IsolationLevel string comparison
to avoid unreachable statement warning.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Fix _build_delete_files_partition_predicate overwriting _case_sensitive
to True by passing the current value to delete_by_predicate. This
caused case-insensitive deletes to fail when _OverwriteFiles was used
with a user-specified predicate.

Move import random/time to file top level. Add total timeout
(commit.retry.total-timeout-ms) to the retry loop. Add comments for
intentional validation duplication and cached_property clearing.
Stabilize test_commit_retry_on_commit_failed by removing flaky
patch.object assertion.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
In CI, pyiceberg.table module is loaded twice, creating two distinct
Transaction class objects. patch.object on the test-imported Transaction
does not affect the runtime Transaction used by Table.append(). Fix by
resolving Transaction from pyiceberg.table module at runtime.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Benchmark results

This PR brings three capabilities to PyIceberg's write path.

  1. Transparent retry for concurrent writes. Users no longer need to implement retry logic around table.append() or table.delete().
  2. Data conflict validation. Incompatible concurrent modifications (e.g. concurrent deletes on the same data) are detected and rejected with ValidationException, preventing silent data corruption.
  3. Efficient retry via data file reuse. On retry, only manifests are regenerated. Data files already written to S3 are reused, avoiding redundant Parquet writes.

To validate (3), I benchmarked concurrent appends using the NYC Yellow Taxi dataset (2024-01, 2.9M rows, 19 columns) with Glue Data Catalog + S3.

Before vs After

Without this PR, concurrent appends fail immediately with CommitFailedException. Only one writer succeeds per batch, regardless of parallelism.

Workers Before (no retry) After (this PR)
2 50.0% 100.0%
4 25.0% 100.0%
8 12.5% 100.0%

(N workers x 10 batches x 1K rows, commit.retry.num-retries=10, commit.retry.min-wait-ms=500)

Internal retry vs user-side retry

Compared the internal retry (this PR) against a user-side retry that catches CommitFailedException and re-does load_table + append from scratch. Both use the same backoff parameters (retries=15, min-wait=500ms).

Workers Internal retry User-side retry Speedup
2 33s 46s 1.4x
4 68s 87s 1.3x
8 167s 299s 1.8x
16 399s 588s 1.5x

(3 batches per worker, ~370K-1.5M rows per batch depending on worker count)

Internal retry is faster because it reuses data files already written to S3 and only regenerates manifests on retry. User-side retry rewrites Parquet files on every attempt.

Interestingly, internal retry actually performs more retries than user-side retry (88 vs 50 total retries at 8 workers), because the shorter retry window increases commit attempt density. Despite more retries, the total time is lower because each retry is much cheaper.

Tuning commit.retry.min-wait-ms

Tested different min-wait-ms values with 8 workers to find the optimal backoff for Glue.

min-wait-ms Total time Total retries
100 158s 78
500 126s 115
1000 238s 67
2000 235s 63
3000 206s 41

The default (100ms, matching Java Iceberg) works reasonably well, but 500ms is optimal for Glue. Too short causes contention storms, too long wastes time waiting. The optimal value depends on the catalog's commit latency.

Copy link
Copy Markdown

@qzyu999 qzyu999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lawofcycles, thanks so much for this amazing PR. I took a look and saw two spaces so far where there are some minor gaps that can be easily patched.

The first is regarding AssertTableUUID, where I notice a pattern of repetitively adding/removing it inside the retry loop for commit_transaction(). I believe this can be resolved simply by moving the addition part outside the for-loop.

The second is also regarding commit_transaction(), where in the case of an abort (e.g., ValidationException), there will be some orphaned manifest files. This can be easily fixed by adding a try/except around the for-loop itself, making sure upon failure that both _uncommitted_manifests and _written_manifests are cleared.

Thanks again for the great work, I look forward to #3320 merging so that I may integrate the changes into #3131, PTAL!

Comment thread pyiceberg/table/__init__.py Outdated
Comment on lines +1001 to +1003
for attempt in range(num_retries + 1):
try:
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Here AssertTableUUID is appended to self._requirements within each retry loop, but below in _rebuild_snapshot_updates it's removed again with:

self._requirements = tuple(r for r in self._requirements if not isinstance(r, (AssertRefSnapshotId, AssertTableUUID)))

This can be simplified by moving self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),) outside the for-loop and updating the line in _rebuild_snapshot_updates to simply:

self._requirements = tuple(r for r in self._requirements if not isinstance(r, AssertRefSnapshotId))

The reason being is that AssertTableUUID would remain constant the whole time, so we're simply adding and removing it within each retry.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I moved it outside the loop and removed the AssertTableUUID filter from _rebuild_snapshot_updates.

Comment on lines +372 to +379
def _cleanup_uncommitted(self) -> None:
"""Delete manifest files from failed retry attempts."""
for path in self._uncommitted_manifests:
try:
self._io.delete(path)
except Exception:
logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True)
self._uncommitted_manifests.clear()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We could also add a second similar function as follows:

    def _clean_all_uncommitted(self) -> None:
        """Clean up all manifests written during this producer's lifecycle on abort."""
        for path in itertools.chain(self._uncommitted_manifests, self._written_manifests):
            try:
                self._io.delete(path)
            except Exception:
                logger.warning("Failed to delete uncommitted manifest: %s", path, exc_info=True)
        self._uncommitted_manifests.clear()
        self._written_manifests.clear()

then in Transaciton.commit_transaction(), we can add a try/except to the for-loop as follows:

        try:
            for attempt in range(num_retries + 1):
                try:
                    self._table._do_commit(...)
                    self._cleanup_uncommitted_manifests()
                    break
                except CommitFailedException:
                    ... # retry logic
        except Exception:
            # Catch ValidationException or retry exhaustion
            for producer in self._snapshot_producers:
                producer._clean_all_uncommitted()
            raise

this would then allow the PyIceberg implementation to mirror the cleanAll() method in Java. In the current implementation, the for-loop for retrying will only clear out the _uncommitted_manifests from the previous failed retries, but we can extend this with _clean_all_uncommitted which will clear out that and _written_manifests from the current attempt in the case of a permanent abort. This would fix the gap for orphaned manifests from ValidationException (or other permanent failures) that are not cleaned up. I also think it's worth mentioning that this fix could be cleanly added to this PR without waiting for a full Delete orphaned files implementation in PyIceberg. WDYT about adding this into the current PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I added _clean_all_uncommitted() that cleans up both _uncommitted_manifests and _written_manifests, and wrapped the retry loop with try/except so it gets called on any permanent failure (ValidationException, retry exhaustion, etc.).

@antoniogplobato
Copy link
Copy Markdown

Any update on this? We are currently facing a production issue that this PR would solve.

AssertTableUUID is constant across retries, so add it once before the
loop instead of adding/removing on each iteration.

Add _clean_all_uncommitted() that deletes both _uncommitted_manifests
and _written_manifests on permanent failure, fixing orphaned manifests
from the last attempt.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles requested a review from qzyu999 May 27, 2026 20:28
@lawofcycles
Copy link
Copy Markdown
Contributor Author

@qzyu999 Thanks for the review and sorry for the late response. Both addressed in the latest commit. Looking forward to seeing this integrated with #3131!

Copy link
Copy Markdown

@qzyu999 qzyu999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lawofcycles, thank you so much for accepting the suggested changes. I reviewed those and they all look correct.

@rambleraptor
Copy link
Copy Markdown
Contributor

rambleraptor commented May 29, 2026

Thanks for writing this! This is such a useful feature and there's a ton of nuance.

Here's a short Gist explaining this issue that I'm seeing. Code is often easier to parse than writing.

We try to commit and there's a conflict. Now, while we're waiting to retry, a second conflicting commit comes in. We're now two commits behind. We have to make sure that there's no issues against both of these commits.

We should have this example as a test. As it stands, we're ignoring one of the commits.

Comment thread pyiceberg/table/update/snapshot.py Outdated
conflict_detection_filter = self._predicate if self._predicate != AlwaysFalse() else None

if isolation_level == IsolationLevel.SERIALIZABLE:
_validate_added_data_files(table, parent_snapshot, conflict_detection_filter, parent_snapshot)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are both of these parent_snapshot?

Copy link
Copy Markdown
Contributor Author

@lawofcycles lawofcycles May 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be. This was a bug. _parent_snapshot_id gets updated on each retry, so passing it for both collapsed the validation window to zero. Fixed by introducing _starting_snapshot_id that stays fixed across retries.

The concurrency validation was using parent_snapshot (current head) for
both the starting point and ending point of the validation window. When
multiple concurrent commits occur during retry sleep, the validation
would only inspect the latest head and miss conflicting commits below it.

Introduce _starting_snapshot_id that is fixed at operation init time and
does not change on retry. Also fix _validation_history to use exclusive
semantics for from_snapshot, matching Java Iceberg's ancestorsBetween.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

@rambleraptor Thanks for the repro, this made the issue very clear. Confirmed and fixed. The validation now pins the original base snapshot at init time so the window covers all concurrent commits, not just the latest head.


table = self._transaction._table
parent_snapshot = table.metadata.snapshot_by_id(self._parent_snapshot_id)
if parent_snapshot is None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make these short-circuits a bit more targeted?

self._parent_snapshot_id is None seems like a valid empty-table/new-branch case, but if a non-null parent or starting snapshot id cannot be resolved, I’m not sure we should silently skip validation. Would it make sense to raise in those cases, or otherwise distinguish expected no-snapshot cases from unexpected missing snapshots?

Copy link
Copy Markdown
Contributor Author

@lawofcycles lawofcycles Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Now raises ValidationException when the ID is non-null but unresolvable. The _parent_snapshot_id is None early return stays for the empty-table case.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add coverage for operations that stage more than one snapshot? For example, Transaction.delete() can create both _DeleteFiles and _OverwriteFiles when one file is fully deleted and another is partially rewritten.

For example:

    def test_mixed_delete_overwrite_starts_from_catalog_snapshot(catalog: Catalog) -> None:
        """Mixed full-file and partial deletes should validate from the original table snapshot."""
        catalog.create_namespace("default")
        schema = _test_schema()
        table = catalog.create_table("default.mixed_delete_start_snapshot", schema=schema)
    
        import pyarrow as pa
        from pyiceberg.table.update.snapshot import _DeleteFiles, _OverwriteFiles
    
        table.append(pa.table({"x": [1, 2]}))
        table.append(pa.table({"x": [2, 3]}))
    
        base_snapshot_id = table.metadata.current_snapshot_id
    
        tx = Transaction(table, autocommit=False)
        tx.delete("x <= 2")
    
        assert len(tx._snapshot_producers) == 2
    
        delete_producer, overwrite_producer = tx._snapshot_producers
        assert isinstance(delete_producer, _DeleteFiles)
        assert isinstance(overwrite_producer, _OverwriteFiles)
    
        assert delete_producer._starting_snapshot_id == base_snapshot_id
        assert overwrite_producer._starting_snapshot_id == base_snapshot_id

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Writing this test exposed a bug: _OverwriteFiles was picking up the post-_DeleteFiles snapshot as its starting point. Fixed by propagating _starting_snapshot_id from the delete producer.

…opagation

Address review feedback on commit retry validation:

- _validate_concurrency() now raises ValidationException when
parent_snapshot_id or starting_snapshot_id is non-null but cannot
be resolved, instead of silently skipping validation.
- Fix _starting_snapshot_id not being propagated from _DeleteFiles to
_OverwriteFiles in Transaction.delete(), which caused the conflict
detection window to collapse when both producers are involved.
- Move ValidationException import to module level per project convention.
- Add commit retry and isolation level properties to configuration docs.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Copy link
Copy Markdown
Contributor

@rambleraptor rambleraptor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have more thoughts later, but I wanted to get some basic ones down. The logic here is complicated

| Key | Options | Default | Description |
| -------------------------------- | ---------------- | --------- | ------------------------------------------------------------------ |
| `commit.retry.num-retries` | Integer | 4 | Maximum number of retry attempts after a commit conflict |
| `commit.retry.min-wait-ms` | Integer (ms) | 100 | Minimum wait time before the first retry |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did you get these defaults from? (Fine to say you made them up!)

self._table._do_commit( # pylint: disable=W0212
updates=self._updates,
requirements=self._requirements,
from pyiceberg.utils.properties import property_as_int
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we can't import this at the top?

return

table = self._transaction._table
parent_snapshot = table.metadata.snapshot_by_id(self._parent_snapshot_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a helper function? Something like get_parent_snapshot?

In general, there's a lot of tricky logic here and I'm looking for places to help us make it easier to understand.

if parent_snapshot is None:
raise ValidationException(f"Cannot find parent snapshot {self._parent_snapshot_id} in table metadata")

starting_snapshot_id = self._starting_snapshot_id if self._starting_snapshot_id is not None else self._parent_snapshot_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing - a helper function like fetch_starting_snapshot

Note: This method is intentionally duplicated in _OverwriteFiles rather than
extracted to the base class. While the logic is currently identical, Java Iceberg's
BaseOverwriteFiles and BaseRowDelta have divergent validation. Keeping them separate
makes it easier to add RowDelta-specific validation in the future.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'm reading through this comment now. Couple thoughts:

  1. If Java's implementation has diverged, why aren't we diverging?
  2. If these methods are the same, we need to just have them be the same. If they diverge in the future, then we can separate them out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add commit retry with data conflict validation Support IsolationLevels and Concurrency Safety Validation Checks Support intelligent commit retries

6 participants