From ac33d664ae79062e7ab3f311c6e749ada4784257 Mon Sep 17 00:00:00 2001 From: lisuo Date: Fri, 27 Nov 2020 13:03:50 +0800 Subject: [PATCH 1/8] add touch series limit Signed-off-by: lisuo --- cmd/thanos/store.go | 7 ++++++- pkg/store/bucket.go | 17 +++++++++++++++++ pkg/store/limiter.go | 16 ++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 0f8b6a775f..aa639bf822 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -67,6 +67,9 @@ func registerStore(app *extkingpin.App) { maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() + maxTouchSeriesCount := cmd.Flag("store.grpc.touch-series-limit", + "Maximum amount of touch series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). + Default("0").Uint() maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() @@ -136,6 +139,7 @@ func registerStore(app *extkingpin.App) { uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), + uint64(*maxTouchSeriesCount), *maxConcurrent, component.Store, debugLogging, @@ -173,7 +177,7 @@ func runStore( grpcGracePeriod time.Duration, grpcCert, grpcKey, grpcClientCA, httpBindAddr string, httpGracePeriod time.Duration, - indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64, + indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSeriesCount uint64, maxConcurrency int, component component.Component, verbose bool, @@ -303,6 +307,7 @@ func runStore( queriesGate, chunkPoolSizeBytes, store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. + store.NewSeriesLimiterFactory(maxSeriesCount), verbose, blockSyncConcurrency, filterConf, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1d0195c529..a501ffc158 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -105,6 +105,7 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter + queriesSeriesDropped prometheus.Counter seriesRefetches prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec @@ -190,6 +191,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) + m.queriesSeriesDropped = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_queries_series_dropped_total", + Help: "Number of queries that were dropped due to the series limit.", + }) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), @@ -276,6 +281,8 @@ type BucketStore struct { // chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call. chunksLimiterFactory ChunksLimiterFactory + // seriesLimiterFactory creates a new limiter used to limit the number of touch series by each Series() call. + seriesLimiterFactory SeriesLimiterFactory partitioner partitioner filterConfig *FilterConfig @@ -300,6 +307,7 @@ func NewBucketStore( queryGate gate.Gate, maxChunkPoolBytes uint64, chunksLimiterFactory ChunksLimiterFactory, + seriesLimiterFactory SeriesLimiterFactory, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -333,6 +341,7 @@ func NewBucketStore( filterConfig: filterConfig, queryGate: queryGate, chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, @@ -683,6 +692,7 @@ func blockSeries( matchers []*labels.Matcher, req *storepb.SeriesRequest, chunksLimiter ChunksLimiter, + seriesLimiter SeriesLimiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -693,6 +703,11 @@ func blockSeries( return storepb.EmptySeriesSet(), indexr.stats, nil } + // Reserve seriesLimiter + if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { + return nil, nil, errors.Wrap(err, "exceeded series limit") + } + // Preload all series index data. // TODO(bwplotka): Consider not keeping all series in memory all the time. // TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method. @@ -887,6 +902,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesSeriesDropped) ) if req.Hints != nil { @@ -942,6 +958,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie blockMatchers, req, chunksLimiter, + seriesLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index c60be901e9..5ec3df3d4b 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -18,10 +18,19 @@ type ChunksLimiter interface { Reserve(num uint64) error } +type SeriesLimiter interface { + // Reserve num series out of the total number of series enforced by the limiter. + // Returns an error if the limit has been exceeded. This function must be + // goroutine safe. + Reserve(num uint64) error +} + // ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for // projects depending on Thanos (eg. Cortex) which have dynamic limits. type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter +type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter + // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { limit uint64 @@ -57,3 +66,10 @@ func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory { return NewLimiter(limit, failedCounter) } } + +// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit. +func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory { + return func(failedCounter prometheus.Counter) SeriesLimiter { + return NewLimiter(limit, failedCounter) + } +} From 1cc998e6bdecb3bdd75347607419bad62bcb8102 Mon Sep 17 00:00:00 2001 From: lisuo Date: Fri, 27 Nov 2020 13:26:18 +0800 Subject: [PATCH 2/8] update CHANGELOG.md Signed-off-by: lisuo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fa87aa49a..563cceca7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific. - [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block. - +- [#3509](https://github.com/thanos-io/thanos/pull/3509) Store: Added touch series limit ### Fixed - From 854c2d05c66197a01d56c653fa5533639c0f420b Mon Sep 17 00:00:00 2001 From: lisuo Date: Fri, 27 Nov 2020 14:06:42 +0800 Subject: [PATCH 3/8] fix go-lint error Signed-off-by: lisuo --- docs/components/store.md | 4 ++++ pkg/store/bucket_e2e_test.go | 1 + pkg/store/bucket_test.go | 6 ++++++ 3 files changed, 11 insertions(+) diff --git a/docs/components/store.md b/docs/components/store.md index 2ff77a9225..c0754d18d1 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -101,6 +101,10 @@ Flags: samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit. + --store.grpc.touch-series-limit=0 + Maximum amount of touch series returned via a + single Series call. The Series call fails if + this limit is exceeded. 0 means no limit. --store.grpc.series-max-concurrency=20 Maximum number of concurrent Series calls. --objstore.config-file= diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 482ae959fc..6626fb41a8 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -164,6 +164,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m nil, 0, NewChunksLimiterFactory(maxChunksLimit), + NewSeriesLimiterFactory(0), false, 20, filterConf, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e6aa993816..77f9d80636 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -575,6 +575,7 @@ func TestBucketStore_Info(t *testing.T) { nil, 2e5, NewChunksLimiterFactory(0), + NewSeriesLimiterFactory(0), false, 20, allowAllFilterConf, @@ -826,6 +827,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul nil, 0, NewChunksLimiterFactory(0), + NewSeriesLimiterFactory(0), false, 20, allowAllFilterConf, @@ -1641,6 +1643,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { nil, 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), false, 10, nil, @@ -1734,6 +1737,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { nil, 1000000, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), false, 10, nil, @@ -1878,6 +1882,7 @@ func TestBlockWithLargeChunks(t *testing.T) { nil, 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), false, 10, nil, @@ -2038,6 +2043,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb nil, 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), false, 10, nil, From f7ab48dc41c9ac8ef5d860e9dd8bc325f6de2b36 Mon Sep 17 00:00:00 2001 From: lisuo Date: Sat, 28 Nov 2020 21:56:40 +0800 Subject: [PATCH 4/8] update flag name maxTouchSeriesCount to maxTouchedSeriesCount Signed-off-by: lisuo --- cmd/thanos/store.go | 4 ++-- docs/components/store.md | 2 +- pkg/store/bucket.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index aa639bf822..af9fc92ec5 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -67,7 +67,7 @@ func registerStore(app *extkingpin.App) { maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() - maxTouchSeriesCount := cmd.Flag("store.grpc.touch-series-limit", + maxTouchedSeriesCount := cmd.Flag("store.grpc.touched-series-limit", "Maximum amount of touch series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). Default("0").Uint() @@ -139,7 +139,7 @@ func registerStore(app *extkingpin.App) { uint64(*indexCacheSize), uint64(*chunkPoolSize), uint64(*maxSampleCount), - uint64(*maxTouchSeriesCount), + uint64(*maxTouchedSeriesCount), *maxConcurrent, component.Store, debugLogging, diff --git a/docs/components/store.md b/docs/components/store.md index c0754d18d1..088c5cd873 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -101,7 +101,7 @@ Flags: samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit. - --store.grpc.touch-series-limit=0 + --store.grpc.touched-series-limit=0 Maximum amount of touch series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a501ffc158..0e213c0447 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -703,7 +703,7 @@ func blockSeries( return storepb.EmptySeriesSet(), indexr.stats, nil } - // Reserve seriesLimiter + // Reserve series seriesLimiter if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil { return nil, nil, errors.Wrap(err, "exceeded series limit") } From 7ee6c38a954e3a24daa93103848164716a5ad285 Mon Sep 17 00:00:00 2001 From: lisuo Date: Tue, 1 Dec 2020 10:28:36 +0800 Subject: [PATCH 5/8] Fix flag `store.grpc.touched-series-limit` `touch` to `touched` Signed-off-by: lisuo --- cmd/thanos/store.go | 2 +- docs/components/store.md | 2 +- pkg/store/bucket.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index af9fc92ec5..451e79ae6d 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -68,7 +68,7 @@ func registerStore(app *extkingpin.App) { "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() maxTouchedSeriesCount := cmd.Flag("store.grpc.touched-series-limit", - "Maximum amount of touch series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). + "Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). Default("0").Uint() maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() diff --git a/docs/components/store.md b/docs/components/store.md index 088c5cd873..3579f3005f 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -102,7 +102,7 @@ Flags: number of samples might be lower, even though the maximum could be hit. --store.grpc.touched-series-limit=0 - Maximum amount of touch series returned via a + Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. --store.grpc.series-max-concurrency=20 diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 0e213c0447..9d9d0c0c6e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -281,7 +281,7 @@ type BucketStore struct { // chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call. chunksLimiterFactory ChunksLimiterFactory - // seriesLimiterFactory creates a new limiter used to limit the number of touch series by each Series() call. + // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call. seriesLimiterFactory SeriesLimiterFactory partitioner partitioner From b4bc7e77a418a39c39a45f0e31e8d6fef89cd769 Mon Sep 17 00:00:00 2001 From: lisuo Date: Tue, 1 Dec 2020 14:12:09 +0800 Subject: [PATCH 6/8] Add comment to SeriesLimiterFactory Signed-off-by: lisuo --- pkg/store/limiter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 5ec3df3d4b..266dbbf3b2 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -29,6 +29,7 @@ type SeriesLimiter interface { // projects depending on Thanos (eg. Cortex) which have dynamic limits. type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter +// SeriesLimiterFactory is used to create a new SeriesLimiter. type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter // Limiter is a simple mechanism for checking if something has passed a certain threshold. From 06dbfcf3a943f31f1c103b22d011ef4b619903de Mon Sep 17 00:00:00 2001 From: lisuo Date: Tue, 1 Dec 2020 20:23:57 +0800 Subject: [PATCH 7/8] Update `thanos_bucket_store_queries_dropped_total` metric type `Couter` to `CouterVec` Signed-off-by: lisuo --- pkg/store/bucket.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 9d9d0c0c6e..02dd356012 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -104,8 +104,7 @@ type bucketStoreMetrics struct { seriesMergeDuration prometheus.Histogram resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram - queriesDropped prometheus.Counter - queriesSeriesDropped prometheus.Counter + queriesDropped *prometheus.CounterVec seriesRefetches prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec @@ -187,14 +186,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }, }) - m.queriesDropped = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.queriesDropped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_bucket_store_queries_dropped_total", - Help: "Number of queries that were dropped due to the sample limit.", - }) - m.queriesSeriesDropped = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_bucket_store_queries_series_dropped_total", - Help: "Number of queries that were dropped due to the series limit.", - }) + Help: "Number of queries that were dropped due to the limit.", + }, []string{"reason"}) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), @@ -901,8 +896,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesSeriesDropped) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) ) if req.Hints != nil { From 928cf15b0b0e529acd094f5f6fbb0dfaeb8a3900 Mon Sep 17 00:00:00 2001 From: lisuo Date: Mon, 28 Dec 2020 17:10:03 +0800 Subject: [PATCH 8/8] fix unit test Signed-off-by: lisuo --- pkg/store/bucket_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index dcd5686e8e..2c0b44d8d4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1275,6 +1275,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer }, queryGate: noopGate{}, chunksLimiterFactory: NewChunksLimiterFactory(0), + seriesLimiterFactory: NewSeriesLimiterFactory(0), } for _, block := range blocks { @@ -1491,6 +1492,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { }, queryGate: noopGate{}, chunksLimiterFactory: NewChunksLimiterFactory(0), + seriesLimiterFactory: NewSeriesLimiterFactory(0), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {