-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement max series limit in distributor for /series
API
#4683
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1863,13 +1863,17 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
matchers []*labels.Matcher | ||
expectedResult []metric.Metric | ||
expectedIngesters int | ||
queryLimiter *limiter.QueryLimiter | ||
expectedErr error | ||
}{ | ||
"should return an empty response if no metric match": { | ||
matchers: []*labels.Matcher{ | ||
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"), | ||
}, | ||
expectedResult: []metric.Metric{}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should filter metrics by single matcher": { | ||
matchers: []*labels.Matcher{ | ||
|
@@ -1880,6 +1884,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
{Metric: util.LabelsToMetric(fixtures[1].lbls)}, | ||
}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should filter metrics by multiple matchers": { | ||
matchers: []*labels.Matcher{ | ||
|
@@ -1890,6 +1896,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
{Metric: util.LabelsToMetric(fixtures[0].lbls)}, | ||
}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should return all matching metrics even if their FastFingerprint collide": { | ||
matchers: []*labels.Matcher{ | ||
|
@@ -1900,6 +1908,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
{Metric: util.LabelsToMetric(fixtures[4].lbls)}, | ||
}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": { | ||
shuffleShardEnabled: true, | ||
|
@@ -1912,6 +1922,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
{Metric: util.LabelsToMetric(fixtures[1].lbls)}, | ||
}, | ||
expectedIngesters: 3, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should query all ingesters if shuffle sharding is enabled but shard size is 0": { | ||
shuffleShardEnabled: true, | ||
|
@@ -1924,6 +1936,30 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
{Metric: util.LabelsToMetric(fixtures[1].lbls)}, | ||
}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(0, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
"should return err if series limit is exhausted": { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a test where:
In this case we shouldn't hit the limit. The purpose of this test is to ensure that the limit is applied against de-duped time series. I know the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for reviewing. The replication factor is 3 by default. I added a test to cover the case where one series is queried with limit is set to 1. |
||
shuffleShardEnabled: true, | ||
shuffleShardSize: 0, | ||
matchers: []*labels.Matcher{ | ||
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"), | ||
}, | ||
expectedResult: nil, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(1, 0, 0), | ||
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), | ||
}, | ||
"should not exhaust series limit when only one series is fetched": { | ||
matchers: []*labels.Matcher{ | ||
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"), | ||
}, | ||
expectedResult: []metric.Metric{ | ||
{Metric: util.LabelsToMetric(fixtures[2].lbls)}, | ||
}, | ||
expectedIngesters: numIngesters, | ||
queryLimiter: limiter.NewQueryLimiter(1, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
} | ||
|
||
|
@@ -1943,6 +1979,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
|
||
// Push fixtures | ||
ctx := user.InjectOrgID(context.Background(), "test") | ||
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) | ||
|
||
for _, series := range fixtures { | ||
req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp) | ||
|
@@ -1951,6 +1988,12 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
} | ||
|
||
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) | ||
|
||
if testData.expectedErr != nil { | ||
assert.EqualError(t, err, testData.expectedErr.Error()) | ||
return | ||
} | ||
|
||
require.NoError(t, err) | ||
assert.ElementsMatch(t, testData.expectedResult, metrics) | ||
|
||
|
@@ -1963,6 +2006,97 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { | |
} | ||
} | ||
|
||
func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) { | ||
const ( | ||
numIngesters = 100 | ||
numSeriesPerRequest = 100 | ||
) | ||
|
||
tests := map[string]struct { | ||
prepareConfig func(limits *validation.Limits) | ||
prepareSeries func() ([]labels.Labels, []cortexpb.Sample) | ||
matchers []*labels.Matcher | ||
queryLimiter *limiter.QueryLimiter | ||
expectedErr error | ||
}{ | ||
"get series within limits": { | ||
prepareConfig: func(limits *validation.Limits) {}, | ||
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) { | ||
metrics := make([]labels.Labels, numSeriesPerRequest) | ||
samples := make([]cortexpb.Sample, numSeriesPerRequest) | ||
|
||
for i := 0; i < numSeriesPerRequest; i++ { | ||
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: fmt.Sprintf("foo_%d", i)}}) | ||
for i := 0; i < 10; i++ { | ||
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i)) | ||
} | ||
|
||
metrics[i] = lbls.Labels() | ||
samples[i] = cortexpb.Sample{ | ||
Value: float64(i), | ||
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond), | ||
} | ||
} | ||
|
||
return metrics, samples | ||
}, | ||
matchers: []*labels.Matcher{ | ||
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"), | ||
}, | ||
queryLimiter: limiter.NewQueryLimiter(100, 0, 0), | ||
expectedErr: nil, | ||
}, | ||
} | ||
|
||
for testName, testData := range tests { | ||
b.Run(testName, func(b *testing.B) { | ||
// Create distributor | ||
ds, ingesters, _, _ := prepare(b, prepConfig{ | ||
numIngesters: numIngesters, | ||
happyIngesters: numIngesters, | ||
numDistributors: 1, | ||
shardByAllLabels: true, | ||
shuffleShardEnabled: false, | ||
shuffleShardSize: 0, | ||
}) | ||
|
||
// Push fixtures | ||
ctx := user.InjectOrgID(context.Background(), "test") | ||
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) | ||
|
||
// Prepare the series to remote write before starting the benchmark. | ||
metrics, samples := testData.prepareSeries() | ||
|
||
if _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)); err != nil { | ||
b.Fatalf("error pushing to distributor %v", err) | ||
} | ||
|
||
// Run the benchmark. | ||
b.ReportAllocs() | ||
b.ResetTimer() | ||
|
||
for n := 0; n < b.N; n++ { | ||
now := model.Now() | ||
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) | ||
|
||
if testData.expectedErr != nil { | ||
assert.EqualError(b, err, testData.expectedErr.Error()) | ||
return | ||
} | ||
|
||
require.NoError(b, err) | ||
|
||
// Check how many ingesters have been queried. | ||
// Due to the quorum the distributor could cancel the last request towards ingesters | ||
// if all other ones are successful, so we're good either has been queried X or X-1 | ||
// ingesters. | ||
assert.Contains(b, []int{numIngesters, numIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers")) | ||
assert.Equal(b, numSeriesPerRequest, len(metrics)) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestDistributor_MetricsMetadata(t *testing.T) { | ||
const numIngesters = 5 | ||
|
||
|
@@ -2058,7 +2192,7 @@ type prepConfig struct { | |
errFail error | ||
} | ||
|
||
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { | ||
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { | ||
ingesters := []mockIngester{} | ||
for i := 0; i < cfg.happyIngesters; i++ { | ||
ingesters = append(ingesters, mockIngester{ | ||
|
@@ -2095,7 +2229,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
} | ||
|
||
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) | ||
t.Cleanup(func() { assert.NoError(t, closer.Close()) }) | ||
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) }) | ||
|
||
err := kvStore.CAS(context.Background(), ingester.RingKey, | ||
func(_ interface{}) (interface{}, bool, error) { | ||
|
@@ -2104,7 +2238,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
}, true, nil | ||
}, | ||
) | ||
require.NoError(t, err) | ||
require.NoError(tb, err) | ||
|
||
// Use a default replication factor of 3 if there isn't a provided replication factor. | ||
rf := cfg.replicationFactor | ||
|
@@ -2119,10 +2253,10 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
HeartbeatTimeout: 60 * time.Minute, | ||
ReplicationFactor: rf, | ||
}, ingester.RingKey, ingester.RingKey, nil, nil) | ||
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) | ||
require.NoError(tb, err) | ||
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), ingestersRing)) | ||
|
||
test.Poll(t, time.Second, cfg.numIngesters, func() interface{} { | ||
test.Poll(tb, time.Second, cfg.numIngesters, func() interface{} { | ||
return ingestersRing.InstancesCount() | ||
}) | ||
|
||
|
@@ -2163,7 +2297,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
if cfg.enableTracker { | ||
codec := GetReplicaDescCodec() | ||
ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) | ||
t.Cleanup(func() { assert.NoError(t, closer.Close()) }) | ||
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) }) | ||
mock := kv.PrefixClient(ringStore, "prefix") | ||
distributorCfg.HATrackerConfig = HATrackerConfig{ | ||
EnableHATracker: true, | ||
|
@@ -2175,12 +2309,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
} | ||
|
||
overrides, err := validation.NewOverrides(*cfg.limits, nil) | ||
require.NoError(t, err) | ||
require.NoError(tb, err) | ||
|
||
reg := prometheus.NewPedanticRegistry() | ||
d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, reg, log.NewNopLogger()) | ||
require.NoError(t, err) | ||
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) | ||
require.NoError(tb, err) | ||
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), d)) | ||
|
||
distributors = append(distributors, d) | ||
registries = append(registries, reg) | ||
|
@@ -2189,12 +2323,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p | |
// If the distributors ring is setup, wait until the first distributor | ||
// updates to the expected size | ||
if distributors[0].distributorsRing != nil { | ||
test.Poll(t, time.Second, cfg.numDistributors, func() interface{} { | ||
test.Poll(tb, time.Second, cfg.numDistributors, func() interface{} { | ||
return distributors[0].distributorsLifeCycler.HealthyInstancesCount() | ||
}) | ||
} | ||
|
||
t.Cleanup(func() { stopAll(distributors, ingestersRing) }) | ||
tb.Cleanup(func() { stopAll(distributors, ingestersRing) }) | ||
|
||
return distributors, ingesters, registries, ingestersRing | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this would you be able to write some
go test -bench
(or use existing one if any) to and post before and after?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a benchmark test
BenchmarkDistributor_MetricsForLabelMatchers
.Without the locking:
With the locking
I didn't see any significant differences.