diff --git a/CHANGELOG.md b/CHANGELOG.md index a56a7828df7..cee0559d68c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452 +* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305 * [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414 * [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412 - `cortex_ingester_tsdb_wal_corruptions_total` diff --git a/docs/api/_index.md b/docs/api/_index.md index 9ce6ba83ee4..5c3c0eb91d9 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -220,7 +220,7 @@ GET,POST /ingester/shutdown GET,POST /shutdown ``` -Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200. +Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200. This endpoint will unregister the ingester from the ring even if `-ingester.unregister-from-ring` is disabled. _This API endpoint is usually used by scale down automations._ diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a8ffaa2a688..40c85ea3b29 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -608,6 +608,13 @@ lifecycler: # CLI flag: -distributor.zone-awareness-enabled [zone_awareness_enabled: | default = false] + # Try writing to an additional ingester in the presence of an ingester not + # in the ACTIVE state. It is useful to disable this along with + # -ingester.unregister-from-ring=false in order to not spread samples to + # extra ingesters during rolling restarts with consistent naming. + # CLI flag: -distributor.extend-writes + [extend_writes: | default = true] + # Number of tokens for each ingester. # CLI flag: -ingester.num-tokens [num_tokens: | default = 128] @@ -648,6 +655,12 @@ lifecycler: # CLI flag: -ingester.availability-zone [availability_zone: | default = ""] + # Unregister from the ring upon clean shutdown. It can be useful to disable + # for rolling restarts with consistent naming in conjunction with + # -distributor.extend-writes=false. + # CLI flag: -ingester.unregister-from-ring + [unregister_from_ring: | default = true] + # Number of times to try and transfer chunks before falling back to flushing. # Negative value or zero disables hand-over. This feature is supported only by # the chunks storage. diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 3c2fa0d418d..e06792760dc 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -84,7 +84,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID lc.InfNames = cfg.InstanceInterfaceNames - lc.SkipUnregister = false + lc.UnregisterFromRing = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod lc.ObservePeriod = 0 lc.JoinAfter = 0 diff --git a/pkg/distributor/distributor_ring.go b/pkg/distributor/distributor_ring.go index 6eb7e9f5f29..9d60b527bfa 100644 --- a/pkg/distributor/distributor_ring.go +++ b/pkg/distributor/distributor_ring.go @@ -76,7 +76,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID lc.InfNames = cfg.InstanceInterfaceNames - lc.SkipUnregister = false + lc.UnregisterFromRing = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod lc.ObservePeriod = 0 lc.NumTokens = 1 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7826f37e538..79c70897b1a 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1105,6 +1105,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 3, + ExtendWrites: true, }, ring.IngesterRingKey, ring.IngesterRingKey, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 32848e7fcb1..906c08b7571 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -392,11 +392,19 @@ func (i *Ingester) stopping(_ error) error { // * Change the state of ring to stop accepting writes. // * Flush all the chunks. func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { - originalState := i.lifecycler.FlushOnShutdown() + originalFlush := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) + + // In the case of an HTTP shutdown, we want to unregister no matter what. + originalUnregister := i.lifecycler.ShouldUnregisterFromRing() + i.lifecycler.SetUnregisterFromRing(true) + _ = services.StopAndAwaitTerminated(context.Background(), i) - i.lifecycler.SetFlushOnShutdown(originalState) + // Set state back to original. + i.lifecycler.SetFlushOnShutdown(originalFlush) + i.lifecycler.SetUnregisterFromRing(originalUnregister) + w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 7bf91933987..ba4189dc1a8 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -2,8 +2,11 @@ package ingester import ( "context" + "fmt" "io" "math" + "net/http" + "net/http/httptest" "testing" "time" @@ -64,12 +67,12 @@ func TestIngesterRestart(t *testing.T) { config := defaultIngesterTestConfig() clientConfig := defaultClientTestConfig() limits := defaultLimitsTestConfig() - config.LifecyclerConfig.SkipUnregister = true + config.LifecyclerConfig.UnregisterFromRing = false { _, ingester := newTestStore(t, config, clientConfig, limits, nil) time.Sleep(100 * time.Millisecond) - // doesn't actually unregister due to skipUnregister: true + // Doesn't actually unregister due to UnregisterFromRing: false. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester)) } @@ -80,7 +83,7 @@ func TestIngesterRestart(t *testing.T) { { _, ingester := newTestStore(t, config, clientConfig, limits, nil) time.Sleep(100 * time.Millisecond) - // doesn't actually unregister due to skipUnregister: true + // Doesn't actually unregister due to UnregisterFromRing: false. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester)) } @@ -91,6 +94,32 @@ func TestIngesterRestart(t *testing.T) { }) } +func TestIngester_ShutdownHandler(t *testing.T) { + for _, unregister := range []bool{false, true} { + t.Run(fmt.Sprintf("unregister=%t", unregister), func(t *testing.T) { + config := defaultIngesterTestConfig() + clientConfig := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + config.LifecyclerConfig.UnregisterFromRing = unregister + _, ingester := newTestStore(t, config, clientConfig, limits, nil) + + // Make sure the ingester has been added to the ring. + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + }) + + recorder := httptest.NewRecorder() + ingester.ShutdownHandler(recorder, nil) + require.Equal(t, http.StatusNoContent, recorder.Result().StatusCode) + + // Make sure the ingester has been removed from the ring even when UnregisterFromRing is false. + test.Poll(t, 100*time.Millisecond, 0, func() interface{} { + return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + }) + }) + } +} + func TestIngesterChunksTransfer(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index a9e2f344cf0..49fe106860a 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -47,21 +47,21 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - NumTokens int `yaml:"num_tokens"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - ObservePeriod time.Duration `yaml:"observe_period"` - JoinAfter time.Duration `yaml:"join_after"` - MinReadyDuration time.Duration `yaml:"min_ready_duration"` - InfNames []string `yaml:"interface_names"` - FinalSleep time.Duration `yaml:"final_sleep"` - TokensFilePath string `yaml:"tokens_file_path"` - Zone string `yaml:"availability_zone"` + NumTokens int `yaml:"num_tokens"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + ObservePeriod time.Duration `yaml:"observe_period"` + JoinAfter time.Duration `yaml:"join_after"` + MinReadyDuration time.Duration `yaml:"min_ready_duration"` + InfNames []string `yaml:"interface_names"` + FinalSleep time.Duration `yaml:"final_sleep"` + TokensFilePath string `yaml:"tokens_file_path"` + Zone string `yaml:"availability_zone"` + UnregisterFromRing bool `yaml:"unregister_from_ring"` // For testing, you can override the address and ID of this ingester - Addr string `yaml:"address" doc:"hidden"` - Port int `doc:"hidden"` - ID string `doc:"hidden"` - SkipUnregister bool `yaml:"-"` + Addr string `yaml:"address" doc:"hidden"` + Port int `doc:"hidden"` + ID string `doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` @@ -102,6 +102,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register in the ring.") f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.") + f.BoolVar(&cfg.UnregisterFromRing, prefix+"unregister-from-ring", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -122,7 +123,8 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterFromRing *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -176,12 +178,13 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa flushTransferer: flushTransferer, KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - Zone: zone, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterFromRing: atomic.NewBool(cfg.UnregisterFromRing), + Zone: zone, actorChan: make(chan func()), @@ -489,7 +492,7 @@ heartbeatLoop: } } - if !i.cfg.SkipUnregister { + if i.ShouldUnregisterFromRing() { if err := i.unregister(context.Background()); err != nil { return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } @@ -778,6 +781,16 @@ func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) { i.flushOnShutdown.Store(flushOnShutdown) } +// ShouldUnregisterFromRing returns if unregistering should be skipped on shutdown. +func (i *Lifecycler) ShouldUnregisterFromRing() bool { + return i.unregisterFromRing.Load() +} + +// SetUnregisterFromRing enables/disables unregistering on shutdown. +func (i *Lifecycler) SetUnregisterFromRing(unregisterFromRing bool) { + i.unregisterFromRing.Store(unregisterFromRing) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { flushRequired := i.flushOnShutdown.Load() transferStart := time.Now() diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 67d96c30ed4..b13fd7a3c61 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -17,7 +17,9 @@ type ReplicationStrategy interface { ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool } -type DefaultReplicationStrategy struct{} +type DefaultReplicationStrategy struct { + ExtendWrites bool +} // Filter decides, given the set of ingesters eligible for a key, // which ingesters you will try and write to and how many failures you will @@ -70,8 +72,11 @@ func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDes // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. // NB dead ingester will be filtered later by DefaultReplicationStrategy.Filter(). - if op == Write && ingester.State != ACTIVE { - return true + if op == Write { + if s.ExtendWrites { + return ingester.State != ACTIVE + } + return false } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { return true } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index a0d8723e2c1..ecb4a44c29c 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -112,6 +112,7 @@ type Config struct { HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` + ExtendWrites bool `yaml:"extend_writes"` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -126,6 +127,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") + f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-from-ring=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") } // Ring holds the information about the members of the consistent hash ring. @@ -178,7 +180,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error) return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{}) + return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{ExtendWrites: cfg.ExtendWrites}) } func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) { diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 7b36347d078..136eff26b7a 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1800,14 +1800,15 @@ func TestRingUpdates(t *testing.T) { func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecyclerID int, zones int) *Lifecycler { lcCfg := LifecyclerConfig{ - RingConfig: cfg, - NumTokens: 16, - HeartbeatPeriod: heartbeat, - ObservePeriod: 0, - JoinAfter: 0, - Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones), - Addr: fmt.Sprintf("addr-%d", lifecyclerID), - ID: fmt.Sprintf("ingester-%d", lifecyclerID), + RingConfig: cfg, + NumTokens: 16, + HeartbeatPeriod: heartbeat, + ObservePeriod: 0, + JoinAfter: 0, + Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones), + Addr: fmt.Sprintf("addr-%d", lifecyclerID), + ID: fmt.Sprintf("ingester-%d", lifecyclerID), + UnregisterFromRing: true, } lc, err := NewLifecycler(lcCfg, &noopFlushTransferer{}, "test", "test", false, nil)