Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626
* [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
67 changes: 49 additions & 18 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ type Ring struct {
// When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp).
lastTopologyChange time.Time

// For subrings returned by ShuffleShardWithLookback, the time at which the subring's
// membership would next change because an instance falls out of the lookback window.
// Zero means the subring has no time-based expiry (membership only changes with topology).
shuffleShardExpiry time.Time

// List of zones for which there's at least 1 instance in the ring. This list is guaranteed
// to be sorted alphabetically.
ringZones []string
Expand All @@ -224,6 +229,10 @@ type subringCacheKey struct {
shardSize int

zoneStableSharding bool

// lookbackPeriod distinguishes subrings built with ShuffleShardWithLookback (>0) from
// plain shuffle-shard subrings (0).
lookbackPeriod time.Duration
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand Down Expand Up @@ -755,11 +764,11 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, false)
return r.shuffleShardWithCache(identifier, size, 0, time.Now(), false)
}

func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, true)
return r.shuffleShardWithCache(identifier, size, 0, time.Now(), true)
}

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
Expand All @@ -768,34 +777,35 @@ func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRi
// The returned subring may be unbalanced with regard to zones and should never be used for write
// operations (read only).
//
// This function doesn't support caching.
// The returned subring is cached and reused until its membership would change: either when an
// instance ages out of the lookback window (a per-entry expiry) or on a topology change.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
return r.shuffleShardWithCache(identifier, size, lookbackPeriod, now, false)
}

func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
func (r *Ring) shuffleShardWithCache(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
if cached := r.getCachedShuffledSubring(identifier, size, lookbackPeriod, now, zoneStableSharding); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)
result := r.shuffleShard(identifier, size, lookbackPeriod, now, zoneStableSharding)

r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
r.setCachedShuffledSubring(identifier, size, lookbackPeriod, zoneStableSharding, result)
return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()
lookbackInSeconds := int64(lookbackPeriod / time.Second)

// Earliest time (unix seconds) the subring's membership changes because a lookback-included
// instance ages out of the window. Zero means it has no time-based expiry.
var minLookbackExpiry int64

r.mtx.RLock()
defer r.mtx.RUnlock()
Expand Down Expand Up @@ -877,7 +887,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
// then we should include it in the subring but continuing selecting instances.
// If an instance is in READONLY we should always extend. The write path will filter it out when GetRing.
// The read path should extend to get new ingester used on write
if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY {
withinLookback := lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil
if withinLookback {
// Track when this instance will leave the lookback window; that's the earliest
// point at which the cached subring's membership would change.
if expiry := instance.RegisteredTimestamp + lookbackInSeconds; minLookbackExpiry == 0 || expiry < minLookbackExpiry {
minLookbackExpiry = expiry
}
}
if withinLookback || instance.State == READONLY {
continue
}

Expand All @@ -898,6 +916,11 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
shardDesc := &Desc{Ingesters: shard}
shardTokensByZone := shardDesc.getTokensByZone()

var shuffleShardExpiry time.Time
if minLookbackExpiry > 0 {
shuffleShardExpiry = time.Unix(minLookbackExpiry, 0)
}

return &Ring{
cfg: r.cfg,
strategy: r.strategy,
Expand All @@ -914,6 +937,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur

// For caching to work, remember these values.
lastTopologyChange: r.lastTopologyChange,
shuffleShardExpiry: shuffleShardExpiry,
}
}

Expand Down Expand Up @@ -951,7 +975,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
return ok
}

func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
func (r *Ring) getCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}
Expand All @@ -960,11 +984,18 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
defer r.mtx.RUnlock()

// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}]
if cached == nil {
return nil
}

// For lookback subrings the cached membership is only valid until the earliest instance
// leaves the lookback window. Once "now" reaches that point, treat it as a cache miss so
// the caller recomputes.
if !cached.shuffleShardExpiry.IsZero() && !now.Before(cached.shuffleShardExpiry) {
return nil
}

cached.mtx.Lock()
defer cached.mtx.Unlock()

Expand All @@ -979,7 +1010,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
return cached
}

func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
func (r *Ring) setCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, zoneStableSharding bool, subring *Ring) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}
Expand All @@ -991,7 +1022,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringCache can be only nil when set by test.
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}] = subring
}
}

Expand Down
114 changes: 114 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,120 @@ func TestShuffleShardWithCaching(t *testing.T) {
require.False(t, subring == newSubring)
}

func TestShuffleShardWithLookbackCaching(t *testing.T) {
inmem, closer := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{
MaxCasRetries: 20,
CasRetryDelay: 100 * time.Millisecond,
}, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

cfg := Config{
KVStore: kv.Config{Mock: inmem},
HeartbeatTimeout: 1 * time.Minute,
ReplicationFactor: 3,
ZoneAwarenessEnabled: true,
}

ring, err := New(cfg, "test", "test", log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), ring)
})

const numLifecyclers = 6
const zones = 3

lcs := []*Lifecycler(nil)
for i := range numLifecyclers {
lc := startLifecycler(t, cfg, 500*time.Millisecond, i, zones)

lcs = append(lcs, lc)
}

// Wait until all instances in the ring are ACTIVE.
test.Poll(t, 5*time.Second, numLifecyclers, func() any {
active := 0
rs, _ := ring.GetReplicationSetForOperation(Read)
for _, ing := range rs.Instances {
if ing.State == ACTIVE {
active++
}
}
return active
})

const (
shardSize = zones
user = "user"
lookbackPeriod = time.Hour
)

// All instances were registered just now, so with an hour lookback they are all within the
// lookback window and the resulting subring carries a non-zero expiry.
now := time.Now()

// This lookback subring should be cached and reused while now is before its expiry.
subring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now)

// Repeated calls with now still before the expiry reuse the cached subring.
const iters = 100
sleep := (2 * time.Second) / iters
for range iters {
newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, time.Now())
require.True(t, subring == newSubring, "cached lookback subring reused before expiry")
time.Sleep(sleep)
}

// On a cache hit the subring still has up-to-date instance timestamps.
{
rs, err := subring.GetReplicationSetForOperation(Read)
require.NoError(t, err)

nowTs := time.Now()
for _, ing := range rs.Instances {
// Lifecyclers use 500ms refresh, but timestamps use 1s resolution, so give it some buffer.
assert.InDelta(t, nowTs.UnixNano(), time.Unix(ing.Timestamp, 0).UnixNano(), float64(2*time.Second.Nanoseconds()))
}
}

// Advancing now past the lookback window forces a recompute: instances registered ~now are no
// longer within the window, so the subring shrinks and a new one is returned.
subringAfterExpiry := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+time.Minute))
require.False(t, subring == subringAfterExpiry, "expired lookback subring is recomputed")
require.NotEqual(t, subring.InstancesCount(), subringAfterExpiry.InstancesCount())

// The recomputed subring has no instances within lookback, so no time-based expiry, and is reused.
subring = subringAfterExpiry
newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+2*time.Minute))
require.True(t, subring == newSubring, "recomputed lookback subring reused")

// A plain shuffle shard for the same user and size is cached under a separate key.
require.False(t, ring.ShuffleShard(user, shardSize) == newSubring, "plain and lookback subrings are cached separately")

// Change of instances (topology) invalidates the cache.
before := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now())
require.True(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "cached before topology change")

// Stop one instance per zone. Subring needs to be recomputed.
for i := range zones {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), lcs[i]))
}
test.Poll(t, 5*time.Second, numLifecyclers-zones, func() any {
return ring.InstancesCount()
})
require.False(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after topology change")

// Change of shard size needs a different subring.
sizeOne := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now())
require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 2, lookbackPeriod, time.Now()), "different shard sizes cached separately")

// Same size reuses the cache, and cleanup invalidates it.
require.True(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()))
ring.CleanupShuffleShardCache(user)
require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after cache cleanup")
}

// User shuffle shard token.
func userToken(user, zone string, skip int) uint32 {
r := rand.New(rand.NewSource(shard.ShuffleShardSeed(user, zone)))
Expand Down
Loading