Skip to content

Commit 7399a1f

Browse files
committed
Addressing comments
1 parent b0039f7 commit 7399a1f

File tree

6 files changed

+88
-15
lines changed

6 files changed

+88
-15
lines changed

pkg/distributor/query.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
210210
}
211211

212212
// Enforce the max chunks limits.
213-
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
214-
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount(), matchers); chunkLimitErr != nil {
213+
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil {
215214
return nil, validation.LimitError(chunkLimitErr.Error())
216215
}
217216

pkg/querier/blocks_store_queryable.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ const (
5454
)
5555

5656
var (
57-
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
57+
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
58+
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
5859
)
5960

6061
// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
@@ -402,11 +403,14 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
402403
resSeriesSets = []storage.SeriesSet(nil)
403404
resWarnings = storage.Warnings(nil)
404405

406+
maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID)
407+
leftChunksLimit = maxChunksLimit
408+
405409
resultMtx sync.Mutex
406410
)
407411

408412
queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
409-
seriesSets, queriedBlocks, warnings, _, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers)
413+
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
410414
if err != nil {
411415
return nil, err
412416
}
@@ -416,6 +420,11 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
416420
resSeriesSets = append(resSeriesSets, seriesSets...)
417421
resWarnings = append(resWarnings, warnings...)
418422

423+
// Given a single block is guaranteed to not be queried twice, we can safely decrease the number of
424+
// chunks we can still read before hitting the limit (max == 0 means disabled).
425+
if maxChunksLimit > 0 {
426+
leftChunksLimit -= numChunks
427+
}
419428
resultMtx.Unlock()
420429

421430
return queriedBlocks, nil
@@ -543,6 +552,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
543552
maxT int64,
544553
matchers []*labels.Matcher,
545554
convertedMatchers []storepb.LabelMatcher,
555+
maxChunksLimit int,
556+
leftChunksLimit int,
546557
) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) {
547558
var (
548559
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
@@ -609,17 +620,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
609620
}
610621

611622
// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
612-
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks), matchers); chunkLimitErr != nil {
613-
return validation.LimitError(chunkLimitErr.Error())
623+
if maxChunksLimit > 0 {
624+
actual := numChunks.Add(int32(len(s.Chunks)))
625+
if actual > int32(leftChunksLimit) {
626+
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
627+
}
614628
}
615-
616629
chunksSize := 0
617630
for _, c := range s.Chunks {
618631
chunksSize += c.Size()
619632
}
620633
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
621634
return validation.LimitError(chunkBytesLimitErr.Error())
622635
}
636+
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
637+
return validation.LimitError(chunkLimitErr.Error())
638+
}
623639
}
624640

625641
if w := resp.GetWarning(); w != "" {

pkg/querier/blocks_store_queryable_test.go

+58-2
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,26 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
451451
},
452452
},
453453
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1},
454+
queryLimiter: noOpQueryLimiter,
455+
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
456+
},
457+
"max chunks per query limit hit while fetching chunks at first attempt - global limit": {
458+
finderResult: bucketindex.Blocks{
459+
{ID: block1},
460+
{ID: block2},
461+
},
462+
storeSetResponses: []interface{}{
463+
map[BlocksStoreClient][]ulid.ULID{
464+
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
465+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
466+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
467+
mockHintsResponse(block1, block2),
468+
}}: {block1, block2},
469+
},
470+
},
471+
limits: &blocksStoreLimitsMock{},
454472
queryLimiter: limiter.NewQueryLimiter(0, 0, 1),
455-
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
473+
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)),
456474
},
457475
"max chunks per query limit hit while fetching chunks during subsequent attempts": {
458476
finderResult: bucketindex.Blocks{
@@ -489,8 +507,46 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
489507
},
490508
},
491509
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3},
510+
queryLimiter: noOpQueryLimiter,
511+
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
512+
},
513+
"max chunks per query limit hit while fetching chunks during subsequent attempts - global": {
514+
finderResult: bucketindex.Blocks{
515+
{ID: block1},
516+
{ID: block2},
517+
{ID: block3},
518+
{ID: block4},
519+
},
520+
storeSetResponses: []interface{}{
521+
// First attempt returns a client whose response does not include all expected blocks.
522+
map[BlocksStoreClient][]ulid.ULID{
523+
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
524+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
525+
mockHintsResponse(block1),
526+
}}: {block1, block3},
527+
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
528+
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2),
529+
mockHintsResponse(block2),
530+
}}: {block2, block4},
531+
},
532+
// Second attempt returns 1 missing block.
533+
map[BlocksStoreClient][]ulid.ULID{
534+
&storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{
535+
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
536+
mockHintsResponse(block3),
537+
}}: {block3, block4},
538+
},
539+
// Third attempt returns the last missing block.
540+
map[BlocksStoreClient][]ulid.ULID{
541+
&storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{
542+
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3),
543+
mockHintsResponse(block4),
544+
}}: {block4},
545+
},
546+
},
547+
limits: &blocksStoreLimitsMock{},
492548
queryLimiter: limiter.NewQueryLimiter(0, 0, 3),
493-
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
549+
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)),
494550
},
495551
"max series per query limit hit while fetching chunks": {
496552
finderResult: bucketindex.Blocks{

pkg/querier/querier.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
224224
return nil, err
225225
}
226226

227-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQueryFromStore(userID)))
227+
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID)))
228228

229229
mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
230230
if err == errEmptyTimeRange {

pkg/util/limiter/query_limiter.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import (
66
"sync"
77

88
"github.com/prometheus/common/model"
9-
"github.com/prometheus/prometheus/pkg/labels"
109
"go.uber.org/atomic"
1110

1211
"github.com/cortexproject/cortex/pkg/cortexpb"
1312
"github.com/cortexproject/cortex/pkg/ingester/client"
14-
"github.com/cortexproject/cortex/pkg/util"
1513
)
1614

1715
type queryLimiterCtxKey struct{}
@@ -20,7 +18,7 @@ var (
2018
ctxKey = &queryLimiterCtxKey{}
2119
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
2220
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
23-
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
21+
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)"
2422
)
2523

2624
type QueryLimiter struct {
@@ -100,13 +98,13 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error {
10098
return nil
10199
}
102100

103-
func (ql *QueryLimiter) AddChunks(count int, matchers []*labels.Matcher) error {
101+
func (ql *QueryLimiter) AddChunks(count int) error {
104102
if ql.maxChunksPerQuery == 0 {
105103
return nil
106104
}
107105

108106
if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) {
109-
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), ql.maxChunksPerQuery))
107+
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery))
110108
}
111109
return nil
112110
}

pkg/util/validation/limits.go

+4
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int {
388388
return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore
389389
}
390390

391+
func (o *Overrides) MaxChunksPerQuery(userID string) int {
392+
return o.getOverridesForUser(userID).MaxChunksPerQuery
393+
}
394+
391395
// MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching
392396
// chunks from ingesters and blocks storage.
393397
func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int {

0 commit comments

Comments
 (0)