Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using ingester id instead of address on the distributor metrics. #6078

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984
* [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056
* [CHANGE] Distributor/Ingester: Change `cortex_distributor_ingester_appends_total`, `cortex_distributor_ingester_append_failures_total`, `cortex_distributor_ingester_queries_total`, and `cortex_distributor_ingester_query_failures_total` metrics to use the ingester ID instead of its IP as the label value. #6078
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ type RingMock struct {
mock.Mock
}

func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) {
return "", nil
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}
Expand Down
22 changes: 15 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,10 +734,12 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
return
}

ipsMap := map[string]struct{}{}
idsMap := map[string]struct{}{}

for _, ing := range append(healthy, unhealthy...) {
ipsMap[ing.Addr] = struct{}{}
if id, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr); err == nil {
idsMap[id] = struct{}{}
}
}

ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures}
Expand All @@ -751,7 +753,7 @@ func (d *Distributor) cleanStaleIngesterMetrics() {
}

for _, lbls := range metrics {
if _, ok := ipsMap[lbls.Get("ingester")]; !ok {
if _, ok := idsMap[lbls.Get("ingester")]; !ok {
err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")})
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err)
Expand Down Expand Up @@ -956,6 +958,12 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if err != nil {
return err
}

id, err := d.ingestersRing.GetInstanceIdByAddr(ingester.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ingester.Addr, "err", err)
}

c := h.(ingester_client.HealthAndIngesterClient)

req := cortexpb.PreallocWriteRequestFromPool()
Expand All @@ -972,15 +980,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
}

if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeMetadata, getErrorStatus(err)).Inc()
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
}
}
if len(timeseries) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeSamples).Inc()
d.ingesterAppends.WithLabelValues(id, typeSamples).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr, typeSamples, getErrorStatus(err)).Inc()
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
}
}

Expand Down
90 changes: 45 additions & 45 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ func TestDistributor_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="samples"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="samples"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="samples"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="samples"} 1
`,
},
"A push to ingesters should report the correct metrics with no samples": {
Expand All @@ -251,12 +251,12 @@ func TestDistributor_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="5xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="5xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
`,
},
"A push to overloaded ingesters should report the correct metrics": {
Expand All @@ -268,14 +268,14 @@ func TestDistributor_Push(t *testing.T) {
expectedResponse: emptyResponse,
ingesterError: httpgrpc.Errorf(http.StatusTooManyRequests, "Fail"),
expectedMetrics: `
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="2",status="4xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="2",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-2",type="metadata"} 1
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="ingester-2",status="4xx",type="metadata"} 1
`,
},
"A push to 3 happy ingesters should succeed, histograms": {
Expand Down Expand Up @@ -436,14 +436,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueries.WithLabelValues(h[1].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc()
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
ingId1, _ := r.GetInstanceIdByAddr(h[1].Addr)
d.ingesterAppends.WithLabelValues(ingId0, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(ingId0, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(ingId1, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(ingId1, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(ingId0).Inc()
d.ingesterQueries.WithLabelValues(ingId1).Inc()
d.ingesterQueryFailures.WithLabelValues(ingId0).Inc()
d.ingesterQueryFailures.WithLabelValues(ingId1).Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
Expand Down Expand Up @@ -489,27 +491,27 @@ func TestDistributor_MetricsCleanup(t *testing.T) {

# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="0"} 1
cortex_distributor_ingester_queries_total{ingester="1"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-0"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="0"} 1
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-0"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
`), metrics...))

d.cleanupInactiveUser("userA")

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
delete(r.Ingesters, "0")
delete(r.Ingesters, "ingester-0")
return in, true, nil
})

Expand Down Expand Up @@ -556,16 +558,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {

# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="ingester-1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="ingester-1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="1"} 1
cortex_distributor_ingester_queries_total{ingester="ingester-1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
cortex_distributor_ingester_query_failures_total{ingester="ingester-1"} 1
`), metrics...))
}

Expand Down Expand Up @@ -757,23 +759,20 @@ func TestPush_QuorumError(t *testing.T) {

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
ingester2 := r.Ingesters["2"]
ingester2 := r.Ingesters["ingester-2"]
ingester2.State = ring.LEFT
ingester2.Timestamp = time.Now().Unix()
r.Ingesters["2"] = ingester2
r.Ingesters["ingester-2"] = ingester2
return in, true, nil
})

require.NoError(t, err)

// Give time to the ring get updated with the KV value
for {
test.Poll(t, 15*time.Second, true, func() interface{} {
replicationSet, _ := r.GetAllHealthy(ring.Read)
if len(replicationSet.Instances) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
return len(replicationSet.Instances) == 2
})

for i := 0; i < numberOfWrites; i++ {
request := makeWriteRequest(0, 30, 20, 10)
Expand Down Expand Up @@ -2717,8 +2716,9 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
} else {
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
}
addr := fmt.Sprintf("%d", i)
ingesterDescs[addr] = ring.InstanceDesc{
ingester := fmt.Sprintf("ingester-%d", i)
addr := fmt.Sprintf("ip-ingester-%d", i)
ingesterDescs[ingester] = ring.InstanceDesc{
Addr: addr,
Zone: "",
State: ring.ACTIVE,
Expand Down
23 changes: 18 additions & 5 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -165,10 +166,16 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
return nil, err
}

ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
}

resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()

d.ingesterQueries.WithLabelValues(ingesterId).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
return nil, err
}

Expand Down Expand Up @@ -225,11 +232,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
if err != nil {
return nil, err
}
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()

ingesterId, err := d.ingestersRing.GetInstanceIdByAddr(ing.Addr)
if err != nil {
level.Warn(d.log).Log("msg", "instance not found in the ring", "addr", ing.Addr, "err", err)
}

d.ingesterQueries.WithLabelValues(ingesterId).Inc()

stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
Expand All @@ -242,7 +255,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
} else if err != nil {
// Do not track a failure if the context was canceled.
if !grpcutil.IsGRPCContextCanceled(err) {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(ingesterId).Inc()
}

return nil, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,16 @@ func (d *Desc) getTokensByZone() map[string][]uint32 {
return MergeTokensByZone(zones)
}

// getInstancesByAddr returns instances id by its address
func (d *Desc) getInstancesByAddr() map[string]string {
instancesByAddMap := make(map[string]string, len(d.Ingesters))
for id, instance := range d.Ingesters {
instancesByAddMap[instance.Addr] = id
}

return instancesByAddMap
}

type CompareResult int

// CompareResult responses
Expand Down
17 changes: 17 additions & 0 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type ReadRing interface {
// instance does not exist in the ring.
GetInstanceState(instanceID string) (InstanceState, error)

// GetInstanceIdByAddr returns the instance id from its address or an error if the
// // instance does not exist in the ring.
GetInstanceIdByAddr(addr string) (string, error)

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
// all instances that have been part of the identifier's shard since "now - lookbackPeriod".
ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
Expand Down Expand Up @@ -186,6 +190,8 @@ type Ring struct {
// change it is to create a new one and replace it).
ringInstanceByToken map[uint32]instanceInfo

ringInstanceIdByAddr map[string]string

// When did a set of instances change the last time (instance changing state or heartbeat is ignored for this timestamp).
lastTopologyChange time.Time

Expand Down Expand Up @@ -338,6 +344,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
ringTokens := ringDesc.GetTokens()
ringTokensByZone := ringDesc.getTokensByZone()
ringInstanceByToken := ringDesc.getTokensInfo()
ringInstanceByAddr := ringDesc.getInstancesByAddr()
ringZones := getZones(ringTokensByZone)

r.mtx.Lock()
Expand All @@ -346,6 +353,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.ringTokens = ringTokens
r.ringTokensByZone = ringTokensByZone
r.ringInstanceByToken = ringInstanceByToken
r.ringInstanceIdByAddr = ringInstanceByAddr
r.ringZones = ringZones
r.lastTopologyChange = now
if r.shuffledSubringCache != nil {
Expand Down Expand Up @@ -895,6 +903,15 @@ func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) {
return instance.GetState(), nil
}

// GetInstanceIdByAddr implements ReadRing.
func (r *Ring) GetInstanceIdByAddr(addr string) (string, error) {
if i, ok := r.ringInstanceIdByAddr[addr]; ok {
return i, nil
}

return "notFound", ErrInstanceNotFound
}

// HasInstance returns whether the ring contains an instance matching the provided instanceID.
func (r *Ring) HasInstance(instanceID string) bool {
r.mtx.RLock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type RingMock struct {
mock.Mock
}

func (r *RingMock) GetInstanceIdByAddr(addr string) (string, error) {
return "", nil
}

func (r *RingMock) Collect(ch chan<- prometheus.Metric) {}

func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {}
Expand Down
Loading