Skip to content

Commit 87e84da

Browse files
committed
Add --query-range.request-downsampled flag to Query Frontend (thanos-io#2641)
Signed-off-by: Vladimir Kononov <[email protected]>
1 parent c7a171f commit 87e84da

File tree

6 files changed

+121
-1
lines changed

6 files changed

+121
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
1515
### Added
1616

1717
- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks
18+
- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request.
1819

1920
### Changed
2021

cmd/thanos/query_frontend.go

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ func registerQueryFrontend(app *extkingpin.App) {
6565
cmd.Flag("query-range.align-range-with-step", "Mutate incoming queries to align their start and end with their step for better cache-ability. Note: Grafana dashboards do that by default.").
6666
Default("true").BoolVar(&cfg.QueryRangeConfig.AlignRangeWithStep)
6767

68+
cmd.Flag("query-range.request-downsampled", "Make additional query for downsampled data in case of empty or incomplete response to range request.").
69+
Default("true").BoolVar(&cfg.QueryRangeConfig.RequestDownsampled)
70+
6871
cmd.Flag("query-range.split-interval", "Split query range requests by an interval and execute in parallel, it should be greater than 0 when query-range.response-cache-config is configured.").
6972
Default("24h").DurationVar(&cfg.QueryRangeConfig.SplitQueriesByInterval)
7073

docs/components/query-frontend.md

+4
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ Flags:
147147
and end with their step for better
148148
cache-ability. Note: Grafana dashboards do that
149149
by default.
150+
--query-range.request-downsampled
151+
Make additional query for downsampled data in
152+
case of empty or incomplete response to range
153+
request.
150154
--query-range.split-interval=24h
151155
Split query range requests by an interval and
152156
execute in parallel, it should be greater than

pkg/queryfrontend/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ type QueryRangeConfig struct {
162162
CachePathOrContent extflag.PathOrContent
163163

164164
AlignRangeWithStep bool
165+
RequestDownsampled bool
165166
SplitQueriesByInterval time.Duration
166167
MaxRetries int
167168
Limits *cortexvalidation.Limits

pkg/queryfrontend/downsampled.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package queryfrontend
5+
6+
import (
7+
"context"
8+
9+
"github.com/cortexproject/cortex/pkg/querier/queryrange"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promauto"
12+
"github.com/thanos-io/thanos/pkg/compact/downsample"
13+
)
14+
15+
// DownsampledMiddleware creates a new Middleware that requests downsampled data
16+
// should response to original request with auto max_source_resolution not contain data points.
17+
func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
18+
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
19+
return downsampled{
20+
next: next,
21+
merger: merger,
22+
extraCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
23+
Namespace: "thanos",
24+
Name: "frontend_downsampled_extra_queries_total",
25+
Help: "Total number of extra queries for downsampled data",
26+
}),
27+
}
28+
})
29+
}
30+
31+
type downsampled struct {
32+
next queryrange.Handler
33+
merger queryrange.Merger
34+
35+
// Metrics.
36+
extraCounter prometheus.Counter
37+
}
38+
39+
var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2}
40+
41+
func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
42+
tqrr, ok := req.(*ThanosQueryRangeRequest)
43+
if !ok || !tqrr.AutoDownsampling {
44+
return d.next.Do(ctx, req)
45+
}
46+
47+
var (
48+
resps = make([]queryrange.Response, 0)
49+
resp queryrange.Response
50+
err error
51+
i int
52+
)
53+
54+
forLoop:
55+
for i < len(resolutions) {
56+
if i > 0 {
57+
d.extraCounter.Inc()
58+
}
59+
r := *tqrr
60+
resp, err = d.next.Do(ctx, &r)
61+
if err != nil {
62+
return nil, err
63+
}
64+
resps = append(resps, resp)
65+
// Set MaxSourceResolution for next request, if any.
66+
for i < len(resolutions) {
67+
if tqrr.MaxSourceResolution < resolutions[i] {
68+
tqrr.AutoDownsampling = false
69+
tqrr.MaxSourceResolution = resolutions[i]
70+
break
71+
}
72+
i++
73+
}
74+
m := minResponseTime(resp)
75+
switch m {
76+
case tqrr.Start: // Response not impacted by retention policy.
77+
break forLoop
78+
case -1: // Empty response, retry with higher MaxSourceResolution.
79+
continue
80+
default: // Data partially present, query for empty part with higher MaxSourceResolution.
81+
tqrr.End = m - tqrr.Step
82+
}
83+
if tqrr.Start > tqrr.End {
84+
break forLoop
85+
}
86+
}
87+
response, err := d.merger.MergeResponse(resps...)
88+
if err != nil {
89+
return nil, err
90+
}
91+
return response, nil
92+
}
93+
94+
func minResponseTime(r queryrange.Response) int64 {
95+
var res = r.(*queryrange.PrometheusResponse).Data.Result
96+
if len(res) == 0 {
97+
return -1
98+
}
99+
if len(res[0].Samples) == 0 {
100+
return -1
101+
}
102+
return res[0].Samples[0].TimestampMs
103+
}

pkg/queryfrontend/roundtrip.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func getOperation(r *http.Request) string {
131131
}
132132

133133
// newQueryRangeTripperware returns a Tripperware for range queries configured with middlewares of
134-
// limit, step align, split by interval, cache requests and retry.
134+
// limit, step align, downsampled, split by interval, cache requests and retry.
135135
func newQueryRangeTripperware(
136136
config QueryRangeConfig,
137137
limits queryrange.Limits,
@@ -151,6 +151,14 @@ func newQueryRangeTripperware(
151151
)
152152
}
153153

154+
if config.RequestDownsampled {
155+
queryRangeMiddleware = append(
156+
queryRangeMiddleware,
157+
queryrange.InstrumentMiddleware("downsampled", m),
158+
DownsampledMiddleware(codec, reg),
159+
)
160+
}
161+
154162
queryIntervalFn := func(_ queryrange.Request) time.Duration {
155163
return config.SplitQueriesByInterval
156164
}

0 commit comments

Comments
 (0)