logpuller: refactor resolve lock rate limit logic & add more resolve lock metrics#4954
logpuller: refactor resolve lock rate limit logic & add more resolve lock metrics#4954
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughCentralizes per-region resolve-lock throttling into a new rate limiter on ChangesLock resolution, throttling, and metrics
sequenceDiagram
participant Client as SubscriptionClient
participant Span as subscribedSpan
participant Queue as resolveLockTaskCh
participant Resolver as LockResolver
participant Metrics as Prometheus
Client->>Span: tryResolveLock(regionID, targetTs)
Span->>Span: check Initialized / ResolvedTs vs targetTs
Span->>Span: resolveLockLimiter.tryMark(regionID)
alt allowed
Span->>Queue: enqueue resolveLockTask
else denied
Span->>Span: (skip enqueue)
end
Note over Queue,Client: handler goroutine reads tasks
Queue->>Client: handleResolveLockTasks reads task
Client->>Resolver: Resolve(task.state, task.targetTs)
Resolver->>Metrics: increment processed_lock_count{status="found"/"resolved"}
Resolver-->>Client: (err / nil)
Client->>Metrics: increment resolve_lock_count{status="success"/"failure"}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request enhances the observability of the lock resolution process by introducing new Prometheus metrics and updating Grafana dashboards to track task statuses, durations, and scanned ranges. Key improvements include more granular tracking of skipped, throttled, and dropped tasks in the subscription client and transaction resolver. Feedback suggests correcting a unit mismatch in the LockResolveDuration metric, optimizing memory allocation in countLocksByTTL, and removing redundant metric increments for dropped tasks.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/subscription_client.go (1)
1002-1064:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDrop the second, client-wide resolve-lock throttle.
tryResolveLockalready throttles persubscribedSpan/region and records the enqueue time. The extraresolveLastRun[regionID]here is global to the whole client, so two subscriptions that share a TiKV region can suppress each other's valid resolve attempts for 10 seconds. That can skip real lock-resolution work and undercount the new execution metric.Proposed change
func (s *subscriptionClient) handleResolveLockTasks(ctx context.Context) error { - resolveLastRun := make(map[uint64]time.Time) - doResolve := func(task resolveLockTask) { keyspaceID := task.keyspaceID regionID := task.regionID state := task.state @@ - lastRun, ok := resolveLastRun[regionID] - if ok { - if time.Since(lastRun) < resolveLockMinInterval { - return - } - } - if s.lockResolver == nil { metrics.SubscriptionClientResolveLockCounter.WithLabelValues(resolveLockMetricFailed).Inc() log.Warn("subscription client lock resolver is nil", @@ if err != nil { log.Warn("subscription client resolve lock fail", zap.Uint32("keyspaceID", keyspaceID), zap.Uint64("regionID", regionID), zap.Uint64("targetTs", targetTs), - zap.Time("lastRun", lastRun), zap.Any("state", state), zap.Error(err)) } - resolveLastRun[regionID] = time.Now() } - - gcTicker := time.NewTicker(resolveLockMinInterval * 3 / 2) - defer gcTicker.Stop() for { select { case <-ctx.Done(): return ctx.Err() - case <-gcTicker.C: - resolveLastRun = gcResolveLastRunMap(resolveLastRun, time.Now()) case task := <-s.resolveLockTaskCh: doResolve(task) } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 1002 - 1064, The client currently applies a second, client-wide throttle via resolveLastRun inside handleResolveLockTasks which can incorrectly suppress valid per-region resolves; remove that global throttle by deleting the resolveLastRun map, the "lastRun" check (time.Since(lastRun) < resolveLockMinInterval), the resolveLastRun[regionID] = time.Now() assignment, the zap.Time("lastRun", lastRun) field in the error log, and the gcTicker + gcResolveLastRunMap cleanup logic; keep the rest of doResolve (including the s.lockResolver.Resolve call, metrics increments, and existing state checks) so per-subscription/region throttling from tryResolveLock remains the only throttle.
🧹 Nitpick comments (1)
logservice/txnutil/lock_resolver.go (1)
172-182: ⚡ Quick winSnapshot the expired-lock lower bound before calling
ResolveLocks.
countExpiredLocksruns afterResolveLocksreturns, so a lock that expires while the RPC is in flight can be counted as "resolved" even though this invocation never handled it. That makes the "lower bound" in the comment inaccurate.Proposed change
- msBeforeTxnExpired, err1 := kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks) + expiredLockCount := countExpiredLocks(kvStorage, locks) + msBeforeTxnExpired, err1 := kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks) if err1 != nil { return errors.Trace(err1) } resolvedLockCount := len(locks) if msBeforeTxnExpired > 0 { // ResolveLocks only reports remaining TTL, so use expired locks as the // best available lower bound when a batch is only partially resolved. - resolvedLockCount = countExpiredLocks(kvStorage, locks) + resolvedLockCount = expiredLockCount }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/txnutil/lock_resolver.go` around lines 172 - 182, The code computes countExpiredLocks(kvStorage, locks) after calling kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks), which lets locks expire during the RPC and misattributes those to this resolver; to fix, call countExpiredLocks(kvStorage, locks) once before invoking ResolveLocks and store it (e.g. expiredLowerBound), then call ResolveLocks, and use that precomputed expiredLowerBound when msBeforeTxnExpired > 0 to compute resolvedLockCount before passing it to recordLockResolveLockCount(lockResolveMetricResolved, resolvedLockCount); keep the existing symbols ResolveLocks, countExpiredLocks, msBeforeTxnExpired, resolvedLockCount, recordLockResolveLockCount and kvStorage/locks to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 1002-1064: The client currently applies a second, client-wide
throttle via resolveLastRun inside handleResolveLockTasks which can incorrectly
suppress valid per-region resolves; remove that global throttle by deleting the
resolveLastRun map, the "lastRun" check (time.Since(lastRun) <
resolveLockMinInterval), the resolveLastRun[regionID] = time.Now() assignment,
the zap.Time("lastRun", lastRun) field in the error log, and the gcTicker +
gcResolveLastRunMap cleanup logic; keep the rest of doResolve (including the
s.lockResolver.Resolve call, metrics increments, and existing state checks) so
per-subscription/region throttling from tryResolveLock remains the only
throttle.
---
Nitpick comments:
In `@logservice/txnutil/lock_resolver.go`:
- Around line 172-182: The code computes countExpiredLocks(kvStorage, locks)
after calling kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks), which lets
locks expire during the RPC and misattributes those to this resolver; to fix,
call countExpiredLocks(kvStorage, locks) once before invoking ResolveLocks and
store it (e.g. expiredLowerBound), then call ResolveLocks, and use that
precomputed expiredLowerBound when msBeforeTxnExpired > 0 to compute
resolvedLockCount before passing it to
recordLockResolveLockCount(lockResolveMetricResolved, resolvedLockCount); keep
the existing symbols ResolveLocks, countExpiredLocks, msBeforeTxnExpired,
resolvedLockCount, recordLockResolveLockCount and kvStorage/locks to locate the
change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 471ff694-0242-46f4-a7b5-682dedb1a3c6
📒 Files selected for processing (7)
logservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.gologservice/txnutil/lock_resolver.gometrics/grafana/ticdc_new_arch.jsonmetrics/nextgengrafana/ticdc_new_arch_next_gen.jsonpkg/metrics/log_puller.gopkg/metrics/puller.go
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
logservice/logpuller/subscription_client.go (1)
1016-1038: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winRemove the second resolve-lock cooldown here.
tryResolveLockalready throttles per region before enqueueing. Re-checkingresolveLastRunin the worker resets the cooldown at execution time, so a task that sat in the channel or failed late can suppress the next retry for another fullresolveLockMinInterval. In practice this stretches recovery toqueue delay + resolve time + 10sinstead of the configured interval.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 1016 - 1038, The worker currently re-applies a per-region cooldown using resolveLastRun and resolveLockMinInterval and updates resolveLastRun after calling s.lockResolver.Resolve; remove that second cooldown so only tryResolveLock enforces throttling: delete the initial time.Since(lastRun) < resolveLockMinInterval check (and its early return), stop setting resolveLastRun[regionID] = time.Now() after Resolve, and clean up any logging that references lastRun; keep the Resolve call, metric increments, and error logging intact so tryResolveLock remains the single source of cooldown enforcement.
🧹 Nitpick comments (2)
logservice/logpuller/subscription_client_test.go (1)
149-161: 💤 Low valueConsider isolating the metric assertion to avoid cross-test interference.
The
>=comparison at line 160 could pass incorrectly if other tests in the suite also incrementSubscriptionClientResolveLockSuccessCounterconcurrently. Since Prometheus counters are global singletons, parallel test execution could cause flakiness.Consider resetting/isolating the metric or using a more precise assertion that captures the exact delta.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client_test.go` around lines 149 - 161, The test currently uses a >= check against metrics.SubscriptionClientResolveLockSuccessCounter allowing other tests to affect the result; instead capture the initial value via testutil.ToFloat64(metrics.SubscriptionClientResolveLockSuccessCounter) and assert the exact delta after the operation (i.e., final == initial+1) or isolate the metric by resetting or using a private Prometheus registry for SubscriptionClientResolveLockSuccessCounter before sending to client.resolveLockTaskCh; update the require.Eventually assertion to compare equality (resolver.calls.Load() == 1 && testutil.ToFloat64(metrics.SubscriptionClientResolveLockSuccessCounter) == successBefore+1) or call a metric-reset helper targeted at SubscriptionClientResolveLockSuccessCounter to avoid cross-test interference.metrics/grafana/ticdc_new_arch.json (1)
8292-8316: ⚡ Quick winConsider separate Y-axis or panel split — targets A/B and C have different granularities
Targets A and B count resolve-lock task executions (one increment per
subscribedSpanattempt), while target C counts individual TiKV locks found/resolved (one increment per lock). In high-lock-rate scenarios, C can be orders of magnitude larger than A and B, which will compress the task-success/failure lines to near-invisible on a shared linear Y-axis.Two remediation options:
- Assign target C to a right Y-axis (
yaxis: 2) so the two scales are independent.- Split the panel into two: one for task-level success/failure rates, and one for lock counts.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@metrics/grafana/ticdc_new_arch.json` around lines 8292 - 8316, Targets A/B (refId "A","B" – resolve_lock success/failure expressions) are task-level counts and target C (refId "C" – ticdc_kvclient_lock_resolve_lock_count) is per-lock and can be orders of magnitude larger; fix by either assigning target C to the right Y-axis (set yaxis: 2 for the target object with refId "C") so A/B stay on the left axis, or split into two panels—one panel containing targets A and B and a second panel containing target C—so the scales don’t compress the task-level lines.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@metrics/nextgengrafana/ticdc_new_arch_next_gen.json`:
- Around line 8295-8316: The PromQL metric names used by refId A, B, and C are
missing the OpenMetrics `_total` suffix and must be updated: change the
expression in refId A from ticdc_subscription_client_resolve_lock_success_count
to ticdc_subscription_client_resolve_lock_success_count_total, change refId B
from ticdc_subscription_client_resolve_lock_failure_count to
ticdc_subscription_client_resolve_lock_failure_count_total, and change refId C
from ticdc_kvclient_lock_resolve_lock_count to
ticdc_kvclient_lock_resolve_lock_count_total so the queries match the metrics
exposed by the Go/OpenMetrics client.
---
Outside diff comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 1016-1038: The worker currently re-applies a per-region cooldown
using resolveLastRun and resolveLockMinInterval and updates resolveLastRun after
calling s.lockResolver.Resolve; remove that second cooldown so only
tryResolveLock enforces throttling: delete the initial time.Since(lastRun) <
resolveLockMinInterval check (and its early return), stop setting
resolveLastRun[regionID] = time.Now() after Resolve, and clean up any logging
that references lastRun; keep the Resolve call, metric increments, and error
logging intact so tryResolveLock remains the single source of cooldown
enforcement.
---
Nitpick comments:
In `@logservice/logpuller/subscription_client_test.go`:
- Around line 149-161: The test currently uses a >= check against
metrics.SubscriptionClientResolveLockSuccessCounter allowing other tests to
affect the result; instead capture the initial value via
testutil.ToFloat64(metrics.SubscriptionClientResolveLockSuccessCounter) and
assert the exact delta after the operation (i.e., final == initial+1) or isolate
the metric by resetting or using a private Prometheus registry for
SubscriptionClientResolveLockSuccessCounter before sending to
client.resolveLockTaskCh; update the require.Eventually assertion to compare
equality (resolver.calls.Load() == 1 &&
testutil.ToFloat64(metrics.SubscriptionClientResolveLockSuccessCounter) ==
successBefore+1) or call a metric-reset helper targeted at
SubscriptionClientResolveLockSuccessCounter to avoid cross-test interference.
In `@metrics/grafana/ticdc_new_arch.json`:
- Around line 8292-8316: Targets A/B (refId "A","B" – resolve_lock
success/failure expressions) are task-level counts and target C (refId "C" –
ticdc_kvclient_lock_resolve_lock_count) is per-lock and can be orders of
magnitude larger; fix by either assigning target C to the right Y-axis (set
yaxis: 2 for the target object with refId "C") so A/B stay on the left axis, or
split into two panels—one panel containing targets A and B and a second panel
containing target C—so the scales don’t compress the task-level lines.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 626703dd-3000-4fad-89a4-59e319b268db
📒 Files selected for processing (7)
logservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.gologservice/txnutil/lock_resolver.gometrics/grafana/ticdc_new_arch.jsonmetrics/nextgengrafana/ticdc_new_arch_next_gen.jsonpkg/metrics/log_puller.gotests/integration_tests/resolve_lock/main.go
…nge-metrics # Conflicts: # logservice/logpuller/subscription_client_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
logservice/txnutil/lock_resolver.go (1)
98-109: ⚡ Quick winWrap
LocateRegionByIDerrors insidereloadRegion.
reloadRegionnow returns the rawLocateRegionByIDerror and both callers wrap it later. That loses the “wrap at source” convention and makes future callers easy to get wrong. Wrap the third-party error in the closure and let callers return it as-is.♻️ Suggested change
reloadRegion := func() error { var err error loc, err = kvStorage.GetRegionCache().LocateRegionByID(bo, regionID) if err != nil { - return err + return errors.Trace(err) } key = loc.StartKey endKey = loc.EndKey return nil } if err = reloadRegion(); err != nil { - return errors.Trace(err) + return err } ... if regionErr != nil { err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } if err := reloadRegion(); err != nil { - return errors.Trace(err) + return err } continue }As per coding guidelines, "When an error comes from a third party/library call, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again".Also applies to: 132-134
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@logservice/txnutil/lock_resolver.go` around lines 98 - 109, The closure reloadRegion currently returns the raw error from kvStorage.GetRegionCache().LocateRegionByID and callers wrap it later; change reloadRegion so it immediately wraps any error from LocateRegionByID (e.g., return errors.Trace(err) or errors.WrapError(err)) before returning, so callers can propagate it unchanged; apply the same fix for the other closure/usage around the later LocateRegionByID call (the block referenced at 132-134) so all third-party errors are wrapped at the source (refer to reloadRegion, LocateRegionByID, kvStorage.GetRegionCache()).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 138-139: The resolve-lock limiter is currently scoped per
subscribedSpan which allows multiple enqueues for the same TiKV region and
records time in tryMark too early; move the resolveLockRateLimiter out of
subscribedSpan and into subscriptionClient as a shared limiter keyed by
(keyspaceID, regionID) so throttling is per-region, not per-span; also separate
“pending/in-flight” dedupe from the “last execution time” (used by
resolveLockMinInterval) and update that timestamp when the actual resolve starts
or finishes (e.g., in handleResolveLockTasks) rather than in tryMark to prevent
admitting another task for the same region while one is still pending.
---
Nitpick comments:
In `@logservice/txnutil/lock_resolver.go`:
- Around line 98-109: The closure reloadRegion currently returns the raw error
from kvStorage.GetRegionCache().LocateRegionByID and callers wrap it later;
change reloadRegion so it immediately wraps any error from LocateRegionByID
(e.g., return errors.Trace(err) or errors.WrapError(err)) before returning, so
callers can propagate it unchanged; apply the same fix for the other
closure/usage around the later LocateRegionByID call (the block referenced at
132-134) so all third-party errors are wrapped at the source (refer to
reloadRegion, LocateRegionByID, kvStorage.GetRegionCache()).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b62055f8-d659-4fd9-a7b9-7d991cb5c1cd
📒 Files selected for processing (10)
logservice/logpuller/resolve_lock_rate_limiter.gologservice/logpuller/resolve_lock_rate_limiter_test.gologservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.gologservice/txnutil/lock_resolver.gometrics/grafana/ticdc_new_arch.jsonmetrics/nextgengrafana/ticdc_new_arch_next_gen.jsonpkg/metrics/init.gopkg/metrics/log_puller.gopkg/metrics/puller.go
💤 Files with no reviewable changes (2)
- pkg/metrics/puller.go
- pkg/metrics/init.go
🚧 Files skipped from review as they are similar to previous changes (1)
- logservice/logpuller/subscription_client_test.go
|
/gemini summary |
Summary of ChangesThis pull request refactors the resolve-lock logic in the logpuller component to improve efficiency and observability. By centralizing the rate-limiting mechanism and introducing granular metrics, the system now better handles concurrent resolve-lock requests and provides clearer insights into the lock resolution process. These changes ensure that redundant tasks are avoided and that the performance of lock resolution can be effectively monitored via updated dashboards. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Activity
|
|
/test all |
|
/retest |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: close #5016
What is changed and how it works?
This pull request refactors the resolve-lock logic in the logpuller component to improve efficiency and observability. By centralizing the rate-limiting mechanism and introducing granular metrics, the system now better handles concurrent resolve-lock requests and provides clearer insights into the lock resolution process. These changes ensure that redundant tasks are avoided and that the performance of lock resolution can be effectively monitored via updated dashboards.
Highlights
resolveLockRateLimiterinsubscriptionClientto deduplicate and throttle resolve-lock tasks per region, ensuring consistent behavior across multiple subscriptions.handleResolveLockTasksto directly invoke the lock resolver and improved task scheduling logic to prevent redundant execution.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests