Skip to content

Commit f73e148

Browse files
committed
implement default partition for limits per labelset but without any labels
Signed-off-by: Ben Ye <[email protected]>
1 parent b72a536 commit f73e148

File tree

7 files changed

+549
-60
lines changed

7 files changed

+549
-60
lines changed

pkg/ingester/ingester_test.go

+72-4
Original file line numberDiff line numberDiff line change
@@ -366,22 +366,77 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
366366
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
367367
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
368368

369-
// Should remove metrics when the limits is removed
369+
// Add default partition -> no label set configured working as a fallback when a series
370+
// doesn't match any existing label set limit.
371+
emptyLabels := labels.EmptyLabels()
372+
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
373+
Limits: validation.LimitsPerLabelSetEntry{
374+
MaxSeries: 2,
375+
},
376+
}
377+
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
378+
b, err = json.Marshal(limits)
379+
require.NoError(t, err)
380+
require.NoError(t, limits.UnmarshalJSON(b))
381+
tenantLimits.setLimits(userID, &limits)
382+
383+
lbls = []string{labels.MetricName, "test_default"}
384+
for i := 0; i < 2; i++ {
385+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
386+
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
387+
require.NoError(t, err)
388+
}
389+
390+
// Max series limit for default partition is 2 so 1 more series will be throttled.
391+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
392+
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
393+
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
394+
require.True(t, ok, "returned error is not an httpgrpc response")
395+
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
396+
require.ErrorContains(t, err, emptyLabels.String())
397+
398+
ing.updateActiveSeries(ctx)
399+
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
400+
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
401+
# TYPE cortex_ingester_limits_per_labelset gauge
402+
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
403+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
404+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
405+
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
406+
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
407+
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
408+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
409+
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
410+
# TYPE cortex_ingester_usage_per_labelset gauge
411+
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
412+
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
413+
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
414+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
415+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
416+
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
417+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
418+
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
419+
420+
// Should remove metrics when the limits is removed, keep default partition limit
370421
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
422+
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
371423
b, err = json.Marshal(limits)
372424
require.NoError(t, err)
373425
require.NoError(t, limits.UnmarshalJSON(b))
374426
tenantLimits.setLimits(userID, &limits)
375427
ing.updateActiveSeries(ctx)
428+
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
376429
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
377430
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
378431
# TYPE cortex_ingester_limits_per_labelset gauge
379432
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
380433
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
434+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
381435
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
382436
# TYPE cortex_ingester_usage_per_labelset gauge
383437
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
384438
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
439+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
385440
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
386441

387442
// Should persist between restarts
@@ -396,10 +451,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
396451
# TYPE cortex_ingester_limits_per_labelset gauge
397452
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
398453
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
454+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
399455
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
400456
# TYPE cortex_ingester_usage_per_labelset gauge
401457
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
402458
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
459+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
403460
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
404461
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
405462

@@ -420,6 +477,13 @@ func TestPushRace(t *testing.T) {
420477
MaxSeries: 10e10,
421478
},
422479
},
480+
{
481+
// Default partition.
482+
LabelSet: labels.EmptyLabels(),
483+
Limits: validation.LimitsPerLabelSetEntry{
484+
MaxSeries: 10e10,
485+
},
486+
},
423487
}
424488

425489
dir := t.TempDir()
@@ -451,6 +515,10 @@ func TestPushRace(t *testing.T) {
451515
defer wg.Done()
452516
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
453517
require.NoError(t, err)
518+
519+
// Go to default partition.
520+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
521+
require.NoError(t, err)
454522
}()
455523
}
456524
}
@@ -472,13 +540,13 @@ func TestPushRace(t *testing.T) {
472540
err = ir.Series(p.At(), &builder, nil)
473541
require.NoError(t, err)
474542
lbls := builder.Labels()
475-
require.Equal(t, "foo", lbls.Get(labels.MetricName))
543+
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
476544
require.Equal(t, "1", lbls.Get("userId"))
477545
require.NotEmpty(t, lbls.Get("k"))
478546
builder.Reset()
479547
}
480-
require.Equal(t, numberOfSeries, total)
481-
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
548+
require.Equal(t, 2*numberOfSeries, total)
549+
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
482550
}
483551

484552
func TestIngesterUserLimitExceeded(t *testing.T) {

pkg/ingester/limiter.go

+6-22
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int
107107

108108
// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
109109
// number of metrics with metadata in input and returns an error if so.
110-
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
111-
m := l.limitsPerLabelSets(userID, metric)
112-
for _, limit := range m {
110+
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
111+
limits := l.limits.LimitsPerLabelSet(userID)
112+
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
113+
for _, limit := range matchedLimits {
113114
maxSeriesFunc := func(string) int {
114115
return limit.Limits.MaxSeries
115116
}
116117
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
117-
if u, err := f(limit); err != nil {
118+
if u, err := f(limits, limit); err != nil {
118119
return err
119120
} else if u >= local {
120121
return errMaxSeriesPerLabelSetLimitExceeded{
@@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim
191192

192193
func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
193194
m := l.limits.LimitsPerLabelSet(userID)
194-
195-
// returning early to not have any overhead
196-
if len(m) == 0 {
197-
return nil
198-
}
199-
200-
r := make([]validation.LimitsPerLabelSet, 0, len(m))
201-
outer:
202-
for _, lbls := range m {
203-
for _, lbl := range lbls.LabelSet {
204-
// We did not find some of the labels on the set
205-
if v := metric.Get(lbl.Name); v != lbl.Value {
206-
continue outer
207-
}
208-
}
209-
r = append(r, lbls)
210-
}
211-
return r
195+
return validation.LimitsPerLabelSetsForSeries(m, metric)
212196
}
213197

214198
func (l *Limiter) maxSeriesPerMetric(userID string) int {

pkg/ingester/limiter_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
500500
require.NoError(t, err)
501501

502502
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
503-
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
503+
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
504504
return testData.series, nil
505505
})
506506

pkg/ingester/user_state.go

+105-33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/prometheus/common/model"
88
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/prometheus/prometheus/tsdb"
910
"github.com/prometheus/prometheus/tsdb/index"
1011
"github.com/segmentio/fasthash/fnv1a"
1112

@@ -114,59 +115,113 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
114115
}
115116

116117
func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
117-
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.LimitsPerLabelSet) (int, error) {
118-
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
118+
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
119+
s := m.shards[util.HashFP(model.Fingerprint(limit.Hash))%numMetricCounterShards]
119120
s.RLock()
120-
if r, ok := s.valuesCounter[set.Hash]; ok {
121+
if r, ok := s.valuesCounter[limit.Hash]; ok {
121122
defer s.RUnlock()
122123
return r.count, nil
123124
}
124125
s.RUnlock()
125126

126127
// We still dont keep track of this label value so we need to backfill
127-
return m.backFillLimit(ctx, u, set, s)
128+
return m.backFillLimit(ctx, u, false, allLimits, limit, s)
128129
})
129130
}
130131

131-
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
132+
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, forceBackfill bool, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet, s *labelSetCounterShard) (int, error) {
133+
s.Lock()
134+
// If not force backfill, use existing counter value.
135+
if !forceBackfill {
136+
if r, ok := s.valuesCounter[limit.Hash]; ok {
137+
s.Unlock()
138+
return r.count, nil
139+
}
140+
}
141+
132142
ir, err := u.db.Head().Index()
133143
if err != nil {
134144
return 0, err
135145
}
136146

137147
defer ir.Close()
138148

139-
s.Lock()
140-
defer s.Unlock()
141-
if r, ok := s.valuesCounter[limit.Hash]; !ok {
142-
postings := make([]index.Postings, 0, len(limit.LabelSet))
143-
for _, lbl := range limit.LabelSet {
144-
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
145-
if err != nil {
146-
return 0, err
147-
}
148-
postings = append(postings, p)
149-
}
149+
totalCount, err := getCardinalityForLimitsPerLabelSet(ctx, ir, allLimits, limit)
150+
if err != nil {
151+
return 0, err
152+
}
150153

151-
p := index.Intersect(postings...)
154+
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
155+
count: totalCount,
156+
labels: limit.LabelSet,
157+
}
158+
s.Unlock()
159+
return totalCount, nil
160+
}
152161

153-
totalCount := 0
154-
for p.Next() {
155-
totalCount++
162+
func getCardinalityForLimitsPerLabelSet(ctx context.Context, ir tsdb.IndexReader, allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
163+
// Easy path with explicit labels.
164+
if limit.LabelSet.Len() > 0 {
165+
p, err := getPostingForLabels(ctx, ir, limit.LabelSet)
166+
if err != nil {
167+
return 0, err
156168
}
169+
return getPostingCardinality(p)
170+
}
157171

158-
if p.Err() != nil {
159-
return 0, p.Err()
172+
// Default partition needs to get cardinality of all series that doesn't belong to any existing partitions.
173+
postings := make([]index.Postings, 0, len(allLimits)-1)
174+
for _, l := range allLimits {
175+
if l.Hash == limit.Hash {
176+
continue
160177
}
178+
p, err := getPostingForLabels(ctx, ir, l.LabelSet)
179+
if err != nil {
180+
return 0, err
181+
}
182+
postings = append(postings, p)
183+
}
184+
mergedCardinality, err := getPostingCardinality(index.Merge(ctx, postings...))
185+
if err != nil {
186+
return 0, err
187+
}
188+
189+
name, value := index.AllPostingsKey()
190+
// Don't expand all postings but get length directly instead.
191+
allPostings, err := ir.Postings(ctx, name, value)
192+
if err != nil {
193+
return 0, err
194+
}
195+
allCardinality, err := getPostingCardinality(allPostings)
196+
if err != nil {
197+
return 0, err
198+
}
199+
return allCardinality - mergedCardinality, nil
200+
}
161201

162-
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
163-
count: totalCount,
164-
labels: limit.LabelSet,
202+
func getPostingForLabels(ctx context.Context, ir tsdb.IndexReader, lbls labels.Labels) (index.Postings, error) {
203+
postings := make([]index.Postings, 0, len(lbls))
204+
for _, lbl := range lbls {
205+
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
206+
if err != nil {
207+
return nil, err
165208
}
166-
return totalCount, nil
167-
} else {
168-
return r.count, nil
209+
postings = append(postings, p)
169210
}
211+
212+
return index.Intersect(postings...), nil
213+
}
214+
215+
func getPostingCardinality(p index.Postings) (int, error) {
216+
totalCount := 0
217+
for p.Next() {
218+
totalCount++
219+
}
220+
221+
if p.Err() != nil {
222+
return 0, p.Err()
223+
}
224+
return totalCount, nil
170225
}
171226

172227
func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
@@ -200,10 +255,13 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe
200255

201256
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics *ingesterMetrics) error {
202257
currentLbsLimitHash := map[uint64]validation.LimitsPerLabelSet{}
203-
for _, l := range m.limiter.limits.LimitsPerLabelSet(u.userID) {
258+
limits := m.limiter.limits.LimitsPerLabelSet(u.userID)
259+
for _, l := range limits {
204260
currentLbsLimitHash[l.Hash] = l
205261
}
206262

263+
nonDefaultPartitionRemoved := false
264+
var defaultPartitionHash uint64
207265
for i := 0; i < numMetricCounterShards; i++ {
208266
s := m.shards[i]
209267
s.RLock()
@@ -215,17 +273,31 @@ func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, metrics
215273
metrics.limitsPerLabelSet.DeleteLabelValues(u.userID, "max_series", lbls)
216274
continue
217275
}
218-
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
219-
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
220-
delete(currentLbsLimitHash, h)
276+
// Delay deletion of default partition from current label limits as if
277+
// another label set is removed then we need to backfill default partition again.
278+
if entry.labels.Len() > 0 {
279+
metrics.usagePerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(entry.count))
280+
metrics.limitsPerLabelSet.WithLabelValues(u.userID, "max_series", lbls).Set(float64(currentLbsLimitHash[h].Limits.MaxSeries))
281+
delete(currentLbsLimitHash, h)
282+
nonDefaultPartitionRemoved = true
283+
} else {
284+
defaultPartitionHash = h
285+
}
221286
}
222287
s.RUnlock()
223288
}
224289

290+
// No partitions with label sets configured got removed, no need to backfill default partition.
291+
if !nonDefaultPartitionRemoved {
292+
delete(currentLbsLimitHash, defaultPartitionHash)
293+
}
294+
225295
// Backfill all limits that are not being tracked yet
226296
for _, l := range currentLbsLimitHash {
227297
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
228-
count, err := m.backFillLimit(ctx, u, l, s)
298+
// Force backfill is enabled to make sure we update the counter for the default partition
299+
// when other limits got removed.
300+
count, err := m.backFillLimit(ctx, u, true, limits, l, s)
229301
if err != nil {
230302
return err
231303
}

0 commit comments

Comments
 (0)