Skip to content

Commit 4f82673

Browse files
authored
Implement max series limit in distributor for /series API (#4683)
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 805dc82 commit 4f82673

File tree

3 files changed

+168
-22
lines changed

3 files changed

+168
-22
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
66
* [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686
77
* [CHANGE] Enable Thanos series limiter in store-gateway. #4702
8+
* [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683
89

910
## 1.12.0 in progress
1011

pkg/distributor/distributor.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"sort"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/go-kit/log"
@@ -945,6 +946,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
945946
// MetricsForLabelMatchers gets the metrics that match said matchers
946947
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
947948
replicationSet, err := d.GetIngestersForMetadata(ctx)
949+
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
948950
if err != nil {
949951
return nil, err
950952
}
@@ -953,20 +955,29 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
953955
if err != nil {
954956
return nil, err
955957
}
956-
957-
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
958-
return client.MetricsForLabelMatchers(ctx, req)
959-
})
960-
if err != nil {
961-
return nil, err
962-
}
963-
958+
mutex := sync.Mutex{}
964959
metrics := map[model.Fingerprint]model.Metric{}
965-
for _, resp := range resps {
966-
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*ingester_client.MetricsForLabelMatchersResponse))
960+
961+
_, err = d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
962+
resp, err := client.MetricsForLabelMatchers(ctx, req)
963+
if err != nil {
964+
return nil, err
965+
}
966+
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
967967
for _, m := range ms {
968+
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
969+
return nil, err
970+
}
971+
mutex.Lock()
968972
metrics[m.Fingerprint()] = m
973+
mutex.Unlock()
969974
}
975+
976+
return resp, nil
977+
})
978+
979+
if err != nil {
980+
return nil, err
970981
}
971982

972983
result := make([]metric.Metric, 0, len(metrics))

pkg/distributor/distributor_test.go

+146-12
Original file line numberDiff line numberDiff line change
@@ -1863,13 +1863,17 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
18631863
matchers []*labels.Matcher
18641864
expectedResult []metric.Metric
18651865
expectedIngesters int
1866+
queryLimiter *limiter.QueryLimiter
1867+
expectedErr error
18661868
}{
18671869
"should return an empty response if no metric match": {
18681870
matchers: []*labels.Matcher{
18691871
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"),
18701872
},
18711873
expectedResult: []metric.Metric{},
18721874
expectedIngesters: numIngesters,
1875+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1876+
expectedErr: nil,
18731877
},
18741878
"should filter metrics by single matcher": {
18751879
matchers: []*labels.Matcher{
@@ -1880,6 +1884,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
18801884
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
18811885
},
18821886
expectedIngesters: numIngesters,
1887+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1888+
expectedErr: nil,
18831889
},
18841890
"should filter metrics by multiple matchers": {
18851891
matchers: []*labels.Matcher{
@@ -1890,6 +1896,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
18901896
{Metric: util.LabelsToMetric(fixtures[0].lbls)},
18911897
},
18921898
expectedIngesters: numIngesters,
1899+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1900+
expectedErr: nil,
18931901
},
18941902
"should return all matching metrics even if their FastFingerprint collide": {
18951903
matchers: []*labels.Matcher{
@@ -1900,6 +1908,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19001908
{Metric: util.LabelsToMetric(fixtures[4].lbls)},
19011909
},
19021910
expectedIngesters: numIngesters,
1911+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1912+
expectedErr: nil,
19031913
},
19041914
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
19051915
shuffleShardEnabled: true,
@@ -1912,6 +1922,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19121922
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19131923
},
19141924
expectedIngesters: 3,
1925+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1926+
expectedErr: nil,
19151927
},
19161928
"should query all ingesters if shuffle sharding is enabled but shard size is 0": {
19171929
shuffleShardEnabled: true,
@@ -1924,6 +1936,30 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19241936
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
19251937
},
19261938
expectedIngesters: numIngesters,
1939+
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
1940+
expectedErr: nil,
1941+
},
1942+
"should return err if series limit is exhausted": {
1943+
shuffleShardEnabled: true,
1944+
shuffleShardSize: 0,
1945+
matchers: []*labels.Matcher{
1946+
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
1947+
},
1948+
expectedResult: nil,
1949+
expectedIngesters: numIngesters,
1950+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
1951+
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
1952+
},
1953+
"should not exhaust series limit when only one series is fetched": {
1954+
matchers: []*labels.Matcher{
1955+
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"),
1956+
},
1957+
expectedResult: []metric.Metric{
1958+
{Metric: util.LabelsToMetric(fixtures[2].lbls)},
1959+
},
1960+
expectedIngesters: numIngesters,
1961+
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
1962+
expectedErr: nil,
19271963
},
19281964
}
19291965

@@ -1943,6 +1979,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19431979

19441980
// Push fixtures
19451981
ctx := user.InjectOrgID(context.Background(), "test")
1982+
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter)
19461983

19471984
for _, series := range fixtures {
19481985
req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp)
@@ -1951,6 +1988,12 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19511988
}
19521989

19531990
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
1991+
1992+
if testData.expectedErr != nil {
1993+
assert.EqualError(t, err, testData.expectedErr.Error())
1994+
return
1995+
}
1996+
19541997
require.NoError(t, err)
19551998
assert.ElementsMatch(t, testData.expectedResult, metrics)
19561999

@@ -1963,6 +2006,97 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19632006
}
19642007
}
19652008

2009+
func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
2010+
const (
2011+
numIngesters = 100
2012+
numSeriesPerRequest = 100
2013+
)
2014+
2015+
tests := map[string]struct {
2016+
prepareConfig func(limits *validation.Limits)
2017+
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
2018+
matchers []*labels.Matcher
2019+
queryLimiter *limiter.QueryLimiter
2020+
expectedErr error
2021+
}{
2022+
"get series within limits": {
2023+
prepareConfig: func(limits *validation.Limits) {},
2024+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
2025+
metrics := make([]labels.Labels, numSeriesPerRequest)
2026+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
2027+
2028+
for i := 0; i < numSeriesPerRequest; i++ {
2029+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: fmt.Sprintf("foo_%d", i)}})
2030+
for i := 0; i < 10; i++ {
2031+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
2032+
}
2033+
2034+
metrics[i] = lbls.Labels()
2035+
samples[i] = cortexpb.Sample{
2036+
Value: float64(i),
2037+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
2038+
}
2039+
}
2040+
2041+
return metrics, samples
2042+
},
2043+
matchers: []*labels.Matcher{
2044+
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"),
2045+
},
2046+
queryLimiter: limiter.NewQueryLimiter(100, 0, 0),
2047+
expectedErr: nil,
2048+
},
2049+
}
2050+
2051+
for testName, testData := range tests {
2052+
b.Run(testName, func(b *testing.B) {
2053+
// Create distributor
2054+
ds, ingesters, _, _ := prepare(b, prepConfig{
2055+
numIngesters: numIngesters,
2056+
happyIngesters: numIngesters,
2057+
numDistributors: 1,
2058+
shardByAllLabels: true,
2059+
shuffleShardEnabled: false,
2060+
shuffleShardSize: 0,
2061+
})
2062+
2063+
// Push fixtures
2064+
ctx := user.InjectOrgID(context.Background(), "test")
2065+
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter)
2066+
2067+
// Prepare the series to remote write before starting the benchmark.
2068+
metrics, samples := testData.prepareSeries()
2069+
2070+
if _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)); err != nil {
2071+
b.Fatalf("error pushing to distributor %v", err)
2072+
}
2073+
2074+
// Run the benchmark.
2075+
b.ReportAllocs()
2076+
b.ResetTimer()
2077+
2078+
for n := 0; n < b.N; n++ {
2079+
now := model.Now()
2080+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
2081+
2082+
if testData.expectedErr != nil {
2083+
assert.EqualError(b, err, testData.expectedErr.Error())
2084+
return
2085+
}
2086+
2087+
require.NoError(b, err)
2088+
2089+
// Check how many ingesters have been queried.
2090+
// Due to the quorum the distributor could cancel the last request towards ingesters
2091+
// if all other ones are successful, so we're good either has been queried X or X-1
2092+
// ingesters.
2093+
assert.Contains(b, []int{numIngesters, numIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers"))
2094+
assert.Equal(b, numSeriesPerRequest, len(metrics))
2095+
}
2096+
})
2097+
}
2098+
}
2099+
19662100
func TestDistributor_MetricsMetadata(t *testing.T) {
19672101
const numIngesters = 5
19682102

@@ -2058,7 +2192,7 @@ type prepConfig struct {
20582192
errFail error
20592193
}
20602194

2061-
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
2195+
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
20622196
ingesters := []mockIngester{}
20632197
for i := 0; i < cfg.happyIngesters; i++ {
20642198
ingesters = append(ingesters, mockIngester{
@@ -2095,7 +2229,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
20952229
}
20962230

20972231
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
2098-
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2232+
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) })
20992233

21002234
err := kvStore.CAS(context.Background(), ingester.RingKey,
21012235
func(_ interface{}) (interface{}, bool, error) {
@@ -2104,7 +2238,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21042238
}, true, nil
21052239
},
21062240
)
2107-
require.NoError(t, err)
2241+
require.NoError(tb, err)
21082242

21092243
// Use a default replication factor of 3 if there isn't a provided replication factor.
21102244
rf := cfg.replicationFactor
@@ -2119,10 +2253,10 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21192253
HeartbeatTimeout: 60 * time.Minute,
21202254
ReplicationFactor: rf,
21212255
}, ingester.RingKey, ingester.RingKey, nil, nil)
2122-
require.NoError(t, err)
2123-
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))
2256+
require.NoError(tb, err)
2257+
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), ingestersRing))
21242258

2125-
test.Poll(t, time.Second, cfg.numIngesters, func() interface{} {
2259+
test.Poll(tb, time.Second, cfg.numIngesters, func() interface{} {
21262260
return ingestersRing.InstancesCount()
21272261
})
21282262

@@ -2163,7 +2297,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21632297
if cfg.enableTracker {
21642298
codec := GetReplicaDescCodec()
21652299
ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
2166-
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2300+
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) })
21672301
mock := kv.PrefixClient(ringStore, "prefix")
21682302
distributorCfg.HATrackerConfig = HATrackerConfig{
21692303
EnableHATracker: true,
@@ -2175,12 +2309,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21752309
}
21762310

21772311
overrides, err := validation.NewOverrides(*cfg.limits, nil)
2178-
require.NoError(t, err)
2312+
require.NoError(tb, err)
21792313

21802314
reg := prometheus.NewPedanticRegistry()
21812315
d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, reg, log.NewNopLogger())
2182-
require.NoError(t, err)
2183-
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
2316+
require.NoError(tb, err)
2317+
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), d))
21842318

21852319
distributors = append(distributors, d)
21862320
registries = append(registries, reg)
@@ -2189,12 +2323,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21892323
// If the distributors ring is setup, wait until the first distributor
21902324
// updates to the expected size
21912325
if distributors[0].distributorsRing != nil {
2192-
test.Poll(t, time.Second, cfg.numDistributors, func() interface{} {
2326+
test.Poll(tb, time.Second, cfg.numDistributors, func() interface{} {
21932327
return distributors[0].distributorsLifeCycler.HealthyInstancesCount()
21942328
})
21952329
}
21962330

2197-
t.Cleanup(func() { stopAll(distributors, ingestersRing) })
2331+
tb.Cleanup(func() { stopAll(distributors, ingestersRing) })
21982332

21992333
return distributors, ingesters, registries, ingestersRing
22002334
}

0 commit comments

Comments
 (0)