Skip to content

Allow ingesters to stay in the ring during restart #3305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452
* [FEATURE] Distributor/Ingester: Provide ability to not overflow writes in the presence of a leaving or unhealthy ingester. This allows for more efficient ingester rolling restarts. #3305
* [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414
* [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412
- `cortex_ingester_tsdb_wal_corruptions_total`
Expand Down
2 changes: 1 addition & 1 deletion docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ GET,POST /ingester/shutdown
GET,POST /shutdown
```

Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200.
Flushes in-memory time series data from ingester to the long-term storage, and shuts down the ingester service. Notice that the other Cortex services are still running, and the operator (or any automation) is expected to terminate the process with a `SIGINT` / `SIGTERM` signal after the shutdown endpoint returns. In the meantime, `/ready` will not return 200. This endpoint will unregister the ingester from the ring even if `-ingester.unregister-from-ring` is disabled.

_This API endpoint is usually used by scale down automations._

Expand Down
13 changes: 13 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,13 @@ lifecycler:
# CLI flag: -distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]

# Try writing to an additional ingester in the presence of an ingester not
# in the ACTIVE state. It is useful to disable this along with
# -ingester.unregister-from-ring=false in order to not spread samples to
# extra ingesters during rolling restarts with consistent naming.
# CLI flag: -distributor.extend-writes
[extend_writes: <boolean> | default = true]

# Number of tokens for each ingester.
# CLI flag: -ingester.num-tokens
[num_tokens: <int> | default = 128]
Expand Down Expand Up @@ -648,6 +655,12 @@ lifecycler:
# CLI flag: -ingester.availability-zone
[availability_zone: <string> | default = ""]

# Unregister from the ring upon clean shutdown. It can be useful to disable
# for rolling restarts with consistent naming in conjunction with
# -distributor.extend-writes=false.
# CLI flag: -ingester.unregister-from-ring
[unregister_from_ring: <boolean> | default = true]

# Number of times to try and transfer chunks before falling back to flushing.
# Negative value or zero disables hand-over. This feature is supported only by
# the chunks storage.
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.SkipUnregister = false
lc.UnregisterFromRing = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.ObservePeriod = 0
lc.JoinAfter = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
lc.Port = cfg.InstancePort
lc.ID = cfg.InstanceID
lc.InfNames = cfg.InstanceInterfaceNames
lc.SkipUnregister = false
lc.UnregisterFromRing = true
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
lc.ObservePeriod = 0
lc.NumTokens = 1
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 3,
ExtendWrites: true,
}, ring.IngesterRingKey, ring.IngesterRingKey, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,19 @@ func (i *Ingester) stopping(_ error) error {
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
originalState := i.lifecycler.FlushOnShutdown()
originalFlush := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)

// In the case of an HTTP shutdown, we want to unregister no matter what.
originalUnregister := i.lifecycler.ShouldUnregisterFromRing()
i.lifecycler.SetUnregisterFromRing(true)

_ = services.StopAndAwaitTerminated(context.Background(), i)
i.lifecycler.SetFlushOnShutdown(originalState)
// Set state back to original.
i.lifecycler.SetFlushOnShutdown(originalFlush)
i.lifecycler.SetUnregisterFromRing(originalUnregister)

w.WriteHeader(http.StatusNoContent)
}

Expand Down
35 changes: 32 additions & 3 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ingester

import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand Down Expand Up @@ -64,12 +67,12 @@ func TestIngesterRestart(t *testing.T) {
config := defaultIngesterTestConfig()
clientConfig := defaultClientTestConfig()
limits := defaultLimitsTestConfig()
config.LifecyclerConfig.SkipUnregister = true
config.LifecyclerConfig.UnregisterFromRing = false

{
_, ingester := newTestStore(t, config, clientConfig, limits, nil)
time.Sleep(100 * time.Millisecond)
// doesn't actually unregister due to skipUnregister: true
// Doesn't actually unregister due to UnregisterFromRing: false.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester))
}

Expand All @@ -80,7 +83,7 @@ func TestIngesterRestart(t *testing.T) {
{
_, ingester := newTestStore(t, config, clientConfig, limits, nil)
time.Sleep(100 * time.Millisecond)
// doesn't actually unregister due to skipUnregister: true
// Doesn't actually unregister due to UnregisterFromRing: false.
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester))
}

Expand All @@ -91,6 +94,32 @@ func TestIngesterRestart(t *testing.T) {
})
}

func TestIngester_ShutdownHandler(t *testing.T) {
for _, unregister := range []bool{false, true} {
t.Run(fmt.Sprintf("unregister=%t", unregister), func(t *testing.T) {
config := defaultIngesterTestConfig()
clientConfig := defaultClientTestConfig()
limits := defaultLimitsTestConfig()
config.LifecyclerConfig.UnregisterFromRing = unregister
_, ingester := newTestStore(t, config, clientConfig, limits, nil)

// Make sure the ingester has been added to the ring.
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey)
})

recorder := httptest.NewRecorder()
ingester.ShutdownHandler(recorder, nil)
require.Equal(t, http.StatusNoContent, recorder.Result().StatusCode)

// Make sure the ingester has been removed from the ring even when UnregisterFromRing is false.
test.Poll(t, 100*time.Millisecond, 0, func() interface{} {
return testutils.NumTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey)
})
})
}
}

func TestIngesterChunksTransfer(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
Expand Down
55 changes: 34 additions & 21 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`

// Config for the ingester lifecycle control
NumTokens int `yaml:"num_tokens"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
ObservePeriod time.Duration `yaml:"observe_period"`
JoinAfter time.Duration `yaml:"join_after"`
MinReadyDuration time.Duration `yaml:"min_ready_duration"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
NumTokens int `yaml:"num_tokens"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
ObservePeriod time.Duration `yaml:"observe_period"`
JoinAfter time.Duration `yaml:"join_after"`
MinReadyDuration time.Duration `yaml:"min_ready_duration"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
UnregisterFromRing bool `yaml:"unregister_from_ring"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" doc:"hidden"`
Port int `doc:"hidden"`
ID string `doc:"hidden"`
SkipUnregister bool `yaml:"-"`
Addr string `yaml:"address" doc:"hidden"`
Port int `doc:"hidden"`
ID string `doc:"hidden"`

// Injected internally
ListenPort int `yaml:"-"`
Expand Down Expand Up @@ -102,6 +102,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register in the ring.")
f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.")
f.BoolVar(&cfg.UnregisterFromRing, prefix+"unregister-from-ring", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.")
}

// Lifecycler is responsible for managing the lifecycle of entries in the ring.
Expand All @@ -122,7 +123,8 @@ type Lifecycler struct {
Zone string

// Whether to flush if transfer fails on shutdown.
flushOnShutdown *atomic.Bool
flushOnShutdown *atomic.Bool
unregisterFromRing *atomic.Bool

// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
// goes away and comes back empty. The state changes during lifecycle of instance.
Expand Down Expand Up @@ -176,12 +178,13 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer: flushTransferer,
KVStore: store,

Addr: fmt.Sprintf("%s:%d", addr, port),
ID: cfg.ID,
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: atomic.NewBool(flushOnShutdown),
Zone: zone,
Addr: fmt.Sprintf("%s:%d", addr, port),
ID: cfg.ID,
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: atomic.NewBool(flushOnShutdown),
unregisterFromRing: atomic.NewBool(cfg.UnregisterFromRing),
Zone: zone,

actorChan: make(chan func()),

Expand Down Expand Up @@ -489,7 +492,7 @@ heartbeatLoop:
}
}

if !i.cfg.SkipUnregister {
if i.ShouldUnregisterFromRing() {
if err := i.unregister(context.Background()); err != nil {
return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
}
Expand Down Expand Up @@ -778,6 +781,16 @@ func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) {
i.flushOnShutdown.Store(flushOnShutdown)
}

// ShouldUnregisterFromRing returns if unregistering should be skipped on shutdown.
func (i *Lifecycler) ShouldUnregisterFromRing() bool {
return i.unregisterFromRing.Load()
}

// SetUnregisterFromRing enables/disables unregistering on shutdown.
func (i *Lifecycler) SetUnregisterFromRing(unregisterFromRing bool) {
i.unregisterFromRing.Store(unregisterFromRing)
}

func (i *Lifecycler) processShutdown(ctx context.Context) {
flushRequired := i.flushOnShutdown.Load()
transferStart := time.Now()
Expand Down
11 changes: 8 additions & 3 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ type ReplicationStrategy interface {
ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool
}

type DefaultReplicationStrategy struct{}
type DefaultReplicationStrategy struct {
ExtendWrites bool
}

// Filter decides, given the set of ingesters eligible for a key,
// which ingesters you will try and write to and how many failures you will
Expand Down Expand Up @@ -70,8 +72,11 @@ func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDes
// size of the replica set for read, but we can read from Leaving ingesters,
// so don't skip it in this case.
// NB dead ingester will be filtered later by DefaultReplicationStrategy.Filter().
if op == Write && ingester.State != ACTIVE {
return true
if op == Write {
if s.ExtendWrites {
return ingester.State != ACTIVE
}
return false
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
return true
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Config struct {
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
ExtendWrites bool `yaml:"extend_writes"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
Expand All @@ -126,6 +127,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
f.BoolVar(&cfg.ExtendWrites, prefix+"distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-from-ring=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
}

// Ring holds the information about the members of the consistent hash ring.
Expand Down Expand Up @@ -178,7 +180,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error)
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{})
return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{ExtendWrites: cfg.ExtendWrites})
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1800,14 +1800,15 @@ func TestRingUpdates(t *testing.T) {

func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecyclerID int, zones int) *Lifecycler {
lcCfg := LifecyclerConfig{
RingConfig: cfg,
NumTokens: 16,
HeartbeatPeriod: heartbeat,
ObservePeriod: 0,
JoinAfter: 0,
Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones),
Addr: fmt.Sprintf("addr-%d", lifecyclerID),
ID: fmt.Sprintf("ingester-%d", lifecyclerID),
RingConfig: cfg,
NumTokens: 16,
HeartbeatPeriod: heartbeat,
ObservePeriod: 0,
JoinAfter: 0,
Zone: fmt.Sprintf("zone-%d", lifecyclerID%zones),
Addr: fmt.Sprintf("addr-%d", lifecyclerID),
ID: fmt.Sprintf("ingester-%d", lifecyclerID),
UnregisterFromRing: true,
}

lc, err := NewLifecycler(lcCfg, &noopFlushTransferer{}, "test", "test", false, nil)
Expand Down