Skip to content

Commit bbdfc73

Browse files
committed
Add --query-range.request-downsampled flag to Query Frontend (thanos-io#2641)
Signed-off-by: Vladimir Kononov <[email protected]>
1 parent c7a171f commit bbdfc73

File tree

6 files changed

+119
-1
lines changed

6 files changed

+119
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
1515
### Added
1616

1717
- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks
18+
- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request.
1819

1920
### Changed
2021

cmd/thanos/query_frontend.go

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ func registerQueryFrontend(app *extkingpin.App) {
6565
cmd.Flag("query-range.align-range-with-step", "Mutate incoming queries to align their start and end with their step for better cache-ability. Note: Grafana dashboards do that by default.").
6666
Default("true").BoolVar(&cfg.QueryRangeConfig.AlignRangeWithStep)
6767

68+
cmd.Flag("query-range.request-downsampled", "Make additional query for downsampled data in case of empty or incomplete response to range request.").
69+
Default("true").BoolVar(&cfg.QueryRangeConfig.RequestDownsampled)
70+
6871
cmd.Flag("query-range.split-interval", "Split query range requests by an interval and execute in parallel, it should be greater than 0 when query-range.response-cache-config is configured.").
6972
Default("24h").DurationVar(&cfg.QueryRangeConfig.SplitQueriesByInterval)
7073

docs/components/query-frontend.md

+4
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ Flags:
147147
and end with their step for better
148148
cache-ability. Note: Grafana dashboards do that
149149
by default.
150+
--query-range.request-downsampled
151+
Make additional query for downsampled data in
152+
case of empty or incomplete response to range
153+
request.
150154
--query-range.split-interval=24h
151155
Split query range requests by an interval and
152156
execute in parallel, it should be greater than

pkg/queryfrontend/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ type QueryRangeConfig struct {
162162
CachePathOrContent extflag.PathOrContent
163163

164164
AlignRangeWithStep bool
165+
RequestDownsampled bool
165166
SplitQueriesByInterval time.Duration
166167
MaxRetries int
167168
Limits *cortexvalidation.Limits

pkg/queryfrontend/downsampled.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package queryfrontend
2+
3+
import (
4+
"context"
5+
"github.com/cortexproject/cortex/pkg/querier/queryrange"
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
"github.com/thanos-io/thanos/pkg/compact/downsample"
9+
)
10+
11+
// DownsampledMiddleware creates a new Middleware that requests downsampled data
12+
// should response to original request with auto max_source_resolution not contain data points.
13+
func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
14+
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
15+
return downsampled{
16+
next: next,
17+
merger: merger,
18+
extraCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
19+
Namespace: "thanos",
20+
Name: "frontend_downsampled_extra_queries_total",
21+
Help: "Total number of extra queries for downsampled data",
22+
}),
23+
}
24+
})
25+
}
26+
27+
type downsampled struct {
28+
next queryrange.Handler
29+
merger queryrange.Merger
30+
31+
// Metrics.
32+
extraCounter prometheus.Counter
33+
}
34+
35+
var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2}
36+
37+
func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
38+
tqrr, ok := req.(*ThanosQueryRangeRequest)
39+
if !ok || !tqrr.AutoDownsampling {
40+
return d.next.Do(ctx, req)
41+
}
42+
43+
var (
44+
resps = make([]queryrange.Response, 0)
45+
resp queryrange.Response
46+
err error
47+
i int
48+
)
49+
50+
forLoop:
51+
for {
52+
if i >= len(resolutions) {
53+
break
54+
} else if i > 0 {
55+
d.extraCounter.Inc()
56+
}
57+
r := *tqrr
58+
resp, err = d.next.Do(ctx, &r)
59+
if err != nil {
60+
return nil, err
61+
}
62+
resps = append(resps, resp)
63+
// Set MaxSourceResolution for next request, if any.
64+
for i < len(resolutions) {
65+
if tqrr.MaxSourceResolution < resolutions[i] {
66+
tqrr.AutoDownsampling = false
67+
tqrr.MaxSourceResolution = resolutions[i]
68+
break
69+
}
70+
i++
71+
}
72+
m := minResponseTime(resp)
73+
switch m {
74+
case tqrr.Start: // Response not impacted by retention policy.
75+
break forLoop
76+
case -1: // Empty response, retry with higher MaxSourceResolution.
77+
continue
78+
default: // Data partially present, query for empty part with higher MaxSourceResolution.
79+
tqrr.End = m - tqrr.Step
80+
}
81+
if tqrr.Start > tqrr.End {
82+
break forLoop
83+
}
84+
}
85+
response, err := d.merger.MergeResponse(resps...)
86+
if err != nil {
87+
return nil, err
88+
}
89+
return response, nil
90+
}
91+
92+
func minResponseTime(r queryrange.Response) int64 {
93+
var res = r.(*queryrange.PrometheusResponse).Data.Result
94+
if len(res) == 0 {
95+
return -1
96+
}
97+
if len(res[0].Samples) == 0 {
98+
return -1
99+
}
100+
return res[0].Samples[0].TimestampMs
101+
}

pkg/queryfrontend/roundtrip.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func getOperation(r *http.Request) string {
131131
}
132132

133133
// newQueryRangeTripperware returns a Tripperware for range queries configured with middlewares of
134-
// limit, step align, split by interval, cache requests and retry.
134+
// limit, step align, downsampled, split by interval, cache requests and retry.
135135
func newQueryRangeTripperware(
136136
config QueryRangeConfig,
137137
limits queryrange.Limits,
@@ -151,6 +151,14 @@ func newQueryRangeTripperware(
151151
)
152152
}
153153

154+
if config.RequestDownsampled {
155+
queryRangeMiddleware = append(
156+
queryRangeMiddleware,
157+
queryrange.InstrumentMiddleware("downsampled", m),
158+
DownsampledMiddleware(codec, reg),
159+
)
160+
}
161+
154162
queryIntervalFn := func(_ queryrange.Request) time.Duration {
155163
return config.SplitQueriesByInterval
156164
}

0 commit comments

Comments
 (0)