@@ -36,6 +36,7 @@ import (
36
36
"github.com/cortexproject/cortex/pkg/tenant"
37
37
"github.com/cortexproject/cortex/pkg/util"
38
38
"github.com/cortexproject/cortex/pkg/util/extract"
39
+ "github.com/cortexproject/cortex/pkg/util/labelset"
39
40
"github.com/cortexproject/cortex/pkg/util/limiter"
40
41
util_log "github.com/cortexproject/cortex/pkg/util/log"
41
42
util_math "github.com/cortexproject/cortex/pkg/util/math"
@@ -130,7 +131,7 @@ type Distributor struct {
130
131
asyncExecutor util.AsyncExecutor
131
132
132
133
// Map to track label sets from user.
133
- labelSetTracker * labelSetTracker
134
+ labelSetTracker * labelset. LabelSetTracker
134
135
}
135
136
136
137
// Config contains the configuration required to
@@ -388,7 +389,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
388
389
asyncExecutor : util .NewNoOpExecutor (),
389
390
}
390
391
391
- d .labelSetTracker = newLabelSetTracker ( d . receivedSamplesPerLabelSet )
392
+ d .labelSetTracker = labelset . NewLabelSetTracker ( )
392
393
393
394
if cfg .NumPushWorkers > 0 {
394
395
util_log .WarnExperimentalUse ("Distributor: using goroutine worker pool" )
@@ -810,7 +811,16 @@ func (d *Distributor) updateLabelSetMetrics() {
810
811
}
811
812
}
812
813
813
- d .labelSetTracker .updateMetrics (activeUserSet )
814
+ d .labelSetTracker .UpdateMetrics (activeUserSet , func (user , labelSetStr string , removeUser bool ) {
815
+ if removeUser {
816
+ if err := util .DeleteMatchingLabels (d .receivedSamplesPerLabelSet , map [string ]string {"user" : user }); err != nil {
817
+ level .Warn (d .log ).Log ("msg" , "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user" , "user" , user , "err" , err )
818
+ }
819
+ return
820
+ }
821
+ d .receivedSamplesPerLabelSet .DeleteLabelValues (user , sampleMetricTypeFloat , labelSetStr )
822
+ d .receivedSamplesPerLabelSet .DeleteLabelValues (user , sampleMetricTypeHistogram , labelSetStr )
823
+ })
814
824
}
815
825
816
826
func (d * Distributor ) cleanStaleIngesterMetrics () {
@@ -913,6 +923,12 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
913
923
return metadataKeys , validatedMetadata , firstPartialErr
914
924
}
915
925
926
+ type samplesLabelSetEntry struct {
927
+ floatSamples int64
928
+ histogramSamples int64
929
+ labels labels.Labels
930
+ }
931
+
916
932
func (d * Distributor ) prepareSeriesKeys (ctx context.Context , req * cortexpb.WriteRequest , userID string , limits * validation.Limits , removeReplica bool ) ([]uint32 , []cortexpb.PreallocTimeseries , int , int , int , error , error ) {
917
933
pSpan , _ := opentracing .StartSpanFromContext (ctx , "prepareSeriesKeys" )
918
934
defer pSpan .Finish ()
@@ -1070,8 +1086,16 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
1070
1086
validatedExemplars += len (ts .Exemplars )
1071
1087
}
1072
1088
for h , counter := range labelSetCounters {
1073
- d .labelSetTracker .increaseSamplesLabelSet (userID , h , counter .labels , counter .floatSamples , counter .histogramSamples )
1089
+ d .labelSetTracker .Track (userID , h , counter .labels )
1090
+ labelSetStr := counter .labels .String ()
1091
+ if counter .floatSamples > 0 {
1092
+ d .receivedSamplesPerLabelSet .WithLabelValues (userID , sampleMetricTypeFloat , labelSetStr ).Add (float64 (counter .floatSamples ))
1093
+ }
1094
+ if counter .histogramSamples > 0 {
1095
+ d .receivedSamplesPerLabelSet .WithLabelValues (userID , sampleMetricTypeHistogram , labelSetStr ).Add (float64 (counter .histogramSamples ))
1096
+ }
1074
1097
}
1098
+
1075
1099
return seriesKeys , validatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , nil
1076
1100
}
1077
1101
0 commit comments