Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where cached entry size keeps increasing when making tiny query repeatedly #3968

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ e2e_integration_test*
active-query-tracker
dist/

# Test generated data
pkg/alertmanager/templates
pkg/querier/chunks_head/

# Binaries built from ./cmd
/blocksconvert
/cortex
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Querier: streamline tracing spans. #3924
* [BUGFIX] Query-frontend: Fix issue where cached entry size keeps increasing when making tiny query repeatedly. #3968

## 1.8.0 in progress

Expand Down
35 changes: 20 additions & 15 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,15 @@ func (s resultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime in
return response, extents, nil
}

func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent, maxCacheTime int64) (Response, []Extent, error) {
func (s resultsCache) handleHit(ctx context.Context, r Request, cachedExtents []Extent, maxCacheTime int64) (Response, []Extent, error) {
var (
reqResps []RequestResponse
err error
)
log, ctx := spanlogger.New(ctx, "handleHit")
defer log.Finish()

requests, responses, err := s.partition(r, extents)
requests, responses, usedExtents, err := s.partition(r, cachedExtents)
if err != nil {
return nil, nil, err
}
Expand All @@ -382,35 +382,35 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
if err != nil {
return nil, nil, err
}
extents = append(extents, extent)
usedExtents = append(usedExtents, extent)
}
sort.Slice(extents, func(i, j int) bool {
return extents[i].Start < extents[j].Start
sort.Slice(usedExtents, func(i, j int) bool {
return usedExtents[i].Start < usedExtents[j].Start
})

// Merge any extents - they're guaranteed not to overlap.
accumulator, err := newAccumulator(extents[0])
accumulator, err := newAccumulator(usedExtents[0])
if err != nil {
return nil, nil, err
}
mergedExtents := make([]Extent, 0, len(extents))
mergedExtents := make([]Extent, 0, len(usedExtents))

for i := 1; i < len(extents); i++ {
if accumulator.End+r.GetStep() < extents[i].Start {
for i := 1; i < len(usedExtents); i++ {
if accumulator.End+r.GetStep() < usedExtents[i].Start {
mergedExtents, err = merge(mergedExtents, accumulator)
if err != nil {
return nil, nil, err
}
accumulator, err = newAccumulator(extents[i])
accumulator, err = newAccumulator(usedExtents[i])
if err != nil {
return nil, nil, err
}
continue
}

accumulator.TraceId = jaegerTraceID(ctx)
accumulator.End = extents[i].End
currentRes, err := extents[i].toResponse()
accumulator.End = usedExtents[i].End
currentRes, err := usedExtents[i].toResponse()
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -474,10 +474,11 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) {

// partition calculates the required requests to satisfy req given the cached data.
// extents must be in order by start time.
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, error) {
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, []Extent, error) {
var requests []Request
var cachedResponses []Response
start := req.GetStart()
var usedExtent []Extent

for _, extent := range extents {
// If there is no overlap, ignore this extent.
Expand All @@ -501,11 +502,15 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
}
res, err := extent.toResponse()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
// extract the overlap from the cached extent.
cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res))
start = extent.End
if usedExtent == nil {
usedExtent = make([]Extent, 0, len(extents))
}
usedExtent = append(usedExtent, extent)
}

// Lastly, make a request for any data missing at the end.
Expand All @@ -520,7 +525,7 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
requests = append(requests, req)
}

return requests, cachedResponses, nil
return requests, cachedResponses, usedExtent, nil
}

func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Duration, extents []Extent) ([]Extent, error) {
Expand Down
91 changes: 74 additions & 17 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,52 +386,60 @@ func TestShouldCache(t *testing.T) {
}

func TestPartition(t *testing.T) {
for i, tc := range []struct {
input Request
prevCachedResponse []Extent
expectedRequests []Request
expectedCachedResponse []Response
for _, tc := range []struct {
name string
input Request
prevCachedResponse []Extent
expectedUsedCachedEntry []Extent
expectedRequests []Request
expectedCachedResponse []Response
}{
// 1. Test a complete hit.
{
name: "Test a complete hit.",
input: &PrometheusRequest{
Start: 0,
End: 100,
},
prevCachedResponse: []Extent{
mkExtent(0, 100),
},
expectedUsedCachedEntry: []Extent{
mkExtent(0, 100),
},
expectedCachedResponse: []Response{
mkAPIResponse(0, 100, 10),
},
},

// Test with a complete miss.
{
name: "Test with a complete miss.",
input: &PrometheusRequest{
Start: 0,
End: 100,
},
prevCachedResponse: []Extent{
mkExtent(110, 210),
},
expectedUsedCachedEntry: nil,
expectedRequests: []Request{
&PrometheusRequest{
Start: 0,
End: 100,
}},
expectedCachedResponse: nil,
},

// Test a partial hit.
{
name: "Test a partial hit.",
input: &PrometheusRequest{
Start: 0,
End: 100,
},
prevCachedResponse: []Extent{
mkExtent(50, 100),
},
expectedUsedCachedEntry: []Extent{
mkExtent(50, 100),
},
expectedRequests: []Request{
&PrometheusRequest{
Start: 0,
Expand All @@ -442,9 +450,8 @@ func TestPartition(t *testing.T) {
mkAPIResponse(50, 100, 10),
},
},

// Test multiple partial hits.
{
name: "Test multiple partial hits.",
input: &PrometheusRequest{
Start: 100,
End: 200,
Expand All @@ -453,6 +460,10 @@ func TestPartition(t *testing.T) {
mkExtent(50, 120),
mkExtent(160, 250),
},
expectedUsedCachedEntry: []Extent{
mkExtent(50, 120),
mkExtent(160, 250),
},
expectedRequests: []Request{
&PrometheusRequest{
Start: 120,
Expand All @@ -464,9 +475,8 @@ func TestPartition(t *testing.T) {
mkAPIResponse(160, 200, 10),
},
},

// Partial hits with tiny gap.
{
name: "Partial hits with tiny gap.",
input: &PrometheusRequest{
Start: 100,
End: 160,
Expand All @@ -475,6 +485,9 @@ func TestPartition(t *testing.T) {
mkExtent(50, 120),
mkExtent(122, 130),
},
expectedUsedCachedEntry: []Extent{
mkExtent(50, 120),
},
expectedRequests: []Request{
&PrometheusRequest{
Start: 120,
Expand All @@ -485,24 +498,25 @@ func TestPartition(t *testing.T) {
mkAPIResponse(100, 120, 10),
},
},
// Extent is outside the range and the request has a single step (same start and end).
{
name: "Extent is outside the range and the request has a single step (same start and end).",
input: &PrometheusRequest{
Start: 100,
End: 100,
},
prevCachedResponse: []Extent{
mkExtent(50, 90),
},
expectedUsedCachedEntry: nil,
expectedRequests: []Request{
&PrometheusRequest{
Start: 100,
End: 100,
},
},
},
// Test when hit has a large step and only a single sample extent.
{
name: "Test when hit has a large step and only a single sample extent.",
// If there is a only a single sample in the split interval, start and end will be the same.
input: &PrometheusRequest{
Start: 100,
Expand All @@ -511,24 +525,67 @@ func TestPartition(t *testing.T) {
prevCachedResponse: []Extent{
mkExtent(100, 100),
},
expectedUsedCachedEntry: []Extent{
mkExtent(100, 100),
},
expectedCachedResponse: []Response{
mkAPIResponse(100, 105, 10),
},
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s := resultsCache{
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
}
reqs, resps, err := s.partition(tc.input, tc.prevCachedResponse)
reqs, resps, usedCachedEntry, err := s.partition(tc.input, tc.prevCachedResponse)
require.Nil(t, err)
require.Equal(t, tc.expectedRequests, reqs)
require.Equal(t, tc.expectedCachedResponse, resps)
require.Equal(t, tc.expectedUsedCachedEntry, usedCachedEntry)
})
}
}

func TestCacheHitShouldNotDuplicateSamplesForTinyRequest(t *testing.T) {
s := resultsCache{
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
limits: mockLimits{},
merger: PrometheusCodec,
next: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
return parsedResponse, nil
}),
}

request := &PrometheusRequest{
Start: 0,
End: 5,
}

ctx := user.InjectOrgID(context.Background(), "1")

parsedResponseAsAny, err := types.MarshalAny(parsedResponse)
require.NoError(t, err)

// current cached extents is smaller than minCacheExtent, so it should not be used and should
// be dropped from the cache.
currentlyCachedExtents := []Extent{{
Start: request.Start,
End: request.End + 1,
Response: parsedResponseAsAny,
}}
_, updatedExtents, err := s.handleHit(ctx, request, currentlyCachedExtents, 0)
require.NoError(t, err)

expectedExtents := []Extent{{
Start: request.Start,
End: request.End,
Response: parsedResponseAsAny,
}}
require.Equal(t, expectedExtents, updatedExtents)
}

func TestResultsCache(t *testing.T) {
calls := 0
cfg := ResultsCacheConfig{
Expand Down