Skip to content

add context cancellation checks on merging GetLabel slices #5837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808
* [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand Down
10 changes: 8 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,10 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
for i, resp := range resps {
values[i] = resp.([]string)
}
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...)
if err != nil {
return nil, err
}
span.SetTag("result_length", len(r))
return r, nil
}
Expand Down Expand Up @@ -1043,7 +1046,10 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
for i, resp := range resps {
values[i] = resp.([]string)
}
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...)
if err != nil {
return nil, err
}
span.SetTag("result_length", len(r))

return r, nil
Expand Down
35 changes: 25 additions & 10 deletions pkg/util/strings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"context"
"sync"
"unsafe"

Expand Down Expand Up @@ -37,17 +38,18 @@ func StringsClone(s string) string {

// MergeSlicesParallel merge sorted slices in parallel
// using the MergeSortedSlices function
func MergeSlicesParallel(parallelism int, a ...[]string) []string {
func MergeSlicesParallel(ctx context.Context, parallelism int, a ...[]string) ([]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test case that returns an error due to context check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure how to add proper test for context cancellation case.

  1. I could mock and add some wait time on iteration, and cancel context within this time. But adding wait time on tests? I'm not sure.
  2. Cancel context before calling the function? but in this one we are not actually testing the real case.

There should be better way than the above two. I will add second one, but let me know if we have any other option to properly test it.

Copy link
Contributor

@yeya24 yeya24 Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancel context before calling the function? but in this one we are not actually testing the real case.

I was thinking about in the middle of the call. If it is not easy to mock that, It is fine to skip.

if parallelism <= 1 {
return MergeSortedSlices(a...)
return MergeSortedSlices(ctx, a...)
}
if len(a) == 0 {
return nil
return nil, nil
}
if len(a) == 1 {
return a[0]
return a[0], nil
}
c := make(chan []string, len(a))
errCh := make(chan error, 1)
wg := sync.WaitGroup{}
var r [][]string
p := min(parallelism, len(a)/2)
Expand All @@ -57,21 +59,31 @@ func MergeSlicesParallel(parallelism int, a ...[]string) []string {
wg.Add(1)
go func(i int) {
m := min(len(a), i+batchSize)
c <- MergeSortedSlices(a[i:m]...)
r, e := MergeSortedSlices(ctx, a[i:m]...)
if e != nil {
errCh <- e
wg.Done()
return
}
c <- r
wg.Done()
}(i)
}

go func() {
wg.Wait()
close(c)
close(errCh)
}()

if err := <-errCh; err != nil {
return nil, err
}
for s := range c {
r = append(r, s)
}

return MergeSortedSlices(r...)
return MergeSortedSlices(ctx, r...)
}

func NewStringListIter(s []string) *StringListIter {
Expand All @@ -98,9 +110,9 @@ var MAX_STRING = string([]byte{0xff})

// MergeSortedSlices merges a set of sorted string slices into a single ones
// while removing all duplicates.
func MergeSortedSlices(a ...[]string) []string {
func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {
if len(a) == 1 {
return a[0]
return a[0], nil
}
its := make([]*StringListIter, 0, len(a))
sumLengh := 0
Expand All @@ -111,16 +123,19 @@ func MergeSortedSlices(a ...[]string) []string {
lt := loser.New(its, MAX_STRING)

if sumLengh == 0 {
return []string{}
return []string{}, nil
}

r := make([]string, 0, sumLengh*2/10)
var current string
for lt.Next() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if lt.At() != current {
current = lt.At()
r = append(r, current)
}
}
return r
return r, nil
}
5 changes: 4 additions & 1 deletion pkg/util/strings_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"context"
"fmt"
"math/rand"
"sort"
Expand Down Expand Up @@ -96,12 +97,14 @@ func BenchmarkMergeSlicesParallel(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
var r []string
var err error
for i := 0; i < b.N; i++ {
if p == usingMap {
r = sortUsingMap(input...)
require.NotEmpty(b, r)
} else {
r = MergeSlicesParallel(int(p), input...)
r, err = MergeSlicesParallel(context.Background(), int(p), input...)
require.NoError(b, err)
require.NotEmpty(b, r)
}
}
Expand Down
Loading