Skip to content

[FLINK-38831][route] The route configuration supports matching the first one#4212

Merged
yuxiqian merged 5 commits intoapache:masterfrom
linguoxuan:FLINK-38831-route
Mar 17, 2026
Merged

[FLINK-38831][route] The route configuration supports matching the first one#4212
yuxiqian merged 5 commits intoapache:masterfrom
linguoxuan:FLINK-38831-route

Conversation

@linguoxuan
Copy link
Copy Markdown
Contributor

@linguoxuan linguoxuan commented Jan 5, 2026

Purpose

This PR adds support for configurable route matching modes in Flink CDC pipelines. Previously, all route rules were evaluated and applied (all-match behavior). With this change, users can now choose between two matching strategies:

  • all-match (default): Apply all matching route rules to a table
  • first-match: Apply only the first matching route rule and stop evaluation

Core Components Changes

  1. RouteRule.java: Added MatchMode enum with two modes:
  • ALL_MATCH: Matches all applicable routing rules (default behavior)
  • FIRST_MATCH: Matches only the first applicable routing rule
  1. PipelineDef.java:
  • Added routeMode field to store the global route matching mode
  • Updated constructors to support the new field
  • Added getRouteMode() method
  1. YamlPipelineDefinitionParser.java:
  • Added parsing logic for route-mode configuration from the pipeline section
  • Defaults to "all-match" if not specified
  • Validates configuration values using RouteRule.MatchMode.fromConfigValue()
  1. SchemaOperatorTranslator.java:
  • Updated to pass routeMode from PipelineDef to SchemaRegistry
  1. SchemaRegistry.java:
  • Added routeMatchMode field
  • Updated constructor to accept match mode parameter
  • Passes match mode to TableIdRouter
  1. TableIdRouter.java:
  • Implemented match mode logic in the route() method
  • For FIRST_MATCH: Returns immediately after finding the first matching rule
  • For ALL_MATCH: Continues to evaluate all rules (existing behavior)

Tests

  1. TableIdRouterMatchModeTest.java
  2. YamlPipelineDefinitionParserTest.java

Usage

pipeline:
  name: mysql_to_doris_pipeline
  parallelism: 2
  route-mode: FIRST-MATCH

route:
  - source-table: mydb.order_.*
    sink-table: ods_db.ods_orders
    description: "Merge all order sharded tables"
  - source-table: mydb.product_.*
    sink-table: ods_db.ods_products
    description: "Merge all product sharded tables"
  - source-table: mydb.*
    sink-table: ods_db.ods_<>
    replace-symbol: <>
    description: "One-to-one mapping for other tables"

@linguoxuan
Copy link
Copy Markdown
Contributor Author

@yuxiqian Could you please take a look at this PR?

@yuxiqian yuxiqian self-requested a review January 5, 2026 13:06
Copy link
Copy Markdown
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @linguoxuan's nice work, just left some minor comments.

Adding more IT / E2e tests in FlinkPipelineComposerITCase and RouteE2eITCase would be nice, too.

Comment thread flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java Outdated
@linguoxuan linguoxuan force-pushed the FLINK-38831-route branch 2 times, most recently from 51947eb to d8a9460 Compare January 6, 2026 06:56
@github-actions github-actions Bot added the docs Improvements or additions to documentation label Jan 7, 2026
@sd4324530
Copy link
Copy Markdown
Contributor

Will this feature be available in version 3.6.0?

@lvyanquan lvyanquan added this to the V3.6.0 milestone Mar 3, 2026
@yuxiqian
Copy link
Copy Markdown
Member

Hi @linguoxuan, sorry for the late reply. Could you please rebase this PR with master so we can push this forward?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a configurable route matching strategy to Flink CDC pipelines so users can choose between applying all matching route rules (default) or stopping at the first match.

Changes:

  • Introduces RouteMode + PipelineOptions.PIPELINE_ROUTE_MODE and threads it through composer/runtime schema operators.
  • Updates TableIdRouter to support FIRST_MATCH vs ALL_MATCH behavior when routing.
  • Adds unit/integration/e2e test coverage and updates examples/docs.

Reviewed changes

Copilot reviewed 30 out of 30 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java Updates test harness wiring to pass RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java Updates distributed test harness wiring to pass RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java Adjusts unit tests to new SchemaOperator constructor signature including RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java Adjusts schema evolution tests to new constructor signature including RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java Adjusts distributed schema evolution tests to include RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java New unit tests validating FIRST_MATCH vs ALL_MATCH routing results.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java Updates shared test router initialization to include RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java Updates router construction in derivation tests to include RouteMode.
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java Updates router construction in common route tests to include RouteMode.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java Propagates RouteMode into operator + coordinator provider.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java Stores RouteMode and constructs TableIdRouter with it.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java Adds RouteMode to coordinator provider plumbing.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java Passes RouteMode into base SchemaRegistry.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java Stores RouteMode and constructs TableIdRouter with it for batch schema operator.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java Propagates RouteMode into distributed operator + coordinator provider.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java Stores RouteMode and constructs TableIdRouter with it.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java Adds RouteMode to coordinator provider plumbing.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java Passes RouteMode into base SchemaRegistry.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java Stores RouteMode and constructs TableIdRouter with it.
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java Adds e2e coverage for both route modes in a multi-route scenario.
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java Adds IT coverage ensuring composer honors route mode for routing fan-out vs first match.
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java Extends translator APIs to accept RouteMode and threads it to operators.
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java Passes pipelineDef.getRouteMode() into schema operator translation.
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java Adds getRouteMode() accessor backed by PIPELINE_ROUTE_MODE.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java Adds RouteMode support to routing evaluation logic.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java New enum defining ALL_MATCH and FIRST_MATCH.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java Adds route-mode config option with default ALL_MATCH.
flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml Adds a sample pipeline definition including route-mode.
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java Adds parser test covering route-mode parsing into PipelineDef.
README.md Documents route-mode in the YAML example.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 30 out of 30 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@yuxiqian yuxiqian force-pushed the FLINK-38831-route branch from 46d2483 to ce4e073 Compare March 17, 2026 07:59
@yuxiqian
Copy link
Copy Markdown
Member

Would @leonardBang @lvyanquan like to take a look?

Copy link
Copy Markdown
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuxiqian and @linguoxuan for the contribution, generally +1 from my side

@yuxiqian yuxiqian merged commit c9ffab7 into apache:master Mar 17, 2026
22 checks passed
Mrart pushed a commit to Mrart/flink-cdc that referenced this pull request Mar 26, 2026
ThorneANN pushed a commit to ThorneANN/flink-cdc that referenced this pull request Mar 31, 2026
@linguoxuan
Copy link
Copy Markdown
Contributor Author

Hi @linguoxuan, sorry for the late reply. Could you please rebase this PR with master so we can push this forward?

Sorry, I missed your message.

@yuxiqian
Copy link
Copy Markdown
Member

yuxiqian commented Apr 7, 2026

Never mind, thanks for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants