Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StoreGateway: fix store gateway shuffle sharding consistency issue when shard size change by 1 #5489

Merged
merged 10 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_postings_compression_time_seconds` to `cortex_bucket_store_cached_postings_compression_time_seconds_total`. #5431
* [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_series_fetch_duration_seconds` to `cortex_bucket_store_series_fetch_duration_seconds` and `cortex_bucket_store_cached_postings_fetch_duration_seconds` to `cortex_bucket_store_postings_fetch_duration_seconds`. Add new metric `cortex_bucket_store_chunks_fetch_duration_seconds`. #5448
* [CHANGE] Store Gateway: Remove `idle_timeout`, `max_conn_age`, `pool_size`, `min_idle_conns` fields for Redis index cache and caching bucket. #5448
* [CHANGE] Store Gateway: Add flag `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` to enable store gateway to use zone stable shuffle sharding. #5489
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
* [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]

# If true, use zone stable shuffle sharding algorithm. Otherwise, use the
# default shuffle sharding algorithm.
# CLI flag: -store-gateway.sharding-ring.zone-stable-shuffle-sharding
[zone_stable_shuffle_sharding: <boolean> | default = false]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4789,6 +4789,11 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]

# If true, use zone stable shuffle sharding algorithm. Otherwise, use the
# default shuffle sharding algorithm.
# CLI flag: -store-gateway.sharding-ring.zone-stable-shuffle-sharding
[zone_stable_shuffle_sharding: <boolean> | default = false]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]
Expand Down
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,11 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ring.ReadRing {
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ring.ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(ring.InstanceState), args.Error(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create store-gateway ring client")
}

stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled, gatewayCfg.ShardingRing.ZoneStableShuffleSharding)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type blocksStoreReplicationSet struct {
balancingStrategy loadBalancingStrategy
limits BlocksStoreLimits

zoneAwarenessEnabled bool
zoneAwarenessEnabled bool
zoneStableShuffleSharding bool

// Subservices manager.
subservices *services.Manager
Expand All @@ -53,14 +54,17 @@ func newBlocksStoreReplicationSet(
logger log.Logger,
reg prometheus.Registerer,
zoneAwarenessEnabled bool,
zoneStableShuffleSharding bool,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
zoneAwarenessEnabled: zoneAwarenessEnabled,
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,

zoneAwarenessEnabled: zoneAwarenessEnabled,
zoneStableShuffleSharding: zoneStableShuffleSharding,
}

var err error
Expand Down Expand Up @@ -106,7 +110,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
// otherwise we just use the full ring.
var userRing ring.ReadRing
if s.shardingStrategy == util.ShardingStrategyShuffle {
userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits)
userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits, s.zoneStableShuffleSharding)
} else {
userRing = s.storesRing
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
}

reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled)
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled, true)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin

limits := &blocksStoreLimitsMock{}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false)
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false, false)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ZoneAwareness(t *testing.T) {

limits := &blocksStoreLimitsMock{}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true)
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true, false)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down
79 changes: 56 additions & 23 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm.
// It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one
// shard size at a time, at most 1 instance can be changed.
// It is only used in Store Gateway for now.
ShuffleShardWithZoneStability(identifier string, size int) ReadRing
Copy link
Contributor

@harry671003 harry671003 Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to put this behind a feature flag? Would it break some customers of Cortex who're using zone-awareness in store-gateways with shuffle-sharding?
By default, we use the old algorithm. Cortex has to be explicitly configured to use the new one.

We can remove this feature flag after two releases. ;)


// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)
Expand Down Expand Up @@ -200,6 +206,8 @@ type Ring struct {
type subringCacheKey struct {
identifier string
shardSize int

zoneStableSharding bool
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand Down Expand Up @@ -659,24 +667,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
// - Stability: given the same ring, two invocations returns the same result.
//
// - Consistency: adding/removing 1 instance from the ring generates a resulting
// subring with no more then 1 difference.
// subring with no more than 1 difference.
//
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now())
return r.shuffleShardWithCache(identifier, size, false)
}

r.setCachedShuffledSubring(identifier, size, result)
return result
func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, true)
}

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
Expand All @@ -687,25 +687,52 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
//
// This function doesn't support caching.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
}

func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now)
if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)

r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring {
func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()

r.mtx.RLock()
defer r.mtx.RUnlock()

var numInstancesPerZone int
var actualZones []string
var (
numInstancesPerZone int
actualZones []string
zonesWithExtraInstance int
)

if r.cfg.ZoneAwarenessEnabled {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
if zoneStableSharding {
numInstancesPerZone = size / len(r.ringZones)
zonesWithExtraInstance = size % len(r.ringZones)
if zonesWithExtraInstance > 0 {
numInstancesPerZone++
}
} else {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
}
actualZones = r.ringZones
} else {
numInstancesPerZone = size
Expand Down Expand Up @@ -778,6 +805,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
break
}
}
if zoneStableSharding && zonesWithExtraInstance > 0 {
zonesWithExtraInstance--
if zonesWithExtraInstance == 0 {
numInstancesPerZone--
}
}
}

// Build a read-only ring for the shard.
Expand Down Expand Up @@ -828,7 +861,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
return ok
}

func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}
Expand All @@ -837,7 +870,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
defer r.mtx.RUnlock()

// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}]
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
if cached == nil {
return nil
}
Expand All @@ -856,7 +889,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
return cached
}

func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ring) {
func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}
Expand All @@ -868,7 +901,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringCache can be only nil when set by test.
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
}
}

Expand Down
Loading