Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue where we are not returning all the available ingesters in the ring #3977

Merged
merged 3 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
* [BUGFIX] Querier: streamline tracing spans. #3924
* [BUGFIX] Distributor: fix issue causing distributors to not extend the replication set because of failing instances when zone-aware replication is enabled. #3977

## 1.8.0 in progress

Expand Down
5 changes: 4 additions & 1 deletion pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
if util.StringsContain(distinctZones, info.Zone) {
continue
}
distinctZones = append(distinctZones, info.Zone)
}

distinctHosts = append(distinctHosts, info.InstanceID)
Expand All @@ -357,6 +356,10 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// this instance.
if op.ShouldExtendReplicaSetOnState(instance.State) {
n++
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// We should only add the zone if we are not going to extend,
// as we want to extend the instance in the same AZ.
distinctZones = append(distinctZones, info.Zone)
}

instances = append(instances, instance)
Expand Down
77 changes: 77 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,83 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) {
require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens)
}

func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) {
const testCount = 10000

tests := map[string]struct {
replicationFactor int
expectedInstances int
expectedZones int
}{
"should succeed if there are enough instances per zone on RF = 3": {
replicationFactor: 3,
expectedInstances: 3,
expectedZones: 3,
},
"should succeed if there are enough instances per zone on RF = 2": {
replicationFactor: 2,
expectedInstances: 2,
expectedZones: 2,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
r := NewDesc()
instances := map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: LEAVING},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE},
}
var prevTokens []uint32
for id, instance := range instances {
ingTokens := GenerateTokens(128, prevTokens)
r.AddIngester(id, instance.Addr, instance.Zone, ingTokens, instance.State, time.Now())
prevTokens = append(prevTokens, ingTokens...)
}
instancesList := make([]InstanceDesc, 0, len(r.GetIngesters()))
for _, v := range r.GetIngesters() {
instancesList = append(instancesList, v)
}

ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
ReplicationFactor: testData.replicationFactor,
ZoneAwarenessEnabled: true,
},
ringDesc: r,
ringTokens: r.GetTokens(),
ringTokensByZone: r.getTokensByZone(),
ringInstanceByToken: r.getTokensInfo(),
ringZones: getZones(r.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
}

_, bufHosts, bufZones := MakeBuffersForGet()

// Use the GenerateTokens to get an array of random uint32 values.
testValues := GenerateTokens(testCount, nil)

for i := 0; i < testCount; i++ {
set, err := ring.Get(testValues[i], Write, instancesList, bufHosts, bufZones)
require.NoError(t, err)

distinctZones := map[string]int{}
for _, instance := range set.Instances {
distinctZones[instance.Zone]++
}

assert.Len(t, set.Instances, testData.expectedInstances)
assert.Len(t, distinctZones, testData.expectedZones)
}
})
}
}

func TestRing_Get_ZoneAwareness(t *testing.T) {
// Number of tests to run.
const testCount = 10000
Expand Down