Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement max series limit in distributor for /series API #4683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
* [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686
* [CHANGE] Enable Thanos series limiter in store-gateway. #4702
* [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683

## 1.12.0 in progress

Expand Down
31 changes: 21 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -945,6 +946,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
if err != nil {
return nil, err
}
Expand All @@ -953,20 +955,29 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err != nil {
return nil, err
}

resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
return nil, err
}

mutex := sync.Mutex{}
metrics := map[model.Fingerprint]model.Metric{}
for _, resp := range resps {
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*ingester_client.MetricsForLabelMatchersResponse))

_, err = d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
return nil, err
}
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
for _, m := range ms {
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
return nil, err
}
mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this would you be able to write some go test -bench (or use existing one if any) to and post before and after?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a benchmark test BenchmarkDistributor_MetricsForLabelMatchers.

Without the locking:

❯ go test -run=^$ -bench ^BenchmarkDistributor_MetricsForLabelMatchers$ github.com/cortexproject/cortex/pkg/distributor -count 10
goos: darwin
goarch: amd64
pkg: github.com/cortexproject/cortex/pkg/distributor
cpu: Intel(R) Core(TM) i7-8569U CPU @ 2.80GHz
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     775	   1553236 ns/op	  758952 B/op	    8580 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     724	   1551052 ns/op	  759031 B/op	    8581 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     898	   1281153 ns/op	  759056 B/op	    8581 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1064	   1145841 ns/op	  758964 B/op	    8580 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1102	   1074635 ns/op	  759169 B/op	    8581 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1123	   1068547 ns/op	  759050 B/op	    8580 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1108	   1080608 ns/op	  759064 B/op	    8581 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1129	   1052474 ns/op	  758646 B/op	    8578 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1129	   1123325 ns/op	  758817 B/op	    8579 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	    1084	   1360482 ns/op	  758859 B/op	    8580 allocs/op
PASS
ok  	github.com/cortexproject/cortex/pkg/distributor	14.253s

With the locking

❯ go test -run=^$ -bench ^BenchmarkDistributor_MetricsForLabelMatchers$ github.com/cortexproject/cortex/pkg/distributor -count 10
goos: darwin
goarch: amd64
pkg: github.com/cortexproject/cortex/pkg/distributor
cpu: Intel(R) Core(TM) i7-8569U CPU @ 2.80GHz
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     847	   1519934 ns/op	  870517 B/op	    9174 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     804	   1789515 ns/op	  870984 B/op	    9178 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     542	   2008873 ns/op	  871077 B/op	    9179 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     715	   1493901 ns/op	  871080 B/op	    9179 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     793	   1619484 ns/op	  871042 B/op	    9179 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     712	   1502533 ns/op	  870682 B/op	    9176 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     872	   1594814 ns/op	  870655 B/op	    9176 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     652	   1608638 ns/op	  870481 B/op	    9175 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     806	   1452676 ns/op	  870680 B/op	    9176 allocs/op
BenchmarkDistributor_MetricsForLabelMatchers/get_series_within_limits-8         	     681	   1634886 ns/op	  870830 B/op	    9177 allocs/op
PASS
ok  	github.com/cortexproject/cortex/pkg/distributor	15.305s

I didn't see any significant differences.

metrics[m.Fingerprint()] = m
mutex.Unlock()
}

return resp, nil
})

if err != nil {
return nil, err
}

result := make([]metric.Metric, 0, len(metrics))
Expand Down
158 changes: 146 additions & 12 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,13 +1863,17 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
matchers []*labels.Matcher
expectedResult []metric.Metric
expectedIngesters int
queryLimiter *limiter.QueryLimiter
expectedErr error
}{
"should return an empty response if no metric match": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "unknown"),
},
expectedResult: []metric.Metric{},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should filter metrics by single matcher": {
matchers: []*labels.Matcher{
Expand All @@ -1880,6 +1884,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should filter metrics by multiple matchers": {
matchers: []*labels.Matcher{
Expand All @@ -1890,6 +1896,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[0].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should return all matching metrics even if their FastFingerprint collide": {
matchers: []*labels.Matcher{
Expand All @@ -1900,6 +1908,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[4].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
shuffleShardEnabled: true,
Expand All @@ -1912,6 +1922,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: 3,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should query all ingesters if shuffle sharding is enabled but shard size is 0": {
shuffleShardEnabled: true,
Expand All @@ -1924,6 +1936,30 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
expectedErr: nil,
},
"should return err if series limit is exhausted": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test where:

  1. There is 5 ingesters
  2. Replication factor is 5
  3. There is exactly 1 time series replicated across the 5 ingesters
  4. Set the max time series to 1

In this case we shouldn't hit the limit. The purpose of this test is to ensure that the limit is applied against de-duped time series. I know the QueryLimiter does de-dupe for you, but it's good to ensure the same from distributor perspective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing. The replication factor is 3 by default. I added a test to cover the case where one series is queried with limit is set to 1.

shuffleShardEnabled: true,
shuffleShardSize: 0,
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: nil,
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
},
"should not exhaust series limit when only one series is fetched": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"),
},
expectedResult: []metric.Metric{
{Metric: util.LabelsToMetric(fixtures[2].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
expectedErr: nil,
},
}

Expand All @@ -1943,6 +1979,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {

// Push fixtures
ctx := user.InjectOrgID(context.Background(), "test")
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter)

for _, series := range fixtures {
req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp)
Expand All @@ -1951,6 +1988,12 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}

metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)

if testData.expectedErr != nil {
assert.EqualError(t, err, testData.expectedErr.Error())
return
}

require.NoError(t, err)
assert.ElementsMatch(t, testData.expectedResult, metrics)

Expand All @@ -1963,6 +2006,97 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}
}

func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
const (
numIngesters = 100
numSeriesPerRequest = 100
)

tests := map[string]struct {
prepareConfig func(limits *validation.Limits)
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
matchers []*labels.Matcher
queryLimiter *limiter.QueryLimiter
expectedErr error
}{
"get series within limits": {
prepareConfig: func(limits *validation.Limits) {},
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
metrics := make([]labels.Labels, numSeriesPerRequest)
samples := make([]cortexpb.Sample, numSeriesPerRequest)

for i := 0; i < numSeriesPerRequest; i++ {
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: fmt.Sprintf("foo_%d", i)}})
for i := 0; i < 10; i++ {
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
}

metrics[i] = lbls.Labels()
samples[i] = cortexpb.Sample{
Value: float64(i),
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
}
}

return metrics, samples
},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"),
},
queryLimiter: limiter.NewQueryLimiter(100, 0, 0),
expectedErr: nil,
},
}

for testName, testData := range tests {
b.Run(testName, func(b *testing.B) {
// Create distributor
ds, ingesters, _, _ := prepare(b, prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
shardByAllLabels: true,
shuffleShardEnabled: false,
shuffleShardSize: 0,
})

// Push fixtures
ctx := user.InjectOrgID(context.Background(), "test")
ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter)

// Prepare the series to remote write before starting the benchmark.
metrics, samples := testData.prepareSeries()

if _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)); err != nil {
b.Fatalf("error pushing to distributor %v", err)
}

// Run the benchmark.
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
now := model.Now()
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)

if testData.expectedErr != nil {
assert.EqualError(b, err, testData.expectedErr.Error())
return
}

require.NoError(b, err)

// Check how many ingesters have been queried.
// Due to the quorum the distributor could cancel the last request towards ingesters
// if all other ones are successful, so we're good either has been queried X or X-1
// ingesters.
assert.Contains(b, []int{numIngesters, numIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers"))
assert.Equal(b, numSeriesPerRequest, len(metrics))
}
})
}
}

func TestDistributor_MetricsMetadata(t *testing.T) {
const numIngesters = 5

Expand Down Expand Up @@ -2058,7 +2192,7 @@ type prepConfig struct {
errFail error
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
ingesters := []mockIngester{}
for i := 0; i < cfg.happyIngesters; i++ {
ingesters = append(ingesters, mockIngester{
Expand Down Expand Up @@ -2095,7 +2229,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
}

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) })

err := kvStore.CAS(context.Background(), ingester.RingKey,
func(_ interface{}) (interface{}, bool, error) {
Expand All @@ -2104,7 +2238,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
}, true, nil
},
)
require.NoError(t, err)
require.NoError(tb, err)

// Use a default replication factor of 3 if there isn't a provided replication factor.
rf := cfg.replicationFactor
Expand All @@ -2119,10 +2253,10 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
HeartbeatTimeout: 60 * time.Minute,
ReplicationFactor: rf,
}, ingester.RingKey, ingester.RingKey, nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))
require.NoError(tb, err)
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), ingestersRing))

test.Poll(t, time.Second, cfg.numIngesters, func() interface{} {
test.Poll(tb, time.Second, cfg.numIngesters, func() interface{} {
return ingestersRing.InstancesCount()
})

Expand Down Expand Up @@ -2163,7 +2297,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
if cfg.enableTracker {
codec := GetReplicaDescCodec()
ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
tb.Cleanup(func() { assert.NoError(tb, closer.Close()) })
mock := kv.PrefixClient(ringStore, "prefix")
distributorCfg.HATrackerConfig = HATrackerConfig{
EnableHATracker: true,
Expand All @@ -2175,12 +2309,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
}

overrides, err := validation.NewOverrides(*cfg.limits, nil)
require.NoError(t, err)
require.NoError(tb, err)

reg := prometheus.NewPedanticRegistry()
d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, reg, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
require.NoError(tb, err)
require.NoError(tb, services.StartAndAwaitRunning(context.Background(), d))

distributors = append(distributors, d)
registries = append(registries, reg)
Expand All @@ -2189,12 +2323,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
// If the distributors ring is setup, wait until the first distributor
// updates to the expected size
if distributors[0].distributorsRing != nil {
test.Poll(t, time.Second, cfg.numDistributors, func() interface{} {
test.Poll(tb, time.Second, cfg.numDistributors, func() interface{} {
return distributors[0].distributorsLifeCycler.HealthyInstancesCount()
})
}

t.Cleanup(func() { stopAll(distributors, ingestersRing) })
tb.Cleanup(func() { stopAll(distributors, ingestersRing) })

return distributors, ingesters, registries, ingestersRing
}
Expand Down