Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where cached entry size keeps increasing when making tiny query repeatedly #3968

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [BUGFIX] Querier: streamline tracing spans. #3924
* [BUGFIX] Ruler Storage: ignore objects with empty namespace or group in the name. #3999
* [BUGFIX] Distributor: fix issue causing distributors to not extend the replication set because of failing instances when zone-aware replication is enabled. #3977
* [BUGFIX] Query-frontend: Fix issue where cached entry size keeps increasing when making tiny query repeatedly. #3968

## Blocksconvert

Expand Down
31 changes: 29 additions & 2 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,15 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
// We need to make sure we don't repeat samples. This causes some visualisations to be broken in Grafana.
// The prometheus API is inclusive of start and end timestamps.
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
if existing.Samples[len(existing.Samples)-1].TimestampMs == stream.Samples[0].TimestampMs {
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
if existingEndTs == stream.Samples[0].TimestampMs {
// Typically this the cases where only 1 sample point overlap,
// so optimize with simple code.
stream.Samples = stream.Samples[1:]
}
} else if existingEndTs > stream.Samples[0].TimestampMs {
// Overlap might be big, use heavier algorithm to remove overlap.
stream.Samples = sliceSamples(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
existing.Samples = append(existing.Samples, stream.Samples...)
output[metric] = existing
Expand All @@ -381,6 +387,27 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
return result
}

// sliceSamples assumes given samples are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
//timestamps in samples.
func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {

if len(samples) <= 0 || minTs < samples[0].TimestampMs {
return samples
}

if len(samples) > 0 && minTs > samples[len(samples)-1].TimestampMs {
return samples[len(samples):]
}

searchResult := sort.Search(len(samples), func(i int) bool {
return samples[i].TimestampMs > minTs
})

return samples[searchResult:]
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
Expand Down
67 changes: 59 additions & 8 deletions pkg/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ func TestResponse(t *testing.T) {
}

func TestMergeAPIResponses(t *testing.T) {
for i, tc := range []struct {
for _, tc := range []struct {
name string
input []Response
expected Response
}{
// No responses shouldn't panic and return a non-null result and result type.
{
name: "No responses shouldn't panic and return a non-null result and result type.",
input: []Response{},
expected: &PrometheusResponse{
Status: StatusSuccess,
Expand All @@ -126,8 +127,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// A single empty response shouldn't panic.
{
name: "A single empty response shouldn't panic.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand All @@ -145,8 +146,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Multiple empty responses shouldn't panic.
{
name: "Multiple empty responses shouldn't panic.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand All @@ -170,8 +171,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Basic merging of two responses.
{
name: "Basic merging of two responses.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand Down Expand Up @@ -221,8 +222,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Merging of responses when labels are in different order.
{
name: "Merging of responses when labels are in different order.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[0,"0"],[1,"1"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
Expand All @@ -245,8 +246,9 @@ func TestMergeAPIResponses(t *testing.T) {
},
},
},
// Merging of samples where there is overlap.

{
name: "Merging of samples where there is single overlap.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
Expand All @@ -267,8 +269,57 @@ func TestMergeAPIResponses(t *testing.T) {
},
},
},
},
{
name: "Merging of samples where there is multiple partial overlaps.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
},
expected: &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: matrix,
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
{Value: 2, TimestampMs: 2000},
{Value: 3, TimestampMs: 3000},
{Value: 4, TimestampMs: 4000},
{Value: 5, TimestampMs: 5000},
},
},
},
},
},
},
{
name: "Merging of samples where there is complete overlap.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
},
expected: &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: matrix,
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
Samples: []cortexpb.Sample{
{Value: 2, TimestampMs: 2000},
{Value: 3, TimestampMs: 3000},
{Value: 4, TimestampMs: 4000},
{Value: 5, TimestampMs: 5000},
},
},
},
},
},
}} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
Expand Down
11 changes: 8 additions & 3 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return extents[i].Start < extents[j].Start
})

// Merge any extents - they're guaranteed not to overlap.
// Merge any extents - potentially overlapping
accumulator, err := newAccumulator(extents[0])
if err != nil {
return nil, nil, err
Expand All @@ -408,6 +408,10 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
continue
}

if accumulator.End >= extents[i].End {
continue
}

accumulator.TraceId = jaegerTraceID(ctx)
accumulator.End = extents[i].End
currentRes, err := extents[i].toResponse()
Expand Down Expand Up @@ -485,12 +489,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
continue
}

// If this extent is tiny, discard it: more efficient to do a few larger queries.
// If this extent is tiny and request is not tiny, discard it: more efficient to do a few larger queries.
// Hopefully tiny request can make tiny extent into not-so-tiny extent.

// However if the step is large enough, the split_query_by_interval middleware would generate a query with same start and end.
// For example, if the step size is more than 12h and the interval is 24h.
// This means the extent's start and end time would be same, even if the timerange covers several hours.
if (req.GetStart() != req.GetEnd()) && (extent.End-extent.Start < s.minCacheExtent) {
if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) {
continue
}

Expand Down
Loading