Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize metrics tracking on ingester v2Push() errors #3969

Merged
merged 6 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [ENHANCEMENT] Ingester: reduce CPU and memory when an high number of errors are returned by the ingester on the write path with the blocks storage. #3969
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Querier: streamline tracing spans. #3924

Expand Down
35 changes: 28 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,15 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor

// Keep track of some stats which are tracked only if the samples will be
// successfully committed
succeededSamplesCount := 0
failedSamplesCount := 0
startAppend := time.Now()
var (
succeededSamplesCount = 0
failedSamplesCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
newValueForTimestampCount = 0
otherDiscardedReasonsCount = map[string]int{}
)

// Walk the samples, appending them to the users database
app := db.Appender(ctx)
Expand Down Expand Up @@ -783,11 +789,11 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor

switch cause {
case storage.ErrOutOfBounds:
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Inc()
sampleOutOfBoundsCount++
case storage.ErrOutOfOrderSample:
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Inc()
sampleOutOfOrderCount++
case storage.ErrDuplicateSampleForTimestamp:
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Inc()
newValueForTimestampCount++
}

continue
Expand All @@ -799,7 +805,7 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
if firstPartialErr == nil {
firstPartialErr = ve
}
validation.DiscardedSamples.WithLabelValues(ve.errorType, userID).Inc()
otherDiscardedReasonsCount[ve.errorType]++
continue
}

Expand Down Expand Up @@ -842,6 +848,21 @@ func (i *Ingester) v2Push(ctx context.Context, req *cortexpb.WriteRequest) (*cor
i.metrics.ingestedSamples.Add(float64(succeededSamplesCount))
i.metrics.ingestedSamplesFail.Add(float64(failedSamplesCount))

if sampleOutOfBoundsCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
}
if sampleOutOfOrderCount > 0 {
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
}
if newValueForTimestampCount > 0 {
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
}
if len(otherDiscardedReasonsCount) > 0 {
for reason, count := range otherDiscardedReasonsCount {
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(count))
}
}

switch req.Source {
case cortexpb.RULE:
db.ingestedRuleSamples.add(int64(succeededSamplesCount))
Expand Down
261 changes: 200 additions & 61 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,70 +606,206 @@ func TestIngester_v2Push_DecreaseInactiveSeries(t *testing.T) {
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
}

func Benchmark_Ingester_v2PushOnOutOfBoundsSamplesWithHighConcurrency(b *testing.B) {
const (
numSamplesPerRequest = 1000
numRequestsPerClient = 10
numConcurrentClients = 10000
func Benchmark_Ingester_v2PushOnError(b *testing.B) {
var (
ctx = user.InjectOrgID(context.Background(), userID)
sampleTimestamp = int64(100)
metricName = "test"
)

registry := prometheus.NewRegistry()
ctx := user.InjectOrgID(context.Background(), userID)
scenarios := map[string]struct {
numSeriesPerRequest int
numConcurrentClients int
}{
"no concurrency": {
numSeriesPerRequest: 1000,
numConcurrentClients: 1,
},
"low concurrency": {
numSeriesPerRequest: 1000,
numConcurrentClients: 100,
},
"high concurrency": {
numSeriesPerRequest: 1000,
numConcurrentClients: 1000,
},
}

// Create a mocked ingester
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0
tests := map[string]struct {
prepareConfig func(limits *validation.Limits)
beforeBenchmark func(b *testing.B, ingester *Ingester, numSeriesPerRequest int)
runBenchmark func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample)
}{
"out of bound samples": {
prepareConfig: func(limits *validation.Limits) {},
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
// Push a single time series to set the TSDB min time.
currTimeReq := cortexpb.ToWriteRequest(
[]labels.Labels{{{Name: labels.MetricName, Value: metricName}}},
[]cortexpb.Sample{{Value: 1, TimestampMs: util.TimeToMillis(time.Now())}},
nil,
cortexpb.API)
_, err := ingester.v2Push(ctx, currTimeReq)
require.NoError(b, err)
},
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
expectedErr := storage.ErrOutOfBounds.Error()

ingester, err := prepareIngesterWithBlocksStorage(b, cfg, registry)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester))
defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck
// Push out of bound samples.
for n := 0; n < b.N; n++ {
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ingester.lifecycler.GetState()
})
if !strings.Contains(err.Error(), expectedErr) {
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
}
}
},
},
"out of order samples": {
prepareConfig: func(limits *validation.Limits) {},
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
// For each series, push a single sample with a timestamp greater than next pushes.
for i := 0; i < numSeriesPerRequest; i++ {
currTimeReq := cortexpb.ToWriteRequest(
[]labels.Labels{{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}}},
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
nil,
cortexpb.API)

_, err := ingester.v2Push(ctx, currTimeReq)
require.NoError(b, err)
}
},
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
expectedErr := storage.ErrOutOfOrderSample.Error()

// Push a single time series to set the TSDB min time.
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)
// Push out of order samples.
for n := 0; n < b.N; n++ {
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck

currTimeReq := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: util.TimeToMillis(time.Now())}},
nil,
cortexpb.API)
_, err = ingester.v2Push(ctx, currTimeReq)
require.NoError(b, err)
if !strings.Contains(err.Error(), expectedErr) {
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
}
}
},
},
"per-user series limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLocalSeriesPerUser = 1
},
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
// Push a series with a metric name different than the one used during the benchmark.
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "another"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)

// Prepare a request containing out of bound samples.
metrics := make([]labels.Labels, 0, numSamplesPerRequest)
samples := make([]cortexpb.Sample, 0, numSamplesPerRequest)
for i := 0; i < numSamplesPerRequest; i++ {
metrics = append(metrics, metricLabels)
samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: 0})
}
outOfBoundReq := cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)
currTimeReq := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
nil,
cortexpb.API)
_, err := ingester.v2Push(ctx, currTimeReq)
require.NoError(b, err)
},
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
expectedErr := "per-user series limit"

// Run the benchmark.
wg := sync.WaitGroup{}
wg.Add(numConcurrentClients)
start := make(chan struct{})
// Push series with a different name than the one already pushed.
for n := 0; n < b.N; n++ {
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck

for c := 0; c < numConcurrentClients; c++ {
go func() {
defer wg.Done()
<-start
if !strings.Contains(err.Error(), expectedErr) {
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
}
}
},
},
"per-metric series limit reached": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxLocalSeriesPerMetric = 1
},
beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) {
// Push a series with the same metric name but different labels than the one used during the benchmark.
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: "another"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)

for n := 0; n < numRequestsPerClient; n++ {
ingester.v2Push(ctx, outOfBoundReq) // nolint:errcheck
}
}()
currTimeReq := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: sampleTimestamp + 1}},
nil,
cortexpb.API)
_, err := ingester.v2Push(ctx, currTimeReq)
require.NoError(b, err)
},
runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) {
expectedErr := "per-metric series limit"

// Push series with different labels than the one already pushed.
for n := 0; n < b.N; n++ {
_, err := ingester.v2Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)) // nolint:errcheck

if !strings.Contains(err.Error(), expectedErr) {
b.Fatalf("unexpected error. expected: %s actual: %s", expectedErr, err.Error())
}
}
},
},
}

b.ResetTimer()
close(start)
wg.Wait()
for testName, testData := range tests {
for scenarioName, scenario := range scenarios {
b.Run(fmt.Sprintf("failure: %s, scenario: %s", testName, scenarioName), func(b *testing.B) {
registry := prometheus.NewRegistry()

// Create a mocked ingester
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0

limits := defaultLimitsTestConfig()
testData.prepareConfig(&limits)

ingester, err := prepareIngesterWithBlocksStorageAndLimits(b, cfg, limits, "", registry)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingester))
defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(b, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ingester.lifecycler.GetState()
})

testData.beforeBenchmark(b, ingester, scenario.numSeriesPerRequest)

// Prepare the request.
metrics := make([]labels.Labels, 0, scenario.numSeriesPerRequest)
samples := make([]cortexpb.Sample, 0, scenario.numSeriesPerRequest)
for i := 0; i < scenario.numSeriesPerRequest; i++ {
metrics = append(metrics, labels.Labels{{Name: labels.MetricName, Value: metricName}, {Name: "cardinality", Value: strconv.Itoa(i)}})
samples = append(samples, cortexpb.Sample{Value: float64(i), TimestampMs: sampleTimestamp})
}

// Run the benchmark.
wg := sync.WaitGroup{}
wg.Add(scenario.numConcurrentClients)
start := make(chan struct{})

b.ReportAllocs()
b.ResetTimer()

for c := 0; c < scenario.numConcurrentClients; c++ {
go func() {
defer wg.Done()
<-start

testData.runBenchmark(b, ingester, metrics, samples)
}()
}

b.ResetTimer()
close(start)
wg.Wait()
})
}
}
}

func Test_Ingester_v2LabelNames(t *testing.T) {
Expand Down Expand Up @@ -1729,19 +1865,22 @@ func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestamp
}

func prepareIngesterWithBlocksStorage(t testing.TB, ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, error) {
dataDir, err := ioutil.TempDir("", "ingester")
if err != nil {
return nil, err
}

t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dataDir))
})

return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), dataDir, registerer)
return prepareIngesterWithBlocksStorageAndLimits(t, ingesterCfg, defaultLimitsTestConfig(), "", registerer)
}

func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config, limits validation.Limits, dataDir string, registerer prometheus.Registerer) (*Ingester, error) {
// Create a data dir if none has been provided.
if dataDir == "" {
var err error
if dataDir, err = ioutil.TempDir("", "ingester"); err != nil {
return nil, err
}

t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dataDir))
})
}

bucketDir, err := ioutil.TempDir("", "bucket")
if err != nil {
return nil, err
Expand Down