sink: add debezium-avro protocol#5475
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces support for Confluent Avro encoding within the Debezium protocol, allowing MQ downstreams to utilize the Confluent Avro Schema Registry. Key changes include the implementation of a Debezium Avro encoder, schema conversion logic, configuration validation, and comprehensive integration tests. Feedback on the changes highlights two critical bugs: a namespace sanitization and union branch name mismatch that could reject valid schemas, and an array optional items schema mismatch causing runtime encoding failures. Additionally, a performance concern was raised regarding double serialization/deserialization round-trips during row event encoding.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func avroFullName(connectName string, fallbackName string) string { | ||
| if connectName != "" { | ||
| return connectName | ||
| } | ||
| if fallbackName != "" { | ||
| return avroFieldName(fallbackName) | ||
| } | ||
| return "ConnectDefault" | ||
| } | ||
|
|
||
| func splitAvroFullName(fullName string) (name string, namespace string) { | ||
| idx := strings.LastIndex(fullName, ".") | ||
| if idx < 0 { | ||
| return avroFieldName(fullName), "" | ||
| } | ||
| return avroFieldName(fullName[idx+1:]), fullName[:idx] | ||
| } | ||
|
|
||
| func avroFieldName(field string) string { | ||
| return common.SanitizeName(field) | ||
| } |
There was a problem hiding this comment.
Critical Bug: Namespace Sanitization and Union Branch Name Mismatch
Currently, splitAvroFullName only sanitizes the name part of the full name, leaving the namespace part unsanitized. If a database name, table name, or cluster ID contains invalid Avro characters (such as hyphens -), the generated Avro schema will be invalid and rejected by the Schema Registry.
Furthermore, avroUnionBranchName uses avroFullName directly, which returns the unsanitized name. This causes a mismatch between the union branch name (unsanitized) and the record name in the Avro schema (partially sanitized), leading to runtime encoding failures (e.g., unknown union member errors).
We can resolve both issues cleanly by introducing a sanitizeAvroFullName helper and ensuring avroFullName always returns a fully sanitized name.
func avroFullName(connectName string, fallbackName string) string {
if connectName != "" {
return sanitizeAvroFullName(connectName)
}
if fallbackName != "" {
return avroFieldName(fallbackName)
}
return "ConnectDefault"
}
func sanitizeAvroFullName(fullName string) string {
idx := strings.LastIndex(fullName, ".")
if idx < 0 {
return avroFieldName(fullName)
}
name := avroFieldName(fullName[idx+1:])
parts := strings.Split(fullName[:idx], ".")
for i, part := range parts {
parts[i] = avroFieldName(part)
}
return strings.Join(parts, ".") + "." + name
}
func splitAvroFullName(fullName string) (name string, namespace string) {
idx := strings.LastIndex(fullName, ".")
if idx < 0 {
return fullName, ""
}
return fullName[idx+1:], fullName[:idx]
}
func avroFieldName(field string) string {
return common.SanitizeName(field)
}| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| arraySchema := map[string]any{ | ||
| "type": "array", | ||
| "items": items, | ||
| } |
There was a problem hiding this comment.
Critical Bug: Array Optional Items Schema Mismatch
In the toAvroSchema case for "array", if schema.Items.Optional is true, the items schema is not wrapped in a union ["null", items]. However, in toNative, the item values are wrapped in goavro.Union if they are optional. This mismatch between the Avro schema and the native value structure will cause runtime encoding failures.
We should wrap items in a union schema in toAvroSchema when schema.Items.Optional is true.
| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | |
| if err != nil { | |
| return nil, err | |
| } | |
| arraySchema := map[string]any{ | |
| "type": "array", | |
| "items": items, | |
| } | |
| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | |
| if err != nil { | |
| return nil, err | |
| } | |
| if schema.Items.Optional { | |
| items = []any{"null", items} | |
| } | |
| arraySchema := map[string]any{ | |
| "type": "array", | |
| "items": items, | |
| } |
| func (d *BatchEncoder) appendAvroRowChangedEvent( | ||
| ctx context.Context, | ||
| topic string, | ||
| e *commonEvent.RowEvent, | ||
| ) error { | ||
| keyBuf := bytes.Buffer{} | ||
| if err := d.codec.EncodeKey(e, &keyBuf); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| valueBuf := bytes.Buffer{} | ||
| if err := d.codec.EncodeValue(e, &valueBuf); err != nil { | ||
| return errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
Performance Overhead: Double Serialization/Deserialization Round-Trip
Currently, appendAvroRowChangedEvent serializes the row event to JSON using d.codec.EncodeKey and d.codec.EncodeValue, then immediately deserializes it back into Go structs in d.encodeAvroMessage to convert it to Avro. This double serialization/deserialization round-trip (RowEvent -> JSON -> Go Structs -> Avro) introduces significant CPU and memory overhead.
While this approach allows maximum reuse of the existing dbzCodec, please consider refactoring this in the future to directly encode RowEvent to Avro without the intermediate JSON step to improve performance.
|
/test kafka |
|
/test kafka |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
/test kafka |
|
@wk989898: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note