Skip to content

sink: add debezium-avro protocol#5475

Draft
wk989898 wants to merge 7 commits into
pingcap:masterfrom
wk989898:debezium-avro
Draft

sink: add debezium-avro protocol#5475
wk989898 wants to merge 7 commits into
pingcap:masterfrom
wk989898:debezium-avro

Conversation

@wk989898

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

wk989898 added 3 commits June 22, 2026 04:06
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot

ti-chi-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels Jun 22, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign lidezhu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jun 22, 2026
@coderabbitai

coderabbitai Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 427f8b29-b390-4e4e-a3c4-184a6c1eb97a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for Confluent Avro encoding within the Debezium protocol, allowing MQ downstreams to utilize the Confluent Avro Schema Registry. Key changes include the implementation of a Debezium Avro encoder, schema conversion logic, configuration validation, and comprehensive integration tests. Feedback on the changes highlights two critical bugs: a namespace sanitization and union branch name mismatch that could reject valid schemas, and an array optional items schema mismatch causing runtime encoding failures. Additionally, a performance concern was raised regarding double serialization/deserialization round-trips during row event encoding.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +420 to +440
func avroFullName(connectName string, fallbackName string) string {
if connectName != "" {
return connectName
}
if fallbackName != "" {
return avroFieldName(fallbackName)
}
return "ConnectDefault"
}

func splitAvroFullName(fullName string) (name string, namespace string) {
idx := strings.LastIndex(fullName, ".")
if idx < 0 {
return avroFieldName(fullName), ""
}
return avroFieldName(fullName[idx+1:]), fullName[:idx]
}

func avroFieldName(field string) string {
return common.SanitizeName(field)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical Bug: Namespace Sanitization and Union Branch Name Mismatch

Currently, splitAvroFullName only sanitizes the name part of the full name, leaving the namespace part unsanitized. If a database name, table name, or cluster ID contains invalid Avro characters (such as hyphens -), the generated Avro schema will be invalid and rejected by the Schema Registry.

Furthermore, avroUnionBranchName uses avroFullName directly, which returns the unsanitized name. This causes a mismatch between the union branch name (unsanitized) and the record name in the Avro schema (partially sanitized), leading to runtime encoding failures (e.g., unknown union member errors).

We can resolve both issues cleanly by introducing a sanitizeAvroFullName helper and ensuring avroFullName always returns a fully sanitized name.

func avroFullName(connectName string, fallbackName string) string {
	if connectName != "" {
		return sanitizeAvroFullName(connectName)
	}
	if fallbackName != "" {
		return avroFieldName(fallbackName)
	}
	return "ConnectDefault"
}

func sanitizeAvroFullName(fullName string) string {
	idx := strings.LastIndex(fullName, ".")
	if idx < 0 {
		return avroFieldName(fullName)
	}
	name := avroFieldName(fullName[idx+1:])
	parts := strings.Split(fullName[:idx], ".")
	for i, part := range parts {
		parts[i] = avroFieldName(part)
	}
	return strings.Join(parts, ".") + "." + name
}

func splitAvroFullName(fullName string) (name string, namespace string) {
	idx := strings.LastIndex(fullName, ".")
	if idx < 0 {
		return fullName, ""
	}
	return fullName[idx+1:], fullName[:idx]
}

func avroFieldName(field string) string {
	return common.SanitizeName(field)
}

Comment on lines +249 to +256
items, err := c.toAvroSchema(schema.Items, fallbackName+"Item")
if err != nil {
return nil, err
}
arraySchema := map[string]any{
"type": "array",
"items": items,
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical Bug: Array Optional Items Schema Mismatch

In the toAvroSchema case for "array", if schema.Items.Optional is true, the items schema is not wrapped in a union ["null", items]. However, in toNative, the item values are wrapped in goavro.Union if they are optional. This mismatch between the Avro schema and the native value structure will cause runtime encoding failures.

We should wrap items in a union schema in toAvroSchema when schema.Items.Optional is true.

Suggested change
items, err := c.toAvroSchema(schema.Items, fallbackName+"Item")
if err != nil {
return nil, err
}
arraySchema := map[string]any{
"type": "array",
"items": items,
}
items, err := c.toAvroSchema(schema.Items, fallbackName+"Item")
if err != nil {
return nil, err
}
if schema.Items.Optional {
items = []any{"null", items}
}
arraySchema := map[string]any{
"type": "array",
"items": items,
}

Comment on lines +54 to +67
func (d *BatchEncoder) appendAvroRowChangedEvent(
ctx context.Context,
topic string,
e *commonEvent.RowEvent,
) error {
keyBuf := bytes.Buffer{}
if err := d.codec.EncodeKey(e, &keyBuf); err != nil {
return errors.Trace(err)
}

valueBuf := bytes.Buffer{}
if err := d.codec.EncodeValue(e, &valueBuf); err != nil {
return errors.Trace(err)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Performance Overhead: Double Serialization/Deserialization Round-Trip

Currently, appendAvroRowChangedEvent serializes the row event to JSON using d.codec.EncodeKey and d.codec.EncodeValue, then immediately deserializes it back into Go structs in d.encodeAvroMessage to convert it to Avro. This double serialization/deserialization round-trip (RowEvent -> JSON -> Go Structs -> Avro) introduces significant CPU and memory overhead.

While this approach allows maximum reuse of the existing dbzCodec, please consider refactoring this in the future to directly encode RowEvent to Avro without the intermediate JSON step to improve performance.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@wk989898

Copy link
Copy Markdown
Collaborator Author

/test kafka

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@wk989898

Copy link
Copy Markdown
Collaborator Author

/test kafka

wk989898 added 2 commits June 23, 2026 06:50
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@wk989898

Copy link
Copy Markdown
Collaborator Author

/test kafka

@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

@wk989898: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-kafka-integration-heavy 79d7780 link true /test pull-cdc-kafka-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant