Skip to content

Commit 3588c55

Browse files
Fix issue where cached entry size keeps increasing when making tiny query repeatedly (#3968)
* Fix issue where cached entry size keeps increasing when making tiny query repeatedly Signed-off-by: Alvin Lin <[email protected]> * Keep the non-overlapping extents and add more tests Signed-off-by: Alvin Lin <[email protected]> * rever .gitignore change Signed-off-by: Alvin Lin <[email protected]> * Re-setup test to be more proper Signed-off-by: Alvin Lin <[email protected]> * small optimization for small queries Signed-off-by: Alvin Lin <[email protected]> * Add comment Signed-off-by: Alvin Lin <[email protected]> * English fix Signed-off-by: Alvin Lin <[email protected]> * Ok, another approach without using temporary slice Signed-off-by: Alvin Lin <[email protected]> * just some refactoring Signed-off-by: Alvin Lin <[email protected]> * Off by one error fix Signed-off-by: Alvin Lin <[email protected]> * Address PR comments Signed-off-by: Alvin Lin <[email protected]> * Update CHANGELOG.md Fix duplicate CHANGELOG.md entry. Signed-off-by: Alvin Lin <[email protected]> * Make comment more precise Signed-off-by: Alvin Lin <[email protected]> * Alwasy return subslice, never new slice Signed-off-by: Alvin Lin <[email protected]> * Update pkg/querier/queryrange/query_range.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent fa5ac42 commit 3588c55

File tree

5 files changed

+295
-27
lines changed

5 files changed

+295
-27
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* [BUGFIX] Querier: streamline tracing spans. #3924
2828
* [BUGFIX] Ruler Storage: ignore objects with empty namespace or group in the name. #3999
2929
* [BUGFIX] Distributor: fix issue causing distributors to not extend the replication set because of failing instances when zone-aware replication is enabled. #3977
30+
* [BUGFIX] Query-frontend: Fix issue where cached entry size keeps increasing when making tiny query repeatedly. #3968
3031

3132
## Blocksconvert
3233

pkg/querier/queryrange/query_range.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,15 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
358358
// We need to make sure we don't repeat samples. This causes some visualisations to be broken in Grafana.
359359
// The prometheus API is inclusive of start and end timestamps.
360360
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
361-
if existing.Samples[len(existing.Samples)-1].TimestampMs == stream.Samples[0].TimestampMs {
361+
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
362+
if existingEndTs == stream.Samples[0].TimestampMs {
363+
// Typically this the cases where only 1 sample point overlap,
364+
// so optimize with simple code.
362365
stream.Samples = stream.Samples[1:]
363-
}
366+
} else if existingEndTs > stream.Samples[0].TimestampMs {
367+
// Overlap might be big, use heavier algorithm to remove overlap.
368+
stream.Samples = sliceSamples(stream.Samples, existingEndTs)
369+
} // else there is no overlap, yay!
364370
}
365371
existing.Samples = append(existing.Samples, stream.Samples...)
366372
output[metric] = existing
@@ -381,6 +387,27 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
381387
return result
382388
}
383389

390+
// sliceSamples assumes given samples are sorted by timestamp in ascending order and
391+
// return a sub slice whose first element's is the smallest timestamp that is strictly
392+
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
393+
// timestamps in samples.
394+
func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
395+
396+
if len(samples) <= 0 || minTs < samples[0].TimestampMs {
397+
return samples
398+
}
399+
400+
if len(samples) > 0 && minTs > samples[len(samples)-1].TimestampMs {
401+
return samples[len(samples):]
402+
}
403+
404+
searchResult := sort.Search(len(samples), func(i int) bool {
405+
return samples[i].TimestampMs > minTs
406+
})
407+
408+
return samples[searchResult:]
409+
}
410+
384411
func parseDurationMs(s string) (int64, error) {
385412
if d, err := strconv.ParseFloat(s, 64); err == nil {
386413
ts := d * float64(time.Second/time.Millisecond)

pkg/querier/queryrange/query_range_test.go

+59-8
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,13 @@ func TestResponse(t *testing.T) {
110110
}
111111

112112
func TestMergeAPIResponses(t *testing.T) {
113-
for i, tc := range []struct {
113+
for _, tc := range []struct {
114+
name string
114115
input []Response
115116
expected Response
116117
}{
117-
// No responses shouldn't panic and return a non-null result and result type.
118118
{
119+
name: "No responses shouldn't panic and return a non-null result and result type.",
119120
input: []Response{},
120121
expected: &PrometheusResponse{
121122
Status: StatusSuccess,
@@ -126,8 +127,8 @@ func TestMergeAPIResponses(t *testing.T) {
126127
},
127128
},
128129

129-
// A single empty response shouldn't panic.
130130
{
131+
name: "A single empty response shouldn't panic.",
131132
input: []Response{
132133
&PrometheusResponse{
133134
Data: PrometheusData{
@@ -145,8 +146,8 @@ func TestMergeAPIResponses(t *testing.T) {
145146
},
146147
},
147148

148-
// Multiple empty responses shouldn't panic.
149149
{
150+
name: "Multiple empty responses shouldn't panic.",
150151
input: []Response{
151152
&PrometheusResponse{
152153
Data: PrometheusData{
@@ -170,8 +171,8 @@ func TestMergeAPIResponses(t *testing.T) {
170171
},
171172
},
172173

173-
// Basic merging of two responses.
174174
{
175+
name: "Basic merging of two responses.",
175176
input: []Response{
176177
&PrometheusResponse{
177178
Data: PrometheusData{
@@ -221,8 +222,8 @@ func TestMergeAPIResponses(t *testing.T) {
221222
},
222223
},
223224

224-
// Merging of responses when labels are in different order.
225225
{
226+
name: "Merging of responses when labels are in different order.",
226227
input: []Response{
227228
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[0,"0"],[1,"1"]]}]}}`),
228229
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
@@ -245,8 +246,9 @@ func TestMergeAPIResponses(t *testing.T) {
245246
},
246247
},
247248
},
248-
// Merging of samples where there is overlap.
249+
249250
{
251+
name: "Merging of samples where there is single overlap.",
250252
input: []Response{
251253
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"]]}]}}`),
252254
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
@@ -267,8 +269,57 @@ func TestMergeAPIResponses(t *testing.T) {
267269
},
268270
},
269271
},
272+
},
273+
{
274+
name: "Merging of samples where there is multiple partial overlaps.",
275+
input: []Response{
276+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`),
277+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
278+
},
279+
expected: &PrometheusResponse{
280+
Status: StatusSuccess,
281+
Data: PrometheusData{
282+
ResultType: matrix,
283+
Result: []SampleStream{
284+
{
285+
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
286+
Samples: []cortexpb.Sample{
287+
{Value: 1, TimestampMs: 1000},
288+
{Value: 2, TimestampMs: 2000},
289+
{Value: 3, TimestampMs: 3000},
290+
{Value: 4, TimestampMs: 4000},
291+
{Value: 5, TimestampMs: 5000},
292+
},
293+
},
294+
},
295+
},
296+
},
297+
},
298+
{
299+
name: "Merging of samples where there is complete overlap.",
300+
input: []Response{
301+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}]}}`),
302+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
303+
},
304+
expected: &PrometheusResponse{
305+
Status: StatusSuccess,
306+
Data: PrometheusData{
307+
ResultType: matrix,
308+
Result: []SampleStream{
309+
{
310+
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
311+
Samples: []cortexpb.Sample{
312+
{Value: 2, TimestampMs: 2000},
313+
{Value: 3, TimestampMs: 3000},
314+
{Value: 4, TimestampMs: 4000},
315+
{Value: 5, TimestampMs: 5000},
316+
},
317+
},
318+
},
319+
},
320+
},
270321
}} {
271-
t.Run(strconv.Itoa(i), func(t *testing.T) {
322+
t.Run(tc.name, func(t *testing.T) {
272323
output, err := PrometheusCodec.MergeResponse(tc.input...)
273324
require.NoError(t, err)
274325
require.Equal(t, tc.expected, output)

pkg/querier/queryrange/results_cache.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
388388
return extents[i].Start < extents[j].Start
389389
})
390390

391-
// Merge any extents - they're guaranteed not to overlap.
391+
// Merge any extents - potentially overlapping
392392
accumulator, err := newAccumulator(extents[0])
393393
if err != nil {
394394
return nil, nil, err
@@ -408,6 +408,10 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
408408
continue
409409
}
410410

411+
if accumulator.End >= extents[i].End {
412+
continue
413+
}
414+
411415
accumulator.TraceId = jaegerTraceID(ctx)
412416
accumulator.End = extents[i].End
413417
currentRes, err := extents[i].toResponse()
@@ -485,12 +489,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
485489
continue
486490
}
487491

488-
// If this extent is tiny, discard it: more efficient to do a few larger queries.
492+
// If this extent is tiny and request is not tiny, discard it: more efficient to do a few larger queries.
493+
// Hopefully tiny request can make tiny extent into not-so-tiny extent.
489494

490495
// However if the step is large enough, the split_query_by_interval middleware would generate a query with same start and end.
491496
// For example, if the step size is more than 12h and the interval is 24h.
492497
// This means the extent's start and end time would be same, even if the timerange covers several hours.
493-
if (req.GetStart() != req.GetEnd()) && (extent.End-extent.Start < s.minCacheExtent) {
498+
if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) {
494499
continue
495500
}
496501

0 commit comments

Comments
 (0)