Skip to content

Commit d45e65a

Browse files
fix issue where we are not returning all the available ingesters in the ring (#3977)
* Fix issue where we are not returning all the available ingesters in the ring (#3977) Signed-off-by: Roy Chiang <[email protected]> * Handle PR comments and improved tests Signed-off-by: Roy Chiang <[email protected]> * Update CHANGELOG.md Co-authored-by: Marco Pracucci <[email protected]> Signed-off-by: Roy Chiang <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent 3475b71 commit d45e65a

File tree

3 files changed

+82
-1
lines changed

3 files changed

+82
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
2323
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
2424
* [BUGFIX] Querier: streamline tracing spans. #3924
25+
* [BUGFIX] Distributor: fix issue causing distributors to not extend the replication set because of failing instances when zone-aware replication is enabled. #3977
2526

2627
## 1.8.0 in progress
2728

pkg/ring/ring.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,6 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
347347
if util.StringsContain(distinctZones, info.Zone) {
348348
continue
349349
}
350-
distinctZones = append(distinctZones, info.Zone)
351350
}
352351

353352
distinctHosts = append(distinctHosts, info.InstanceID)
@@ -357,6 +356,10 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
357356
// this instance.
358357
if op.ShouldExtendReplicaSetOnState(instance.State) {
359358
n++
359+
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
360+
// We should only add the zone if we are not going to extend,
361+
// as we want to extend the instance in the same AZ.
362+
distinctZones = append(distinctZones, info.Zone)
360363
}
361364

362365
instances = append(instances, instance)

pkg/ring/ring_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,83 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) {
132132
require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens)
133133
}
134134

135+
func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) {
136+
const testCount = 10000
137+
138+
tests := map[string]struct {
139+
replicationFactor int
140+
expectedInstances int
141+
expectedZones int
142+
}{
143+
"should succeed if there are enough instances per zone on RF = 3": {
144+
replicationFactor: 3,
145+
expectedInstances: 3,
146+
expectedZones: 3,
147+
},
148+
"should succeed if there are enough instances per zone on RF = 2": {
149+
replicationFactor: 2,
150+
expectedInstances: 2,
151+
expectedZones: 2,
152+
},
153+
}
154+
155+
for testName, testData := range tests {
156+
t.Run(testName, func(t *testing.T) {
157+
r := NewDesc()
158+
instances := map[string]InstanceDesc{
159+
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE},
160+
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE},
161+
"instance-3": {Addr: "127.0.0.3", Zone: "zone-b", State: ACTIVE},
162+
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", State: ACTIVE},
163+
"instance-5": {Addr: "127.0.0.5", Zone: "zone-c", State: LEAVING},
164+
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE},
165+
}
166+
var prevTokens []uint32
167+
for id, instance := range instances {
168+
ingTokens := GenerateTokens(128, prevTokens)
169+
r.AddIngester(id, instance.Addr, instance.Zone, ingTokens, instance.State, time.Now())
170+
prevTokens = append(prevTokens, ingTokens...)
171+
}
172+
instancesList := make([]InstanceDesc, 0, len(r.GetIngesters()))
173+
for _, v := range r.GetIngesters() {
174+
instancesList = append(instancesList, v)
175+
}
176+
177+
ring := Ring{
178+
cfg: Config{
179+
HeartbeatTimeout: time.Hour,
180+
ReplicationFactor: testData.replicationFactor,
181+
ZoneAwarenessEnabled: true,
182+
},
183+
ringDesc: r,
184+
ringTokens: r.GetTokens(),
185+
ringTokensByZone: r.getTokensByZone(),
186+
ringInstanceByToken: r.getTokensInfo(),
187+
ringZones: getZones(r.getTokensByZone()),
188+
strategy: NewDefaultReplicationStrategy(),
189+
}
190+
191+
_, bufHosts, bufZones := MakeBuffersForGet()
192+
193+
// Use the GenerateTokens to get an array of random uint32 values.
194+
testValues := GenerateTokens(testCount, nil)
195+
196+
for i := 0; i < testCount; i++ {
197+
set, err := ring.Get(testValues[i], Write, instancesList, bufHosts, bufZones)
198+
require.NoError(t, err)
199+
200+
distinctZones := map[string]int{}
201+
for _, instance := range set.Instances {
202+
distinctZones[instance.Zone]++
203+
}
204+
205+
assert.Len(t, set.Instances, testData.expectedInstances)
206+
assert.Len(t, distinctZones, testData.expectedZones)
207+
}
208+
})
209+
}
210+
}
211+
135212
func TestRing_Get_ZoneAwareness(t *testing.T) {
136213
// Number of tests to run.
137214
const testCount = 10000

0 commit comments

Comments
 (0)