Skip to content

Commit 27d68c2

Browse files
authored
Add default partition when no matching labelset liimits (#6435)
* implement default partition for limits per labelset but without any labels Signed-off-by: Ben Ye <[email protected]> * unlock properly Signed-off-by: Ben Ye <[email protected]> * handle new partition added and address comments Signed-off-by: Ben Ye <[email protected]> * changelog Signed-off-by: Ben Ye <[email protected]> * update doc Signed-off-by: Ben Ye <[email protected]> * update doc Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent e359f50 commit 27d68c2

File tree

9 files changed

+597
-63
lines changed

9 files changed

+597
-63
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@
4747
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
4848
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
4949
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay. #6277
50+
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
51+
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
5052
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
5153
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
5254
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
5355
* [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398
54-
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
5556
* [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409
5657

5758
## 1.18.1 2024-10-14

docs/configuration/config-file-reference.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -5770,7 +5770,9 @@ limits:
57705770
# would not enforce any limits.
57715771
[max_series: <int> | default = ]
57725772
5773-
# LabelSet which the limit should be applied.
5773+
# LabelSet which the limit should be applied. If no labels are provided, it
5774+
# becomes the default partition which matches any series that doesn't match any
5775+
# other explicitly defined label sets.'
57745776
[label_set: <map of string (labelName) to string (labelValue)> | default = []]
57755777
```
57765778

pkg/ingester/ingester_test.go

+112-4
Original file line numberDiff line numberDiff line change
@@ -366,22 +366,117 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
366366
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
367367
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
368368

369-
// Should remove metrics when the limits is removed
369+
// Add default partition -> no label set configured working as a fallback when a series
370+
// doesn't match any existing label set limit.
371+
emptyLabels := labels.EmptyLabels()
372+
defaultPartitionLimits := validation.LimitsPerLabelSet{LabelSet: emptyLabels,
373+
Limits: validation.LimitsPerLabelSetEntry{
374+
MaxSeries: 2,
375+
},
376+
}
377+
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
378+
b, err = json.Marshal(limits)
379+
require.NoError(t, err)
380+
require.NoError(t, limits.UnmarshalJSON(b))
381+
tenantLimits.setLimits(userID, &limits)
382+
383+
lbls = []string{labels.MetricName, "test_default"}
384+
for i := 0; i < 2; i++ {
385+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
386+
[]labels.Labels{labels.FromStrings(append(lbls, "series", strconv.Itoa(i))...)}, samples, nil, nil, cortexpb.API))
387+
require.NoError(t, err)
388+
}
389+
390+
// Max series limit for default partition is 2 so 1 more series will be throttled.
391+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(
392+
[]labels.Labels{labels.FromStrings(append(lbls, "extraLabel", "extraValueUpdate2")...)}, samples, nil, nil, cortexpb.API))
393+
httpResp, ok = httpgrpc.HTTPResponseFromError(err)
394+
require.True(t, ok, "returned error is not an httpgrpc response")
395+
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
396+
require.ErrorContains(t, err, emptyLabels.String())
397+
398+
ing.updateActiveSeries(ctx)
399+
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
400+
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
401+
# TYPE cortex_ingester_limits_per_labelset gauge
402+
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
403+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
404+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
405+
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
406+
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
407+
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
408+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
409+
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
410+
# TYPE cortex_ingester_usage_per_labelset gauge
411+
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
412+
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
413+
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
414+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
415+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
416+
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
417+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 2
418+
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
419+
420+
// Add a new label set limit.
421+
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet,
422+
validation.LimitsPerLabelSet{LabelSet: labels.FromMap(map[string]string{
423+
"series": "0",
424+
}),
425+
Limits: validation.LimitsPerLabelSetEntry{
426+
MaxSeries: 3,
427+
},
428+
},
429+
)
430+
b, err = json.Marshal(limits)
431+
require.NoError(t, err)
432+
require.NoError(t, limits.UnmarshalJSON(b))
433+
tenantLimits.setLimits(userID, &limits)
434+
ing.updateActiveSeries(ctx)
435+
// Default partition usage reduced from 2 to 1 as one series in default partition
436+
// now counted into the new partition.
437+
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
438+
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
439+
# TYPE cortex_ingester_limits_per_labelset gauge
440+
cortex_ingester_limits_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
441+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
442+
cortex_ingester_limits_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 10
443+
cortex_ingester_limits_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 10
444+
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
445+
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
446+
cortex_ingester_limits_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 3
447+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
448+
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
449+
# TYPE cortex_ingester_usage_per_labelset gauge
450+
cortex_ingester_usage_per_labelset{labelset="{__name__=\"metric_name\", comp2=\"compValue2\"}",limit="max_series",user="1"} 3
451+
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
452+
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
453+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",limit="max_series",user="1"} 2
454+
cortex_ingester_usage_per_labelset{labelset="{comp1=\"compValue1\"}",limit="max_series",user="1"} 7
455+
cortex_ingester_usage_per_labelset{labelset="{comp2=\"compValue2\"}",limit="max_series",user="1"} 3
456+
cortex_ingester_usage_per_labelset{labelset="{series=\"0\"}",limit="max_series",user="1"} 1
457+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 1
458+
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
459+
460+
// Should remove metrics when the limits is removed, keep default partition limit
370461
limits.LimitsPerLabelSet = limits.LimitsPerLabelSet[:2]
462+
limits.LimitsPerLabelSet = append(limits.LimitsPerLabelSet, defaultPartitionLimits)
371463
b, err = json.Marshal(limits)
372464
require.NoError(t, err)
373465
require.NoError(t, limits.UnmarshalJSON(b))
374466
tenantLimits.setLimits(userID, &limits)
375467
ing.updateActiveSeries(ctx)
468+
// Default partition usage increased from 2 to 10 as some existing partitions got removed.
376469
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
377470
# HELP cortex_ingester_limits_per_labelset Limits per user and labelset.
378471
# TYPE cortex_ingester_limits_per_labelset gauge
379472
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
380473
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
474+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
381475
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
382476
# TYPE cortex_ingester_usage_per_labelset gauge
383477
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
384478
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
479+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
385480
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
386481

387482
// Should persist between restarts
@@ -396,10 +491,12 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
396491
# TYPE cortex_ingester_limits_per_labelset gauge
397492
cortex_ingester_limits_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
398493
cortex_ingester_limits_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
494+
cortex_ingester_limits_per_labelset{labelset="{}",limit="max_series",user="1"} 2
399495
# HELP cortex_ingester_usage_per_labelset Current usage per user and labelset.
400496
# TYPE cortex_ingester_usage_per_labelset gauge
401497
cortex_ingester_usage_per_labelset{labelset="{label1=\"value1\"}",limit="max_series",user="1"} 3
402498
cortex_ingester_usage_per_labelset{labelset="{label2=\"value2\"}",limit="max_series",user="1"} 2
499+
cortex_ingester_usage_per_labelset{labelset="{}",limit="max_series",user="1"} 10
403500
`), "cortex_ingester_usage_per_labelset", "cortex_ingester_limits_per_labelset"))
404501
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
405502

@@ -420,6 +517,13 @@ func TestPushRace(t *testing.T) {
420517
MaxSeries: 10e10,
421518
},
422519
},
520+
{
521+
// Default partition.
522+
LabelSet: labels.EmptyLabels(),
523+
Limits: validation.LimitsPerLabelSetEntry{
524+
MaxSeries: 10e10,
525+
},
526+
},
423527
}
424528

425529
dir := t.TempDir()
@@ -451,6 +555,10 @@ func TestPushRace(t *testing.T) {
451555
defer wg.Done()
452556
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
453557
require.NoError(t, err)
558+
559+
// Go to default partition.
560+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "bar", "userId", userID, "k", strconv.Itoa(k))}, []cortexpb.Sample{sample1}, nil, nil, cortexpb.API))
561+
require.NoError(t, err)
454562
}()
455563
}
456564
}
@@ -472,13 +580,13 @@ func TestPushRace(t *testing.T) {
472580
err = ir.Series(p.At(), &builder, nil)
473581
require.NoError(t, err)
474582
lbls := builder.Labels()
475-
require.Equal(t, "foo", lbls.Get(labels.MetricName))
583+
require.True(t, lbls.Get(labels.MetricName) == "foo" || lbls.Get(labels.MetricName) == "bar")
476584
require.Equal(t, "1", lbls.Get("userId"))
477585
require.NotEmpty(t, lbls.Get("k"))
478586
builder.Reset()
479587
}
480-
require.Equal(t, numberOfSeries, total)
481-
require.Equal(t, uint64(numberOfSeries), db.Head().NumSeries())
588+
require.Equal(t, 2*numberOfSeries, total)
589+
require.Equal(t, uint64(2*numberOfSeries), db.Head().NumSeries())
482590
}
483591

484592
func TestIngesterUserLimitExceeded(t *testing.T) {

pkg/ingester/limiter.go

+6-22
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,15 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int
107107

108108
// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
109109
// number of metrics with metadata in input and returns an error if so.
110-
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.LimitsPerLabelSet) (int, error)) error {
111-
m := l.limitsPerLabelSets(userID, metric)
112-
for _, limit := range m {
110+
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(allLimits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error)) error {
111+
limits := l.limits.LimitsPerLabelSet(userID)
112+
matchedLimits := validation.LimitsPerLabelSetsForSeries(limits, metric)
113+
for _, limit := range matchedLimits {
113114
maxSeriesFunc := func(string) int {
114115
return limit.Limits.MaxSeries
115116
}
116117
local := l.maxByLocalAndGlobal(userID, maxSeriesFunc, maxSeriesFunc)
117-
if u, err := f(limit); err != nil {
118+
if u, err := f(limits, limit); err != nil {
118119
return err
119120
} else if u >= local {
120121
return errMaxSeriesPerLabelSetLimitExceeded{
@@ -191,24 +192,7 @@ func (l *Limiter) formatMaxSeriesPerLabelSetError(err errMaxSeriesPerLabelSetLim
191192

192193
func (l *Limiter) limitsPerLabelSets(userID string, metric labels.Labels) []validation.LimitsPerLabelSet {
193194
m := l.limits.LimitsPerLabelSet(userID)
194-
195-
// returning early to not have any overhead
196-
if len(m) == 0 {
197-
return nil
198-
}
199-
200-
r := make([]validation.LimitsPerLabelSet, 0, len(m))
201-
outer:
202-
for _, lbls := range m {
203-
for _, lbl := range lbls.LabelSet {
204-
// We did not find some of the labels on the set
205-
if v := metric.Get(lbl.Name); v != lbl.Value {
206-
continue outer
207-
}
208-
}
209-
r = append(r, lbls)
210-
}
211-
return r
195+
return validation.LimitsPerLabelSetsForSeries(m, metric)
212196
}
213197

214198
func (l *Limiter) maxSeriesPerMetric(userID string) int {

pkg/ingester/limiter_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
500500
require.NoError(t, err)
501501

502502
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
503-
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(set validation.LimitsPerLabelSet) (int, error) {
503+
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
504504
return testData.series, nil
505505
})
506506

0 commit comments

Comments
 (0)