Skip to content

[core][flink] Support parallelism snapshot expire#7027

Draft
wzhero1 wants to merge 1 commit intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt
Draft

[core][flink] Support parallelism snapshot expire#7027
wzhero1 wants to merge 1 commit intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt

Conversation

@wzhero1
Copy link
Contributor

@wzhero1 wzhero1 commented Jan 13, 2026

Purpose

This PR implements parallel snapshot expiration to improve the performance of large-scale cleanup operations.

Motivation:

  • Serial file deletion becomes a performance bottleneck for tables with large amounts of data
  • Current implementation cannot leverage Flink's distributed computing capabilities

Changes:

  1. Core module refactoring - Split the original serial expiration logic into Planner/Executor architecture:

    • ExpireSnapshotsPlanner: Computes expiration plan including snapshot range and four types of tasks
    • ExpireSnapshotsExecutor: Executes deletion tasks based on task type
    • ExpireSnapshotsPlan: Data structure containing task lists and protection set
    • SnapshotExpireTask: Represents a single expiration task with TaskType enum
    • ProtectionSet: Immutable set of protected manifests and tagged snapshots
    • DeletionReport: Execution report for each task
  2. Flink Action parallel mode - ExpireSnapshotsAction supports --parallelism parameter:

    • Worker Phase: Parallel deletion of data files and changelog files using RangePartitionedExpireFunction
    • Sink Phase: Serial deletion of manifests and snapshot metadata using SnapshotExpireSink
    • Tasks are partitioned by snapshot ID range to maximize cache locality

Execution modes:

  • Serial mode (default): parallelism=null or ≤1 → Uses ExpireSnapshotsImpl
  • Parallel mode: parallelism>1 + --force_start_flink_job → Uses Flink distributed execution

Tests

Unit Tests:

  • ExpireSnapshotsPlanTest - Tests task partitioning logic (partitionTasksBySnapshotRange)
  • DeletionReportTest - Tests deletion report serialization
  • ExpireSnapshotsTest - Core expiration logic tests (refactored to use new Planner/Executor)

Integration Tests:

  • ExpireSnapshotsActionITCase - Flink parallel mode integration tests
  • ExpireSnapshotsProcedureITCase - Procedure integration tests

API and Format

New CLI parameters for expire-snapshots action:

Parameter Type Default Description
--parallelism Integer null Parallelism for parallel mode (requires >1)

No storage format changes.

Backward compatible: Default behavior (serial mode) remains unchanged.

Documentation

Yes, this introduces a new feature: parallel snapshot expiration.

Documentation should be added to describe:

  • New --parallelism parameter for expire-snapshots action
  • Requirement: parallel mode needs both --parallelism > 1 and --force_start_flink_job
  • Performance recommendations for large-scale cleanup

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from 69b0a18 to f61b9b4 Compare January 13, 2026 11:13
@wzhero1 wzhero1 changed the title [flink] Support parallelism snapshot expire [core][flink] Support parallelism snapshot expire Jan 13, 2026
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from f61b9b4 to be7a32b Compare January 13, 2026 12:19
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from be7a32b to 59788fc Compare January 26, 2026 05:49
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments as below.

}

@Override
public void run() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.


/** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */
private boolean isParallelMode() {
return forceStartFlinkJob && parallelism != null && parallelism > 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing users to set the parallelism of this action might increase their burden to understand the logic of Paimon. It might be better to derive the parallelism of the job automatically on its own by default.

I noticed that parallelism is mainly used to distribute SnapshotExpireTasks evenly between the RangePartitionedExpireFunction subtasks, so this PR created a List<List<SnapshotExpireTask>> with parallelism batches. Compared with this design, a better way for Flink might be to have the subtasks directly fetch from the original list, continuing to consume the next task after completing the former one. On the one hand, this would make it unnecessary to decide the parallelism of the job from the beginning. On the other hand, this could also achieve better dynamic rebalancing during runtime in case different SnapshotExpireTasks have different workloads.

Besides, the Flink configuration might be a better place to set the job's parallelism, compared with Action arguments.

// Prepare table and config using shared method
Pair<FileStoreTable, ExpireConfig> prepared =
resolveExpireTableAndConfig(
catalog.getTable(Identifier.fromString(database + "." + table)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 168 there is

Identifier identifier = new Identifier(database, table);

We can move this line to above to reuse the created identifier variable here.


// Prepare table and config using shared method
Pair<FileStoreTable, ExpireConfig> prepared =
resolveExpireTableAndConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of method resolveExpireTableAndConfig seems to increase the code complexity a little bit. It might be better to directly put its implementation here.

* @param parallelism target parallelism for distribution
* @return list of task groups, one per worker
*/
public List<List<SnapshotExpireTask>> partitionTasksBySnapshotRange(int parallelism) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this PR divides the expire process of a snapshot into different SnapshotExpireTasks first, and then sends the tasks of the same snapshot to the same Flink subtask. If this is the case, why should this PR make the division first? Would the following implementation simpler?

// calculates the start and end snapshot id before this part of code

env.fromSequence(startId, endId)
    .flatMap(new SnapshotExpireFunction()) // deletes the data, manifest, and snapshot file sequentially
    .sinkTo(new SnapshotExpireSink());

return new ExpireSinkWriter(initExecutor());
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the following javadoc here?

/**
 * Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.18-.
 */

snapshotFileTasks.size());

// 1. Clean empty directories
executor.cleanEmptyDirectories(globalDeletionBuckets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here should be aware of the configuration snapshot.clean-empty-directories.

}

public void setDataFilesDeleted(boolean dataFilesDeleted) {
this.dataFilesDeleted = dataFilesDeleted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that these fields are only updated, but unused except in toString method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants