From a5ba9f4958b434325b684eb7e05ac57c3204bd9d Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 9 Oct 2020 03:22:39 +0000 Subject: [PATCH 1/3] Allow ingesters to stay in the ring during restart By keeping an ingester in the ring and not writing to additional ingesters while it is being restarted, keeps small series from being created. This will allow users running with block storage or a WAL to avoid excess load and memory during a rolling restart. Signed-off-by: Chris Marchbanks --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 10 ++++++++++ pkg/distributor/distributor_test.go | 1 + pkg/ring/lifecycler.go | 9 +++++---- pkg/ring/replication_strategy.go | 11 ++++++++--- pkg/ring/ring.go | 4 +++- 6 files changed, 28 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a56a7828df7..34873d2ab20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452 +* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. * [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414 * [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412 - `cortex_ingester_tsdb_wal_corruptions_total` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a8ffaa2a688..da077a35c81 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -608,6 +608,11 @@ lifecycler: # CLI flag: -distributor.zone-awareness-enabled [zone_awareness_enabled: | default = false] + # Try writing to an additional ingester in the presence of an ingester not + # in the ACTIVE state. + # CLI flag: -distributor.extend-writes + [extend_writes: | default = true] + # Number of tokens for each ingester. # CLI flag: -ingester.num-tokens [num_tokens: | default = 128] @@ -648,6 +653,11 @@ lifecycler: # CLI flag: -ingester.availability-zone [availability_zone: | default = ""] + # Leave the instance in the ring upon removal. Useful for rolling restarts + # with consistent naming. + # CLI flag: -ingester.skip-unregister + [skip_unregister: | default = false] + # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. This feature is supported only by # the chunks storage. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7826f37e538..79c70897b1a 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1105,6 +1105,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 3, + ExtendWrites: true, }, ring.IngesterRingKey, ring.IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index a9e2f344cf0..89550c6d28b 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -56,12 +56,12 @@ type LifecyclerConfig struct { FinalSleep time.Duration `yaml:"final_sleep"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` + SkipUnregister bool `yaml:"skip_unregister"` // For testing, you can override the address and ID of this ingester - Addr string `yaml:"address" doc:"hidden"` - Port int `doc:"hidden"` - ID string `doc:"hidden"` - SkipUnregister bool `yaml:"-"` + Addr string `yaml:"address" doc:"hidden"` + Port int `doc:"hidden"` + ID string `doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` @@ -102,6 +102,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register in the ring.") f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.") + f.BoolVar(&cfg.SkipUnregister, prefix+"skip-unregister", false, "Leave the instance in the ring upon removal. Useful for rolling restarts with consistent naming.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 67d96c30ed4..b13fd7a3c61 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -17,7 +17,9 @@ type ReplicationStrategy interface { ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool } -type DefaultReplicationStrategy struct{} +type DefaultReplicationStrategy struct { + ExtendWrites bool +} // Filter decides, given the set of ingesters eligible for a key, // which ingesters you will try and write to and how many failures you will @@ -70,8 +72,11 @@ func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDes // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. // NB dead ingester will be filtered later by DefaultReplicationStrategy.Filter(). - if op == Write && ingester.State != ACTIVE { - return true + if op == Write { + if s.ExtendWrites { + return ingester.State != ACTIVE + } + return false } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { return true } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index a0d8723e2c1..4f1d7c5adea 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -112,6 +112,7 @@ type Config struct { HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + ExtendWrites bool `yaml:"extend_writes"` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -126,6 +127,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") + f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state.") } // Ring holds the information about the members of the consistent hash ring. @@ -178,7 +180,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{}) + return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{ExtendWrites: cfg.ExtendWrites}) } func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) { From db96936079871ee9fdc8edc93358ac356824d63c Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Sat, 17 Oct 2020 18:30:07 +0000 Subject: [PATCH 2/3] Always unregister upon HTTP shutdown Signed-off-by: Chris Marchbanks --- CHANGELOG.md | 2 +- pkg/ingester/ingester.go | 12 ++++++++++-- pkg/ingester/lifecycle_test.go | 29 +++++++++++++++++++++++++++++ pkg/ring/lifecycler.go | 14 +++++++++++++- 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34873d2ab20..cee0559d68c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452 -* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. +* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305 * [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414 * [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412 - `cortex_ingester_tsdb_wal_corruptions_total` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 32848e7fcb1..f2acc3f5213 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -392,11 +392,19 @@ func (i *Ingester) stopping(_ error) error { // * Change the state of ring to stop accepting writes. // * Flush all the chunks. func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { - originalState := i.lifecycler.FlushOnShutdown() + originalFlush := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) + + // In the case of an HTTP shutdown, we want to unregister no matter what. + originalUnregister := i.lifecycler.SkipUnregister() + i.lifecycler.SetSkipUnregister(false) + _ = services.StopAndAwaitTerminated(context.Background(), i) - i.lifecycler.SetFlushOnShutdown(originalState) + // Set state back to original. + i.lifecycler.SetFlushOnShutdown(originalFlush) + i.lifecycler.SetSkipUnregister(originalUnregister) + w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 7bf91933987..372163e66be 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -2,8 +2,11 @@ package ingester import ( "context" + "fmt" "io" "math" + "net/http" + "net/http/httptest" "testing" "time" @@ -91,6 +94,32 @@ func TestIngesterRestart(t *testing.T) { }) } +func TestIngester_ShutdownHandler(t *testing.T) { + for _, skipUnregister := range []bool{false, true} { + t.Run(fmt.Sprintf("SkipUnregister=%t", skipUnregister), func(t *testing.T) { + config := defaultIngesterTestConfig() + clientConfig := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + config.LifecyclerConfig.SkipUnregister = skipUnregister + _, ingester := newTestStore(t, config, clientConfig, limits, nil) + + // Make sure the ingester has been added to the ring. + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + }) + + recorder := httptest.NewRecorder() + ingester.ShutdownHandler(recorder, nil) + require.Equal(t, http.StatusNoContent, recorder.Result().StatusCode) + + // Make sure the ingester has been removed from the ring even when skipUnregister is true. + test.Poll(t, 100*time.Millisecond, 0, func() interface{} { + return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + }) + }) + } +} + func TestIngesterChunksTransfer(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 89550c6d28b..be6d273828f 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -124,6 +124,7 @@ type Lifecycler struct { // Whether to flush if transfer fails on shutdown. flushOnShutdown *atomic.Bool + skipUnregister *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -182,6 +183,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa RingName: ringName, RingKey: ringKey, flushOnShutdown: atomic.NewBool(flushOnShutdown), + skipUnregister: atomic.NewBool(cfg.SkipUnregister), Zone: zone, actorChan: make(chan func()), @@ -490,7 +492,7 @@ heartbeatLoop: } } - if !i.cfg.SkipUnregister { + if !i.skipUnregister.Load() { if err := i.unregister(context.Background()); err != nil { return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } @@ -779,6 +781,16 @@ func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) { i.flushOnShutdown.Store(flushOnShutdown) } +// SkipUnregister returns if unregistering is skipped on shutdown. +func (i *Lifecycler) SkipUnregister() bool { + return i.skipUnregister.Load() +} + +// SetSkipUnregister enables/disables unregistering on shutdown. +func (i *Lifecycler) SetSkipUnregister(skipUnregister bool) { + i.skipUnregister.Store(skipUnregister) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := i.flushOnShutdown.Load() transferStart := time.Now() From 1823579ac091aca05cbf6c433fdd28f34bb00fcd Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Thu, 12 Nov 2020 21:27:19 +0000 Subject: [PATCH 3/3] Rename skip_unregister to unregister_from_ring Signed-off-by: Chris Marchbanks --- docs/api/_index.md | 2 +- docs/configuration/config-file-reference.md | 13 +++-- pkg/compactor/compactor_ring.go | 2 +- pkg/distributor/distributor_ring.go | 2 +- pkg/ingester/ingester.go | 6 +-- pkg/ingester/lifecycle_test.go | 14 +++--- pkg/ring/lifecycler.go | 54 ++++++++++----------- pkg/ring/ring.go | 2 +- pkg/ring/ring_test.go | 17 ++++--- 9 files changed, 58 insertions(+), 54 deletions(-) diff --git a/docs/api/_index.md b/docs/api/_index.md index 9ce6ba83ee4..5c3c0eb91d9 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -220,7 +220,7 @@ GET,POST /ingester/shutdown GET,POST /shutdown ``` -Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200. +Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200. This endpoint will unregister the ingester from the ring even if `-ingester.unregister-from-ring` is disabled. _This API endpoint is usually used by scale down automations._ diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index da077a35c81..40c85ea3b29 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -609,7 +609,9 @@ lifecycler: [zone_awareness_enabled: | default = false] # Try writing to an additional ingester in the presence of an ingester not - # in the ACTIVE state. + # in the ACTIVE state. It is useful to disable this along with + # -ingester.unregister-from-ring=false in order to not spread samples to + # extra ingesters during rolling restarts with consistent naming. # CLI flag: -distributor.extend-writes [extend_writes: | default = true] @@ -653,10 +655,11 @@ lifecycler: # CLI flag: -ingester.availability-zone [availability_zone: | default = ""] - # Leave the instance in the ring upon removal. Useful for rolling restarts - # with consistent naming. - # CLI flag: -ingester.skip-unregister - [skip_unregister: | default = false] + # Unregister from the ring upon clean shutdown. It can be useful to disable + # for rolling restarts with consistent naming in conjunction with + # -distributor.extend-writes=false. + # CLI flag: -ingester.unregister-from-ring + [unregister_from_ring: | default = true] # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. This feature is supported only by diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 3c2fa0d418d..e06792760dc 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -84,7 +84,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID lc.InfNames = cfg.InstanceInterfaceNames - lc.SkipUnregister = false + lc.UnregisterFromRing = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod lc.ObservePeriod = 0 lc.JoinAfter = 0 diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index 6eb7e9f5f29..9d60b527bfa 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -76,7 +76,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID lc.InfNames = cfg.InstanceInterfaceNames - lc.SkipUnregister = false + lc.UnregisterFromRing = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod lc.ObservePeriod = 0 lc.NumTokens = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f2acc3f5213..906c08b7571 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -397,13 +397,13 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { i.lifecycler.SetFlushOnShutdown(true) // In the case of an HTTP shutdown, we want to unregister no matter what. - originalUnregister := i.lifecycler.SkipUnregister() - i.lifecycler.SetSkipUnregister(false) + originalUnregister := i.lifecycler.ShouldUnregisterFromRing() + i.lifecycler.SetUnregisterFromRing(true) _ = services.StopAndAwaitTerminated(context.Background(), i) // Set state back to original. i.lifecycler.SetFlushOnShutdown(originalFlush) - i.lifecycler.SetSkipUnregister(originalUnregister) + i.lifecycler.SetUnregisterFromRing(originalUnregister) w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 372163e66be..ba4189dc1a8 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -67,12 +67,12 @@ func TestIngesterRestart(t *testing.T) { config := defaultIngesterTestConfig() clientConfig := defaultClientTestConfig() limits := defaultLimitsTestConfig() - config.LifecyclerConfig.SkipUnregister = true + config.LifecyclerConfig.UnregisterFromRing = false { _, ingester := newTestStore(t, config, clientConfig, limits, nil) time.Sleep(100 * time.Millisecond) - // doesn't actually unregister due to skipUnregister: true + // Doesn't actually unregister due to UnregisterFromRing: false. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester)) } @@ -83,7 +83,7 @@ func TestIngesterRestart(t *testing.T) { { _, ingester := newTestStore(t, config, clientConfig, limits, nil) time.Sleep(100 * time.Millisecond) - // doesn't actually unregister due to skipUnregister: true + // Doesn't actually unregister due to UnregisterFromRing: false. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester)) } @@ -95,12 +95,12 @@ func TestIngesterRestart(t *testing.T) { } func TestIngester_ShutdownHandler(t *testing.T) { - for _, skipUnregister := range []bool{false, true} { - t.Run(fmt.Sprintf("SkipUnregister=%t", skipUnregister), func(t *testing.T) { + for _, unregister := range []bool{false, true} { + t.Run(fmt.Sprintf("unregister=%t", unregister), func(t *testing.T) { config := defaultIngesterTestConfig() clientConfig := defaultClientTestConfig() limits := defaultLimitsTestConfig() - config.LifecyclerConfig.SkipUnregister = skipUnregister + config.LifecyclerConfig.UnregisterFromRing = unregister _, ingester := newTestStore(t, config, clientConfig, limits, nil) // Make sure the ingester has been added to the ring. @@ -112,7 +112,7 @@ func TestIngester_ShutdownHandler(t *testing.T) { ingester.ShutdownHandler(recorder, nil) require.Equal(t, http.StatusNoContent, recorder.Result().StatusCode) - // Make sure the ingester has been removed from the ring even when skipUnregister is true. + // Make sure the ingester has been removed from the ring even when UnregisterFromRing is false. test.Poll(t, 100*time.Millisecond, 0, func() interface{} { return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) }) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index be6d273828f..49fe106860a 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -47,16 +47,16 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - NumTokens int `yaml:"num_tokens"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - ObservePeriod time.Duration `yaml:"observe_period"` - JoinAfter time.Duration `yaml:"join_after"` - MinReadyDuration time.Duration `yaml:"min_ready_duration"` - InfNames []string `yaml:"interface_names"` - FinalSleep time.Duration `yaml:"final_sleep"` - TokensFilePath string `yaml:"tokens_file_path"` - Zone string `yaml:"availability_zone"` - SkipUnregister bool `yaml:"skip_unregister"` + NumTokens int `yaml:"num_tokens"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + ObservePeriod time.Duration `yaml:"observe_period"` + JoinAfter time.Duration `yaml:"join_after"` + MinReadyDuration time.Duration `yaml:"min_ready_duration"` + InfNames []string `yaml:"interface_names"` + FinalSleep time.Duration `yaml:"final_sleep"` + TokensFilePath string `yaml:"tokens_file_path"` + Zone string `yaml:"availability_zone"` + UnregisterFromRing bool `yaml:"unregister_from_ring"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -102,7 +102,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register in the ring.") f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.") - f.BoolVar(&cfg.SkipUnregister, prefix+"skip-unregister", false, "Leave the instance in the ring upon removal. Useful for rolling restarts with consistent naming.") + f.BoolVar(&cfg.UnregisterFromRing, prefix+"unregister-from-ring", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -123,8 +123,8 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool - skipUnregister *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterFromRing *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -178,13 +178,13 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa flushTransferer: flushTransferer, KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - skipUnregister: atomic.NewBool(cfg.SkipUnregister), - Zone: zone, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterFromRing: atomic.NewBool(cfg.UnregisterFromRing), + Zone: zone, actorChan: make(chan func()), @@ -492,7 +492,7 @@ heartbeatLoop: } } - if !i.skipUnregister.Load() { + if i.ShouldUnregisterFromRing() { if err := i.unregister(context.Background()); err != nil { return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } @@ -781,14 +781,14 @@ func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) { i.flushOnShutdown.Store(flushOnShutdown) } -// SkipUnregister returns if unregistering is skipped on shutdown. -func (i *Lifecycler) SkipUnregister() bool { - return i.skipUnregister.Load() +// ShouldUnregisterFromRing returns if unregistering should be skipped on shutdown. +func (i *Lifecycler) ShouldUnregisterFromRing() bool { + return i.unregisterFromRing.Load() } -// SetSkipUnregister enables/disables unregistering on shutdown. -func (i *Lifecycler) SetSkipUnregister(skipUnregister bool) { - i.skipUnregister.Store(skipUnregister) +// SetUnregisterFromRing enables/disables unregistering on shutdown. +func (i *Lifecycler) SetUnregisterFromRing(unregisterFromRing bool) { + i.unregisterFromRing.Store(unregisterFromRing) } func (i *Lifecycler) processShutdown(ctx context.Context) { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 4f1d7c5adea..ecb4a44c29c 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -127,7 +127,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") - f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state.") + f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-from-ring=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") } // Ring holds the information about the members of the consistent hash ring. diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 7b36347d078..136eff26b7a 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1800,14 +1800,15 @@ func TestRingUpdates(t *testing.T) { func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecyclerID int, zones int) *Lifecycler { lcCfg := LifecyclerConfig{ - RingConfig: cfg, - NumTokens: 16, - HeartbeatPeriod: heartbeat, - ObservePeriod: 0, - JoinAfter: 0, - Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones), - Addr: fmt.Sprintf("addr-%d", lifecyclerID), - ID: fmt.Sprintf("ingester-%d", lifecyclerID), + RingConfig: cfg, + NumTokens: 16, + HeartbeatPeriod: heartbeat, + ObservePeriod: 0, + JoinAfter: 0, + Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones), + Addr: fmt.Sprintf("addr-%d", lifecyclerID), + ID: fmt.Sprintf("ingester-%d", lifecyclerID), + UnregisterFromRing: true, } lc, err := NewLifecycler(lcCfg, &noopFlushTransferer{}, "test", "test", false, nil)