Skip to content

Commit c00eaf1

Browse files
authored
Fixes @ modifier when splitting queries by time. (#4464)
* Fixes @ modifier when splitting queries by time. This will replace `start` and `end` at (`@`) modifier with the actual constant values based on the original queries. Meaning subqueries will not wrongly use their own query start and end time. Fixes #4463 Signed-off-by: Cyril Tovena <[email protected]> * Update changelog. Signed-off-by: Cyril Tovena <[email protected]>
1 parent 028c4b3 commit c00eaf1

File tree

3 files changed

+95
-10
lines changed

3 files changed

+95
-10
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
4040
* [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #4462
4141
* [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #4462
42+
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464
4243
* [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
4344
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
4445
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335

pkg/querier/queryrange/split_by_interval.go

+37-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/prometheus/client_golang/prometheus"
88
"github.com/prometheus/client_golang/prometheus/promauto"
9+
"github.com/prometheus/prometheus/promql/parser"
910
)
1011

1112
type IntervalFn func(r Request) time.Duration
@@ -40,7 +41,10 @@ type splitByInterval struct {
4041
func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
4142
// First we're going to build new requests, one for each day, taking care
4243
// to line up the boundaries with step.
43-
reqs := splitQuery(r, s.interval(r))
44+
reqs, err := splitQuery(r, s.interval(r))
45+
if err != nil {
46+
return nil, err
47+
}
4448
s.splitByCounter.Add(float64(len(reqs)))
4549

4650
reqResps, err := DoRequests(ctx, s.next, reqs, s.limits)
@@ -60,17 +64,46 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
6064
return response, nil
6165
}
6266

63-
func splitQuery(r Request, interval time.Duration) []Request {
67+
func splitQuery(r Request, interval time.Duration) ([]Request, error) {
68+
// Replace @ modifier function to their respective constant values in the query.
69+
// This way subqueries will be evaluated at the same time as the parent query.
70+
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
71+
if err != nil {
72+
return nil, err
73+
}
6474
var reqs []Request
6575
for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() {
6676
end := nextIntervalBoundary(start, r.GetStep(), interval)
6777
if end+r.GetStep() >= r.GetEnd() {
6878
end = r.GetEnd()
6979
}
7080

71-
reqs = append(reqs, r.WithStartEnd(start, end))
81+
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
7282
}
73-
return reqs
83+
return reqs, nil
84+
}
85+
86+
// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
87+
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
88+
// If the modifier is already a constant, it will be returned as is.
89+
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
90+
expr, err := parser.ParseExpr(query)
91+
if err != nil {
92+
return "", err
93+
}
94+
parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error {
95+
if selector, ok := n.(*parser.VectorSelector); ok {
96+
switch selector.StartOrEnd {
97+
case parser.START:
98+
selector.Timestamp = &start
99+
case parser.END:
100+
selector.Timestamp = &end
101+
}
102+
selector.StartOrEnd = 0
103+
}
104+
return nil
105+
})
106+
return expr.String(), err
74107
}
75108

76109
// Round up to the step before the next interval boundary.

pkg/querier/queryrange/split_by_interval_test.go

+57-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/prometheus/prometheus/promql/parser"
1314
"github.com/stretchr/testify/require"
1415
"github.com/weaveworks/common/middleware"
1516
"github.com/weaveworks/common/user"
@@ -135,20 +136,20 @@ func TestSplitQuery(t *testing.T) {
135136
Start: 0,
136137
End: 2 * 24 * 3600 * seconds,
137138
Step: 15 * seconds,
138-
Query: "foo",
139+
Query: "foo @ start()",
139140
},
140141
expected: []Request{
141142
&PrometheusRequest{
142143
Start: 0,
143144
End: (24 * 3600 * seconds) - (15 * seconds),
144145
Step: 15 * seconds,
145-
Query: "foo",
146+
Query: "foo @ 0.000",
146147
},
147148
&PrometheusRequest{
148149
Start: 24 * 3600 * seconds,
149150
End: 2 * 24 * 3600 * seconds,
150151
Step: 15 * seconds,
151-
Query: "foo",
152+
Query: "foo @ 0.000",
152153
},
153154
},
154155
interval: day,
@@ -236,14 +237,14 @@ func TestSplitQuery(t *testing.T) {
236237
},
237238
} {
238239
t.Run(strconv.Itoa(i), func(t *testing.T) {
239-
days := splitQuery(tc.input, tc.interval)
240+
days, err := splitQuery(tc.input, tc.interval)
241+
require.NoError(t, err)
240242
require.Equal(t, tc.expected, days)
241243
})
242244
}
243245
}
244246

245247
func TestSplitByDay(t *testing.T) {
246-
247248
mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse)
248249
require.NoError(t, err)
249250

@@ -260,7 +261,6 @@ func TestSplitByDay(t *testing.T) {
260261
{query, string(mergedHTTPResponseBody), 2},
261262
} {
262263
t.Run(strconv.Itoa(i), func(t *testing.T) {
263-
264264
var actualCount atomic.Int32
265265
s := httptest.NewServer(
266266
middleware.AuthenticateUser.Wrap(
@@ -298,3 +298,54 @@ func TestSplitByDay(t *testing.T) {
298298
})
299299
}
300300
}
301+
302+
func Test_evaluateAtModifier(t *testing.T) {
303+
const (
304+
start, end = int64(1546300800), int64(1646300800)
305+
)
306+
for _, tt := range []struct {
307+
in, expected string
308+
}{
309+
{"topk(5, rate(http_requests_total[1h] @ start()))", "topk(5, rate(http_requests_total[1h] @ 1546300.800))"},
310+
{"topk(5, rate(http_requests_total[1h] @ 0))", "topk(5, rate(http_requests_total[1h] @ 0.000))"},
311+
{"http_requests_total[1h] @ 10.001", "http_requests_total[1h] @ 10.001"},
312+
{
313+
`min_over_time(
314+
sum by(cluster) (
315+
rate(http_requests_total[5m] @ end())
316+
)[10m:]
317+
)
318+
or
319+
max_over_time(
320+
stddev_over_time(
321+
deriv(
322+
rate(http_requests_total[10m] @ start())
323+
[5m:1m])
324+
[2m:])
325+
[10m:])`,
326+
`min_over_time(
327+
sum by(cluster) (
328+
rate(http_requests_total[5m] @ 1646300.800)
329+
)[10m:]
330+
)
331+
or
332+
max_over_time(
333+
stddev_over_time(
334+
deriv(
335+
rate(http_requests_total[10m] @ 1546300.800)
336+
[5m:1m])
337+
[2m:])
338+
[10m:])`,
339+
},
340+
} {
341+
tt := tt
342+
t.Run(tt.in, func(t *testing.T) {
343+
t.Parallel()
344+
expectedExpr, err := parser.ParseExpr(tt.expected)
345+
require.NoError(t, err)
346+
out, err := evaluateAtModifierFunction(tt.in, start, end)
347+
require.NoError(t, err)
348+
require.Equal(t, expectedExpr.String(), out)
349+
})
350+
}
351+
}

0 commit comments

Comments
 (0)