Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StoreGateway: fix store gateway shuffle sharding consistency issue when shard size change by 1 #5489

Merged
merged 10 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_postings_compression_time_seconds` to `cortex_bucket_store_cached_postings_compression_time_seconds_total`. #5431
* [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_series_fetch_duration_seconds` to `cortex_bucket_store_series_fetch_duration_seconds` and `cortex_bucket_store_cached_postings_fetch_duration_seconds` to `cortex_bucket_store_postings_fetch_duration_seconds`. Add new metric `cortex_bucket_store_chunks_fetch_duration_seconds`. #5448
* [CHANGE] Store Gateway: Remove `idle_timeout`, `max_conn_age`, `pool_size`, `min_idle_conns` fields for Redis index cache and caching bucket. #5448
* [CHANGE] Store Gateway: Add flag `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` to enable store gateway to use zone stable shuffle sharding. #5489
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
* [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338
* [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356
Expand Down
7 changes: 7 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@ The store-gateway supports two sharding strategies:

- `default`
- `shuffle-sharding`
- `zone-stable-shuffle-sharding`

The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants.

The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the tenant's workload is limited to its shard instances.

The shuffle sharding strategy can be enabled via `-store-gateway.sharding-strategy=shuffle-sharding` and requires the `-store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides.

The **`zone-stable-shuffle-sharding`** strategy achieves the same as the **`shuffle-sharding`** strategy, but using a different sharding algorithm. The new sharding algorithm ensures that when zone awareness is enabled, when shard size increases or decreases by one, the replicas for any block should only change at most by one instance. This is important for querying store gateway because a block can be retried at most 3 times.

Zone stable shuffle sharding can be enabled via `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag.

It will become the default shuffle sharding strategy for store gateway in `v1.17.0` release and the previous shuffle sharding algorithm will be removed in `v1.18.0` release.

_Please check out the [shuffle sharding documentation](../guides/shuffle-sharding.md) for more information about how it works._

### Auto-forget
Expand Down
7 changes: 7 additions & 0 deletions docs/blocks-storage/store-gateway.template
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@ The store-gateway supports two sharding strategies:

- `default`
- `shuffle-sharding`
- `zone-stable-shuffle-sharding`

The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants.

The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the tenant's workload is limited to its shard instances.

The shuffle sharding strategy can be enabled via `-store-gateway.sharding-strategy=shuffle-sharding` and requires the `-store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides.

The **`zone-stable-shuffle-sharding`** strategy achieves the same as the **`shuffle-sharding`** strategy, but using a different sharding algorithm. The new sharding algorithm ensures that when zone awareness is enabled, when shard size increases or decreases by one, the replicas for any block should only change at most by one instance. This is important for querying store gateway because a block can be retried at most 3 times.

Zone stable shuffle sharding can be enabled via `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag.

It will become the default shuffle sharding strategy for store gateway in `v1.17.0` release and the previous shuffle sharding algorithm will be removed in `v1.18.0` release.

_Please check out the [shuffle sharding documentation](../guides/shuffle-sharding.md) for more information about how it works._

### Auto-forget
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ Currently experimental features are:
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
- `-ingester.out-of-order-time-window` (duration) CLI flag
- `out_of_order_time_window` (duration) field in runtime config file
- Store Gateway Zone Stable Shuffle Sharding
- `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag
- `zone_stable_shuffle_sharding` (boolean) field in config file
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,11 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ring.ReadRing {
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ring.ReadRing {
args := r.Called(identifier, size)
return args.Get(0).(ring.ReadRing)
}

func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) {
args := r.Called(instanceID)
return args.Get(0).(ring.InstanceState), args.Error(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
return nil, errors.Wrap(err, "failed to create store-gateway ring client")
}

stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled, gatewayCfg.ShardingRing.ZoneStableShuffleSharding)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type blocksStoreReplicationSet struct {
balancingStrategy loadBalancingStrategy
limits BlocksStoreLimits

zoneAwarenessEnabled bool
zoneAwarenessEnabled bool
zoneStableShuffleSharding bool

// Subservices manager.
subservices *services.Manager
Expand All @@ -53,14 +54,17 @@ func newBlocksStoreReplicationSet(
logger log.Logger,
reg prometheus.Registerer,
zoneAwarenessEnabled bool,
zoneStableShuffleSharding bool,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
zoneAwarenessEnabled: zoneAwarenessEnabled,
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,

zoneAwarenessEnabled: zoneAwarenessEnabled,
zoneStableShuffleSharding: zoneStableShuffleSharding,
}

var err error
Expand Down Expand Up @@ -106,7 +110,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
// otherwise we just use the full ring.
var userRing ring.ReadRing
if s.shardingStrategy == util.ShardingStrategyShuffle {
userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits)
userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits, s.zoneStableShuffleSharding)
} else {
userRing = s.storesRing
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
}

reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled)
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled, true)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin

limits := &blocksStoreLimitsMock{}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false)
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false, false)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ZoneAwareness(t *testing.T) {

limits := &blocksStoreLimitsMock{}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true)
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true, false)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand Down
77 changes: 53 additions & 24 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type ReadRing interface {
// and size (number of instances).
ShuffleShard(identifier string, size int) ReadRing

// ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm.
// It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one
// shard size at a time, at most 1 instance can be changed.
// It is only used in Store Gateway for now.
ShuffleShardWithZoneStability(identifier string, size int) ReadRing
Copy link
Contributor

@harry671003 harry671003 Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to put this behind a feature flag? Would it break some customers of Cortex who're using zone-awareness in store-gateways with shuffle-sharding?
By default, we use the old algorithm. Cortex has to be explicitly configured to use the new one.

We can remove this feature flag after two releases. ;)


// GetInstanceState returns the current state of an instance or an error if the
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)
Expand Down Expand Up @@ -200,6 +206,8 @@ type Ring struct {
type subringCacheKey struct {
identifier string
shardSize int

zoneStableSharding bool
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand Down Expand Up @@ -659,24 +667,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
// - Stability: given the same ring, two invocations returns the same result.
//
// - Consistency: adding/removing 1 instance from the ring generates a resulting
// subring with no more then 1 difference.
// subring with no more than 1 difference.
//
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now())
return r.shuffleShardWithCache(identifier, size, false)
}

r.setCachedShuffledSubring(identifier, size, result)
return result
func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, true)
}

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
Expand All @@ -687,25 +687,49 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
//
// This function doesn't support caching.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now)
return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring {
func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)

r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
lookbackUntil := now.Add(-lookbackPeriod).Unix()

r.mtx.RLock()
defer r.mtx.RUnlock()

var numInstancesPerZone int
var actualZones []string
var (
numInstancesPerZone int
actualZones []string
zonesWithExtraInstance int
)

if r.cfg.ZoneAwarenessEnabled {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
if zoneStableSharding {
numInstancesPerZone = size / len(r.ringZones)
zonesWithExtraInstance = size % len(r.ringZones)
} else {
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
}
actualZones = r.ringZones
} else {
numInstancesPerZone = size
Expand Down Expand Up @@ -735,7 +759,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
// To select one more instance while guaranteeing the "consistency" property,
// we do pick a random value from the generator and resolve uniqueness collisions
// (if any) continuing walking the ring.
for i := 0; i < numInstancesPerZone; i++ {
finalInstancesPerZone := numInstancesPerZone
if zonesWithExtraInstance > 0 {
zonesWithExtraInstance--
finalInstancesPerZone++
}
for i := 0; i < finalInstancesPerZone; i++ {
start := searchToken(tokens, random.Uint32())
iterations := 0
found := false
Expand Down Expand Up @@ -828,7 +857,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
return ok
}

func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}
Expand All @@ -837,7 +866,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
defer r.mtx.RUnlock()

// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}]
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
if cached == nil {
return nil
}
Expand All @@ -856,7 +885,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
return cached
}

func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ring) {
func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}
Expand All @@ -868,7 +897,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringCache can be only nil when set by test.
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
}
}

Expand Down
Loading