[FLINK-38831][route] The route configuration supports matching the first one#4212
[FLINK-38831][route] The route configuration supports matching the first one#4212yuxiqian merged 5 commits intoapache:masterfrom
Conversation
|
@yuxiqian Could you please take a look at this PR? |
yuxiqian
left a comment
There was a problem hiding this comment.
Thanks for @linguoxuan's nice work, just left some minor comments.
Adding more IT / E2e tests in FlinkPipelineComposerITCase and RouteE2eITCase would be nice, too.
51947eb to
d8a9460
Compare
|
Will this feature be available in version 3.6.0? |
757d5e9 to
903e65f
Compare
|
Hi @linguoxuan, sorry for the late reply. Could you please rebase this PR with master so we can push this forward? |
903e65f to
9c9f720
Compare
There was a problem hiding this comment.
Pull request overview
Adds a configurable route matching strategy to Flink CDC pipelines so users can choose between applying all matching route rules (default) or stopping at the first match.
Changes:
- Introduces
RouteMode+PipelineOptions.PIPELINE_ROUTE_MODEand threads it through composer/runtime schema operators. - Updates
TableIdRouterto supportFIRST_MATCHvsALL_MATCHbehavior when routing. - Adds unit/integration/e2e test coverage and updates examples/docs.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java | Updates test harness wiring to pass RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java | Updates distributed test harness wiring to pass RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java | Adjusts unit tests to new SchemaOperator constructor signature including RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java | Adjusts schema evolution tests to new constructor signature including RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java | Adjusts distributed schema evolution tests to include RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java | New unit tests validating FIRST_MATCH vs ALL_MATCH routing results. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java | Updates shared test router initialization to include RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java | Updates router construction in derivation tests to include RouteMode. |
| flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java | Updates router construction in common route tests to include RouteMode. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java | Propagates RouteMode into operator + coordinator provider. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java | Stores RouteMode and constructs TableIdRouter with it. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java | Adds RouteMode to coordinator provider plumbing. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java | Passes RouteMode into base SchemaRegistry. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java | Stores RouteMode and constructs TableIdRouter with it for batch schema operator. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java | Propagates RouteMode into distributed operator + coordinator provider. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java | Stores RouteMode and constructs TableIdRouter with it. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java | Adds RouteMode to coordinator provider plumbing. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java | Passes RouteMode into base SchemaRegistry. |
| flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java | Stores RouteMode and constructs TableIdRouter with it. |
| flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java | Adds e2e coverage for both route modes in a multi-route scenario. |
| flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java | Adds IT coverage ensuring composer honors route mode for routing fan-out vs first match. |
| flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java | Extends translator APIs to accept RouteMode and threads it to operators. |
| flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java | Passes pipelineDef.getRouteMode() into schema operator translation. |
| flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java | Adds getRouteMode() accessor backed by PIPELINE_ROUTE_MODE. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java | Adds RouteMode support to routing evaluation logic. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java | New enum defining ALL_MATCH and FIRST_MATCH. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java | Adds route-mode config option with default ALL_MATCH. |
| flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml | Adds a sample pipeline definition including route-mode. |
| flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java | Adds parser test covering route-mode parsing into PipelineDef. |
| README.md | Documents route-mode in the YAML example. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 30 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
46d2483 to
ce4e073
Compare
|
Would @leonardBang @lvyanquan like to take a look? |
There was a problem hiding this comment.
Thanks @yuxiqian and @linguoxuan for the contribution, generally +1 from my side
Sorry, I missed your message. |
|
Never mind, thanks for your contribution! |
Purpose
This PR adds support for configurable route matching modes in Flink CDC pipelines. Previously, all route rules were evaluated and applied (all-match behavior). With this change, users can now choose between two matching strategies:
Core Components Changes
Tests
Usage