cloudstorage,kafka: update sarama and share storage helpers#5483
cloudstorage,kafka: update sarama and share storage helpers#54833AceShowHand wants to merge 16 commits into
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request updates the sarama dependency version and modifies initSaramaLogger in pkg/logger/log.go to restrict Sarama logging to levels below InfoLevel. The reviewer suggested simplifying the level check to level < zapcore.InfoLevel for better readability and consistency with other parts of the codebase.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces ChangesCloud storage SchemaFile migration and path replay refactor
Sarama version bump and logger initialization guard
Sequence Diagram(s)sequenceDiagram
participant Consumer as storage-consumer
participant SchemaFile as cloudstorage.SchemaFile
participant DMLPathKey as cloudstorage.DMLPathKey
participant Sink as downstreamadapter/sink
Consumer->>SchemaFile: parseSchemaFilePath → unmarshal JSON
SchemaFile-->>Consumer: *SchemaFile (checksum validated)
Consumer->>DMLPathKey: NewSchemaFileDMLPathKey(schemaKey)
DMLPathKey-->>Consumer: sentinel DmlPathKey
Consumer->>DMLPathKey: CompareDMLPathKey(keys...) → sort replay order
Consumer->>DMLPathKey: IsSchemaFileDMLPathKey()
DMLPathKey-->>Consumer: true → DDL branch
Consumer->>SchemaFile: BuildDDLEvent()
SchemaFile-->>Consumer: *commonEvent.DDLEvent
Consumer->>SchemaFile: BuildTableInfo()
SchemaFile-->>Consumer: *common.TableInfo → appendDMLEvents
Sink->>SchemaFile: Build(DDLEvent, outputColumnID)
SchemaFile-->>Sink: populated SchemaFile
Sink->>SchemaFile: Marshal() + Path(useTableIDAsPath, tableID)
SchemaFile-->>Sink: []byte + path string → writeFile to storage
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/test all |
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/cloudstorage/schema_file.go (1)
230-249: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Buildshould reset receiver state before populating fields.
BuildmutatesSchemaFilein place but never clears existingColumns/TotalColumns. A second call can keep stale column data (especially on the early-return branch whenTableInfois nil), producing invalid schema artifacts.Proposed fix
func (t *SchemaFile) Build(event *commonEvent.DDLEvent, outputColumnID bool) { - t.Version = defaultSchemaFileVersion - t.TableVersion = event.FinishedTs - t.Query = event.Query - t.Type = event.Type + *t = SchemaFile{ + Version: defaultSchemaFileVersion, + TableVersion: event.FinishedTs, + Query: event.Query, + Type: event.Type, + } info := event.TableInfo if info == nil { t.Schema = event.GetTargetSchemaName() t.Table = event.GetTargetTableName() return } t.Schema = info.GetTargetSchemaName() t.Table = info.GetTargetTableName() t.TotalColumns = len(info.GetColumns()) + t.Columns = make([]TableCol, 0, t.TotalColumns) for _, col := range info.GetColumns() { var tableCol TableCol tableCol.FromTiColumnInfo(col, outputColumnID) t.Columns = append(t.Columns, tableCol) } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sink/cloudstorage/schema_file.go` around lines 230 - 249, The Build method on SchemaFile mutates the receiver in place without clearing existing state before population. At the start of the Build method (before any field assignments), reset the Columns slice to empty and reset TotalColumns to zero. This ensures that subsequent calls to Build do not retain stale column data from previous invocations, especially on the early-return path when TableInfo is nil.
🧹 Nitpick comments (2)
pkg/logger/log_test.go (1)
37-49: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider testing additional log levels (optional).
The test validates the behavior for
DebugLevel(zap logger) andInfoLevel(discard logger). For completeness, you could add assertions forWarnLevelandErrorLevelto ensure they also use the discard logger. However, the current test is focused and sufficient given that the guard condition applies uniformly to all levels ≥ Info.🧪 Optional: Additional test coverage
require.NoError(t, initSaramaLogger(zapcore.InfoLevel)) require.NotSame(t, debugLogger, sarama.Logger) require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger) + + // Verify WarnLevel and ErrorLevel also use discard logger + require.NoError(t, initSaramaLogger(zapcore.WarnLevel)) + require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/logger/log_test.go` around lines 37 - 49, The test TestInitSaramaLoggerResetsWhenInfoEnabled currently only validates behavior for DebugLevel and InfoLevel. To improve test coverage, add additional assertions that call initSaramaLogger with WarnLevel and ErrorLevel, and verify that these levels also result in sarama.Logger being set to a discard logger (using require.IsType with stdlog.New), similar to the existing InfoLevel validation.pkg/logger/log.go (1)
261-265: 📐 Maintainability & Code Quality | 🔵 TrivialConsider adding a clarifying comment for the guard condition.
The condition
zapcore.InfoLevel.Enabled(level)correctly silences Sarama when the log level is Info or higher (less verbose), but the intent may not be immediately clear to future readers. A brief comment explaining that Sarama is intentionally silenced at less verbose levels would improve maintainability.📝 Suggested clarifying comment
func initSaramaLogger(level zapcore.Level) error { + // Silence Sarama at Info or higher levels; it is noisy and not needed for normal operation. if zapcore.InfoLevel.Enabled(level) { sarama.Logger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags) return nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/logger/log.go` around lines 261 - 265, Add a clarifying comment above the if statement that checks `zapcore.InfoLevel.Enabled(level)` to explain that Sarama logging is intentionally silenced when the log level is Info or higher (less verbose logging levels). The comment should make it clear to future maintainers that this condition prevents overly verbose output from Sarama by discarding its logs when verbosity is lower, improving code readability and maintainability.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 593-594: The mustGetSchemaFile function at line 593-594 panics
when a schema file is missing, which causes hard failures when DML files are
encountered before their corresponding schema files due to unordered file walk
traversal in getNewFiles. To fix this, either enforce a two-pass approach in
getNewFiles that processes all schema files before DML files, or add schema
existence validation before invoking mustGetSchemaFile in the loop to check if
the schema is present in schemaFileMap and skip or log the DML key gracefully
instead of panicking. Additionally, replace the hard panic logic at lines 523,
529, and 536 with recoverable error handling or skip logic to handle transient
storage inconsistencies.
In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 300-305: The checksumPayload struct initialization is missing the
Version field, which means schema file version changes won't be reflected in the
checksum-derived filenames. In the checksumPayload struct literal construction
within marshalForChecksum, add the Version field assignment alongside the other
fields like Table, Schema, Columns, and TotalColumns. This ensures that version
information is included in the checksum payload calculation.
- Around line 275-280: The isTableLevel() method currently uses log.Panic to
handle schema validation failures, which crashes the process instead of allowing
callers to handle the error. Refactor isTableLevel() to return an error in
addition to the boolean result (or return only an error if appropriate), and
instead of calling log.Panic when len(t.Columns) != t.TotalColumns, return a
predefined repository error as per the coding guidelines documented in
docs/agents/error-handling.md. This allows callers to handle validation failures
gracefully without causing process-level availability impact.
---
Outside diff comments:
In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 230-249: The Build method on SchemaFile mutates the receiver in
place without clearing existing state before population. At the start of the
Build method (before any field assignments), reset the Columns slice to empty
and reset TotalColumns to zero. This ensures that subsequent calls to Build do
not retain stale column data from previous invocations, especially on the
early-return path when TableInfo is nil.
---
Nitpick comments:
In `@pkg/logger/log_test.go`:
- Around line 37-49: The test TestInitSaramaLoggerResetsWhenInfoEnabled
currently only validates behavior for DebugLevel and InfoLevel. To improve test
coverage, add additional assertions that call initSaramaLogger with WarnLevel
and ErrorLevel, and verify that these levels also result in sarama.Logger being
set to a discard logger (using require.IsType with stdlog.New), similar to the
existing InfoLevel validation.
In `@pkg/logger/log.go`:
- Around line 261-265: Add a clarifying comment above the if statement that
checks `zapcore.InfoLevel.Enabled(level)` to explain that Sarama logging is
intentionally silenced when the log level is Info or higher (less verbose
logging levels). The comment should make it clear to future maintainers that
this condition prevents overly verbose output from Sarama by discarding its logs
when verbosity is lower, improving code readability and maintainability.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3d711bcf-bb42-4ca5-a5a9-64a068bff015
📒 Files selected for processing (13)
cmd/storage-consumer/consumer.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.gopkg/logger/log.gopkg/logger/log_test.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_key.gopkg/sink/cloudstorage/path_key_test.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/schema_file.gopkg/sink/cloudstorage/schema_file_parse.gopkg/sink/cloudstorage/schema_file_parse_test.gopkg/sink/cloudstorage/schema_file_test.go
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 445-452: The code at ParseFileIndexFromFileName extracts a
dispatcherID from the filename into fileIndex, but then immediately overwrites
it with the dispatcherID from the index path without validating they match. Add
validation after ParseFileIndexFromFileName returns to check if the extracted
dispatcherID from the filename matches the dispatcherID parameter being assigned
to FileIndexKey. If there is a mismatch, return an error indicating the
dispatcher IDs are inconsistent, as this can cause incorrect file-index state
and replay behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fbd9743c-107c-4c78-a91f-205d8272aa8a
📒 Files selected for processing (19)
cmd/storage-consumer/consumer.godownstreamadapter/sink/cloudstorage/buffer_manager.godownstreamadapter/sink/cloudstorage/buffer_manager_test.godownstreamadapter/sink/cloudstorage/dml_writers.godownstreamadapter/sink/cloudstorage/encoder_group_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.godownstreamadapter/sink/cloudstorage/task.godownstreamadapter/sink/cloudstorage/writer.godownstreamadapter/sink/cloudstorage/writer_test.gopkg/cloudstorage/config.gopkg/cloudstorage/config_test.gopkg/cloudstorage/main_test.gopkg/cloudstorage/path.gopkg/cloudstorage/path_key.gopkg/cloudstorage/path_key_test.gopkg/cloudstorage/path_test.gopkg/cloudstorage/schema_file.gopkg/cloudstorage/schema_file_test.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/sink/cloudstorage/buffer_manager.go
🚧 Files skipped from review as they are similar to previous changes (1)
- downstreamadapter/sink/cloudstorage/sink_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
pkg/cloudstorage/path_key.go (1)
59-80: 🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy liftReturn parse errors instead of panicking on storage paths.
These parsers handle paths discovered from external storage.
log.Panicmakes one malformed object terminate replay instead of letting callers skip or report the bad file. Please return predefined repository errors from these parsing APIs and propagate them at the call sites.As per coding guidelines, "
**/*.go: Use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation."Also applies to: 199-282
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/cloudstorage/path_key.go` around lines 59 - 80, SchemaPathKey.Parse currently panics on malformed storage paths, which prevents callers from handling bad external objects safely. Update Parse and the related path parsers in the same area (including the other path-key helpers in this file) to return predefined repository errors instead of calling log.Panic or using fatal-style behavior. Propagate those errors to the call sites that consume Parse so replay can skip or report invalid files, and keep the error handling consistent with the repository error guidelines.Source: Coding guidelines
pkg/cloudstorage/generator.go (2)
479-500: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick winReturn an error for invalid index-file payloads.
This parses filenames read from external index files. A malformed index object currently panics instead of returning a recoverable error to
getFileIdxFromIndexFileor the storage consumer.As per coding guidelines, "
**/*.go: Use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/cloudstorage/generator.go` around lines 479 - 500, ParseFileIndexFromFileName currently panics on malformed external index filenames, which makes getFileIdxFromIndexFile and storage consumers unable to recover. Update ParseFileIndexFromFileName in generator.go to return a FileIndex plus an error instead of calling log.Panic, and propagate that error through the caller path using the repository’s predefined error types/patterns from error-handling.md. Adjust any related parsing logic and call sites so invalid CDC filename payloads are reported as recoverable errors with the existing ParseFileIndexFromFileName and getFileIdxFromIndexFile symbols as the main touchpoints.Source: Coding guidelines
414-430: 🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy liftDo not silently fall back to table version
0.
f.versionMap[tbl]returns0whenCheckOrWriteSchemahas not recorded an effective schema version. That can generate data/index paths under<...>/0/..., which will not match a real schema file. Check the mapokresult and fail before generating the path.Also applies to: 442-450
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/cloudstorage/generator.go` around lines 414 - 430, The path generation in FilePathGenerator.generateDataDirPath currently reads f.versionMap[tbl] without checking whether a version was actually recorded, which can silently produce paths under version 0. Update generateDataDirPath and the related data/index path helper referenced in the comment to inspect the map ok result before using the version, and fail fast or return an error when the schema version is missing instead of generating a path. Keep the fix localized around FilePathGenerator and DMLPathKey path construction so all schema-versioned path generation consistently avoids fallback to 0.
🧹 Nitpick comments (2)
pkg/cloudstorage/path_test.go (1)
277-297: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAssert the parsed
FileIndexvalues.This test only calls the parser, so wrong
Idxor dispatcher extraction would still pass. Add expected values, including a dispatcher case.Suggested test tightening
testCases := []struct { - fileName string + fileName string + expected FileIndex }{ { fileName: "CDC000011.json", + expected: FileIndex{Idx: 11}, }, { fileName: "CDC1000000.json", + expected: FileIndex{Idx: 1000000}, + }, + { + fileName: "CDC_dispatcher_000012.json", + expected: FileIndex{ + FileIndexKey: FileIndexKey{ + DispatcherID: "dispatcher", + EnableTableAcrossNodes: true, + }, + Idx: 12, + }, }, } for _, tc := range testCases { - ParseFileIndexFromFileName(tc.fileName, f.extension) + require.Equal(t, tc.expected, ParseFileIndexFromFileName(tc.fileName, f.extension)) }As per coding guidelines, "
**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/cloudstorage/path_test.go` around lines 277 - 297, The test in TestParseFileIndexFromFileName only invokes ParseFileIndexFromFileName without verifying the result, so it can miss regressions in Idx or dispatcher parsing. Update the test cases to include expected FileIndex values for each filename, then assert the parsed output matches those expectations, making sure to cover both a normal index case and a dispatcher case using the existing ParseFileIndexFromFileName helper and FileIndex type.Source: Coding guidelines
cmd/storage-consumer/consumer_test.go (1)
53-66: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd a version-mismatch checksum test.
This covers checksum tampering, but not renaming
schema_100_<checksum>.jsonto another version with the same checksum. Add that case so path/payloadTableVersionvalidation cannot regress.As per coding guidelines, "
**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cmd/storage-consumer/consumer_test.go` around lines 53 - 66, Add a focused deterministic test in consumer_test.go around parseSchemaFilePath that covers renaming a valid schema file to a different TableVersion while keeping the same checksum, and assert the mismatch is rejected. Reuse the existing consumer, schemaFile.Marshal, and parseSchemaFilePath flow, but create a path whose version segment does not match the payload’s TableVersion and verify the validation fails/panics as appropriate. Keep the new case isolated from the existing checksum-tampering test so both checksum and version-mismatch regressions are covered.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 489-497: The checksum check in consumer.go only validates the
schema payload and can miss a renamed path version, so add an explicit
validation that the version encoded in the path matches schemaFile.TableVersion
before caching or inserting the sentinel key. Update the logic around
schemaFile.Sum32, c.schemaFileMap, and the synthetic DML key handling to reject
any mismatch early, and only proceed with map updates when the path version and
payload version agree.
---
Outside diff comments:
In `@pkg/cloudstorage/generator.go`:
- Around line 479-500: ParseFileIndexFromFileName currently panics on malformed
external index filenames, which makes getFileIdxFromIndexFile and storage
consumers unable to recover. Update ParseFileIndexFromFileName in generator.go
to return a FileIndex plus an error instead of calling log.Panic, and propagate
that error through the caller path using the repository’s predefined error
types/patterns from error-handling.md. Adjust any related parsing logic and call
sites so invalid CDC filename payloads are reported as recoverable errors with
the existing ParseFileIndexFromFileName and getFileIdxFromIndexFile symbols as
the main touchpoints.
- Around line 414-430: The path generation in
FilePathGenerator.generateDataDirPath currently reads f.versionMap[tbl] without
checking whether a version was actually recorded, which can silently produce
paths under version 0. Update generateDataDirPath and the related data/index
path helper referenced in the comment to inspect the map ok result before using
the version, and fail fast or return an error when the schema version is missing
instead of generating a path. Keep the fix localized around FilePathGenerator
and DMLPathKey path construction so all schema-versioned path generation
consistently avoids fallback to 0.
In `@pkg/cloudstorage/path_key.go`:
- Around line 59-80: SchemaPathKey.Parse currently panics on malformed storage
paths, which prevents callers from handling bad external objects safely. Update
Parse and the related path parsers in the same area (including the other
path-key helpers in this file) to return predefined repository errors instead of
calling log.Panic or using fatal-style behavior. Propagate those errors to the
call sites that consume Parse so replay can skip or report invalid files, and
keep the error handling consistent with the repository error guidelines.
---
Nitpick comments:
In `@cmd/storage-consumer/consumer_test.go`:
- Around line 53-66: Add a focused deterministic test in consumer_test.go around
parseSchemaFilePath that covers renaming a valid schema file to a different
TableVersion while keeping the same checksum, and assert the mismatch is
rejected. Reuse the existing consumer, schemaFile.Marshal, and
parseSchemaFilePath flow, but create a path whose version segment does not match
the payload’s TableVersion and verify the validation fails/panics as
appropriate. Keep the new case isolated from the existing checksum-tampering
test so both checksum and version-mismatch regressions are covered.
In `@pkg/cloudstorage/path_test.go`:
- Around line 277-297: The test in TestParseFileIndexFromFileName only invokes
ParseFileIndexFromFileName without verifying the result, so it can miss
regressions in Idx or dispatcher parsing. Update the test cases to include
expected FileIndex values for each filename, then assert the parsed output
matches those expectations, making sure to cover both a normal index case and a
dispatcher case using the existing ParseFileIndexFromFileName helper and
FileIndex type.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 85994108-e606-460b-b6bc-609a1f101487
📒 Files selected for processing (13)
cmd/kafka-consumer/writer_test.gocmd/storage-consumer/consumer.gocmd/storage-consumer/consumer_test.gocmd/storage-consumer/main.gocoordinator/changefeed/etcd_backend_test.godownstreamadapter/sink/cloudstorage/sink.gopkg/cloudstorage/config_test.gopkg/cloudstorage/generator.gopkg/cloudstorage/path_key.gopkg/cloudstorage/path_key_test.gopkg/cloudstorage/path_test.gopkg/cloudstorage/schema_file.gopkg/cloudstorage/schema_file_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- downstreamadapter/sink/cloudstorage/sink.go
- pkg/cloudstorage/path_key_test.go
- pkg/cloudstorage/schema_file_test.go
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #5494, close #5437
This PR fixes a Kafka producer ordering issue by updating the PingCAP Sarama fork, and reduces duplicated cloud storage path/schema handling between the cloud storage sink and the storage consumer.
What is changed and how it works?
v1.41.2-pingcap-20260622.1.pkg/cloudstorageso both the sink and storage consumer use the same implementation.TableInfofor CSV DML decoding andDDLEventfor DDL replay.Check List
Tests
go test ./pkg/cloudstoragego test ./cmd/storage-consumergo test --tags=intest ./downstreamadapter/sink/cloudstoragemake local-static-checkQuestions
Will it cause performance regression or break compatibility?
No performance regression is expected. The cloud storage file format and generated path format are intended to stay compatible; this PR mainly centralizes the Go implementation used by sink and consumer.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note