Skip to content

Ensure queries return correctly during rolling upgrades of stateful cluster with RF 3 and only 3 nodes. #2503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 22, 2020
89 changes: 38 additions & 51 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"sort"
Expand Down Expand Up @@ -407,7 +408,7 @@ func TestDistributor_PushQuery(t *testing.T) {
for _, shardByAllLabels := range []bool{true, false} {

// Test with between 3 and 10 ingesters.
for numIngesters := 3; numIngesters < 10; numIngesters++ {
for numIngesters := 2; numIngesters < 10; numIngesters++ {

// Test with between 0 and numIngesters "happy" ingesters.
for happyIngesters := 0; happyIngesters <= numIngesters; happyIngesters++ {
Expand All @@ -426,6 +427,20 @@ func TestDistributor_PushQuery(t *testing.T) {
continue
}

// When we have less ingesters than replication factor, any failed ingester
// will cause a failure.
if shardByAllLabels && numIngesters < 3 && happyIngesters < 2 {
testcases = append(testcases, testcase{
name: fmt.Sprintf("ExpectFail(shardByAllLabels=%v,numIngester=%d,happyIngester=%d)", shardByAllLabels, numIngesters, happyIngesters),
numIngesters: numIngesters,
happyIngesters: happyIngesters,
matchers: []*labels.Matcher{nameMatcher, barMatcher},
expectedError: promql.ErrStorage{Err: errFail},
shardByAllLabels: shardByAllLabels,
})
continue
}

// If we're sharding by metric name and we have failed ingesters, we can't
// tell ahead of time if the query will succeed, as we don't know which
// ingesters will hold the results for the query.
Expand Down Expand Up @@ -823,25 +838,36 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
})
}

// Mock the ingesters ring
ingesterDescs := []ring.IngesterDesc{}
// Use a real ring with a mock KV store to test ring RF logic.
ingesterDescs := map[string]ring.IngesterDesc{}
ingestersByAddr := map[string]*mockIngester{}
for i := range ingesters {
addr := fmt.Sprintf("%d", i)
ingesterDescs = append(ingesterDescs, ring.IngesterDesc{
ingesterDescs[addr] = ring.IngesterDesc{
Addr: addr,
Zone: addr,
State: ring.ACTIVE,
Timestamp: time.Now().Unix(),
})
Tokens: []uint32{uint32((math.MaxUint32 / numIngesters) * i)},
}
ingestersByAddr[addr] = &ingesters[i]
}

ingestersRing := mockRing{
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
replicationFactor: 3,
}
store := consul.NewInMemoryClient(ring.GetCodec())
err := store.Put(context.Background(), ring.IngesterRingKey, &ring.Desc{
Ingesters: ingesterDescs,
})
require.NoError(t, err)

ingestersRing, err := ring.New(ring.Config{
KVStore: kv.Config{
Mock: store,
},
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: 3,
}, ring.IngesterRingKey, ring.IngesterRingKey)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))

factory := func(addr string) (ring_client.PoolClient, error) {
return ingestersByAddr[addr], nil
Expand Down Expand Up @@ -959,45 +985,6 @@ func mustEqualMatcher(k, v string) *labels.Matcher {
return m
}

// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor
// ingesters.
type mockRing struct {
prometheus.Counter
ingesters []ring.IngesterDesc
replicationFactor uint32
}

func (r mockRing) Subring(key uint32, n int) (ring.ReadRing, error) {
return nil, fmt.Errorf("unimplemented")
}

func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Ingesters: buf[:0],
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
result.Ingesters = append(result.Ingesters, r.ingesters[n])
}
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
}, nil
}

func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}

func (r mockRing) IngesterCount() int {
return len(r.ingesters)
}

type mockIngester struct {
sync.Mutex
client.IngesterClient
Expand Down
16 changes: 16 additions & 0 deletions pkg/ring/kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) {
return c, nil
}

// Put is mostly here for testing.
func (c *Client) Put(ctx context.Context, key string, value interface{}) error {
bytes, err := c.codec.Encode(value)
if err != nil {
return err
}

return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := c.kv.Put(&consul.KVPair{
Key: key,
Value: bytes,
}, nil)
return err
})
}

// CAS atomically modifies a value in a callback.
// If value doesn't exist you'll get nil as an argument to your callback.
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ring/kv/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func writeValuesToKV(client *Client, key string, start, end int, sleep time.Dura
defer close(ch)
for i := start; i <= end; i++ {
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
_, _ = client.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
_, _ = client.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
time.Sleep(sleep)
}
}()
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestReset(t *testing.T) {
defer close(ch)
for i := 0; i <= max; i++ {
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
_, _ = c.kv.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
if i == 1 {
c.kv.(*mockKV).ResetIndex()
}
Expand Down Expand Up @@ -142,11 +142,11 @@ func TestWatchKeyWithNoStartValue(t *testing.T) {

go func() {
time.Sleep(100 * time.Millisecond)
_, err := c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
_, err := c.kv.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
_, err = c.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil)
_, err = c.kv.Put(&consul.KVPair{Key: key, Value: []byte("end")}, nil)
require.NoError(t, err)
}()

Expand Down
20 changes: 12 additions & 8 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,24 +256,28 @@ func (r *Ring) GetAll() (ReplicationSet, error) {
return ReplicationSet{}, ErrEmptyRing
}

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
maxErrors := r.cfg.ReplicationFactor / 2
// Calculate the number of required ingesters;
// ensure we always require at least RF-1 when RF=3.
numRequired := len(r.ringDesc.Ingesters)
if numRequired < r.cfg.ReplicationFactor {
numRequired = r.cfg.ReplicationFactor
}
numRequired -= r.cfg.ReplicationFactor / 2

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if !r.IsHealthy(&ingester, Read) {
maxErrors--
continue
if r.IsHealthy(&ingester, Read) {
ingesters = append(ingesters, ingester)
}
ingesters = append(ingesters, ingester)
}

if maxErrors < 0 {
if len(ingesters) < numRequired {
return ReplicationSet{}, fmt.Errorf("too many failed ingesters")
}

return ReplicationSet{
Ingesters: ingesters,
MaxErrors: maxErrors,
MaxErrors: len(ingesters) - numRequired,
}, nil
}

Expand Down