Skip to content

Commit 98af28d

Browse files
committed
Ok, another approach without using temporary slice
Signed-off-by: Alvin Lin <[email protected]>
1 parent 4a9d8b0 commit 98af28d

File tree

4 files changed

+98
-51
lines changed

4 files changed

+98
-51
lines changed

pkg/querier/queryrange/query_range.go

+18
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,11 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
350350
// The prometheus API is inclusive of start and end timestamps.
351351
if len(existing.Samples) > 0 && len(stream.Samples) > 0 {
352352
if existing.Samples[len(existing.Samples)-1].TimestampMs == stream.Samples[0].TimestampMs {
353+
// Typically this the cases, so optimize by not doing full blown search.
353354
stream.Samples = stream.Samples[1:]
355+
} else {
356+
existingEndTs := existing.Samples[len(existing.Samples)-1].TimestampMs
357+
stream.Samples = searchFirstBiggerTimestamp(existingEndTs, stream.Samples)
354358
}
355359
}
356360
existing.Samples = append(existing.Samples, stream.Samples...)
@@ -372,6 +376,20 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
372376
return result
373377
}
374378

379+
func searchFirstBiggerTimestamp(targetTs int64, samples []cortexpb.Sample) []cortexpb.Sample {
380+
381+
// assuming stream.Samples is sorted by by timestamp in ascending order.
382+
searchResult := sort.Search(len(samples), func(i int) bool {
383+
return samples[i].TimestampMs > targetTs
384+
})
385+
386+
if searchResult < len(samples) {
387+
return samples[searchResult:]
388+
}
389+
390+
return nil
391+
}
392+
375393
func parseDurationMs(s string) (int64, error) {
376394
if d, err := strconv.ParseFloat(s, 64); err == nil {
377395
ts := d * float64(time.Second/time.Millisecond)

pkg/querier/queryrange/query_range_test.go

+59-8
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,13 @@ func TestResponse(t *testing.T) {
109109
}
110110

111111
func TestMergeAPIResponses(t *testing.T) {
112-
for i, tc := range []struct {
112+
for _, tc := range []struct {
113+
name string
113114
input []Response
114115
expected Response
115116
}{
116-
// No responses shouldn't panic and return a non-null result and result type.
117117
{
118+
name: "No responses shouldn't panic and return a non-null result and result type.",
118119
input: []Response{},
119120
expected: &PrometheusResponse{
120121
Status: StatusSuccess,
@@ -125,8 +126,8 @@ func TestMergeAPIResponses(t *testing.T) {
125126
},
126127
},
127128

128-
// A single empty response shouldn't panic.
129129
{
130+
name: "A single empty response shouldn't panic.",
130131
input: []Response{
131132
&PrometheusResponse{
132133
Data: PrometheusData{
@@ -144,8 +145,8 @@ func TestMergeAPIResponses(t *testing.T) {
144145
},
145146
},
146147

147-
// Multiple empty responses shouldn't panic.
148148
{
149+
name: "Multiple empty responses shouldn't panic.",
149150
input: []Response{
150151
&PrometheusResponse{
151152
Data: PrometheusData{
@@ -169,8 +170,8 @@ func TestMergeAPIResponses(t *testing.T) {
169170
},
170171
},
171172

172-
// Basic merging of two responses.
173173
{
174+
name: "Basic merging of two responses.",
174175
input: []Response{
175176
&PrometheusResponse{
176177
Data: PrometheusData{
@@ -220,8 +221,8 @@ func TestMergeAPIResponses(t *testing.T) {
220221
},
221222
},
222223

223-
// Merging of responses when labels are in different order.
224224
{
225+
name: "Merging of responses when labels are in different order.",
225226
input: []Response{
226227
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[0,"0"],[1,"1"]]}]}}`),
227228
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
@@ -244,8 +245,9 @@ func TestMergeAPIResponses(t *testing.T) {
244245
},
245246
},
246247
},
247-
// Merging of samples where there is overlap.
248+
248249
{
250+
name: "Merging of samples where there is single overlap.",
249251
input: []Response{
250252
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"]]}]}}`),
251253
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"]]}]}}`),
@@ -266,8 +268,57 @@ func TestMergeAPIResponses(t *testing.T) {
266268
},
267269
},
268270
},
271+
},
272+
{
273+
name: "Merging of samples where there is multiple partial overlaps.",
274+
input: []Response{
275+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`),
276+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
277+
},
278+
expected: &PrometheusResponse{
279+
Status: StatusSuccess,
280+
Data: PrometheusData{
281+
ResultType: matrix,
282+
Result: []SampleStream{
283+
{
284+
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
285+
Samples: []cortexpb.Sample{
286+
{Value: 1, TimestampMs: 1000},
287+
{Value: 2, TimestampMs: 2000},
288+
{Value: 3, TimestampMs: 3000},
289+
{Value: 4, TimestampMs: 4000},
290+
{Value: 5, TimestampMs: 5000},
291+
},
292+
},
293+
},
294+
},
295+
},
296+
},
297+
{
298+
name: "Merging of samples where there is complete overlap.",
299+
input: []Response{
300+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}]}}`),
301+
mustParse(t, `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"c":"d","a":"b"},"values":[[2,"2"],[3,"3"],[4,"4"],[5,"5"]]}]}}`),
302+
},
303+
expected: &PrometheusResponse{
304+
Status: StatusSuccess,
305+
Data: PrometheusData{
306+
ResultType: matrix,
307+
Result: []SampleStream{
308+
{
309+
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
310+
Samples: []cortexpb.Sample{
311+
{Value: 2, TimestampMs: 2000},
312+
{Value: 3, TimestampMs: 3000},
313+
{Value: 4, TimestampMs: 4000},
314+
{Value: 5, TimestampMs: 5000},
315+
},
316+
},
317+
},
318+
},
319+
},
269320
}} {
270-
t.Run(strconv.Itoa(i), func(t *testing.T) {
321+
t.Run(tc.name, func(t *testing.T) {
271322
output, err := PrometheusCodec.MergeResponse(tc.input...)
272323
require.NoError(t, err)
273324
require.Equal(t, tc.expected, output)

pkg/querier/queryrange/results_cache.go

+20-21
Original file line numberDiff line numberDiff line change
@@ -350,15 +350,15 @@ func (s resultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime in
350350
return response, extents, nil
351351
}
352352

353-
func (s resultsCache) handleHit(ctx context.Context, r Request, cachedExtents []Extent, maxCacheTime int64) (Response, []Extent, error) {
353+
func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent, maxCacheTime int64) (Response, []Extent, error) {
354354
var (
355355
reqResps []RequestResponse
356356
err error
357357
)
358358
log, ctx := spanlogger.New(ctx, "handleHit")
359359
defer log.Finish()
360360

361-
requests, responses, updatedExtents, err := s.partition(r, cachedExtents)
361+
requests, responses, err := s.partition(r, extents)
362362
if err != nil {
363363
return nil, nil, err
364364
}
@@ -382,35 +382,39 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, cachedExtents []
382382
if err != nil {
383383
return nil, nil, err
384384
}
385-
updatedExtents = append(updatedExtents, extent)
385+
extents = append(extents, extent)
386386
}
387-
sort.Slice(updatedExtents, func(i, j int) bool {
388-
return updatedExtents[i].Start < updatedExtents[j].Start
387+
sort.Slice(extents, func(i, j int) bool {
388+
return extents[i].Start < extents[j].Start
389389
})
390390

391-
// Merge any extents - they're guaranteed not to overlap.
392-
accumulator, err := newAccumulator(updatedExtents[0])
391+
// Merge any extents - potentially overlapping
392+
accumulator, err := newAccumulator(extents[0])
393393
if err != nil {
394394
return nil, nil, err
395395
}
396-
mergedExtents := make([]Extent, 0, len(updatedExtents))
396+
mergedExtents := make([]Extent, 0, len(extents))
397397

398-
for i := 1; i < len(updatedExtents); i++ {
399-
if accumulator.End+r.GetStep() < updatedExtents[i].Start {
398+
for i := 1; i < len(extents); i++ {
399+
if accumulator.End+r.GetStep() < extents[i].Start {
400400
mergedExtents, err = merge(mergedExtents, accumulator)
401401
if err != nil {
402402
return nil, nil, err
403403
}
404-
accumulator, err = newAccumulator(updatedExtents[i])
404+
accumulator, err = newAccumulator(extents[i])
405405
if err != nil {
406406
return nil, nil, err
407407
}
408408
continue
409409
}
410410

411+
if accumulator.End > extents[i].End {
412+
continue
413+
}
414+
411415
accumulator.TraceId = jaegerTraceID(ctx)
412-
accumulator.End = updatedExtents[i].End
413-
currentRes, err := updatedExtents[i].toResponse()
416+
accumulator.End = extents[i].End
417+
currentRes, err := extents[i].toResponse()
414418
if err != nil {
415419
return nil, nil, err
416420
}
@@ -474,16 +478,14 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) {
474478

475479
// partition calculates the required requests to satisfy req given the cached data.
476480
// extents must be in order by start time.
477-
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, []Extent, error) {
481+
func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, error) {
478482
var requests []Request
479483
var cachedResponses []Response
480484
start := req.GetStart()
481-
updatedExtents := make([]Extent, 0, len(extents))
482485

483486
for _, extent := range extents {
484487
// If there is no overlap, ignore this extent.
485488
if extent.GetEnd() < start || extent.Start > req.GetEnd() {
486-
updatedExtents = append(updatedExtents, extent)
487489
continue
488490
}
489491

@@ -494,8 +496,6 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
494496
// For example, if the step size is more than 12h and the interval is 24h.
495497
// This means the extent's start and end time would be same, even if the timerange covers several hours.
496498
if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) {
497-
// Not appending this extent to updatedExtents because we want to drop this tiny extents from the
498-
// cache, and replace it with larger extent
499499
continue
500500
}
501501

@@ -506,12 +506,11 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
506506
}
507507
res, err := extent.toResponse()
508508
if err != nil {
509-
return nil, nil, nil, err
509+
return nil, nil, err
510510
}
511511
// extract the overlap from the cached extent.
512512
cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res))
513513
start = extent.End
514-
updatedExtents = append(updatedExtents, extent)
515514
}
516515

517516
// Lastly, make a request for any data missing at the end.
@@ -526,7 +525,7 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res
526525
requests = append(requests, req)
527526
}
528527

529-
return requests, cachedResponses, updatedExtents, nil
528+
return requests, cachedResponses, nil
530529
}
531530

532531
func (s resultsCache) filterRecentExtents(req Request, maxCacheFreshness time.Duration, extents []Extent) ([]Extent, error) {

pkg/querier/queryrange/results_cache_test.go

+1-22
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,6 @@ func TestPartition(t *testing.T) {
407407
prevCachedResponse: []Extent{
408408
mkExtent(0, 100),
409409
},
410-
expectedUpdateCachedEntry: []Extent{
411-
mkExtent(0, 100),
412-
},
413410
expectedCachedResponse: []Response{
414411
mkAPIResponse(0, 100, 10),
415412
},
@@ -432,7 +429,6 @@ func TestPartition(t *testing.T) {
432429
Start: 0,
433430
End: 100,
434431
}},
435-
expectedCachedResponse: nil,
436432
},
437433
{
438434
name: "Test a partial hit.",
@@ -443,9 +439,6 @@ func TestPartition(t *testing.T) {
443439
prevCachedResponse: []Extent{
444440
mkExtent(50, 100),
445441
},
446-
expectedUpdateCachedEntry: []Extent{
447-
mkExtent(50, 100),
448-
},
449442
expectedRequests: []Request{
450443
&PrometheusRequest{
451444
Start: 0,
@@ -466,10 +459,6 @@ func TestPartition(t *testing.T) {
466459
mkExtent(50, 120),
467460
mkExtent(160, 250),
468461
},
469-
expectedUpdateCachedEntry: []Extent{
470-
mkExtent(50, 120),
471-
mkExtent(160, 250),
472-
},
473462
expectedRequests: []Request{
474463
&PrometheusRequest{
475464
Start: 120,
@@ -491,9 +480,6 @@ func TestPartition(t *testing.T) {
491480
mkExtent(50, 120),
492481
mkExtent(122, 130),
493482
},
494-
expectedUpdateCachedEntry: []Extent{
495-
mkExtent(50, 120),
496-
},
497483
expectedRequests: []Request{
498484
&PrometheusRequest{
499485
Start: 120,
@@ -513,9 +499,6 @@ func TestPartition(t *testing.T) {
513499
prevCachedResponse: []Extent{
514500
mkExtent(50, 90),
515501
},
516-
expectedUpdateCachedEntry: []Extent{
517-
mkExtent(50, 90),
518-
},
519502
expectedRequests: []Request{
520503
&PrometheusRequest{
521504
Start: 100,
@@ -533,9 +516,6 @@ func TestPartition(t *testing.T) {
533516
prevCachedResponse: []Extent{
534517
mkExtent(100, 100),
535518
},
536-
expectedUpdateCachedEntry: []Extent{
537-
mkExtent(100, 100),
538-
},
539519
expectedCachedResponse: []Response{
540520
mkAPIResponse(100, 105, 10),
541521
},
@@ -546,11 +526,10 @@ func TestPartition(t *testing.T) {
546526
extractor: PrometheusResponseExtractor{},
547527
minCacheExtent: 10,
548528
}
549-
reqs, resps, usedCachedEntry, err := s.partition(tc.input, tc.prevCachedResponse)
529+
reqs, resps, err := s.partition(tc.input, tc.prevCachedResponse)
550530
require.Nil(t, err)
551531
require.Equal(t, tc.expectedRequests, reqs)
552532
require.Equal(t, tc.expectedCachedResponse, resps)
553-
require.Equal(t, tc.expectedUpdateCachedEntry, usedCachedEntry)
554533
})
555534
}
556535
}

0 commit comments

Comments
 (0)