diff --git a/CHANGELOG.md b/CHANGELOG.md index bbf36ec4ba..f23830bae9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 * [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626 +* [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index f93637e2fc..9b2044b4a0 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -199,6 +199,11 @@ type Ring struct { // When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp). lastTopologyChange time.Time + // For subrings returned by ShuffleShardWithLookback, the time at which the subring's + // membership would next change because an instance falls out of the lookback window. + // Zero means the subring has no time-based expiry (membership only changes with topology). + shuffleShardExpiry time.Time + // List of zones for which there's at least 1 instance in the ring. This list is guaranteed // to be sorted alphabetically. ringZones []string @@ -224,6 +229,10 @@ type subringCacheKey struct { shardSize int zoneStableSharding bool + + // lookbackPeriod distinguishes subrings built with ShuffleShardWithLookback (>0) from + // plain shuffle-shard subrings (0). + lookbackPeriod time.Duration } // New creates a new Ring. Being a service, Ring needs to be started to do anything. @@ -755,11 +764,11 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, false) + return r.shuffleShardWithCache(identifier, size, 0, time.Now(), false) } func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { - return r.shuffleShardWithCache(identifier, size, true) + return r.shuffleShardWithCache(identifier, size, 0, time.Now(), true) } // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances @@ -768,34 +777,35 @@ func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRi // The returned subring may be unbalanced with regard to zones and should never be used for write // operations (read only). // -// This function doesn't support caching. +// The returned subring is cached and reused until its membership would change: either when an +// instance ages out of the lookback window (a per-entry expiry) or on a topology change. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller than the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - - return r.shuffleShard(identifier, size, lookbackPeriod, now, false) + return r.shuffleShardWithCache(identifier, size, lookbackPeriod, now, false) } -func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing { +func (r *Ring) shuffleShardWithCache(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) ReadRing { // Nothing to do if the shard size is not smaller than the actual ring. if size <= 0 || r.InstancesCount() <= size { return r } - if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil { + if cached := r.getCachedShuffledSubring(identifier, size, lookbackPeriod, now, zoneStableSharding); cached != nil { return cached } - result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding) + result := r.shuffleShard(identifier, size, lookbackPeriod, now, zoneStableSharding) - r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result) + r.setCachedShuffledSubring(identifier, size, lookbackPeriod, zoneStableSharding, result) return result } func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring { lookbackUntil := now.Add(-lookbackPeriod).Unix() + lookbackInSeconds := int64(lookbackPeriod / time.Second) + + // Earliest time (unix seconds) the subring's membership changes because a lookback-included + // instance ages out of the window. Zero means it has no time-based expiry. + var minLookbackExpiry int64 r.mtx.RLock() defer r.mtx.RUnlock() @@ -877,7 +887,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // then we should include it in the subring but continuing selecting instances. // If an instance is in READONLY we should always extend. The write path will filter it out when GetRing. // The read path should extend to get new ingester used on write - if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY { + withinLookback := lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil + if withinLookback { + // Track when this instance will leave the lookback window; that's the earliest + // point at which the cached subring's membership would change. + if expiry := instance.RegisteredTimestamp + lookbackInSeconds; minLookbackExpiry == 0 || expiry < minLookbackExpiry { + minLookbackExpiry = expiry + } + } + if withinLookback || instance.State == READONLY { continue } @@ -898,6 +916,11 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur shardDesc := &Desc{Ingesters: shard} shardTokensByZone := shardDesc.getTokensByZone() + var shuffleShardExpiry time.Time + if minLookbackExpiry > 0 { + shuffleShardExpiry = time.Unix(minLookbackExpiry, 0) + } + return &Ring{ cfg: r.cfg, strategy: r.strategy, @@ -914,6 +937,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // For caching to work, remember these values. lastTopologyChange: r.lastTopologyChange, + shuffleShardExpiry: shuffleShardExpiry, } } @@ -951,7 +975,7 @@ func (r *Ring) HasInstance(instanceID string) bool { return ok } -func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring { +func (r *Ring) getCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring { if r.cfg.SubringCacheDisabled { return nil } @@ -960,11 +984,18 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS defer r.mtx.RUnlock() // if shuffledSubringCache map is nil, reading it returns default value (nil pointer). - cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] + cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}] if cached == nil { return nil } + // For lookback subrings the cached membership is only valid until the earliest instance + // leaves the lookback window. Once "now" reaches that point, treat it as a cache miss so + // the caller recomputes. + if !cached.shuffleShardExpiry.IsZero() && !now.Before(cached.shuffleShardExpiry) { + return nil + } + cached.mtx.Lock() defer cached.mtx.Unlock() @@ -979,7 +1010,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS return cached } -func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) { +func (r *Ring) setCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, zoneStableSharding bool, subring *Ring) { if subring == nil || r.cfg.SubringCacheDisabled { return } @@ -991,7 +1022,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS // (which can happen between releasing the read lock and getting read-write lock). // Note that shuffledSubringCache can be only nil when set by test. if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) { - r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring + r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}] = subring } } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 550ac10578..a08c2622a3 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -3165,6 +3165,120 @@ func TestShuffleShardWithCaching(t *testing.T) { require.False(t, subring == newSubring) } +func TestShuffleShardWithLookbackCaching(t *testing.T) { + inmem, closer := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{ + MaxCasRetries: 20, + CasRetryDelay: 100 * time.Millisecond, + }, log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + cfg := Config{ + KVStore: kv.Config{Mock: inmem}, + HeartbeatTimeout: 1 * time.Minute, + ReplicationFactor: 3, + ZoneAwarenessEnabled: true, + } + + ring, err := New(cfg, "test", "test", log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), ring) + }) + + const numLifecyclers = 6 + const zones = 3 + + lcs := []*Lifecycler(nil) + for i := range numLifecyclers { + lc := startLifecycler(t, cfg, 500*time.Millisecond, i, zones) + + lcs = append(lcs, lc) + } + + // Wait until all instances in the ring are ACTIVE. + test.Poll(t, 5*time.Second, numLifecyclers, func() any { + active := 0 + rs, _ := ring.GetReplicationSetForOperation(Read) + for _, ing := range rs.Instances { + if ing.State == ACTIVE { + active++ + } + } + return active + }) + + const ( + shardSize = zones + user = "user" + lookbackPeriod = time.Hour + ) + + // All instances were registered just now, so with an hour lookback they are all within the + // lookback window and the resulting subring carries a non-zero expiry. + now := time.Now() + + // This lookback subring should be cached and reused while now is before its expiry. + subring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now) + + // Repeated calls with now still before the expiry reuse the cached subring. + const iters = 100 + sleep := (2 * time.Second) / iters + for range iters { + newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, time.Now()) + require.True(t, subring == newSubring, "cached lookback subring reused before expiry") + time.Sleep(sleep) + } + + // On a cache hit the subring still has up-to-date instance timestamps. + { + rs, err := subring.GetReplicationSetForOperation(Read) + require.NoError(t, err) + + nowTs := time.Now() + for _, ing := range rs.Instances { + // Lifecyclers use 500ms refresh, but timestamps use 1s resolution, so give it some buffer. + assert.InDelta(t, nowTs.UnixNano(), time.Unix(ing.Timestamp, 0).UnixNano(), float64(2*time.Second.Nanoseconds())) + } + } + + // Advancing now past the lookback window forces a recompute: instances registered ~now are no + // longer within the window, so the subring shrinks and a new one is returned. + subringAfterExpiry := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+time.Minute)) + require.False(t, subring == subringAfterExpiry, "expired lookback subring is recomputed") + require.NotEqual(t, subring.InstancesCount(), subringAfterExpiry.InstancesCount()) + + // The recomputed subring has no instances within lookback, so no time-based expiry, and is reused. + subring = subringAfterExpiry + newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+2*time.Minute)) + require.True(t, subring == newSubring, "recomputed lookback subring reused") + + // A plain shuffle shard for the same user and size is cached under a separate key. + require.False(t, ring.ShuffleShard(user, shardSize) == newSubring, "plain and lookback subrings are cached separately") + + // Change of instances (topology) invalidates the cache. + before := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()) + require.True(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "cached before topology change") + + // Stop one instance per zone. Subring needs to be recomputed. + for i := range zones { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), lcs[i])) + } + test.Poll(t, 5*time.Second, numLifecyclers-zones, func() any { + return ring.InstancesCount() + }) + require.False(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after topology change") + + // Change of shard size needs a different subring. + sizeOne := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()) + require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 2, lookbackPeriod, time.Now()), "different shard sizes cached separately") + + // Same size reuses the cache, and cleanup invalidates it. + require.True(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now())) + ring.CleanupShuffleShardCache(user) + require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after cache cleanup") +} + // User shuffle shard token. func userToken(user, zone string, skip int) uint32 { r := rand.New(rand.NewSource(shard.ShuffleShardSeed(user, zone)))