Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ingester Push() on blocks storage when the per-user or per-metric series limit is reached #3971

Merged
merged 4 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func grpcForwardableError(userID string, code int, e error) error {
})
}

// Note: does not retain a reference to `err`
// wrapWithUser prepends the user to the error. It does not retain a reference to err.
func wrapWithUser(err error, userID string) error {
return fmt.Errorf("user=%s: %s", userID, err)
}
15 changes: 9 additions & 6 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,11 @@ func TestIngesterUserLimitExceeded(t *testing.T) {
testLimits := func() {
// Append to two series, expect series-exceeded error.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, []cortexpb.Sample{sample2, sample3}, nil, cortexpb.API))
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusBadRequest {
t.Fatalf("expected error about exceeding metrics per user, got %v", err)
}
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
assert.Equal(t, wrapWithUser(makeLimitError(perUserSeriesLimit, ing.limiter.FormatError(userID, errMaxSeriesPerUserLimitExceeded)), userID).Error(), string(httpResp.Body))

// Append two metadata, expect no error since metadata is a best effort approach.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, cortexpb.API))
require.NoError(t, err)
Expand Down Expand Up @@ -699,9 +701,10 @@ func TestIngesterMetricLimitExceeded(t *testing.T) {
testLimits := func() {
// Append two series, expect series-exceeded error.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, []cortexpb.Sample{sample2, sample3}, nil, cortexpb.API))
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusBadRequest {
t.Fatalf("expected error about exceeding series per metric, got %v", err)
}
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
assert.Equal(t, wrapWithUser(makeMetricLimitError(perMetricSeriesLimit, labels3, ing.limiter.FormatError(userID, errMaxSeriesPerMetricLimitExceeded)), userID).Error(), string(httpResp.Body))

// Append two metadata for the same metric. Drop the second one, and expect no error since metadata is a best effort approach.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, cortexpb.API))
Expand Down
48 changes: 29 additions & 19 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {

// Total series limit.
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil {
return makeLimitError(perUserSeriesLimit, err)
return err
}

// Series per metric name limit.
Expand All @@ -225,7 +225,7 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}
if err := u.seriesInMetric.canAddSeriesFor(u.userID, metricName); err != nil {
return makeMetricLimitError(perMetricSeriesLimit, metric, err)
return err
}

return nil
Expand Down Expand Up @@ -721,13 +721,14 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
newValueForTimestampCount = 0
otherDiscardedReasonsCount = map[string]int{}
succeededSamplesCount = 0
failedSamplesCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
)

// Walk the samples, appending them to the users database
Expand Down Expand Up @@ -782,6 +783,7 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
cause := errors.Cause(err)

if cause == storage.ErrOutOfBounds || cause == storage.ErrOutOfOrderSample || cause == storage.ErrDuplicateSampleForTimestamp {
if firstPartialErr == nil {
firstPartialErr = wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels)
Expand All @@ -797,15 +799,20 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
}

continue
}

var ve *validationError
if errors.As(cause, &ve) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because, following the code flow, I think the only errors that could be returned are per-user and per-metric series limit.

// Caused by limits.
if firstPartialErr == nil {
firstPartialErr = ve
} else if cause == errMaxSeriesPerUserLimitExceeded || cause == errMaxSeriesPerMetricLimitExceeded {
switch cause {
case errMaxSeriesPerUserLimitExceeded:
if firstPartialErr == nil {
firstPartialErr = makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause))
}
perUserSeriesLimitCount++
case errMaxSeriesPerMetricLimitExceeded:
if firstPartialErr == nil {
firstPartialErr = makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
}
perMetricSeriesLimitCount++
}
otherDiscardedReasonsCount[ve.errorType]++

continue
}

Expand Down Expand Up @@ -857,8 +864,11 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
if newValueForTimestampCount > 0 {
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
}
for reason, count := range otherDiscardedReasonsCount {
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(count))
if perUserSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
}
if perMetricSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}

switch req.Source {
Expand Down
99 changes: 68 additions & 31 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"fmt"
"math"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
errMaxSeriesPerMetricLimitExceeded = "per-metric series limit of %d exceeded, please contact administrator to raise it. (local limit: %d global limit: %d actual local limit: %d)"
errMaxSeriesPerUserLimitExceeded = "per-user series limit of %d exceeded, please contact administrator to raise it. (local limit: %d global limit: %d actual local limit: %d)"
errMaxMetadataPerMetricLimitExceeded = "per-metric metadata limit of %d exceeded, please contact administrator to raise it. (local limit: %d global limit: %d actual local limit: %d)"
errMaxMetadataPerUserLimitExceeded = "per-user metric metadata limit of %d exceeded, please contact administrator to raise it. (local limit: %d global limit: %d actual local limit: %d)"
var (
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
)

// RingCount is the interface exposed by a ring implementation which allows
Expand Down Expand Up @@ -56,66 +58,101 @@ func NewLimiter(
// AssertMaxSeriesPerMetric limit has not been reached compared to the current
// number of series in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerMetric(userID string, series int) error {
actualLimit := l.maxSeriesPerMetric(userID)
if series < actualLimit {
if actualLimit := l.maxSeriesPerMetric(userID); series < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)

return fmt.Errorf(errMaxSeriesPerMetricLimitExceeded, minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
return errMaxSeriesPerMetricLimitExceeded
}

// AssertMaxMetadataPerMetric limit has not been reached compared to the current
// number of metadata per metric in input and returns an error if so.
func (l *Limiter) AssertMaxMetadataPerMetric(userID string, metadata int) error {
actualLimit := l.maxMetadataPerMetric(userID)

if metadata < actualLimit {
if actualLimit := l.maxMetadataPerMetric(userID); metadata < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalMetadataPerMetric(userID)
globalLimit := l.limits.MaxGlobalMetadataPerMetric(userID)

return fmt.Errorf(errMaxMetadataPerMetricLimitExceeded, minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
return errMaxMetadataPerMetricLimitExceeded
}

// AssertMaxSeriesPerUser limit has not been reached compared to the current
// number of series in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error {
actualLimit := l.maxSeriesPerUser(userID)
if series < actualLimit {
if actualLimit := l.maxSeriesPerUser(userID); series < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalSeriesPerUser(userID)
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)

return fmt.Errorf(errMaxSeriesPerUserLimitExceeded, minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
return errMaxSeriesPerUserLimitExceeded
}

// AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error {
actualLimit := l.maxMetadataPerUser(userID)

if metrics < actualLimit {
if actualLimit := l.maxMetadataPerUser(userID); metrics < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalMetricsWithMetadataPerUser(userID)
globalLimit := l.limits.MaxGlobalMetricsWithMetadataPerUser(userID)

return fmt.Errorf(errMaxMetadataPerUserLimitExceeded, minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
return errMaxMetadataPerUserLimitExceeded
}

// MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
func (l *Limiter) MaxSeriesPerQuery(userID string) int {
return l.limits.MaxSeriesPerQuery(userID)
}

// FormatError returns the input error enriched with the actual limits for the given user.
// It acts as pass-through if the input error is unknown.
func (l *Limiter) FormatError(userID string, err error) error {
switch err {
case errMaxSeriesPerUserLimitExceeded:
return l.formatMaxSeriesPerUserError(userID)
case errMaxSeriesPerMetricLimitExceeded:
return l.formatMaxSeriesPerMetricError(userID)
case errMaxMetadataPerUserLimitExceeded:
return l.formatMaxMetadataPerUserError(userID)
case errMaxMetadataPerMetricLimitExceeded:
return l.formatMaxMetadataPerMetricError(userID)
default:
return err
}
}

func (l *Limiter) formatMaxSeriesPerUserError(userID string) error {
actualLimit := l.maxSeriesPerUser(userID)
localLimit := l.limits.MaxLocalSeriesPerUser(userID)
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)

return fmt.Errorf("per-user series limit of %d exceeded, please contact administrator to raise it (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxSeriesPerMetricError(userID string) error {
actualLimit := l.maxSeriesPerMetric(userID)
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)

return fmt.Errorf("per-metric series limit of %d exceeded, please contact administrator to raise it (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxMetadataPerUserError(userID string) error {
actualLimit := l.maxMetadataPerUser(userID)
localLimit := l.limits.MaxLocalMetricsWithMetadataPerUser(userID)
globalLimit := l.limits.MaxGlobalMetricsWithMetadataPerUser(userID)

return fmt.Errorf("per-user metric metadata limit of %d exceeded, please contact administrator to raise it (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxMetadataPerMetricError(userID string) error {
actualLimit := l.maxMetadataPerMetric(userID)
localLimit := l.limits.MaxLocalMetadataPerMetric(userID)
globalLimit := l.limits.MaxGlobalMetadataPerMetric(userID)

return fmt.Errorf("per-metric metadata limit of %d exceeded, please contact administrator to raise it (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), localLimit, globalLimit, actualLimit)
}

func (l *Limiter) maxSeriesPerMetric(userID string) int {
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)
Expand Down
44 changes: 39 additions & 5 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ingester

import (
"fmt"
"errors"
"math"
"testing"

Expand Down Expand Up @@ -270,7 +270,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) {
ringIngesterCount: 10,
shardByAllLabels: true,
series: 300,
expected: fmt.Errorf(errMaxSeriesPerMetricLimitExceeded, 1000, 0, 1000, 300),
expected: errMaxSeriesPerMetricLimitExceeded,
},
}

Expand Down Expand Up @@ -332,7 +332,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) {
ringIngesterCount: 10,
shardByAllLabels: true,
metadata: 300,
expected: fmt.Errorf(errMaxMetadataPerMetricLimitExceeded, 1000, 0, 1000, 300),
expected: errMaxMetadataPerMetricLimitExceeded,
},
}

Expand Down Expand Up @@ -395,7 +395,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
ringIngesterCount: 10,
shardByAllLabels: true,
series: 300,
expected: fmt.Errorf(errMaxSeriesPerUserLimitExceeded, 1000, 0, 1000, 300),
expected: errMaxSeriesPerUserLimitExceeded,
},
}

Expand Down Expand Up @@ -458,7 +458,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
ringIngesterCount: 10,
shardByAllLabels: true,
metadata: 300,
expected: fmt.Errorf(errMaxMetadataPerUserLimitExceeded, 1000, 0, 1000, 300),
expected: errMaxMetadataPerUserLimitExceeded,
},
}

Expand Down Expand Up @@ -486,6 +486,40 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
}
}

func TestLimiter_FormatError(t *testing.T) {
// Mock the ring
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(3)
ring.On("ZonesCount").Return(1)

// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxGlobalSeriesPerUser: 100,
MaxGlobalSeriesPerMetric: 20,
MaxGlobalMetricsWithMetadataPerUser: 10,
MaxGlobalMetadataPerMetric: 3,
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, true, 3, false)

actual := limiter.FormatError("user-1", errMaxSeriesPerUserLimitExceeded)
assert.EqualError(t, actual, "per-user series limit of 100 exceeded, please contact administrator to raise it (local limit: 0 global limit: 100 actual local limit: 100)")

actual = limiter.FormatError("user-1", errMaxSeriesPerMetricLimitExceeded)
assert.EqualError(t, actual, "per-metric series limit of 20 exceeded, please contact administrator to raise it (local limit: 0 global limit: 20 actual local limit: 20)")

actual = limiter.FormatError("user-1", errMaxMetadataPerUserLimitExceeded)
assert.EqualError(t, actual, "per-user metric metadata limit of 10 exceeded, please contact administrator to raise it (local limit: 0 global limit: 10 actual local limit: 10)")

actual = limiter.FormatError("user-1", errMaxMetadataPerMetricLimitExceeded)
assert.EqualError(t, actual, "per-metric metadata limit of 3 exceeded, please contact administrator to raise it (local limit: 0 global limit: 3 actual local limit: 3)")

input := errors.New("unknown error")
actual = limiter.FormatError("user-1", input)
assert.Equal(t, input, actual)
}

func TestLimiter_minNonZero(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/user_metrics_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func (mm *userMetricsMetadata) add(metric string, metadata *cortexpb.MetricMetad
// Verify that the user can create more metric metadata given we don't have a set for that metric name.
if err := mm.limiter.AssertMaxMetricsWithMetadataPerUser(mm.userID, len(mm.metricToMetadata)); err != nil {
validation.DiscardedMetadata.WithLabelValues(mm.userID, perUserMetadataLimit).Inc()
return makeLimitError(perUserMetadataLimit, err)
return makeLimitError(perUserMetadataLimit, mm.limiter.FormatError(mm.userID, err))
}
set = metricMetadataSet{}
mm.metricToMetadata[metric] = set
}

if err := mm.limiter.AssertMaxMetadataPerMetric(mm.userID, len(set)); err != nil {
validation.DiscardedMetadata.WithLabelValues(mm.userID, perMetricMetadataLimit).Inc()
return makeLimitError(perMetricMetadataLimit, err)
return makeLimitError(perMetricMetadataLimit, mm.limiter.FormatError(mm.userID, err))
}

// if we have seen this metadata before, it is a no-op and we don't need to change our metrics.
Expand Down
Loading