Skip to content

Commit 9817d07

Browse files
committed
Using ingester ID on the distributor metrics
Signed-off-by: alanprot <[email protected]>
1 parent ee8d110 commit 9817d07

File tree

5 files changed

+99
-49
lines changed

5 files changed

+99
-49
lines changed

pkg/distributor/distributor.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -734,10 +734,12 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
734734
return
735735
}
736736

737-
ipsMap := map[string]struct{}{}
737+
idsMap := map[string]struct{}{}
738738

739739
for _, ing := range append(healthy, unhealthy...) {
740-
ipsMap[ing.Addr] = struct{}{}
740+
if id, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr); err == nil {
741+
idsMap[id] = struct{}{}
742+
}
741743
}
742744

743745
ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures}
@@ -751,7 +753,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
751753
}
752754

753755
for _, lbls := range metrics {
754-
if _, ok := ipsMap[lbls.Get("ingester")]; !ok {
756+
if _, ok := idsMap[lbls.Get("ingester")]; !ok {
755757
err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")})
756758
if err != nil {
757759
level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err)
@@ -956,6 +958,12 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
956958
if err != nil {
957959
return err
958960
}
961+
962+
id, err := d.ingestersRing.GetInstanceIdByAddr(ingester.Addr)
963+
if err != nil {
964+
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ingester.Addr, "err", err)
965+
}
966+
959967
c := h.(ingester_client.HealthAndIngesterClient)
960968

961969
req := cortexpb.PreallocWriteRequestFromPool()
@@ -972,15 +980,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
972980
}
973981

974982
if len(metadata) > 0 {
975-
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
983+
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
976984
if err != nil {
977-
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata, getErrorStatus(err)).Inc()
985+
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
978986
}
979987
}
980988
if len(timeseries) > 0 {
981-
d.ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
989+
d.ingesterAppends.WithLabelValues(id, typeSamples).Inc()
982990
if err != nil {
983-
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples, getErrorStatus(err)).Inc()
991+
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
984992
}
985993
}
986994

pkg/distributor/distributor_test.go

+40-37
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,12 @@ func TestDistributor_Push(t *testing.T) {
232232
expectedMetrics: `
233233
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
234234
# TYPE cortex_distributor_ingester_append_failures_total counter
235-
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="samples"} 1
235+
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="samples"} 1
236236
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
237237
# TYPE cortex_distributor_ingester_appends_total counter
238-
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
239-
cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1
240-
cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1
238+
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="samples"} 1
239+
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="samples"} 1
240+
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="samples"} 1
241241
`,
242242
},
243243
"A push to ingesters should report the correct metrics with no samples": {
@@ -251,12 +251,12 @@ func TestDistributor_Push(t *testing.T) {
251251
expectedMetrics: `
252252
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
253253
# TYPE cortex_distributor_ingester_append_failures_total counter
254-
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="metadata"} 1
254+
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="metadata"} 1
255255
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
256256
# TYPE cortex_distributor_ingester_appends_total counter
257-
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
258-
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
259-
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
257+
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
258+
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
259+
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
260260
`,
261261
},
262262
"A push to overloaded ingesters should report the correct metrics": {
@@ -268,14 +268,14 @@ func TestDistributor_Push(t *testing.T) {
268268
expectedResponse: emptyResponse,
269269
ingesterError: httpgrpc.Errorf(http.StatusTooManyRequests, "Fail"),
270270
expectedMetrics: `
271-
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
272-
# TYPE cortex_distributor_ingester_append_failures_total counter
273-
cortex_distributor_ingester_append_failures_total{ingester="2",status="4xx",type="metadata"} 1
274271
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
275272
# TYPE cortex_distributor_ingester_appends_total counter
276-
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
277-
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
278-
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
273+
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
274+
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
275+
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
276+
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
277+
# TYPE cortex_distributor_ingester_append_failures_total counter
278+
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="4xx",type="metadata"} 1
279279
`,
280280
},
281281
"A push to 3 happy ingesters should succeed, histograms": {
@@ -436,14 +436,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
436436
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
437437

438438
h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
439-
d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc()
440-
d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc()
441-
d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc()
442-
d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc()
443-
d.ingesterQueries.WithLabelValues(h[0].Addr).Inc()
444-
d.ingesterQueries.WithLabelValues(h[1].Addr).Inc()
445-
d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc()
446-
d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc()
439+
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
440+
ingId1, _ := r.GetInstanceIdByAddr(h[1].Addr)
441+
d.ingesterAppends.WithLabelValues(ingId0, typeMetadata).Inc()
442+
d.ingesterAppendFailures.WithLabelValues(ingId0, typeMetadata, "2xx").Inc()
443+
d.ingesterAppends.WithLabelValues(ingId1, typeMetadata).Inc()
444+
d.ingesterAppendFailures.WithLabelValues(ingId1, typeMetadata, "2xx").Inc()
445+
d.ingesterQueries.WithLabelValues(ingId0).Inc()
446+
d.ingesterQueries.WithLabelValues(ingId1).Inc()
447+
d.ingesterQueryFailures.WithLabelValues(ingId0).Inc()
448+
d.ingesterQueryFailures.WithLabelValues(ingId1).Inc()
447449

448450
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
449451
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
@@ -489,27 +491,27 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
489491
490492
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
491493
# TYPE cortex_distributor_ingester_append_failures_total counter
492-
cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1
493-
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
494+
cortex_distributor_ingester_append_failures_total{ingester="ingester-0",status="2xx",type="metadata"} 1
495+
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
494496
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
495497
# TYPE cortex_distributor_ingester_appends_total counter
496-
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
497-
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
498+
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
499+
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
498500
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
499501
# TYPE cortex_distributor_ingester_queries_total counter
500-
cortex_distributor_ingester_queries_total{ingester="0"} 1
501-
cortex_distributor_ingester_queries_total{ingester="1"} 1
502+
cortex_distributor_ingester_queries_total{ingester="ingester-0"} 1
503+
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
502504
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
503505
# TYPE cortex_distributor_ingester_query_failures_total counter
504-
cortex_distributor_ingester_query_failures_total{ingester="0"} 1
505-
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
506+
cortex_distributor_ingester_query_failures_total{ingester="ingester-0"} 1
507+
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
506508
`), metrics...))
507509

508510
d.cleanupInactiveUser("userA")
509511

510512
err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
511513
r := in.(*ring.Desc)
512-
delete(r.Ingesters, "0")
514+
delete(r.Ingesters, "ingester-0")
513515
return in, true, nil
514516
})
515517

@@ -556,16 +558,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
556558
557559
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
558560
# TYPE cortex_distributor_ingester_append_failures_total counter
559-
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
561+
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
560562
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
561563
# TYPE cortex_distributor_ingester_appends_total counter
562-
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
564+
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
563565
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
564566
# TYPE cortex_distributor_ingester_queries_total counter
565-
cortex_distributor_ingester_queries_total{ingester="1"} 1
567+
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
566568
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
567569
# TYPE cortex_distributor_ingester_query_failures_total counter
568-
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
570+
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
569571
`), metrics...))
570572
}
571573

@@ -2717,8 +2719,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
27172719
} else {
27182720
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
27192721
}
2720-
addr := fmt.Sprintf("%d", i)
2721-
ingesterDescs[addr] = ring.InstanceDesc{
2722+
ingester := fmt.Sprintf("ingester-%d", i)
2723+
addr := fmt.Sprintf("ip-ingester-%d", i)
2724+
ingesterDescs[ingester] = ring.InstanceDesc{
27222725
Addr: addr,
27232726
Zone: "",
27242727
State: ring.ACTIVE,

pkg/distributor/query.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package distributor
22

33
import (
44
"context"
5+
"github.com/go-kit/log/level"
56
"io"
67
"sort"
78
"time"
@@ -166,9 +167,14 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
166167
}
167168

168169
resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req)
169-
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
170+
ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
170171
if err != nil {
171-
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
172+
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
173+
}
174+
175+
d.ingesterQueries.WithLabelValues(ingesterId).Inc()
176+
if err != nil {
177+
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
172178
return nil, err
173179
}
174180

@@ -225,11 +231,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
225231
if err != nil {
226232
return nil, err
227233
}
228-
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
234+
235+
ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
236+
if err != nil {
237+
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
238+
}
239+
240+
d.ingesterQueries.WithLabelValues(ingesterId).Inc()
229241

230242
stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req)
231243
if err != nil {
232-
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
244+
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
233245
return nil, err
234246
}
235247
defer stream.CloseSend() //nolint:errcheck
@@ -242,7 +254,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
242254
} else if err != nil {
243255
// Do not track a failure if the context was canceled.
244256
if !grpcutil.IsGRPCContextCanceled(err) {
245-
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
257+
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
246258
}
247259

248260
return nil, err

pkg/ring/model.go

+10
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,16 @@ func (d *Desc) getTokensByZone() map[string][]uint32 {
527527
return MergeTokensByZone(zones)
528528
}
529529

530+
// getInstancesByAddr returns instances id by its address
531+
func (d *Desc) getInstancesByAddr() map[string]string {
532+
instancesByAddMap := make(map[string]string, len(d.Ingesters))
533+
for id, instance := range d.Ingesters {
534+
instancesByAddMap[instance.Addr] = id
535+
}
536+
537+
return instancesByAddMap
538+
}
539+
530540
type CompareResult int
531541

532542
// CompareResult responses

pkg/ring/ring.go

+17
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ type ReadRing interface {
8080
// instance does not exist in the ring.
8181
GetInstanceState(instanceID string) (InstanceState, error)
8282

83+
// GetInstanceIdByAddr returns the instance id from its address or an error if the
84+
// // instance does not exist in the ring.
85+
GetInstanceIdByAddr(addr string) (string, error)
86+
8387
// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
8488
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
8589
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
@@ -186,6 +190,8 @@ type Ring struct {
186190
// change it is to create a new one and replace it).
187191
ringInstanceByToken map[uint32]instanceInfo
188192

193+
ringInstanceIdByAddr map[string]string
194+
189195
// When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp).
190196
lastTopologyChange time.Time
191197

@@ -338,6 +344,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
338344
ringTokens := ringDesc.GetTokens()
339345
ringTokensByZone := ringDesc.getTokensByZone()
340346
ringInstanceByToken := ringDesc.getTokensInfo()
347+
ringInstanceByAddr := ringDesc.getInstancesByAddr()
341348
ringZones := getZones(ringTokensByZone)
342349

343350
r.mtx.Lock()
@@ -346,6 +353,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
346353
r.ringTokens = ringTokens
347354
r.ringTokensByZone = ringTokensByZone
348355
r.ringInstanceByToken = ringInstanceByToken
356+
r.ringInstanceIdByAddr = ringInstanceByAddr
349357
r.ringZones = ringZones
350358
r.lastTopologyChange = now
351359
if r.shuffledSubringCache != nil {
@@ -895,6 +903,15 @@ func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) {
895903
return instance.GetState(), nil
896904
}
897905

906+
// GetInstanceIdByAddr implements ReadRing.
907+
func (r *Ring) GetInstanceIdByAddr(addr string) (string, error) {
908+
if i, ok := r.ringInstanceIdByAddr[addr]; ok {
909+
return i, nil
910+
}
911+
912+
return "notFound", ErrInstanceNotFound
913+
}
914+
898915
// HasInstance returns whether the ring contains an instance matching the provided instanceID.
899916
func (r *Ring) HasInstance(instanceID string) bool {
900917
r.mtx.RLock()

0 commit comments

Comments
 (0)