Skip to content

Commit 0cbc752

Browse files
committed
Fix issue where cached entry size keeps increasing when making tiny query repeatedly
Signed-off-by: Alvin Lin <[email protected]>
1 parent ec958eb commit 0cbc752

File tree

4 files changed

+99
-32
lines changed

4 files changed

+99
-32
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ e2e_integration_test*
1414
active-query-tracker
1515
dist/
1616

17+
# Test generated data
18+
pkg/alertmanager/templates
19+
pkg/querier/chunks_head/
20+
1721
# Binaries built from ./cmd
1822
/blocksconvert
1923
/cortex

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
1919
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
2020
* [BUGFIX] Querier: streamline tracing spans. #3924
21+
* [BUGFIX] Query-frontend: Fix issue where cached entry size keeps increasing when making tiny query repeatedly. #3968
2122

2223
## 1.8.0 in progress
2324

pkg/querier/queryrange/results_cache.go

+20-15
Original file line numberDiff line numberDiff line change
@@ -350,15 +350,15 @@ func (s resultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime in
350350
return response, extents, nil
351351
}
352352

353-
func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent, maxCacheTime int64) (Response, []Extent, error) {
353+
func (s resultsCache) handleHit(ctx context.Context, r Request, cachedExtents []Extent, maxCacheTime int64) (Response, []Extent, error) {
354354
var (
355355
reqResps []RequestResponse
356356
err error
357357
)
358358
log, ctx := spanlogger.New(ctx, "handleHit")
359359
defer log.Finish()
360360

361-
requests, responses, err := s.partition(r, extents)
361+
requests, responses, usedExtents, err := s.partition(r, cachedExtents)
362362
if err != nil {
363363
return nil, nil, err
364364
}
@@ -382,35 +382,35 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
382382
if err != nil {
383383
return nil, nil, err
384384
}
385-
extents = append(extents, extent)
385+
usedExtents = append(usedExtents, extent)
386386
}
387-
sort.Slice(extents, func(i, j int) bool {
388-
return extents[i].Start < extents[j].Start
387+
sort.Slice(usedExtents, func(i, j int) bool {
388+
return usedExtents[i].Start < usedExtents[j].Start
389389
})
390390

391391
// Merge any extents - they're guaranteed not to overlap.
392-
accumulator, err := newAccumulator(extents[0])
392+
accumulator, err := newAccumulator(usedExtents[0])
393393
if err != nil {
394394
return nil, nil, err
395395
}
396-
mergedExtents := make([]Extent, 0, len(extents))
396+
mergedExtents := make([]Extent, 0, len(usedExtents))
397397

398-
for i := 1; i < len(extents); i++ {
399-
if accumulator.End+r.GetStep() < extents[i].Start {
398+
for i := 1; i < len(usedExtents); i++ {
399+
if accumulator.End+r.GetStep() < usedExtents[i].Start {
400400
mergedExtents, err = merge(mergedExtents, accumulator)
401401
if err != nil {
402402
return nil, nil, err
403403
}
404-
accumulator, err = newAccumulator(extents[i])
404+
accumulator, err = newAccumulator(usedExtents[i])
405405
if err != nil {
406406
return nil, nil, err
407407
}
408408
continue
409409
}
410410

411411
accumulator.TraceId = jaegerTraceID(ctx)
412-
accumulator.End = extents[i].End
413-
currentRes, err := extents[i].toResponse()
412+
accumulator.End = usedExtents[i].End
413+
currentRes, err := usedExtents[i].toResponse()
414414
if err != nil {
415415
return nil, nil, err
416416
}
@@ -474,10 +474,11 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) {
474474

475475
// partition calculates the required requests to satisfy req given the cached data.
476476
// extents must be in order by start time.
477-
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, error) {
477+
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, []Extent, error) {
478478
var requests []Request
479479
var cachedResponses []Response
480480
start := req.GetStart()
481+
var usedExtent []Extent
481482

482483
for _, extent := range extents {
483484
// If there is no overlap, ignore this extent.
@@ -501,11 +502,15 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
501502
}
502503
res, err := extent.toResponse()
503504
if err != nil {
504-
return nil, nil, err
505+
return nil, nil, nil, err
505506
}
506507
// extract the overlap from the cached extent.
507508
cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res))
508509
start = extent.End
510+
if usedExtent == nil {
511+
usedExtent = make([]Extent, 0, len(extents))
512+
}
513+
usedExtent = append(usedExtent, extent)
509514
}
510515

511516
// Lastly, make a request for any data missing at the end.
@@ -520,7 +525,7 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
520525
requests = append(requests, req)
521526
}
522527

523-
return requests, cachedResponses, nil
528+
return requests, cachedResponses, usedExtent, nil
524529
}
525530

526531
func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Duration, extents []Extent) ([]Extent, error) {

pkg/querier/queryrange/results_cache_test.go

+74-17
Original file line numberDiff line numberDiff line change
@@ -386,52 +386,60 @@ func TestShouldCache(t *testing.T) {
386386
}
387387

388388
func TestPartition(t *testing.T) {
389-
for i, tc := range []struct {
390-
input Request
391-
prevCachedResponse []Extent
392-
expectedRequests []Request
393-
expectedCachedResponse []Response
389+
for _, tc := range []struct {
390+
name string
391+
input Request
392+
prevCachedResponse []Extent
393+
expectedUsedCachedEntry []Extent
394+
expectedRequests []Request
395+
expectedCachedResponse []Response
394396
}{
395-
// 1. Test a complete hit.
396397
{
398+
name: "Test a complete hit.",
397399
input: &PrometheusRequest{
398400
Start: 0,
399401
End: 100,
400402
},
401403
prevCachedResponse: []Extent{
402404
mkExtent(0, 100),
403405
},
406+
expectedUsedCachedEntry: []Extent{
407+
mkExtent(0, 100),
408+
},
404409
expectedCachedResponse: []Response{
405410
mkAPIResponse(0, 100, 10),
406411
},
407412
},
408413

409-
// Test with a complete miss.
410414
{
415+
name: "Test with a complete miss.",
411416
input: &PrometheusRequest{
412417
Start: 0,
413418
End: 100,
414419
},
415420
prevCachedResponse: []Extent{
416421
mkExtent(110, 210),
417422
},
423+
expectedUsedCachedEntry: nil,
418424
expectedRequests: []Request{
419425
&PrometheusRequest{
420426
Start: 0,
421427
End: 100,
422428
}},
423429
expectedCachedResponse: nil,
424430
},
425-
426-
// Test a partial hit.
427431
{
432+
name: "Test a partial hit.",
428433
input: &PrometheusRequest{
429434
Start: 0,
430435
End: 100,
431436
},
432437
prevCachedResponse: []Extent{
433438
mkExtent(50, 100),
434439
},
440+
expectedUsedCachedEntry: []Extent{
441+
mkExtent(50, 100),
442+
},
435443
expectedRequests: []Request{
436444
&PrometheusRequest{
437445
Start: 0,
@@ -442,9 +450,8 @@ func TestPartition(t *testing.T) {
442450
mkAPIResponse(50, 100, 10),
443451
},
444452
},
445-
446-
// Test multiple partial hits.
447453
{
454+
name: "Test multiple partial hits.",
448455
input: &PrometheusRequest{
449456
Start: 100,
450457
End: 200,
@@ -453,6 +460,10 @@ func TestPartition(t *testing.T) {
453460
mkExtent(50, 120),
454461
mkExtent(160, 250),
455462
},
463+
expectedUsedCachedEntry: []Extent{
464+
mkExtent(50, 120),
465+
mkExtent(160, 250),
466+
},
456467
expectedRequests: []Request{
457468
&PrometheusRequest{
458469
Start: 120,
@@ -464,9 +475,8 @@ func TestPartition(t *testing.T) {
464475
mkAPIResponse(160, 200, 10),
465476
},
466477
},
467-
468-
// Partial hits with tiny gap.
469478
{
479+
name: "Partial hits with tiny gap.",
470480
input: &PrometheusRequest{
471481
Start: 100,
472482
End: 160,
@@ -475,6 +485,9 @@ func TestPartition(t *testing.T) {
475485
mkExtent(50, 120),
476486
mkExtent(122, 130),
477487
},
488+
expectedUsedCachedEntry: []Extent{
489+
mkExtent(50, 120),
490+
},
478491
expectedRequests: []Request{
479492
&PrometheusRequest{
480493
Start: 120,
@@ -485,24 +498,25 @@ func TestPartition(t *testing.T) {
485498
mkAPIResponse(100, 120, 10),
486499
},
487500
},
488-
// Extent is outside the range and the request has a single step (same start and end).
489501
{
502+
name: "Extent is outside the range and the request has a single step (same start and end).",
490503
input: &PrometheusRequest{
491504
Start: 100,
492505
End: 100,
493506
},
494507
prevCachedResponse: []Extent{
495508
mkExtent(50, 90),
496509
},
510+
expectedUsedCachedEntry: nil,
497511
expectedRequests: []Request{
498512
&PrometheusRequest{
499513
Start: 100,
500514
End: 100,
501515
},
502516
},
503517
},
504-
// Test when hit has a large step and only a single sample extent.
505518
{
519+
name: "Test when hit has a large step and only a single sample extent.",
506520
// If there is a only a single sample in the split interval, start and end will be the same.
507521
input: &PrometheusRequest{
508522
Start: 100,
@@ -511,24 +525,67 @@ func TestPartition(t *testing.T) {
511525
prevCachedResponse: []Extent{
512526
mkExtent(100, 100),
513527
},
528+
expectedUsedCachedEntry: []Extent{
529+
mkExtent(100, 100),
530+
},
514531
expectedCachedResponse: []Response{
515532
mkAPIResponse(100, 105, 10),
516533
},
517534
},
518535
} {
519-
t.Run(strconv.Itoa(i), func(t *testing.T) {
536+
t.Run(tc.name, func(t *testing.T) {
520537
s := resultsCache{
521538
extractor: PrometheusResponseExtractor{},
522539
minCacheExtent: 10,
523540
}
524-
reqs, resps, err := s.partition(tc.input, tc.prevCachedResponse)
541+
reqs, resps, usedCachedEntry, err := s.partition(tc.input, tc.prevCachedResponse)
525542
require.Nil(t, err)
526543
require.Equal(t, tc.expectedRequests, reqs)
527544
require.Equal(t, tc.expectedCachedResponse, resps)
545+
require.Equal(t, tc.expectedUsedCachedEntry, usedCachedEntry)
528546
})
529547
}
530548
}
531549

550+
func TestCacheHitShouldNotDuplicateSamplesForTinyRequest(t *testing.T) {
551+
s := resultsCache{
552+
extractor: PrometheusResponseExtractor{},
553+
minCacheExtent: 10,
554+
limits: mockLimits{},
555+
merger: PrometheusCodec,
556+
next: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
557+
return parsedResponse, nil
558+
}),
559+
}
560+
561+
request := &PrometheusRequest{
562+
Start: 0,
563+
End: 5,
564+
}
565+
566+
ctx := user.InjectOrgID(context.Background(), "1")
567+
568+
parsedResponseAsAny, err := types.MarshalAny(parsedResponse)
569+
require.NoError(t, err)
570+
571+
// current cached extents is smaller than minCacheExtent, so it should not be used and should
572+
// be dropped from the cache.
573+
currentlyCachedExtents := []Extent{{
574+
Start: request.Start,
575+
End: request.End + 1,
576+
Response: parsedResponseAsAny,
577+
}}
578+
_, updatedExtents, err := s.handleHit(ctx, request, currentlyCachedExtents, 0)
579+
require.NoError(t, err)
580+
581+
expectedExtents := []Extent{{
582+
Start: request.Start,
583+
End: request.End,
584+
Response: parsedResponseAsAny,
585+
}}
586+
require.Equal(t, expectedExtents, updatedExtents)
587+
}
588+
532589
func TestResultsCache(t *testing.T) {
533590
calls := 0
534591
cfg := ResultsCacheConfig{

0 commit comments

Comments
 (0)