Skip to content

Commit 805dc82

Browse files
authored
Enable thanos series limiter in store-gateway (#4702)
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 98b8d8a commit 805dc82

File tree

3 files changed

+104
-4
lines changed

3 files changed

+104
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
66
* [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686
7+
* [CHANGE] Enable Thanos series limiter in store-gateway. #4702
78

89
## 1.12.0 in progress
910

pkg/storegateway/bucket_stores.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
492492
fetcher,
493493
u.syncDirForUser(userID),
494494
newChunksLimiterFactory(u.limits, userID),
495-
store.NewSeriesLimiterFactory(0), // No series limiter.
495+
newSeriesLimiterFactory(u.limits, userID),
496496
u.partitioner,
497497
u.cfg.BucketStore.BlockSyncConcurrency,
498498
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
@@ -605,11 +605,11 @@ func (s spanSeriesServer) Context() context.Context {
605605
return s.ctx
606606
}
607607

608-
type chunkLimiter struct {
608+
type limiter struct {
609609
limiter *store.Limiter
610610
}
611611

612-
func (c *chunkLimiter) Reserve(num uint64) error {
612+
func (c *limiter) Reserve(num uint64) error {
613613
err := c.limiter.Reserve(num)
614614
if err != nil {
615615
return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error())
@@ -622,8 +622,18 @@ func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.
622622
return func(failedCounter prometheus.Counter) store.ChunksLimiter {
623623
// Since limit overrides could be live reloaded, we have to get the current user's limit
624624
// each time a new limiter is instantiated.
625-
return &chunkLimiter{
625+
return &limiter{
626626
limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter),
627627
}
628628
}
629629
}
630+
631+
func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory {
632+
return func(failedCounter prometheus.Counter) store.SeriesLimiter {
633+
// Since limit overrides could be live reloaded, we have to get the current user's limit
634+
// each time a new limiter is instantiated.
635+
return &limiter{
636+
limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter),
637+
}
638+
}
639+
}

pkg/storegateway/gateway_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,95 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
973973
}
974974
}
975975

976+
func TestStoreGateway_SeriesQueryingShouldEnforceMaxSeriesPerQueryLimit(t *testing.T) {
977+
const seriesQueried = 10
978+
979+
tests := map[string]struct {
980+
limit int
981+
expectedErr error
982+
}{
983+
"no limit enforced if zero": {
984+
limit: 0,
985+
expectedErr: nil,
986+
},
987+
"should return NO error if the actual number of queried series is <= limit": {
988+
limit: seriesQueried,
989+
expectedErr: nil,
990+
},
991+
"should return error if the actual number of queried series is > limit": {
992+
limit: seriesQueried - 1,
993+
expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded series limit: rpc error: code = Code(422) desc = limit %d violated (got %d)", seriesQueried-1, seriesQueried)),
994+
},
995+
}
996+
997+
ctx := context.Background()
998+
logger := log.NewNopLogger()
999+
userID := "user-1"
1000+
1001+
storageDir, err := ioutil.TempDir(os.TempDir(), "")
1002+
require.NoError(t, err)
1003+
defer os.RemoveAll(storageDir) //nolint:errcheck
1004+
1005+
// Generate 1 TSDB block with chunksQueried series. Since each mocked series contains only 1 sample,
1006+
// it will also only have 1 chunk.
1007+
now := time.Now()
1008+
minT := now.Add(-1*time.Hour).Unix() * 1000
1009+
maxT := now.Unix() * 1000
1010+
mockTSDB(t, path.Join(storageDir, userID), seriesQueried, 0, minT, maxT)
1011+
1012+
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
1013+
require.NoError(t, err)
1014+
1015+
// Prepare the request to query back all series (1 chunk per series in this test).
1016+
req := &storepb.SeriesRequest{
1017+
MinTime: minT,
1018+
MaxTime: maxT,
1019+
Matchers: []storepb.LabelMatcher{
1020+
{Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"},
1021+
},
1022+
}
1023+
1024+
for testName, testData := range tests {
1025+
t.Run(testName, func(t *testing.T) {
1026+
// Customise the limits.
1027+
limits := defaultLimitsConfig()
1028+
limits.MaxFetchedSeriesPerQuery = testData.limit
1029+
overrides, err := validation.NewOverrides(limits, nil)
1030+
require.NoError(t, err)
1031+
1032+
// Create a store-gateway used to query back the series from the blocks.
1033+
gatewayCfg := mockGatewayConfig()
1034+
gatewayCfg.ShardingEnabled = false
1035+
storageCfg := mockStorageConfig(t)
1036+
1037+
g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, overrides, mockLoggingLevel(), logger, nil)
1038+
require.NoError(t, err)
1039+
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
1040+
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck
1041+
1042+
// Query back all the series (1 chunk per series in this test).
1043+
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
1044+
err = g.Series(req, srv)
1045+
1046+
if testData.expectedErr != nil {
1047+
fmt.Println("Error: ", err.Error())
1048+
require.Error(t, err)
1049+
assert.IsType(t, testData.expectedErr, err)
1050+
s1, ok := status.FromError(errors.Cause(err))
1051+
assert.True(t, ok)
1052+
s2, ok := status.FromError(errors.Cause(testData.expectedErr))
1053+
assert.True(t, ok)
1054+
assert.True(t, strings.Contains(s1.Message(), s2.Message()))
1055+
assert.Equal(t, s1.Code(), s2.Code())
1056+
} else {
1057+
require.NoError(t, err)
1058+
assert.Empty(t, srv.Warnings)
1059+
assert.Len(t, srv.SeriesSet, seriesQueried)
1060+
}
1061+
})
1062+
}
1063+
}
1064+
9761065
func mockGatewayConfig() Config {
9771066
cfg := Config{}
9781067
flagext.DefaultValues(&cfg)

0 commit comments

Comments
 (0)