Skip to content

cloudstorage,kafka: update sarama and share storage helpers#5483

Open
3AceShowHand wants to merge 16 commits into
pingcap:masterfrom
3AceShowHand:sarama-fix
Open

cloudstorage,kafka: update sarama and share storage helpers#5483
3AceShowHand wants to merge 16 commits into
pingcap:masterfrom
3AceShowHand:sarama-fix

Conversation

@3AceShowHand

@3AceShowHand 3AceShowHand commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #5494, close #5437

This PR fixes a Kafka producer ordering issue by updating the PingCAP Sarama fork, and reduces duplicated cloud storage path/schema handling between the cloud storage sink and the storage consumer.

What is changed and how it works?

  • Update the PingCAP Sarama replacement to v1.41.2-pingcap-20260622.1.
  • Move shared cloud storage config, path, file-index, and schema-file logic into pkg/cloudstorage so both the sink and storage consumer use the same implementation.
  • Align storage consumer path parsing with sink path generation, including schema files, DML/index paths, table-across-nodes file names, date separators, and partition/table-id path layouts.
  • Let the storage consumer decode schema files into the structures it needs for replay: TableInfo for CSV DML decoding and DDLEvent for DDL replay.
  • Keep schema checksum validation in the storage consumer while keeping sink-side index-file parsing recoverable through errors.
  • Add a standard-library logger path for Sarama log output and discard it where TiCDC does not need Sarama logs.

Check List

Tests

  • Unit test
    • go test ./pkg/cloudstorage
    • go test ./cmd/storage-consumer
    • go test --tags=intest ./downstreamadapter/sink/cloudstorage
  • Static check
    • make local-static-check

Questions

Will it cause performance regression or break compatibility?

No performance regression is expected. The cloud storage file format and generated path format are intended to stay compatible; this PR mainly centralizes the Go implementation used by sink and consumer.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

None.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jun 23, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jun 23, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign 3aceshowhand for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot added the size/S Denotes a PR that changes 10-29 lines, ignoring generated files. label Jun 23, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the sarama dependency version and modifies initSaramaLogger in pkg/logger/log.go to restrict Sarama logging to levels below InfoLevel. The reviewer suggested simplifying the level check to level < zapcore.InfoLevel for better readability and consistency with other parts of the codebase.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread pkg/logger/log.go Outdated
@coderabbitai

coderabbitai Bot commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces TableDefinition with SchemaFile as the persisted cloud storage schema payload, introducing checksum-verified schema file paths and updated marshal/parse/path APIs. DML and index path-key types are extended with table-ID path support and schema-file sentinel ordering. The downstream sink, consumer, and generator are rewired to use SchemaFile throughout. A sarama dependency bump and logger discard guard are also included.

Changes

Cloud storage SchemaFile migration and path replay refactor

Layer / File(s) Summary
SchemaFile contracts, checksum, and path generation
pkg/cloudstorage/schema_file.go
Replaces TableDefinition with SchemaFile; Build, BuildDDLEvent, BuildTableInfo, Marshal, Sum32, and Path implement the new payload lifecycle; TableCol.ToTiColumnInfo drops its error return and panics on invalid input.
SchemaFile serialization, path, and checksum tests
pkg/cloudstorage/schema_file_test.go
Replaces TestTableDefinition with TestSchemaFile; adds TestSchemaFileGenFilePath, TestParseSchemaFile, and TestSchemaFileSum32; test helper builds SchemaFile via DDLEvent.Build.
pkg/cloudstorage: SchemaPathKey, FileIndex, DMLPathKey, and replay ordering
pkg/cloudstorage/path_key.go
New file introduces SchemaPathKey, FileIndexKey/FileIndex, DMLPathKey with UseTableIDAsPath/TableID/PartitionNum/Date, schema-marker sentinel helpers, CompareDMLPathKey, GenerateDMLFilePath, parseDMLDataDir, and ParseIndexFilePath.
pkg/cloudstorage path key tests
pkg/cloudstorage/path_key_test.go
Adds TestGenerateDMLFilePath for multiple path layouts and TestSchemaFileDMLPathKeyOrder for schema-marker and ordering semantics.
pkg/cloudstorage generator: CheckOrWriteSchema and index-path refactor
pkg/cloudstorage/generator.go
CheckOrWriteSchema switches to SchemaFile+DDLEvent; GenerateIndexFilePath returns string only; FetchIndexFromFileName replaced by ParseFileIndexFromFileName; schema-meta walk uses mustParseSchemaFileName.
pkg/cloudstorage generator and path tests
pkg/cloudstorage/path_test.go
Removes TestFetchIndexFromFileName; adds TestGenerateAndParseIndexFilePath and TestParseFileIndexFromFileName; drops error handling on GenerateIndexFilePath callsites; uses t.Context() throughout.
pkg/cloudstorage config test context cleanup
pkg/cloudstorage/config_test.go
Replaces context.TODO()/context.Background() with t.Context() in all cfg.Apply calls; removes context import.
pkg/sink/cloudstorage path-key ordering and parsing refactor
pkg/sink/cloudstorage/path_key.go
Extends sink-layer DmlPathKey with UseTableIDAsPath/TableID fields and schemaFilePartitionNum sentinel; rewrites DML/index parsing without regex; adds schema-marker and CompareDMLPathKey APIs.
Sink SchemaFile DDL writing and cloudstorage import rewiring
downstreamadapter/sink/cloudstorage/sink.go, downstreamadapter/sink/cloudstorage/buffer_manager.go, downstreamadapter/sink/cloudstorage/dml_writers.go, downstreamadapter/sink/cloudstorage/task.go, downstreamadapter/sink/cloudstorage/writer.go
writeDDLEvent and writeFile switch from TableDefinition to SchemaFile; tableSchemaStore field removed; SetTableSchemaStore becomes a no-op; all components rewire imports to pkg/cloudstorage; GenerateIndexFilePath callsite drops error handling.
Sink SchemaFile test assertions
downstreamadapter/sink/cloudstorage/sink_test.go, downstreamadapter/sink/cloudstorage/buffer_manager_test.go, downstreamadapter/sink/cloudstorage/encoder_group_test.go, downstreamadapter/sink/cloudstorage/writer_test.go
Updates import aliases; renames readSchemaDefinitionForTest to readSchemaFileForTest; asserts Schema/Table/column on SchemaFile struct; switches Config type references.
Storage consumer schema-file ingestion and replay ordering
cmd/storage-consumer/consumer.go, cmd/storage-consumer/consumer_test.go
Replaces tableDefMap with schemaFileMap; overhauls parseSchemaFilePath with checksum validation; inserts NewSchemaFileDMLPathKey sentinel; uses CompareDMLPathKey, IsSchemaFileDMLPathKey, and SchemaFile.BuildDDLEvent/BuildTableInfo throughout; adds checksum-enforcement test.
main.go fix, kafka-consumer and etcd backend test cleanups
cmd/storage-consumer/main.go, cmd/kafka-consumer/writer_test.go, coordinator/changefeed/etcd_backend_test.go
Moves flag.Parse and logger init into main() scope; reorders resolvedEvents initialization in kafka-consumer test; updates FuncMarcher to use any instead of interface{}.

Sarama version bump and logger initialization guard

Layer / File(s) Summary
Logger discard guard and sarama version bump
go.mod, pkg/logger/log.go
go.mod replace directive moves to v1.41.2-pingcap-20260622.1; initSaramaLogger imports stdlog and short-circuits to a discard logger when zapcore.InfoLevel is enabled.

Sequence Diagram(s)

sequenceDiagram
    participant Consumer as storage-consumer
    participant SchemaFile as cloudstorage.SchemaFile
    participant DMLPathKey as cloudstorage.DMLPathKey
    participant Sink as downstreamadapter/sink

    Consumer->>SchemaFile: parseSchemaFilePath → unmarshal JSON
    SchemaFile-->>Consumer: *SchemaFile (checksum validated)
    Consumer->>DMLPathKey: NewSchemaFileDMLPathKey(schemaKey)
    DMLPathKey-->>Consumer: sentinel DmlPathKey
    Consumer->>DMLPathKey: CompareDMLPathKey(keys...) → sort replay order
    Consumer->>DMLPathKey: IsSchemaFileDMLPathKey()
    DMLPathKey-->>Consumer: true → DDL branch
    Consumer->>SchemaFile: BuildDDLEvent()
    SchemaFile-->>Consumer: *commonEvent.DDLEvent
    Consumer->>SchemaFile: BuildTableInfo()
    SchemaFile-->>Consumer: *common.TableInfo → appendDMLEvents

    Sink->>SchemaFile: Build(DDLEvent, outputColumnID)
    SchemaFile-->>Sink: populated SchemaFile
    Sink->>SchemaFile: Marshal() + Path(useTableIDAsPath, tableID)
    SchemaFile-->>Sink: []byte + path string → writeFile to storage
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • pingcap/ticdc#5359: Modifies the same go.mod replace directive for github.com/IBM/sarama to point to a newer github.com/pingcap/sarama pseudo-version, directly overlapping with the dependency bump in this PR.

Suggested labels

release-note-none

Suggested reviewers

  • asddongmen
  • wk989898
  • tenfyzhong

Poem

🐇 Hoppity-hop through the schema files we go,
TableDefinition retired, SchemaFile takes the show!
Checksums embedded in every JSON name,
Replay ordering sorted — no two keys the same.
The rabbit says: discard that sarama noise,
And let the cloud storage sing with cleaner poise! 🎵

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description follows the template and includes the issue links, change summary, tests, answers, and release note.
Title check ✅ Passed The title matches the main changes: Sarama was updated and cloudstorage helpers were refactored/shared.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@ti-chi-bot ti-chi-bot Bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/S Denotes a PR that changes 10-29 lines, ignoring generated files. labels Jun 23, 2026
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/sink/cloudstorage/schema_file.go (1)

230-249: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Build should reset receiver state before populating fields.

Build mutates SchemaFile in place but never clears existing Columns/TotalColumns. A second call can keep stale column data (especially on the early-return branch when TableInfo is nil), producing invalid schema artifacts.

Proposed fix
 func (t *SchemaFile) Build(event *commonEvent.DDLEvent, outputColumnID bool) {
-	t.Version = defaultSchemaFileVersion
-	t.TableVersion = event.FinishedTs
-	t.Query = event.Query
-	t.Type = event.Type
+	*t = SchemaFile{
+		Version:      defaultSchemaFileVersion,
+		TableVersion: event.FinishedTs,
+		Query:        event.Query,
+		Type:         event.Type,
+	}
 
 	info := event.TableInfo
 	if info == nil {
 		t.Schema = event.GetTargetSchemaName()
 		t.Table = event.GetTargetTableName()
 		return
 	}
 	t.Schema = info.GetTargetSchemaName()
 	t.Table = info.GetTargetTableName()
 	t.TotalColumns = len(info.GetColumns())
+	t.Columns = make([]TableCol, 0, t.TotalColumns)
 	for _, col := range info.GetColumns() {
 		var tableCol TableCol
 		tableCol.FromTiColumnInfo(col, outputColumnID)
 		t.Columns = append(t.Columns, tableCol)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/sink/cloudstorage/schema_file.go` around lines 230 - 249, The Build
method on SchemaFile mutates the receiver in place without clearing existing
state before population. At the start of the Build method (before any field
assignments), reset the Columns slice to empty and reset TotalColumns to zero.
This ensures that subsequent calls to Build do not retain stale column data from
previous invocations, especially on the early-return path when TableInfo is nil.
🧹 Nitpick comments (2)
pkg/logger/log_test.go (1)

37-49: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value

Consider testing additional log levels (optional).

The test validates the behavior for DebugLevel (zap logger) and InfoLevel (discard logger). For completeness, you could add assertions for WarnLevel and ErrorLevel to ensure they also use the discard logger. However, the current test is focused and sufficient given that the guard condition applies uniformly to all levels ≥ Info.

🧪 Optional: Additional test coverage
 	require.NoError(t, initSaramaLogger(zapcore.InfoLevel))
 	require.NotSame(t, debugLogger, sarama.Logger)
 	require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger)
+
+	// Verify WarnLevel and ErrorLevel also use discard logger
+	require.NoError(t, initSaramaLogger(zapcore.WarnLevel))
+	require.IsType(t, stdlog.New(nil, "", 0), sarama.Logger)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/logger/log_test.go` around lines 37 - 49, The test
TestInitSaramaLoggerResetsWhenInfoEnabled currently only validates behavior for
DebugLevel and InfoLevel. To improve test coverage, add additional assertions
that call initSaramaLogger with WarnLevel and ErrorLevel, and verify that these
levels also result in sarama.Logger being set to a discard logger (using
require.IsType with stdlog.New), similar to the existing InfoLevel validation.
pkg/logger/log.go (1)

261-265: 📐 Maintainability & Code Quality | 🔵 Trivial

Consider adding a clarifying comment for the guard condition.

The condition zapcore.InfoLevel.Enabled(level) correctly silences Sarama when the log level is Info or higher (less verbose), but the intent may not be immediately clear to future readers. A brief comment explaining that Sarama is intentionally silenced at less verbose levels would improve maintainability.

📝 Suggested clarifying comment
 func initSaramaLogger(level zapcore.Level) error {
+	// Silence Sarama at Info or higher levels; it is noisy and not needed for normal operation.
 	if zapcore.InfoLevel.Enabled(level) {
 		sarama.Logger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags)
 		return nil
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/logger/log.go` around lines 261 - 265, Add a clarifying comment above the
if statement that checks `zapcore.InfoLevel.Enabled(level)` to explain that
Sarama logging is intentionally silenced when the log level is Info or higher
(less verbose logging levels). The comment should make it clear to future
maintainers that this condition prevents overly verbose output from Sarama by
discarding its logs when verbosity is lower, improving code readability and
maintainability.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 593-594: The mustGetSchemaFile function at line 593-594 panics
when a schema file is missing, which causes hard failures when DML files are
encountered before their corresponding schema files due to unordered file walk
traversal in getNewFiles. To fix this, either enforce a two-pass approach in
getNewFiles that processes all schema files before DML files, or add schema
existence validation before invoking mustGetSchemaFile in the loop to check if
the schema is present in schemaFileMap and skip or log the DML key gracefully
instead of panicking. Additionally, replace the hard panic logic at lines 523,
529, and 536 with recoverable error handling or skip logic to handle transient
storage inconsistencies.

In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 300-305: The checksumPayload struct initialization is missing the
Version field, which means schema file version changes won't be reflected in the
checksum-derived filenames. In the checksumPayload struct literal construction
within marshalForChecksum, add the Version field assignment alongside the other
fields like Table, Schema, Columns, and TotalColumns. This ensures that version
information is included in the checksum payload calculation.
- Around line 275-280: The isTableLevel() method currently uses log.Panic to
handle schema validation failures, which crashes the process instead of allowing
callers to handle the error. Refactor isTableLevel() to return an error in
addition to the boolean result (or return only an error if appropriate), and
instead of calling log.Panic when len(t.Columns) != t.TotalColumns, return a
predefined repository error as per the coding guidelines documented in
docs/agents/error-handling.md. This allows callers to handle validation failures
gracefully without causing process-level availability impact.

---

Outside diff comments:
In `@pkg/sink/cloudstorage/schema_file.go`:
- Around line 230-249: The Build method on SchemaFile mutates the receiver in
place without clearing existing state before population. At the start of the
Build method (before any field assignments), reset the Columns slice to empty
and reset TotalColumns to zero. This ensures that subsequent calls to Build do
not retain stale column data from previous invocations, especially on the
early-return path when TableInfo is nil.

---

Nitpick comments:
In `@pkg/logger/log_test.go`:
- Around line 37-49: The test TestInitSaramaLoggerResetsWhenInfoEnabled
currently only validates behavior for DebugLevel and InfoLevel. To improve test
coverage, add additional assertions that call initSaramaLogger with WarnLevel
and ErrorLevel, and verify that these levels also result in sarama.Logger being
set to a discard logger (using require.IsType with stdlog.New), similar to the
existing InfoLevel validation.

In `@pkg/logger/log.go`:
- Around line 261-265: Add a clarifying comment above the if statement that
checks `zapcore.InfoLevel.Enabled(level)` to explain that Sarama logging is
intentionally silenced when the log level is Info or higher (less verbose
logging levels). The comment should make it clear to future maintainers that
this condition prevents overly verbose output from Sarama by discarding its logs
when verbosity is lower, improving code readability and maintainability.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3d711bcf-bb42-4ca5-a5a9-64a068bff015

📥 Commits

Reviewing files that changed from the base of the PR and between 7741785 and 0703056.

📒 Files selected for processing (13)
  • cmd/storage-consumer/consumer.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/cloudstorage/sink_test.go
  • pkg/logger/log.go
  • pkg/logger/log_test.go
  • pkg/sink/cloudstorage/path.go
  • pkg/sink/cloudstorage/path_key.go
  • pkg/sink/cloudstorage/path_key_test.go
  • pkg/sink/cloudstorage/path_test.go
  • pkg/sink/cloudstorage/schema_file.go
  • pkg/sink/cloudstorage/schema_file_parse.go
  • pkg/sink/cloudstorage/schema_file_parse_test.go
  • pkg/sink/cloudstorage/schema_file_test.go

Comment thread cmd/storage-consumer/consumer.go
Comment thread pkg/cloudstorage/schema_file.go Outdated
Comment thread pkg/cloudstorage/schema_file.go
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 445-452: The code at ParseFileIndexFromFileName extracts a
dispatcherID from the filename into fileIndex, but then immediately overwrites
it with the dispatcherID from the index path without validating they match. Add
validation after ParseFileIndexFromFileName returns to check if the extracted
dispatcherID from the filename matches the dispatcherID parameter being assigned
to FileIndexKey. If there is a mismatch, return an error indicating the
dispatcher IDs are inconsistent, as this can cause incorrect file-index state
and replay behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fbd9743c-107c-4c78-a91f-205d8272aa8a

📥 Commits

Reviewing files that changed from the base of the PR and between 0703056 and eb69660.

📒 Files selected for processing (19)
  • cmd/storage-consumer/consumer.go
  • downstreamadapter/sink/cloudstorage/buffer_manager.go
  • downstreamadapter/sink/cloudstorage/buffer_manager_test.go
  • downstreamadapter/sink/cloudstorage/dml_writers.go
  • downstreamadapter/sink/cloudstorage/encoder_group_test.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • downstreamadapter/sink/cloudstorage/sink_test.go
  • downstreamadapter/sink/cloudstorage/task.go
  • downstreamadapter/sink/cloudstorage/writer.go
  • downstreamadapter/sink/cloudstorage/writer_test.go
  • pkg/cloudstorage/config.go
  • pkg/cloudstorage/config_test.go
  • pkg/cloudstorage/main_test.go
  • pkg/cloudstorage/path.go
  • pkg/cloudstorage/path_key.go
  • pkg/cloudstorage/path_key_test.go
  • pkg/cloudstorage/path_test.go
  • pkg/cloudstorage/schema_file.go
  • pkg/cloudstorage/schema_file_test.go
✅ Files skipped from review due to trivial changes (1)
  • downstreamadapter/sink/cloudstorage/buffer_manager.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • downstreamadapter/sink/cloudstorage/sink_test.go

Comment thread cmd/storage-consumer/consumer.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
pkg/cloudstorage/path_key.go (1)

59-80: 🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

Return parse errors instead of panicking on storage paths.

These parsers handle paths discovered from external storage. log.Panic makes one malformed object terminate replay instead of letting callers skip or report the bad file. Please return predefined repository errors from these parsing APIs and propagate them at the call sites.

As per coding guidelines, "**/*.go: Use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation."

Also applies to: 199-282

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/cloudstorage/path_key.go` around lines 59 - 80, SchemaPathKey.Parse
currently panics on malformed storage paths, which prevents callers from
handling bad external objects safely. Update Parse and the related path parsers
in the same area (including the other path-key helpers in this file) to return
predefined repository errors instead of calling log.Panic or using fatal-style
behavior. Propagate those errors to the call sites that consume Parse so replay
can skip or report invalid files, and keep the error handling consistent with
the repository error guidelines.

Source: Coding guidelines

pkg/cloudstorage/generator.go (2)

479-500: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Return an error for invalid index-file payloads.

This parses filenames read from external index files. A malformed index object currently panics instead of returning a recoverable error to getFileIdxFromIndexFile or the storage consumer.

As per coding guidelines, "**/*.go: Use predefined repository errors; see docs/agents/error-handling.md before changing error creation, wrapping, or propagation."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/cloudstorage/generator.go` around lines 479 - 500,
ParseFileIndexFromFileName currently panics on malformed external index
filenames, which makes getFileIdxFromIndexFile and storage consumers unable to
recover. Update ParseFileIndexFromFileName in generator.go to return a FileIndex
plus an error instead of calling log.Panic, and propagate that error through the
caller path using the repository’s predefined error types/patterns from
error-handling.md. Adjust any related parsing logic and call sites so invalid
CDC filename payloads are reported as recoverable errors with the existing
ParseFileIndexFromFileName and getFileIdxFromIndexFile symbols as the main
touchpoints.

Source: Coding guidelines


414-430: 🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift

Do not silently fall back to table version 0.

f.versionMap[tbl] returns 0 when CheckOrWriteSchema has not recorded an effective schema version. That can generate data/index paths under <...>/0/..., which will not match a real schema file. Check the map ok result and fail before generating the path.

Also applies to: 442-450

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/cloudstorage/generator.go` around lines 414 - 430, The path generation in
FilePathGenerator.generateDataDirPath currently reads f.versionMap[tbl] without
checking whether a version was actually recorded, which can silently produce
paths under version 0. Update generateDataDirPath and the related data/index
path helper referenced in the comment to inspect the map ok result before using
the version, and fail fast or return an error when the schema version is missing
instead of generating a path. Keep the fix localized around FilePathGenerator
and DMLPathKey path construction so all schema-versioned path generation
consistently avoids fallback to 0.
🧹 Nitpick comments (2)
pkg/cloudstorage/path_test.go (1)

277-297: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Assert the parsed FileIndex values.

This test only calls the parser, so wrong Idx or dispatcher extraction would still pass. Add expected values, including a dispatcher case.

Suggested test tightening
 	testCases := []struct {
-		fileName string
+		fileName string
+		expected FileIndex
 	}{
 		{
 			fileName: "CDC000011.json",
+			expected: FileIndex{Idx: 11},
 		},
 		{
 			fileName: "CDC1000000.json",
+			expected: FileIndex{Idx: 1000000},
+		},
+		{
+			fileName: "CDC_dispatcher_000012.json",
+			expected: FileIndex{
+				FileIndexKey: FileIndexKey{
+					DispatcherID:           "dispatcher",
+					EnableTableAcrossNodes: true,
+				},
+				Idx: 12,
+			},
 		},
 	}
 
 	for _, tc := range testCases {
-		ParseFileIndexFromFileName(tc.fileName, f.extension)
+		require.Equal(t, tc.expected, ParseFileIndexFromFileName(tc.fileName, f.extension))
 	}

As per coding guidelines, "**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/cloudstorage/path_test.go` around lines 277 - 297, The test in
TestParseFileIndexFromFileName only invokes ParseFileIndexFromFileName without
verifying the result, so it can miss regressions in Idx or dispatcher parsing.
Update the test cases to include expected FileIndex values for each filename,
then assert the parsed output matches those expectations, making sure to cover
both a normal index case and a dispatcher case using the existing
ParseFileIndexFromFileName helper and FileIndex type.

Source: Coding guidelines

cmd/storage-consumer/consumer_test.go (1)

53-66: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Add a version-mismatch checksum test.

This covers checksum tampering, but not renaming schema_100_<checksum>.json to another version with the same checksum. Add that case so path/payload TableVersion validation cannot regress.

As per coding guidelines, "**/*_test.go: Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cmd/storage-consumer/consumer_test.go` around lines 53 - 66, Add a focused
deterministic test in consumer_test.go around parseSchemaFilePath that covers
renaming a valid schema file to a different TableVersion while keeping the same
checksum, and assert the mismatch is rejected. Reuse the existing consumer,
schemaFile.Marshal, and parseSchemaFilePath flow, but create a path whose
version segment does not match the payload’s TableVersion and verify the
validation fails/panics as appropriate. Keep the new case isolated from the
existing checksum-tampering test so both checksum and version-mismatch
regressions are covered.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 489-497: The checksum check in consumer.go only validates the
schema payload and can miss a renamed path version, so add an explicit
validation that the version encoded in the path matches schemaFile.TableVersion
before caching or inserting the sentinel key. Update the logic around
schemaFile.Sum32, c.schemaFileMap, and the synthetic DML key handling to reject
any mismatch early, and only proceed with map updates when the path version and
payload version agree.

---

Outside diff comments:
In `@pkg/cloudstorage/generator.go`:
- Around line 479-500: ParseFileIndexFromFileName currently panics on malformed
external index filenames, which makes getFileIdxFromIndexFile and storage
consumers unable to recover. Update ParseFileIndexFromFileName in generator.go
to return a FileIndex plus an error instead of calling log.Panic, and propagate
that error through the caller path using the repository’s predefined error
types/patterns from error-handling.md. Adjust any related parsing logic and call
sites so invalid CDC filename payloads are reported as recoverable errors with
the existing ParseFileIndexFromFileName and getFileIdxFromIndexFile symbols as
the main touchpoints.
- Around line 414-430: The path generation in
FilePathGenerator.generateDataDirPath currently reads f.versionMap[tbl] without
checking whether a version was actually recorded, which can silently produce
paths under version 0. Update generateDataDirPath and the related data/index
path helper referenced in the comment to inspect the map ok result before using
the version, and fail fast or return an error when the schema version is missing
instead of generating a path. Keep the fix localized around FilePathGenerator
and DMLPathKey path construction so all schema-versioned path generation
consistently avoids fallback to 0.

In `@pkg/cloudstorage/path_key.go`:
- Around line 59-80: SchemaPathKey.Parse currently panics on malformed storage
paths, which prevents callers from handling bad external objects safely. Update
Parse and the related path parsers in the same area (including the other
path-key helpers in this file) to return predefined repository errors instead of
calling log.Panic or using fatal-style behavior. Propagate those errors to the
call sites that consume Parse so replay can skip or report invalid files, and
keep the error handling consistent with the repository error guidelines.

---

Nitpick comments:
In `@cmd/storage-consumer/consumer_test.go`:
- Around line 53-66: Add a focused deterministic test in consumer_test.go around
parseSchemaFilePath that covers renaming a valid schema file to a different
TableVersion while keeping the same checksum, and assert the mismatch is
rejected. Reuse the existing consumer, schemaFile.Marshal, and
parseSchemaFilePath flow, but create a path whose version segment does not match
the payload’s TableVersion and verify the validation fails/panics as
appropriate. Keep the new case isolated from the existing checksum-tampering
test so both checksum and version-mismatch regressions are covered.

In `@pkg/cloudstorage/path_test.go`:
- Around line 277-297: The test in TestParseFileIndexFromFileName only invokes
ParseFileIndexFromFileName without verifying the result, so it can miss
regressions in Idx or dispatcher parsing. Update the test cases to include
expected FileIndex values for each filename, then assert the parsed output
matches those expectations, making sure to cover both a normal index case and a
dispatcher case using the existing ParseFileIndexFromFileName helper and
FileIndex type.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 85994108-e606-460b-b6bc-609a1f101487

📥 Commits

Reviewing files that changed from the base of the PR and between f3b7ad7 and 5a7d57c.

📒 Files selected for processing (13)
  • cmd/kafka-consumer/writer_test.go
  • cmd/storage-consumer/consumer.go
  • cmd/storage-consumer/consumer_test.go
  • cmd/storage-consumer/main.go
  • coordinator/changefeed/etcd_backend_test.go
  • downstreamadapter/sink/cloudstorage/sink.go
  • pkg/cloudstorage/config_test.go
  • pkg/cloudstorage/generator.go
  • pkg/cloudstorage/path_key.go
  • pkg/cloudstorage/path_key_test.go
  • pkg/cloudstorage/path_test.go
  • pkg/cloudstorage/schema_file.go
  • pkg/cloudstorage/schema_file_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • downstreamadapter/sink/cloudstorage/sink.go
  • pkg/cloudstorage/path_key_test.go
  • pkg/cloudstorage/schema_file_test.go

Comment thread cmd/storage-consumer/consumer.go Outdated
@3AceShowHand 3AceShowHand changed the title kafka: bump sarama to fix out or oder when controller failure cloudstorage,kafka: update Sarama and share storage helpers Jun 24, 2026
@ti-chi-bot ti-chi-bot Bot added release-note-none Denotes a PR that doesn't merit a release note. and removed do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jun 24, 2026
@3AceShowHand 3AceShowHand requested a review from asddongmen June 24, 2026 09:47
@3AceShowHand 3AceShowHand requested review from lidezhu and wk989898 June 24, 2026 09:47
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand 3AceShowHand changed the title cloudstorage,kafka: update Sarama and share storage helpers cloudstorage,kafka: update sarama and share storage helpers Jun 24, 2026
@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@3AceShowHand

Copy link
Copy Markdown
Collaborator Author

/test all

@ti-chi-bot

ti-chi-bot Bot commented Jun 24, 2026

Copy link
Copy Markdown

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 552c8d5 link true /test pull-error-log-review
pull-cdc-storage-integration-light 552c8d5 link true /test pull-cdc-storage-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

storage sink unify the code [v8.5.7] After a Kafka controller failure, data inconsistency occurs between upstream and downstream

1 participant