Skip to content

Commit 013efad

Browse files
authored
Hook up MetricsQueryService to main funcs (#3079)
1 parent e33977e commit 013efad

File tree

12 files changed

+335
-80
lines changed

12 files changed

+335
-80
lines changed

cmd/all-in-one/main.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package main
1717

1818
import (
19+
"fmt"
1920
"io"
2021
"log"
2122
"os"
@@ -44,10 +45,12 @@ import (
4445
"github.com/jaegertracing/jaeger/cmd/status"
4546
"github.com/jaegertracing/jaeger/pkg/config"
4647
"github.com/jaegertracing/jaeger/pkg/version"
48+
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
4749
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
4850
"github.com/jaegertracing/jaeger/plugin/storage"
4951
"github.com/jaegertracing/jaeger/ports"
5052
"github.com/jaegertracing/jaeger/storage/dependencystore"
53+
"github.com/jaegertracing/jaeger/storage/metricsstore"
5154
"github.com/jaegertracing/jaeger/storage/spanstore"
5255
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
5356
)
@@ -71,6 +74,12 @@ func main() {
7174
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
7275
}
7376

77+
fc := metricsPlugin.FactoryConfigFromEnv()
78+
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
79+
if err != nil {
80+
log.Fatalf("Cannot initialize metrics store factory: %v", err)
81+
}
82+
7483
v := viper.New()
7584
command := &cobra.Command{
7685
Use: "jaeger-all-in-one",
@@ -107,6 +116,11 @@ by default uses only in-memory database.`,
107116
logger.Fatal("Failed to create dependency reader", zap.Error(err))
108117
}
109118

119+
metricsReader, err := createMetricsReader(metricsReaderFactory, v, logger)
120+
if err != nil {
121+
logger.Fatal("Failed to create metrics reader", zap.Error(err))
122+
}
123+
110124
strategyStoreFactory.InitFromViper(v)
111125
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
112126
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
@@ -157,8 +171,8 @@ by default uses only in-memory database.`,
157171
// query
158172
querySrv := startQuery(
159173
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
160-
spanReader, dependencyReader,
161-
rootMetricsFactory, metricsFactory,
174+
spanReader, dependencyReader, metricsReader,
175+
metricsFactory,
162176
)
163177

164178
svc.RunAndThen(func() {
@@ -196,6 +210,7 @@ by default uses only in-memory database.`,
196210
collectorApp.AddFlags,
197211
queryApp.AddFlags,
198212
strategyStoreFactory.AddFlags,
213+
metricsReaderFactory.AddFlags,
199214
)
200215

201216
if err := command.Execute(); err != nil {
@@ -229,12 +244,13 @@ func startQuery(
229244
queryOpts *querysvc.QueryServiceOptions,
230245
spanReader spanstore.Reader,
231246
depReader dependencystore.Reader,
232-
rootFactory metrics.Factory,
247+
metricsReader metricsstore.Reader,
233248
baseFactory metrics.Factory,
234249
) *queryApp.Server {
235250
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
236251
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
237-
server, err := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
252+
mqs := querysvc.NewMetricsQueryService(metricsReader)
253+
server, err := queryApp.NewServer(svc.Logger, qs, mqs, qOpts, opentracing.GlobalTracer())
238254
if err != nil {
239255
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
240256
}
@@ -272,3 +288,13 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
272288
opentracing.SetGlobalTracer(tracer)
273289
return closer
274290
}
291+
292+
func createMetricsReader(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (metricsstore.Reader, error) {
293+
if err := factory.Initialize(logger); err != nil {
294+
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
295+
}
296+
297+
// Ensure default parameter values are loaded correctly.
298+
factory.InitFromViper(v)
299+
return factory.CreateMetricsReader()
300+
}

cmd/query/app/querysvc/metrics_query_service.go

+1-17
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,18 @@ package querysvc
1616

1717
import (
1818
"context"
19-
"errors"
2019
"time"
2120

2221
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
2322
"github.com/jaegertracing/jaeger/storage/metricsstore"
2423
)
2524

26-
// MetricsQueryService contains the underlying reader required for querying the metrics store.
25+
// MetricsQueryService provides a means of querying R.E.D metrics from an underlying metrics store.
2726
type MetricsQueryService struct {
2827
metricsReader metricsstore.Reader
2928
}
3029

31-
var errNilReader = errors.New("no reader defined for MetricsQueryService")
32-
3330
// NewMetricsQueryService returns a new MetricsQueryService.
34-
// A nil reader will result in a nil MetricsQueryService being returned.
3531
func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
3632
return &MetricsQueryService{
3733
metricsReader: reader,
@@ -40,32 +36,20 @@ func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
4036

4137
// GetLatencies is the queryService implementation of metricsstore.Reader.
4238
func (mqs MetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
43-
if mqs.metricsReader == nil {
44-
return nil, errNilReader
45-
}
4639
return mqs.metricsReader.GetLatencies(ctx, params)
4740
}
4841

4942
// GetCallRates is the queryService implementation of metricsstore.Reader.
5043
func (mqs MetricsQueryService) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
51-
if mqs.metricsReader == nil {
52-
return nil, errNilReader
53-
}
5444
return mqs.metricsReader.GetCallRates(ctx, params)
5545
}
5646

5747
// GetErrorRates is the queryService implementation of metricsstore.Reader.
5848
func (mqs MetricsQueryService) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
59-
if mqs.metricsReader == nil {
60-
return nil, errNilReader
61-
}
6249
return mqs.metricsReader.GetErrorRates(ctx, params)
6350
}
6451

6552
// GetMinStepDuration is the queryService implementation of metricsstore.Reader.
6653
func (mqs MetricsQueryService) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
67-
if mqs.metricsReader == nil {
68-
return 0, errNilReader
69-
}
7054
return mqs.metricsReader.GetMinStepDuration(ctx, params)
7155
}

cmd/query/app/querysvc/metrics_query_service_test.go

-34
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,9 @@ type testMetricsQueryService struct {
3434

3535
func initializeTestMetricsQueryService() *testMetricsQueryService {
3636
metricsReader := &metricsmocks.Reader{}
37-
3837
tqs := testMetricsQueryService{
3938
metricsReader: metricsReader,
4039
}
41-
4240
tqs.queryService = NewMetricsQueryService(metricsReader)
4341
return &tqs
4442
}
@@ -58,14 +56,6 @@ func TestGetLatencies(t *testing.T) {
5856
assert.Equal(t, expectedLatencies, actualLatencies)
5957
}
6058

61-
func TestGetLatenciesNilReader(t *testing.T) {
62-
qs := NewMetricsQueryService(nil)
63-
qParams := &metricsstore.LatenciesQueryParameters{}
64-
r, err := qs.GetLatencies(context.Background(), qParams)
65-
assert.Zero(t, r)
66-
assert.EqualError(t, err, errNilReader.Error())
67-
}
68-
6959
// Test QueryService.GetCallRates()
7060
func TestGetCallRates(t *testing.T) {
7161
tqs := initializeTestMetricsQueryService()
@@ -81,14 +71,6 @@ func TestGetCallRates(t *testing.T) {
8171
assert.Equal(t, expectedCallRates, actualCallRates)
8272
}
8373

84-
func TestGetCallRatesNilReader(t *testing.T) {
85-
qs := NewMetricsQueryService(nil)
86-
qParams := &metricsstore.CallRateQueryParameters{}
87-
r, err := qs.GetCallRates(context.Background(), qParams)
88-
assert.Zero(t, r)
89-
assert.EqualError(t, err, errNilReader.Error())
90-
}
91-
9274
// Test QueryService.GetErrorRates()
9375
func TestGetErrorRates(t *testing.T) {
9476
tqs := initializeTestMetricsQueryService()
@@ -101,14 +83,6 @@ func TestGetErrorRates(t *testing.T) {
10183
assert.Equal(t, expectedErrorRates, actualErrorRates)
10284
}
10385

104-
func TestGetErrorRatesNilReader(t *testing.T) {
105-
qs := NewMetricsQueryService(nil)
106-
qParams := &metricsstore.ErrorRateQueryParameters{}
107-
r, err := qs.GetErrorRates(context.Background(), qParams)
108-
assert.Zero(t, r)
109-
assert.EqualError(t, err, errNilReader.Error())
110-
}
111-
11286
// Test QueryService.GetMinStepDurations()
11387
func TestGetMinStepDurations(t *testing.T) {
11488
tqs := initializeTestMetricsQueryService()
@@ -120,11 +94,3 @@ func TestGetMinStepDurations(t *testing.T) {
12094
assert.NoError(t, err)
12195
assert.Equal(t, expectedMinStep, actualMinStep)
12296
}
123-
124-
func TestGetMinStepDurationsNilReader(t *testing.T) {
125-
qs := NewMetricsQueryService(nil)
126-
qParams := &metricsstore.MinStepDurationQueryParameters{}
127-
r, err := qs.GetMinStepDuration(context.Background(), qParams)
128-
assert.Zero(t, r)
129-
assert.EqualError(t, err, errNilReader.Error())
130-
}

cmd/query/app/server.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type Server struct {
5252
}
5353

5454
// NewServer creates and initializes Server
55-
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
55+
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
5656

5757
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
5858
if err != nil {
@@ -67,12 +67,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
6767
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
6868
}
6969

70-
grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
70+
grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer)
7171
if err != nil {
7272
return nil, err
7373
}
7474

75-
httpServer, err := createHTTPServer(querySvc, options, tracer, logger)
75+
httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
7676
if err != nil {
7777
return nil, err
7878
}
@@ -94,7 +94,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
9494
return s.unavailableChannel
9595
}
9696

97-
func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
97+
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
9898
var grpcOpts []grpc.ServerOption
9999

100100
if options.TLSGRPC.Enabled {
@@ -111,11 +111,15 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo
111111
server := grpc.NewServer(grpcOpts...)
112112

113113
handler := NewGRPCHandler(querySvc, logger, tracer)
114+
115+
// TODO: Register MetricsQueryService
114116
api_v2.RegisterQueryServiceServer(server, handler)
117+
115118
return server, nil
116119
}
117120

118-
func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
121+
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
122+
// TODO: Add HandlerOptions.MetricsQueryService
119123
apiHandlerOptions := []HandlerOption{
120124
HandlerOptions.Logger(logger),
121125
HandlerOptions.Tracer(tracer),

cmd/query/app/server_test.go

+18-12
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27-
opentracing "github.com/opentracing/opentracing-go"
27+
"github.com/opentracing/opentracing-go"
2828
"github.com/stretchr/testify/assert"
2929
"github.com/stretchr/testify/mock"
3030
"github.com/stretchr/testify/require"
@@ -64,7 +64,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
6464
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
6565
}
6666

67-
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
67+
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
6868
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
6969
assert.NotNil(t, err)
7070
}
@@ -77,7 +77,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
7777
ClientCAPath: "invalid/path",
7878
}
7979

80-
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
80+
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
8181
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{})
8282
assert.NotNil(t, err)
8383
}
@@ -90,7 +90,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
9090
ClientCAPath: "invalid/path",
9191
}
9292

93-
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
93+
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
9494
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
9595
assert.NotNil(t, err)
9696
}
@@ -331,7 +331,8 @@ func TestServerHTTPTLS(t *testing.T) {
331331
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
332332

333333
querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
334-
server, err := NewServer(flagsSvc.Logger, querySvc,
334+
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
335+
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
335336
serverOptions,
336337
opentracing.NoopTracer{})
337338
assert.Nil(t, err)
@@ -491,7 +492,8 @@ func TestServerGRPCTLS(t *testing.T) {
491492
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
492493

493494
querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
494-
server, err := NewServer(flagsSvc.Logger, querySvc,
495+
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
496+
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
495497
serverOptions,
496498
opentracing.NoopTracer{})
497499
assert.Nil(t, err)
@@ -545,12 +547,12 @@ func TestServerGRPCTLS(t *testing.T) {
545547

546548
}
547549
func TestServerBadHostPort(t *testing.T) {
548-
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
550+
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
549551
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
550552
opentracing.NoopTracer{})
551553

552554
assert.NotNil(t, err)
553-
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{},
555+
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
554556
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
555557
opentracing.NoopTracer{})
556558

@@ -576,6 +578,7 @@ func TestServerInUseHostPort(t *testing.T) {
576578
server, err := NewServer(
577579
zap.NewNop(),
578580
&querysvc.QueryService{},
581+
querysvc.NewMetricsQueryService(nil),
579582
&QueryOptions{
580583
HTTPHostPort: tc.httpHostPort,
581584
GRPCHostPort: tc.grpcHostPort,
@@ -608,8 +611,8 @@ func TestServerSinglePort(t *testing.T) {
608611
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
609612

610613
querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
611-
612-
server, err := NewServer(flagsSvc.Logger, querySvc,
614+
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
615+
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
613616
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
614617
opentracing.NoopTracer{})
615618
assert.Nil(t, err)
@@ -658,8 +661,10 @@ func TestServerGracefulExit(t *testing.T) {
658661
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)
659662

660663
querySvc := &querysvc.QueryService{}
664+
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
661665
tracer := opentracing.NoopTracer{}
662-
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
666+
667+
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
663668
assert.Nil(t, err)
664669
assert.NoError(t, server.Start())
665670
go func() {
@@ -685,8 +690,9 @@ func TestServerHandlesPortZero(t *testing.T) {
685690
flagsSvc.Logger = zap.New(zapCore)
686691

687692
querySvc := &querysvc.QueryService{}
693+
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
688694
tracer := opentracing.NoopTracer{}
689-
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
695+
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
690696
assert.Nil(t, err)
691697
assert.NoError(t, server.Start())
692698
server.Close()

0 commit comments

Comments
 (0)