Skip to content

Commit 04a0483

Browse files
author
lisuo
committed
add touch series limit
1 parent c21337e commit 04a0483

File tree

3 files changed

+39
-1
lines changed

3 files changed

+39
-1
lines changed

cmd/thanos/store.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func registerStore(app *extkingpin.App) {
6767
maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
6868
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
6969
Default("0").Uint()
70+
maxTouchSeriesCount := cmd.Flag("store.grpc.touch-series-limit",
71+
"Maximum amount of touch series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
72+
Default("0").Uint()
7073

7174
maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
7275

@@ -136,6 +139,7 @@ func registerStore(app *extkingpin.App) {
136139
uint64(*indexCacheSize),
137140
uint64(*chunkPoolSize),
138141
uint64(*maxSampleCount),
142+
uint64(*maxTouchSeriesCount),
139143
*maxConcurrent,
140144
component.Store,
141145
debugLogging,
@@ -173,7 +177,7 @@ func runStore(
173177
grpcGracePeriod time.Duration,
174178
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
175179
httpGracePeriod time.Duration,
176-
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
180+
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSeriesCount uint64,
177181
maxConcurrency int,
178182
component component.Component,
179183
verbose bool,
@@ -303,6 +307,7 @@ func runStore(
303307
queriesGate,
304308
chunkPoolSizeBytes,
305309
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
310+
store.NewSeriesLimiterFactory(maxSeriesCount),
306311
verbose,
307312
blockSyncConcurrency,
308313
filterConf,

pkg/store/bucket.go

+17
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type bucketStoreMetrics struct {
105105
resultSeriesCount prometheus.Summary
106106
chunkSizeBytes prometheus.Histogram
107107
queriesDropped prometheus.Counter
108+
queriesSeriesDropped prometheus.Counter
108109
seriesRefetches prometheus.Counter
109110

110111
cachedPostingsCompressions *prometheus.CounterVec
@@ -190,6 +191,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
190191
Name: "thanos_bucket_store_queries_dropped_total",
191192
Help: "Number of queries that were dropped due to the sample limit.",
192193
})
194+
m.queriesSeriesDropped = promauto.With(reg).NewCounter(prometheus.CounterOpts{
195+
Name: "thanos_bucket_store_queries_series_dropped_total",
196+
Help: "Number of queries that were dropped due to the series limit.",
197+
})
193198
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
194199
Name: "thanos_bucket_store_series_refetches_total",
195200
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
@@ -276,6 +281,8 @@ type BucketStore struct {
276281

277282
// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
278283
chunksLimiterFactory ChunksLimiterFactory
284+
// seriesLimiterFactory creates a new limiter used to limit the number of touch series by each Series() call.
285+
seriesLimiterFactory SeriesLimiterFactory
279286
partitioner partitioner
280287

281288
filterConfig *FilterConfig
@@ -300,6 +307,7 @@ func NewBucketStore(
300307
queryGate gate.Gate,
301308
maxChunkPoolBytes uint64,
302309
chunksLimiterFactory ChunksLimiterFactory,
310+
seriesLimiterFactory SeriesLimiterFactory,
303311
debugLogging bool,
304312
blockSyncConcurrency int,
305313
filterConfig *FilterConfig,
@@ -333,6 +341,7 @@ func NewBucketStore(
333341
filterConfig: filterConfig,
334342
queryGate: queryGate,
335343
chunksLimiterFactory: chunksLimiterFactory,
344+
seriesLimiterFactory: seriesLimiterFactory,
336345
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
337346
enableCompatibilityLabel: enableCompatibilityLabel,
338347
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
@@ -683,6 +692,7 @@ func blockSeries(
683692
matchers []*labels.Matcher,
684693
req *storepb.SeriesRequest,
685694
chunksLimiter ChunksLimiter,
695+
seriesLimiter SeriesLimiter,
686696
) (storepb.SeriesSet, *queryStats, error) {
687697
ps, err := indexr.ExpandedPostings(matchers)
688698
if err != nil {
@@ -693,6 +703,11 @@ func blockSeries(
693703
return storepb.EmptySeriesSet(), indexr.stats, nil
694704
}
695705

706+
// Reserve seriesLimiter
707+
if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil {
708+
return nil, nil, errors.Wrap(err, "exceeded series limit")
709+
}
710+
696711
// Preload all series index data.
697712
// TODO(bwplotka): Consider not keeping all series in memory all the time.
698713
// TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method.
@@ -887,6 +902,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
887902
resHints = &hintspb.SeriesResponseHints{}
888903
reqBlockMatchers []*labels.Matcher
889904
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
905+
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesSeriesDropped)
890906
)
891907

892908
if req.Hints != nil {
@@ -942,6 +958,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
942958
blockMatchers,
943959
req,
944960
chunksLimiter,
961+
seriesLimiter,
945962
)
946963
if err != nil {
947964
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)

pkg/store/limiter.go

+16
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,19 @@ type ChunksLimiter interface {
1818
Reserve(num uint64) error
1919
}
2020

21+
type SeriesLimiter interface {
22+
// Reserve num series out of the total number of series enforced by the limiter.
23+
// Returns an error if the limit has been exceeded. This function must be
24+
// goroutine safe.
25+
Reserve(num uint64) error
26+
}
27+
2128
// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for
2229
// projects depending on Thanos (eg. Cortex) which have dynamic limits.
2330
type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter
2431

32+
type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter
33+
2534
// Limiter is a simple mechanism for checking if something has passed a certain threshold.
2635
type Limiter struct {
2736
limit uint64
@@ -57,3 +66,10 @@ func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory {
5766
return NewLimiter(limit, failedCounter)
5867
}
5968
}
69+
70+
// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit.
71+
func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory {
72+
return func(failedCounter prometheus.Counter) SeriesLimiter {
73+
return NewLimiter(limit, failedCounter)
74+
}
75+
}

0 commit comments

Comments
 (0)