Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "wait ring stability" to store-gateway and fix cold start issue #4271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* `memberlist_client_messages_to_broadcast_dropped_total`
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-dispatcher-aggregation-groups` option to control max number of active dispatcher groups in Alertmanager (per tenant, also overrideable). When the limit is reached, Dispatcher produces log message and increases `cortex_alertmanager_dispatcher_aggregation_group_limit_reached_total` metric. #4254
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-alerts-count` and `-alertmanager.max-alerts-size-bytes` to control max number of alerts and total size of alerts that a single user can have in Alertmanager's memory. Adding more alerts will fail with a log message and incrementing `cortex_alertmanager_alerts_insert_limited_total` metric (per-user). These limits can be overrided by using per-tenant overrides. Current values are tracked in `cortex_alertmanager_alerts_limiter_current_alerts` and `cortex_alertmanager_alerts_limiter_current_alerts_size_bytes` metrics. #4253
* [ENHANCEMENT] Store-gateway: added `-store-gateway.sharding-ring.wait-stability-min-duration` and `-store-gateway.sharding-ring.wait-stability-max-duration` support to store-gateway, to wait for ring stability at startup. #4271
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
* [BUGFIX] Alertmanager: fix Alertmanager status page if clustering via gossip is disabled or sharding is enabled. #4184
Expand All @@ -49,6 +50,7 @@
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269

## Blocksconvert
Expand Down
2 changes: 1 addition & 1 deletion docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ compactor:
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the compactor ring
# keep changing after this period of time, the compactor will start anyway.
# keeps changing after this period of time, the compactor will start anyway.
# CLI flag: -compactor.ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ The store-gateway replication optionally supports [zone-awareness](../guides/zon
2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers.
3. Rollout store-gateways, queriers and rulers to apply the new configuration

### Waiting for stable ring at startup

In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same time we may end up in a situation where each new store-gateway instance starts at a slightly different time and thus each one runs the initial blocks sync based on a different state of the ring. For example, in case of a cold start, the first store-gateway joining the ring may load all blocks since the sharding logic runs based on the current state of the ring, which is 1 single store-gateway.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, shouldn't this be "greater than or equal to replication_factor store-gateway instances at the same time"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends. For a cold start, yes (because if you have a number of replicas <= RF then all replicas load all blocks). For the scale up case you may have a RF=3 and scale up by +2 and this PR still improve it cause the 2 new replicas will not load extra blocks they will not need anymore once they will be both ACTIVE in the ring (after the initial sync is completed).


To reduce the likelihood this could happen, the store-gateway waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-store-gateway.sharding-ring.wait-stability-min-duration`. If the ring keep getting changed after `-store-gateway.sharding-ring.wait-stability-max-duration`, the store-gateway will stop waiting for a stable ring and will proceed starting up normally.

To disable this waiting logic, you can start the store-gateway with `-store-gateway.sharding-ring.wait-stability-min-duration=0`.

## Blocks index-header

The [index-header](./binary-index-header.md) is a subset of the block index which the store-gateway downloads from the object storage and keeps on the local disk in order to speed up queries.
Expand Down Expand Up @@ -250,6 +258,16 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the store-gateway
# ring keeps changing after this period of time, the store-gateway will
# start anyway.
# CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

# Name of network interface to read address from.
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/store-gateway.template
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ The store-gateway replication optionally supports [zone-awareness](../guides/zon
2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers.
3. Rollout store-gateways, queriers and rulers to apply the new configuration

### Waiting for stable ring at startup

In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same time we may end up in a situation where each new store-gateway instance starts at a slightly different time and thus each one runs the initial blocks sync based on a different state of the ring. For example, in case of a cold start, the first store-gateway joining the ring may load all blocks since the sharding logic runs based on the current state of the ring, which is 1 single store-gateway.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.


To reduce the likelihood this could happen, the store-gateway waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-store-gateway.sharding-ring.wait-stability-min-duration`. If the ring keep getting changed after `-store-gateway.sharding-ring.wait-stability-max-duration`, the store-gateway will stop waiting for a stable ring and will proceed starting up normally.

To disable this waiting logic, you can start the store-gateway with `-store-gateway.sharding-ring.wait-stability-min-duration=0`.

## Blocks index-header

The [index-header](./binary-index-header.md) is a subset of the block index which the store-gateway downloads from the object storage and keeps on the local disk in order to speed up queries.
Expand Down
12 changes: 11 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5175,7 +5175,7 @@ sharding_ring:
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the compactor ring
# keep changing after this period of time, the compactor will start anyway.
# keeps changing after this period of time, the compactor will start anyway.
# CLI flag: -compactor.ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

Expand Down Expand Up @@ -5260,6 +5260,16 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the store-gateway
# ring keeps changing after this period of time, the store-gateway will start
# anyway.
# CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

# Name of network interface to read address from.
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand Down
22 changes: 18 additions & 4 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ var (
"quay.io/cortexproject/cortex:v1.3.0": preCortex14Flags,
"quay.io/cortexproject/cortex:v1.4.0": preCortex16Flags,
"quay.io/cortexproject/cortex:v1.5.0": preCortex16Flags,
"quay.io/cortexproject/cortex:v1.6.0": nil,
"quay.io/cortexproject/cortex:v1.7.0": nil,
"quay.io/cortexproject/cortex:v1.8.0": nil,
"quay.io/cortexproject/cortex:v1.9.0": nil,
"quay.io/cortexproject/cortex:v1.6.0": preCortex110Flags,
"quay.io/cortexproject/cortex:v1.7.0": preCortex110Flags,
"quay.io/cortexproject/cortex:v1.8.0": preCortex110Flags,
"quay.io/cortexproject/cortex:v1.9.0": preCortex110Flags,
}
)

Expand All @@ -42,13 +42,27 @@ func preCortex14Flags(flags map[string]string) map[string]string {
"-store-gateway.sharding-ring.replication-factor": "",
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
// Store-gateway "wait ring stability" has been introduced in 1.10.0
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
})
}

func preCortex16Flags(flags map[string]string) map[string]string {
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
// Query-scheduler has been introduced in 1.6.0
"-frontend.scheduler-dns-lookup-period": "",
// Store-gateway "wait ring stability" has been introduced in 1.10.0
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
})
}

func preCortex110Flags(flags map[string]string) map[string]string {
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
// Store-gateway "wait ring stability" has been introduced in 1.10.0
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
})
}

Expand Down
6 changes: 6 additions & 0 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func NewStoreGatewayWithConfigFile(name, consulAddress, configFile string, flags
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consulAddress,
"-store-gateway.sharding-ring.replication-factor": "1",
// Startup quickly.
"-store-gateway.sharding-ring.wait-stability-min-duration": "0",
"-store-gateway.sharding-ring.wait-stability-max-duration": "0",
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
Expand Down Expand Up @@ -301,6 +304,9 @@ func NewSingleBinary(name string, flags map[string]string, image string, otherPo
"-ingester.concurrent-flushes": "10",
"-ingester.max-transfer-retries": "10",
"-ingester.num-tokens": "512",
// Startup quickly.
"-store-gateway.sharding-ring.wait-stability-min-duration": "0",
"-store-gateway.sharding-ring.wait-stability-max-duration": "0",
}, flags))...),
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
httpPort,
Expand Down
20 changes: 11 additions & 9 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
// We don't care for storage part too much here. Both Cortex instances will write new blocks to /tmp, but that's fine.
flags := map[string]string{
// decrease timeouts to make test faster. should still be fine with two instances only
"-ingester.join-after": "0s", // join quickly
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
"-blocks-storage.backend": "s3",
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
"-ingester.join-after": "0s", // join quickly
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
"-blocks-storage.backend": "s3",
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
"-store-gateway.sharding-ring.wait-stability-min-duration": "0", // start quickly
"-store-gateway.sharding-ring.wait-stability-max-duration": "0", // start quickly
}

// This cortex will fail to join the cluster configured in yaml file. That's fine.
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (c *Compactor) starting(ctx context.Context) error {

// In the event of a cluster cold start or scale up of 2+ compactor instances at the same
// time, we may end up in a situation where each new compactor instance starts at a slightly
// different time and thus each one starts with on a different state of the ring. It's better
// different time and thus each one starts with a different state of the ring. It's better
// to just wait the ring stability for a short time.
if c.compactorCfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := c.compactorCfg.ShardingRing.WaitStabilityMinDuration
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keep changing after this period of time, the compactor will start anyway.")
f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keeps changing after this period of time, the compactor will start anyway.")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
10 changes: 10 additions & 0 deletions pkg/storegateway/bucket_store_inmemory_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package storegateway
import (
"context"

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

Expand All @@ -19,6 +21,7 @@ type bucketStoreSeriesServer struct {

SeriesSet []*storepb.Series
Warnings storage.Warnings
Hints hintspb.SeriesResponseHints
}

func newBucketStoreSeriesServer(ctx context.Context) *bucketStoreSeriesServer {
Expand All @@ -30,6 +33,13 @@ func (s *bucketStoreSeriesServer) Send(r *storepb.SeriesResponse) error {
s.Warnings = append(s.Warnings, errors.New(r.GetWarning()))
}

if rawHints := r.GetHints(); rawHints != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's just used by tests. I've added the ability to read hints too.

// We expect only 1 hints entry so we just keep 1.
if err := types.UnmarshalAny(rawHints, &s.Hints); err != nil {
return errors.Wrap(err, "failed to unmarshal series hints")
}
}

if recvSeries := r.GetSeries(); recvSeries != nil {
// Thanos uses a pool for the chunks and may use other pools in the future.
// Given we need to retain the reference after the pooled slices are recycled,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string
return u.users
}

func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, synced *extprom.TxGaugeVec) error {
func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
if util.StringsContain(u.users, userID) {
return nil
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
return err
}
level.Info(g.logger).Log("msg", "store-gateway is JOINING in the ring")

// In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same
// time, we may end up in a situation where each new store-gateway instance starts at a slightly
// different time and thus each one starts with a different state of the ring. It's better
// to just wait the ring stability for a short time.
if g.gatewayCfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMinDuration
maxWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMaxDuration

level.Info(g.logger).Log("msg", "waiting until store-gateway ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, g.ring, BlocksOwnerSync, minWaiting, maxWaiting); err != nil {
level.Warn(g.logger).Log("msg", "store-gateway is ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(g.logger).Log("msg", "store-gateway is ring topology is stable")
}
}
}

// At this point, if sharding is enabled, the instance is registered with some tokens
Expand Down Expand Up @@ -271,7 +287,7 @@ func (g *StoreGateway) running(ctx context.Context) error {
defer syncTicker.Stop()

if g.gatewayCfg.ShardingEnabled {
ringLastState, _ = g.ring.GetAllHealthy(BlocksSync) // nolint:errcheck
ringLastState, _ = g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck
ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2))
defer ringTicker.Stop()
ringTickerChan = ringTicker.C
Expand All @@ -284,7 +300,7 @@ func (g *StoreGateway) running(ctx context.Context) error {
case <-ringTickerChan:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := g.ring.GetAllHealthy(BlocksSync) // nolint:errcheck
currRingState, _ := g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck

if ring.HasReplicationSetChanged(ringLastState, currRingState) {
ringLastState = currRingState
Expand Down
Loading