Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable thanos series limiter in store-gateway #4702

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
* [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686
* [CHANGE] Enable Thanos series limiter in store-gateway. #4702

## 1.12.0 in progress

Expand Down
18 changes: 14 additions & 4 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
fetcher,
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
store.NewSeriesLimiterFactory(0), // No series limiter.
newSeriesLimiterFactory(u.limits, userID),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure that querier will not retry when this limit is enforced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down Expand Up @@ -605,11 +605,11 @@ func (s spanSeriesServer) Context() context.Context {
return s.ctx
}

type chunkLimiter struct {
type limiter struct {
limiter *store.Limiter
}

func (c *chunkLimiter) Reserve(num uint64) error {
func (c *limiter) Reserve(num uint64) error {
err := c.limiter.Reserve(num)
if err != nil {
return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error())
Expand All @@ -622,8 +622,18 @@ func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.
return func(failedCounter prometheus.Counter) store.ChunksLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &chunkLimiter{
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter),
}
}
}

func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) store.SeriesLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter),
}
}
}
89 changes: 89 additions & 0 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,95 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
}
}

func TestStoreGateway_SeriesQueryingShouldEnforceMaxSeriesPerQueryLimit(t *testing.T) {
const seriesQueried = 10

tests := map[string]struct {
limit int
expectedErr error
}{
"no limit enforced if zero": {
limit: 0,
expectedErr: nil,
},
"should return NO error if the actual number of queried series is <= limit": {
limit: seriesQueried,
expectedErr: nil,
},
"should return error if the actual number of queried series is > limit": {
limit: seriesQueried - 1,
expectedErr: status.Error(http.StatusUnprocessableEntity, fmt.Sprintf("exceeded series limit: rpc error: code = Code(422) desc = limit %d violated (got %d)", seriesQueried-1, seriesQueried)),
},
}

ctx := context.Background()
logger := log.NewNopLogger()
userID := "user-1"

storageDir, err := ioutil.TempDir(os.TempDir(), "")
require.NoError(t, err)
defer os.RemoveAll(storageDir) //nolint:errcheck

// Generate 1 TSDB block with chunksQueried series. Since each mocked series contains only 1 sample,
// it will also only have 1 chunk.
now := time.Now()
minT := now.Add(-1*time.Hour).Unix() * 1000
maxT := now.Unix() * 1000
mockTSDB(t, path.Join(storageDir, userID), seriesQueried, 0, minT, maxT)

bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

// Prepare the request to query back all series (1 chunk per series in this test).
req := &storepb.SeriesRequest{
MinTime: minT,
MaxTime: maxT,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Customise the limits.
limits := defaultLimitsConfig()
limits.MaxFetchedSeriesPerQuery = testData.limit
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

// Create a store-gateway used to query back the series from the blocks.
gatewayCfg := mockGatewayConfig()
gatewayCfg.ShardingEnabled = false
storageCfg := mockStorageConfig(t)

g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, overrides, mockLoggingLevel(), logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck

// Query back all the series (1 chunk per series in this test).
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
err = g.Series(req, srv)

if testData.expectedErr != nil {
fmt.Println("Error: ", err.Error())
require.Error(t, err)
assert.IsType(t, testData.expectedErr, err)
s1, ok := status.FromError(errors.Cause(err))
assert.True(t, ok)
s2, ok := status.FromError(errors.Cause(testData.expectedErr))
assert.True(t, ok)
assert.True(t, strings.Contains(s1.Message(), s2.Message()))
assert.Equal(t, s1.Code(), s2.Code())
} else {
require.NoError(t, err)
assert.Empty(t, srv.Warnings)
assert.Len(t, srv.SeriesSet, seriesQueried)
}
})
}
}

func mockGatewayConfig() Config {
cfg := Config{}
flagext.DefaultValues(&cfg)
Expand Down