Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where cached entry size keeps increasing when making tiny query repeatedly #3968

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Querier: streamline tracing spans. #3924
* [BUGFIX] Query-frontend: Fix issue where cached entry size keeps increasing when making tiny query repeatedly. #3968

## 1.8.0 in progress

Expand Down
24 changes: 22 additions & 2 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,15 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
// We need to make sure we don't repeat samples. This causes some visualisations to be broken in Grafana.
// The prometheus API is inclusive of start and end timestamps.
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
if existing.Samples[len(existing.Samples)-1].TimestampMs == stream.Samples[0].TimestampMs {
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
if existingEndTs == stream.Samples[0].TimestampMs {
// Typically this the cases where only 1 sample point overlap,
// so optimize with simple code.
stream.Samples = stream.Samples[1:]
}
} else if existingEndTs > stream.Samples[0].TimestampMs {
// Overlap might be big, use heavier algorithm to remove overlap.
stream.Samples = chopOffOverlapPortion(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
existing.Samples = append(existing.Samples, stream.Samples...)
output[metric] = existing
Expand All @@ -372,6 +378,20 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
return result
}

func chopOffOverlapPortion(samples []cortexpb.Sample, choppingPointTs int64) []cortexpb.Sample {

// assuming stream.Samples is sorted by by timestamp in ascending order.
searchResult := sort.Search(len(samples), func(i int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should never happen with the current logic but, to be on the safe side, I would handle the case sort.Search() returns -1 (in this cause I believe we should the input samples).

Copy link
Contributor Author

@alvinlin123 alvinlin123 Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, better to be more defensive. But I took a look at the doc https://golang.org/pkg/sort/#Search sort.Search() never returns -1 it returns [0, n], where if return value is n it means not found.

func main() {

	x := []int{3, 5, 6, 7}
	
        // both prints "4"
	fmt.Println(sort.Search(len(x), func(i int) bool {return x[i] > 100}))  
	fmt.Println(sort.Search(len(x), func(i int) bool {return x[i] < 0})) 
}

https://play.golang.org/p/HBtNWh3_MsR

But I think I get your point, I will add special handling if minTs is smaller than samples[0].

return samples[i].TimestampMs > choppingPointTs
})

if searchResult < len(samples) {
return samples[searchResult:]
}

return nil
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
Expand Down
67 changes: 59 additions & 8 deletions pkg/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ func TestResponse(t *testing.T) {
}

func TestMergeAPIResponses(t *testing.T) {
for i, tc := range []struct {
for _, tc := range []struct {
name string
input []Response
expected Response
}{
// No responses shouldn't panic and return a non-null result and result type.
{
name: "No responses shouldn't panic and return a non-null result and result type.",
input: []Response{},
expected: &PrometheusResponse{
Status: StatusSuccess,
Expand All @@ -125,8 +126,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// A single empty response shouldn't panic.
{
name: "A single empty response shouldn't panic.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand All @@ -144,8 +145,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Multiple empty responses shouldn't panic.
{
name: "Multiple empty responses shouldn't panic.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand All @@ -169,8 +170,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Basic merging of two responses.
{
name: "Basic merging of two responses.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
Expand Down Expand Up @@ -220,8 +221,8 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

// Merging of responses when labels are in different order.
{
name: "Merging of responses when labels are in different order.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[0,"0"],[1,"1"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
Expand All @@ -244,8 +245,9 @@ func TestMergeAPIResponses(t *testing.T) {
},
},
},
// Merging of samples where there is overlap.

{
name: "Merging of samples where there is single overlap.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
Expand All @@ -266,8 +268,57 @@ func TestMergeAPIResponses(t *testing.T) {
},
},
},
},
{
name: "Merging of samples where there is multiple partial overlaps.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
},
expected: &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: matrix,
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
{Value: 2, TimestampMs: 2000},
{Value: 3, TimestampMs: 3000},
{Value: 4, TimestampMs: 4000},
{Value: 5, TimestampMs: 5000},
},
},
},
},
},
},
{
name: "Merging of samples where there is complete overlap.",
input: []Response{
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}]}}`),
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
},
expected: &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: matrix,
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
Samples: []cortexpb.Sample{
{Value: 2, TimestampMs: 2000},
{Value: 3, TimestampMs: 3000},
{Value: 4, TimestampMs: 4000},
{Value: 5, TimestampMs: 5000},
},
},
},
},
},
}} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
Expand Down
11 changes: 8 additions & 3 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return extents[i].Start < extents[j].Start
})

// Merge any extents - they're guaranteed not to overlap.
// Merge any extents - potentially overlapping
accumulator, err := newAccumulator(extents[0])
if err != nil {
return nil, nil, err
Expand All @@ -408,6 +408,10 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
continue
}

if accumulator.End >= extents[i].End {
continue
}

accumulator.TraceId = jaegerTraceID(ctx)
accumulator.End = extents[i].End
currentRes, err := extents[i].toResponse()
Expand Down Expand Up @@ -485,12 +489,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
continue
}

// If this extent is tiny, discard it: more efficient to do a few larger queries.
// If this extent is tiny and request is not tiny, discard it: more efficient to do a few larger queries.
// Hopefully tiny request can make tiny extent into not-so-tiny extent.

// However if the step is large enough, the split_query_by_interval middleware would generate a query with same start and end.
// For example, if the step size is more than 12h and the interval is 24h.
// This means the extent's start and end time would be same, even if the timerange covers several hours.
if (req.GetStart() != req.GetEnd()) && (extent.End-extent.Start < s.minCacheExtent) {
if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) {
continue
}

Expand Down
Loading