Skip to content

Commit 843d4f4

Browse files
committed
fix issue where we are not returning all the available ingesters in the ring
Signed-off-by: Roy Chiang <[email protected]>
1 parent 3fedc11 commit 843d4f4

File tree

3 files changed

+72
-9
lines changed

3 files changed

+72
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* `cortex_ruler_client_request_duration_seconds`
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
1919
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
20+
* [BUGFIX] Distributor: fix issue where we are not returning all the available ingesters in the ring. #3977
2021

2122
## 1.8.0 in progress
2223

pkg/ring/ring.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -342,23 +342,29 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
342342
continue
343343
}
344344

345+
instance := r.ringDesc.Ingesters[info.InstanceID]
346+
347+
// Check whether the replica set should be extended given we're including
348+
// this instance.
349+
shouldExtendReplicaSet := op.ShouldExtendReplicaSetOnState(instance.State)
350+
if shouldExtendReplicaSet {
351+
n++
352+
}
353+
345354
// Ignore if the instances don't have a zone set.
346355
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
347356
if util.StringsContain(distinctZones, info.Zone) {
348357
continue
349358
}
350-
distinctZones = append(distinctZones, info.Zone)
351-
}
352359

353-
distinctHosts = append(distinctHosts, info.InstanceID)
354-
instance := r.ringDesc.Ingesters[info.InstanceID]
355-
356-
// Check whether the replica set should be extended given we're including
357-
// this instance.
358-
if op.ShouldExtendReplicaSetOnState(instance.State) {
359-
n++
360+
// We should only add instance zone if we are not going to extend,
361+
// as we want to extend the instance in the same AZ.
362+
if !shouldExtendReplicaSet {
363+
distinctZones = append(distinctZones, info.Zone)
364+
}
360365
}
361366

367+
distinctHosts = append(distinctHosts, info.InstanceID)
362368
instances = append(instances, instance)
363369
}
364370

pkg/ring/ring_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,62 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) {
132132
require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens)
133133
}
134134

135+
func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) {
136+
const testCount = 10000
137+
138+
r := NewDesc()
139+
instances := map[string]InstanceDesc{
140+
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil), State: ACTIVE},
141+
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil), State: ACTIVE},
142+
"instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil), State: ACTIVE},
143+
"instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil), State: ACTIVE},
144+
"instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil), State: LEAVING},
145+
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil), State: ACTIVE},
146+
}
147+
var prevTokens []uint32
148+
for id, instance := range instances {
149+
ingTokens := GenerateTokens(128, prevTokens)
150+
r.AddIngester(id, instance.Addr, instance.Zone, ingTokens, instance.State, time.Now())
151+
prevTokens = append(prevTokens, ingTokens...)
152+
}
153+
instancesList := make([]InstanceDesc, 0, len(r.GetIngesters()))
154+
for _, v := range r.GetIngesters() {
155+
instancesList = append(instancesList, v)
156+
}
157+
158+
ring := Ring{
159+
cfg: Config{
160+
HeartbeatTimeout: time.Hour,
161+
ReplicationFactor: 3,
162+
ZoneAwarenessEnabled: true,
163+
},
164+
ringDesc: r,
165+
ringTokens: r.GetTokens(),
166+
ringTokensByZone: r.getTokensByZone(),
167+
ringInstanceByToken: r.getTokensInfo(),
168+
ringZones: getZones(r.getTokensByZone()),
169+
strategy: NewDefaultReplicationStrategy(),
170+
}
171+
172+
_, bufHosts, bufZones := MakeBuffersForGet()
173+
174+
// Use the GenerateTokens to get an array of random uint32 values.
175+
testValues := GenerateTokens(testCount, nil)
176+
177+
for i := 0; i < testCount; i++ {
178+
set, err := ring.Get(testValues[i], Write, instancesList, bufHosts, bufZones)
179+
require.NoError(t, err)
180+
181+
distinctZones := map[string]int{}
182+
for _, instance := range set.Instances {
183+
distinctZones[instance.Zone]++
184+
}
185+
186+
assert.Equal(t, len(set.Instances), 3)
187+
assert.Equal(t, len(distinctZones), 3)
188+
}
189+
}
190+
135191
func TestRing_Get_ZoneAwareness(t *testing.T) {
136192
// Number of tests to run.
137193
const testCount = 10000

0 commit comments

Comments
 (0)