Skip to content

query/querier: fix sum() inflated values problem #1278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 27, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"math"
"math/rand"
"strconv"
"testing"

"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/tsdb/chunkenc"
)
Expand All @@ -38,15 +40,70 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {

}

// Tests E2E how PromQL works with downsampled data.
func TestQuerier_DownsampledData(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
testProxy := &storeServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a"), int(resAggrSum), []sample{{99, 1}, {199, 5}}), // SUM chunk from Store
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b"), int(resAggrSum), []sample{{99, 3}, {199, 8}}), // SUM chunk from Store
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "c"), int(resAggrSum), []sample{{99, 5}, {199, 15}}), // SUM chunk from Store
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "d"), -1, []sample{{22, 5}, {44, 8}, {199, 15}}), // RAW one from Sidecar
},
}

q := NewQueryableCreator(nil, testProxy, "")(false, 9999999, false, nil)

engine := promql.NewEngine(
promql.EngineOpts{
MaxConcurrent: 10,
MaxSamples: math.MaxInt32,
Timeout: 10 * time.Second,
},
)

// Minimal function to parse time.Time.
ptm := func(in string) time.Time {
fl, _ := strconv.ParseFloat(in, 64)
s, ns := math.Modf(fl)
return time.Unix(int64(s), int64(ns*float64(time.Second)))
}

st := ptm("0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be clear in variable names here.

ed := ptm("0.2")
qry, err := engine.NewRangeQuery(
q,
"sum(a)",
st,
ed,
100*time.Millisecond,
)
testutil.Ok(t, err)

res := qry.Exec(context.Background())
testutil.Ok(t, res.Err)
m, err := res.Matrix()
testutil.Ok(t, err)
ser := []promql.Series(m)

testutil.Assert(t, len(ser) == 1, "should return 1 series (got %d)", len(ser))
testutil.Assert(t, ser[0].Points[0].T == 100, "expected first point to be at 100ms (got %v)", ser[0].Points[0].T)
testutil.Assert(t, ser[0].Points[0].V == 22, "expected first point to be 22 (got %v)", ser[0].Points[0].V)

testutil.Assert(t, ser[0].Points[1].T == 200, "expected second point to be at 200ms", ser[0].Points[1].T)
testutil.Assert(t, ser[0].Points[1].V == 43, "expected second point to be 43 (got %v)", ser[0].Points[1].V)

}

func TestQuerier_Series(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

testProxy := &storeServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), -1, []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), -1, []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), -1, []sample{{100, 1}, {300, 3}, {400, 4}}),
},
}

Expand Down Expand Up @@ -401,7 +458,7 @@ func (s *storeServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesS
return nil
}

func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse {
func storeSeriesResponse(t testing.TB, lset labels.Labels, aggr int, smplChunks ...[]sample) *storepb.SeriesResponse {
var s storepb.Series

for _, l := range lset {
Expand All @@ -416,11 +473,20 @@ func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sampl
for _, smpl := range smpls {
a.Append(smpl.t, smpl.v)
}
s.Chunks = append(s.Chunks, storepb.AggrChunk{

ch := storepb.AggrChunk{
MinTime: smpls[0].t,
MaxTime: smpls[len(smpls)-1].t,
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()},
})
}

switch aggr {
case int(resAggrSum):
ch.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}
default:
ch.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}
}

s.Chunks = append(s.Chunks, ch)
}
return storepb.NewSeriesResponse(&s)
}