[core][flink] Support parallelism snapshot expire#7027
[core][flink] Support parallelism snapshot expire#7027wzhero1 wants to merge 1 commit intoapache:masterfrom
Conversation
69b0a18 to
f61b9b4
Compare
f61b9b4 to
be7a32b
Compare
be7a32b to
59788fc
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments as below.
| } | ||
|
|
||
| @Override | ||
| public void run() throws Exception { |
There was a problem hiding this comment.
In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.
|
|
||
| /** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */ | ||
| private boolean isParallelMode() { | ||
| return forceStartFlinkJob && parallelism != null && parallelism > 1; |
There was a problem hiding this comment.
Forcing users to set the parallelism of this action might increase their burden to understand the logic of Paimon. It might be better to derive the parallelism of the job automatically on its own by default.
I noticed that parallelism is mainly used to distribute SnapshotExpireTasks evenly between the RangePartitionedExpireFunction subtasks, so this PR created a List<List<SnapshotExpireTask>> with parallelism batches. Compared with this design, a better way for Flink might be to have the subtasks directly fetch from the original list, continuing to consume the next task after completing the former one. On the one hand, this would make it unnecessary to decide the parallelism of the job from the beginning. On the other hand, this could also achieve better dynamic rebalancing during runtime in case different SnapshotExpireTasks have different workloads.
Besides, the Flink configuration might be a better place to set the job's parallelism, compared with Action arguments.
| // Prepare table and config using shared method | ||
| Pair<FileStoreTable, ExpireConfig> prepared = | ||
| resolveExpireTableAndConfig( | ||
| catalog.getTable(Identifier.fromString(database + "." + table)), |
There was a problem hiding this comment.
In line 168 there is
Identifier identifier = new Identifier(database, table);We can move this line to above to reuse the created identifier variable here.
|
|
||
| // Prepare table and config using shared method | ||
| Pair<FileStoreTable, ExpireConfig> prepared = | ||
| resolveExpireTableAndConfig( |
There was a problem hiding this comment.
The introduction of method resolveExpireTableAndConfig seems to increase the code complexity a little bit. It might be better to directly put its implementation here.
| * @param parallelism target parallelism for distribution | ||
| * @return list of task groups, one per worker | ||
| */ | ||
| public List<List<SnapshotExpireTask>> partitionTasksBySnapshotRange(int parallelism) { |
There was a problem hiding this comment.
It seems that this PR divides the expire process of a snapshot into different SnapshotExpireTasks first, and then sends the tasks of the same snapshot to the same Flink subtask. If this is the case, why should this PR make the division first? Would the following implementation simpler?
// calculates the start and end snapshot id before this part of code
env.fromSequence(startId, endId)
.flatMap(new SnapshotExpireFunction()) // deletes the data, manifest, and snapshot file sequentially
.sinkTo(new SnapshotExpireSink());| return new ExpireSinkWriter(initExecutor()); | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Do we need the following javadoc here?
/**
* Do not annotate with <code>@override</code> here to maintain compatibility with Flink 1.18-.
*/| snapshotFileTasks.size()); | ||
|
|
||
| // 1. Clean empty directories | ||
| executor.cleanEmptyDirectories(globalDeletionBuckets); |
There was a problem hiding this comment.
The implementation here should be aware of the configuration snapshot.clean-empty-directories.
| } | ||
|
|
||
| public void setDataFilesDeleted(boolean dataFilesDeleted) { | ||
| this.dataFilesDeleted = dataFilesDeleted; |
There was a problem hiding this comment.
Seems that these fields are only updated, but unused except in toString method.
Purpose
This PR implements parallel snapshot expiration to improve the performance of large-scale cleanup operations.
Motivation:
Changes:
Core module refactoring - Split the original serial expiration logic into Planner/Executor architecture:
TaskTypeenumFlink Action parallel mode - ExpireSnapshotsAction supports
--parallelismparameter:Execution modes:
parallelism=null or ≤1→ UsesExpireSnapshotsImplparallelism>1 + --force_start_flink_job→ Uses Flink distributed executionTests
Unit Tests:
ExpireSnapshotsPlanTest- Tests task partitioning logic (partitionTasksBySnapshotRange)DeletionReportTest- Tests deletion report serializationExpireSnapshotsTest- Core expiration logic tests (refactored to use new Planner/Executor)Integration Tests:
ExpireSnapshotsActionITCase- Flink parallel mode integration testsExpireSnapshotsProcedureITCase- Procedure integration testsAPI and Format
New CLI parameters for
expire-snapshotsaction:--parallelismNo storage format changes.
Backward compatible: Default behavior (serial mode) remains unchanged.
Documentation
Yes, this introduces a new feature: parallel snapshot expiration.
Documentation should be added to describe:
--parallelismparameter forexpire-snapshotsaction--parallelism > 1and--force_start_flink_job