Skip to content

Commit 351830d

Browse files
committed
Use react-query to query objectives and prometheus service
1 parent 1b11dfb commit 351830d

28 files changed

+3375
-866
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/prometheus/common v0.39.0
1818
github.com/prometheus/prometheus v0.41.0
1919
github.com/stretchr/testify v1.8.1
20+
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
2021
golang.org/x/net v0.4.0
2122
google.golang.org/protobuf v1.28.1
2223
k8s.io/api v0.26.0
@@ -68,7 +69,6 @@ require (
6869
go.uber.org/goleak v1.2.0 // indirect
6970
go.uber.org/multierr v1.6.0 // indirect
7071
go.uber.org/zap v1.24.0 // indirect
71-
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 // indirect
7272
golang.org/x/oauth2 v0.3.0 // indirect
7373
golang.org/x/sys v0.3.0 // indirect
7474
golang.org/x/term v0.3.0 // indirect

go.sum

-40
Large diffs are not rendered by default.

main.go

+79-13
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/go-kit/log/level"
2727
connectprometheus "github.com/polarsignals/connect-go-prometheus"
2828
"github.com/prometheus/client_golang/api"
29-
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
29+
prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
3030
"github.com/prometheus/client_golang/prometheus"
3131
"github.com/prometheus/client_golang/prometheus/collectors"
3232
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -40,6 +40,7 @@ import (
4040

4141
objectivesv1alpha1 "github.com/pyrra-dev/pyrra/proto/objectives/v1alpha1"
4242
"github.com/pyrra-dev/pyrra/proto/objectives/v1alpha1/objectivesv1alpha1connect"
43+
"github.com/pyrra-dev/pyrra/proto/prometheus/v1/prometheusv1connect"
4344
"github.com/pyrra-dev/pyrra/slo"
4445
)
4546

@@ -185,7 +186,7 @@ func cmdAPI(logger log.Logger, reg *prometheus.Registry, promClient api.Client,
185186
defer cache.Close()
186187
promAPI := &promCache{
187188
api: &promLogger{
188-
api: prometheusv1.NewAPI(promClient),
189+
api: prometheusapiv1.NewAPI(promClient),
189190
logger: logger,
190191
},
191192
cache: cache,
@@ -204,22 +205,32 @@ func cmdAPI(logger log.Logger, reg *prometheus.Registry, promClient api.Client,
204205

205206
r.Route(routePrefix, func(r chi.Router) {
206207
objectiveService := &objectiveServer{
207-
logger: logger,
208+
logger: log.WithPrefix(logger, "service", "objective"),
208209
promAPI: promAPI,
209210
client: objectivesv1alpha1connect.NewObjectiveBackendServiceClient(
210211
http.DefaultClient,
211212
apiURL.String(),
212213
connect.WithInterceptors(prometheusInterceptor),
213214
),
214215
}
216+
215217
objectivePath, objectiveHandler := objectivesv1alpha1connect.NewObjectiveServiceHandler(
216218
objectiveService,
217219
connect.WithInterceptors(prometheusInterceptor),
218220
)
221+
222+
prometheusService := &prometheusServer{
223+
logger: log.WithPrefix(logger, "service", "prometheus"),
224+
promAPI: promAPI,
225+
}
226+
prometheusPath, prometheusHandler := prometheusv1connect.NewPrometheusServiceHandler(prometheusService)
227+
219228
if routePrefix != "/" {
220229
r.Mount(objectivePath, http.StripPrefix(routePrefix, objectiveHandler))
230+
r.Mount(prometheusPath, http.StripPrefix(routePrefix, prometheusHandler))
221231
} else {
222232
r.Mount(objectivePath, objectiveHandler)
233+
r.Mount(prometheusPath, prometheusHandler)
223234
}
224235

225236
r.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
@@ -333,17 +344,17 @@ func (c *thanosClient) Do(ctx context.Context, r *http.Request) (*http.Response,
333344

334345
type prometheusAPI interface {
335346
// Query performs a query for the given time.
336-
Query(ctx context.Context, query string, ts time.Time, opts ...prometheusv1.Option) (model.Value, prometheusv1.Warnings, error)
347+
Query(ctx context.Context, query string, ts time.Time, opts ...prometheusapiv1.Option) (model.Value, prometheusapiv1.Warnings, error)
337348
// QueryRange performs a query for the given range.
338-
QueryRange(ctx context.Context, query string, r prometheusv1.Range, opts ...prometheusv1.Option) (model.Value, prometheusv1.Warnings, error)
349+
QueryRange(ctx context.Context, query string, r prometheusapiv1.Range, opts ...prometheusapiv1.Option) (model.Value, prometheusapiv1.Warnings, error)
339350
}
340351

341352
type promLogger struct {
342353
api prometheusAPI
343354
logger log.Logger
344355
}
345356

346-
func (l *promLogger) Query(ctx context.Context, query string, ts time.Time, opts ...prometheusv1.Option) (model.Value, prometheusv1.Warnings, error) {
357+
func (l *promLogger) Query(ctx context.Context, query string, ts time.Time, opts ...prometheusapiv1.Option) (model.Value, prometheusapiv1.Warnings, error) {
347358
level.Debug(l.logger).Log(
348359
"msg", "running instant query",
349360
"query", query,
@@ -352,7 +363,7 @@ func (l *promLogger) Query(ctx context.Context, query string, ts time.Time, opts
352363
return l.api.Query(ctx, query, ts, opts...)
353364
}
354365

355-
func (l *promLogger) QueryRange(ctx context.Context, query string, r prometheusv1.Range, opts ...prometheusv1.Option) (model.Value, prometheusv1.Warnings, error) {
366+
func (l *promLogger) QueryRange(ctx context.Context, query string, r prometheusapiv1.Range, opts ...prometheusapiv1.Option) (model.Value, prometheusapiv1.Warnings, error) {
356367
level.Debug(l.logger).Log(
357368
"msg", "running range query",
358369
"query", query,
@@ -383,7 +394,7 @@ func contextGetPromCache(ctx context.Context) time.Duration {
383394
return 0
384395
}
385396

386-
func (p *promCache) Query(ctx context.Context, query string, ts time.Time) (model.Value, prometheusv1.Warnings, error) {
397+
func (p *promCache) Query(ctx context.Context, query string, ts time.Time) (model.Value, prometheusapiv1.Warnings, error) {
387398
if value, exists := p.cache.Get(query); exists {
388399
return value.(model.Value), nil, nil
389400
}
@@ -410,7 +421,7 @@ func (p *promCache) Query(ctx context.Context, query string, ts time.Time) (mode
410421
return value, warnings, nil
411422
}
412423

413-
func (p *promCache) QueryRange(ctx context.Context, query string, r prometheusv1.Range) (model.Value, prometheusv1.Warnings, error) {
424+
func (p *promCache) QueryRange(ctx context.Context, query string, r prometheusapiv1.Range) (model.Value, prometheusapiv1.Warnings, error) {
414425
// Get the full time range of this query from start to end.
415426
// We round by 10s to adjust for small imperfections to increase cache hits.
416427
timeRange := r.End.Sub(r.Start).Round(10 * time.Second)
@@ -477,6 +488,61 @@ func (s *objectiveServer) List(ctx context.Context, req *connect.Request[objecti
477488
return nil, err
478489
}
479490

491+
groupingMatchers := map[string]*labels.Matcher{}
492+
if req.Msg.Grouping != "" {
493+
ms, err := parser.ParseMetricSelector(req.Msg.Grouping)
494+
if err != nil {
495+
return nil, connect.NewError(connect.CodeInvalidArgument, err)
496+
}
497+
for _, m := range ms {
498+
groupingMatchers[m.Name] = m
499+
}
500+
}
501+
502+
for _, o := range resp.Msg.Objectives {
503+
oi := objectivesv1alpha1.ToInternal(o)
504+
505+
// If specific grouping was selected we need to merge the label matchers for the queries.
506+
if len(groupingMatchers) > 0 {
507+
if oi.Indicator.Ratio != nil {
508+
for _, m := range oi.Indicator.Ratio.Errors.LabelMatchers {
509+
if rm, replace := groupingMatchers[m.Name]; replace {
510+
m.Type = rm.Type
511+
m.Value = rm.Value
512+
}
513+
}
514+
for _, m := range oi.Indicator.Ratio.Total.LabelMatchers {
515+
if rm, replace := groupingMatchers[m.Name]; replace {
516+
m.Type = rm.Type
517+
m.Value = rm.Value
518+
}
519+
}
520+
}
521+
if oi.Indicator.Latency != nil {
522+
for _, m := range oi.Indicator.Latency.Success.LabelMatchers {
523+
if rm, replace := groupingMatchers[m.Name]; replace {
524+
m.Type = rm.Type
525+
m.Value = rm.Value
526+
}
527+
}
528+
for _, m := range oi.Indicator.Latency.Total.LabelMatchers {
529+
if rm, replace := groupingMatchers[m.Name]; replace {
530+
m.Type = rm.Type
531+
m.Value = rm.Value
532+
}
533+
}
534+
}
535+
}
536+
537+
o.Queries = &objectivesv1alpha1.Queries{
538+
CountTotal: oi.QueryTotal(oi.Window),
539+
CountErrors: oi.QueryErrors(oi.Window),
540+
GraphErrorBudget: oi.QueryErrorBudget(),
541+
GraphRequests: oi.RequestRange(5 * time.Minute),
542+
GraphErrors: oi.ErrorsRange(5 * time.Minute),
543+
}
544+
}
545+
480546
return connect.NewResponse(&objectivesv1alpha1.ListResponse{
481547
Objectives: resp.Msg.Objectives,
482548
}), nil
@@ -647,7 +713,7 @@ func (s *objectiveServer) GraphErrorBudget(ctx context.Context, req *connect.Req
647713
step := end.Sub(start) / 1000
648714

649715
query := objective.QueryErrorBudget()
650-
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, 15*time.Second), query, prometheusv1.Range{
716+
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, 15*time.Second), query, prometheusapiv1.Range{
651717
Start: start,
652718
End: end,
653719
Step: step,
@@ -1022,7 +1088,7 @@ func (s *objectiveServer) GraphRate(ctx context.Context, req *connect.Request[ob
10221088

10231089
query := objective.RequestRange(timeRange)
10241090

1025-
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusv1.Range{
1091+
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusapiv1.Range{
10261092
Start: start,
10271093
End: end,
10281094
Step: step,
@@ -1117,7 +1183,7 @@ func (s *objectiveServer) GraphErrors(ctx context.Context, req *connect.Request[
11171183
cacheDuration := rangeCache(start, end)
11181184

11191185
query := objective.ErrorsRange(timeRange)
1120-
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusv1.Range{
1186+
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusapiv1.Range{
11211187
Start: start,
11221188
End: end,
11231189
Step: step,
@@ -1235,7 +1301,7 @@ func (s *objectiveServer) GraphDuration(ctx context.Context, req *connect.Reques
12351301
for _, percentile := range objectivePercentiles {
12361302
if objective.Target >= percentile {
12371303
query := objective.DurationRange(timeRange, percentile)
1238-
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusv1.Range{
1304+
value, _, err := s.promAPI.QueryRange(contextSetPromCache(ctx, cacheDuration), query, prometheusapiv1.Range{
12391305
Start: start,
12401306
End: end,
12411307
Step: step,

prometheus.go

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/bufbuild/connect-go"
8+
"github.com/go-kit/log"
9+
prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
10+
"github.com/prometheus/common/model"
11+
12+
v1 "github.com/pyrra-dev/pyrra/proto/prometheus/v1"
13+
)
14+
15+
type prometheusServer struct {
16+
logger log.Logger
17+
promAPI *promCache
18+
}
19+
20+
func (ps *prometheusServer) Query(ctx context.Context, req *connect.Request[v1.QueryRequest]) (*connect.Response[v1.QueryResponse], error) {
21+
value, warnings, err := ps.promAPI.Query(ctx, req.Msg.Query, time.Unix(req.Msg.Time, 0))
22+
if err != nil {
23+
return nil, err
24+
}
25+
26+
switch value.(type) {
27+
case *model.String:
28+
s := value.(*model.String)
29+
return connect.NewResponse(&v1.QueryResponse{
30+
Warnings: warnings,
31+
Options: &v1.QueryResponse_String_{
32+
String_: &v1.String{
33+
Time: s.Timestamp.Unix(),
34+
Value: s.Value,
35+
},
36+
},
37+
}), err
38+
case *model.Scalar:
39+
s := value.(*model.Scalar)
40+
return connect.NewResponse(&v1.QueryResponse{
41+
Warnings: warnings,
42+
Options: &v1.QueryResponse_Scalar{
43+
Scalar: &v1.SamplePair{
44+
Time: s.Timestamp.Unix(),
45+
Value: float64(s.Value),
46+
},
47+
},
48+
}), nil
49+
case model.Vector:
50+
vector := convertVector(value.(model.Vector))
51+
return connect.NewResponse(&v1.QueryResponse{
52+
Warnings: warnings,
53+
Options: &v1.QueryResponse_Vector{
54+
Vector: vector,
55+
},
56+
}), nil
57+
}
58+
59+
return connect.NewResponse(&v1.QueryResponse{
60+
Warnings: warnings,
61+
Options: nil,
62+
}), nil
63+
}
64+
65+
func (ps *prometheusServer) QueryRange(ctx context.Context, req *connect.Request[v1.QueryRangeRequest]) (*connect.Response[v1.QueryRangeResponse], error) {
66+
value, warnings, err := ps.promAPI.QueryRange(ctx, req.Msg.GetQuery(), prometheusapiv1.Range{
67+
Start: time.Unix(req.Msg.GetStart(), 0),
68+
End: time.Unix(req.Msg.GetEnd(), 0),
69+
Step: time.Duration(req.Msg.GetStep()) * time.Second,
70+
})
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
switch value.(type) {
76+
case model.Matrix:
77+
return connect.NewResponse(&v1.QueryRangeResponse{
78+
Warnings: warnings,
79+
Options: &v1.QueryRangeResponse_Matrix{
80+
Matrix: convertMatrix(value.(model.Matrix)),
81+
},
82+
}), nil
83+
}
84+
85+
return connect.NewResponse(&v1.QueryRangeResponse{
86+
Warnings: warnings,
87+
}), nil
88+
}
89+
90+
func convertVector(in model.Vector) *v1.Vector {
91+
samples := make([]*v1.Sample, 0, len(in))
92+
for _, si := range in {
93+
samples = append(samples,
94+
&v1.Sample{
95+
Metric: convertLabelSet(si.Metric),
96+
Time: si.Timestamp.Unix(),
97+
Value: float64(si.Value),
98+
},
99+
)
100+
}
101+
return &v1.Vector{Samples: samples}
102+
}
103+
104+
func convertLabelSet(metric model.Metric) map[string]string {
105+
out := make(map[string]string, len(metric))
106+
for name, value := range metric {
107+
out[string(name)] = string(value)
108+
}
109+
return out
110+
}
111+
112+
func convertMatrix(in model.Matrix) *v1.Matrix {
113+
samples := make([]*v1.SampleStream, in.Len())
114+
for i, sampleStream := range in {
115+
values := make([]*v1.SamplePair, len(sampleStream.Values))
116+
for j, pair := range sampleStream.Values {
117+
values[j] = &v1.SamplePair{
118+
Time: pair.Timestamp.Unix(),
119+
Value: float64(pair.Value),
120+
}
121+
}
122+
samples[i] = &v1.SampleStream{
123+
Metric: convertLabelSet(sampleStream.Metric),
124+
Values: values,
125+
}
126+
}
127+
return &v1.Matrix{Samples: samples}
128+
}

0 commit comments

Comments
 (0)