Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit c5f1f46

Browse files
committed
metrics: the exporter can now convert from OpenCensus-Go stats to metrics
We now have a fully functioning view.Exporter --> metricspb.Exporter converter. Depends on #32 Updates #31
1 parent fec85ef commit c5f1f46

File tree

3 files changed

+299
-6
lines changed

3 files changed

+299
-6
lines changed

nodeinfo.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import (
2121
"go.opencensus.io"
2222
)
2323

24-
func createNodeInfo(nodeName string) *commonpb.Node {
24+
// NodeWithStartTime creates a node using nodeName and derives:
25+
// Hostname from the environment
26+
// Pid from the current process
27+
// StartTimestamp from the start time of this process
28+
// Language and library information.
29+
func NodeWithStartTime(nodeName string) *commonpb.Node {
2530
return &commonpb.Node{
2631
Identifier: &commonpb.ProcessIdentifier{
2732
HostName: os.Getenv("HOSTNAME"),

ocagent.go

+106-5
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ import (
2424
"google.golang.org/api/support/bundler"
2525
"google.golang.org/grpc"
2626

27+
"go.opencensus.io/stats/view"
2728
"go.opencensus.io/trace"
2829

29-
agentcommonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
30+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
31+
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
3032
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
33+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
3134
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3235
)
3336

@@ -41,6 +44,7 @@ func init() {
4144
}
4245

4346
var _ trace.Exporter = (*Exporter)(nil)
47+
var _ view.Exporter = (*Exporter)(nil)
4448

4549
type Exporter struct {
4650
connectionState int32
@@ -53,7 +57,8 @@ type Exporter struct {
5357
serviceName string
5458
canDialInsecure bool
5559
traceExporter agenttracepb.TraceService_ExportClient
56-
nodeInfo *agentcommonpb.Node
60+
metricsExporter agentmetricspb.MetricsService_ExportClient
61+
nodeInfo *commonpb.Node
5762
grpcClientConn *grpc.ClientConn
5863
reconnectionPeriod time.Duration
5964

@@ -64,6 +69,11 @@ type Exporter struct {
6469
backgroundConnectionDoneCh chan bool
6570

6671
traceBundler *bundler.Bundler
72+
73+
// viewDataBundler is the bundler to enable conversion
74+
// from OpenCensus-Go view.Data to metricspb.Metric.
75+
// Please do not confuse it with metricsBundler!
76+
viewDataBundler *bundler.Bundler
6777
}
6878

6979
func NewExporter(opts ...ExporterOption) (*Exporter, error) {
@@ -90,7 +100,14 @@ func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) {
90100
traceBundler.DelayThreshold = 2 * time.Second
91101
traceBundler.BundleCountThreshold = spanDataBufferSize
92102
e.traceBundler = traceBundler
93-
e.nodeInfo = createNodeInfo(e.serviceName)
103+
104+
viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
105+
e.uploadViewData(bundle.([]*view.Data))
106+
})
107+
viewDataBundler.DelayThreshold = 2 * time.Second
108+
viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable.
109+
e.viewDataBundler = viewDataBundler
110+
e.nodeInfo = NodeWithStartTime(e.serviceName)
94111

95112
return e, nil
96113
}
@@ -155,25 +172,36 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
155172
ae.grpcClientConn = cc
156173
ae.mu.Unlock()
157174

175+
if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
176+
return err
177+
}
178+
179+
return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
180+
}
181+
182+
func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
158183
// Initiate the trace service by sending over node identifier info.
159184
traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
160185
traceExporter, err := traceSvcClient.Export(context.Background())
161186
if err != nil {
162187
return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
163188
}
164189

165-
firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{Node: nodeInfo}
190+
firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{Node: node}
166191
if err := traceExporter.Send(firstTraceMessage); err != nil {
167192
return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
168193
}
194+
195+
ae.mu.Lock()
169196
ae.traceExporter = traceExporter
197+
ae.mu.Unlock()
170198

171199
// Initiate the config service by sending over node identifier info.
172200
configStream, err := traceSvcClient.Config(context.Background())
173201
if err != nil {
174202
return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err)
175203
}
176-
firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: nodeInfo}
204+
firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node}
177205
if err := configStream.Send(firstCfgMessage); err != nil {
178206
return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
179207
}
@@ -185,6 +213,26 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
185213
return nil
186214
}
187215

216+
func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
217+
metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc)
218+
metricsExporter, err := metricsSvcClient.Export(context.Background())
219+
if err != nil {
220+
return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err)
221+
}
222+
// Initiate the metrics service by sending over the first message just containing the Node.
223+
firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{Node: node}
224+
if err := metricsExporter.Send(firstMetricsMessage); err != nil {
225+
return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err)
226+
}
227+
228+
ae.mu.Lock()
229+
ae.metricsExporter = metricsExporter
230+
ae.mu.Unlock()
231+
232+
// With that we are good to go and can start sending metrics
233+
return nil
234+
}
235+
188236
func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
189237
addr := ae.prepareAgentAddress()
190238
var dialOpts []grpc.DialOption
@@ -278,6 +326,13 @@ func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
278326
_ = ae.traceBundler.Add(sd, 1)
279327
}
280328

329+
func (ae *Exporter) ExportView(vd *view.Data) {
330+
if vd == nil {
331+
return
332+
}
333+
_ = ae.viewDataBundler.Add(vd, 1)
334+
}
335+
281336
func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
282337
if len(sdl) == 0 {
283338
return nil
@@ -314,6 +369,52 @@ func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
314369
}
315370
}
316371

372+
func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
373+
if len(vdl) == 0 {
374+
return nil
375+
}
376+
metrics := make([]*metricspb.Metric, 0, len(vdl))
377+
for _, vd := range vdl {
378+
if vd != nil {
379+
vmetrics, err := viewDataToMetrics(vd)
380+
if err == nil && vmetrics != nil {
381+
// TODO: (@odeke-em) somehow report this error.
382+
metrics = append(metrics, vmetrics)
383+
}
384+
}
385+
}
386+
return metrics
387+
}
388+
389+
func (ae *Exporter) uploadViewData(vdl []*view.Data) {
390+
select {
391+
case <-ae.stopCh:
392+
return
393+
394+
default:
395+
if !ae.connected() {
396+
println("not connected!")
397+
return
398+
}
399+
400+
protoMetrics := ocViewDataToPbMetrics(vdl)
401+
if len(protoMetrics) == 0 {
402+
return
403+
}
404+
err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{
405+
Metrics: protoMetrics,
406+
// TODO:(@odeke-em)
407+
// a) Figure out how to derive a Node from the environment
408+
// b) Figure out how to derive a Resource from the environment
409+
// or better letting users of the exporter configure it.
410+
})
411+
if err != nil {
412+
ae.setStateDisconnected()
413+
}
414+
}
415+
}
416+
317417
func (ae *Exporter) Flush() {
318418
ae.traceBundler.Flush()
419+
ae.viewDataBundler.Flush()
319420
}

viewdata_to_metrics_test.go

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ocagent_test
16+
17+
import (
18+
"encoding/json"
19+
"errors"
20+
"net"
21+
"reflect"
22+
"sync"
23+
"testing"
24+
"time"
25+
26+
"github.com/golang/protobuf/ptypes/timestamp"
27+
"google.golang.org/grpc"
28+
29+
"contrib.go.opencensus.io/exporter/ocagent"
30+
"go.opencensus.io/stats"
31+
"go.opencensus.io/stats/view"
32+
33+
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
34+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
35+
)
36+
37+
type metricsAgent struct {
38+
mu sync.RWMutex
39+
metrics []*agentmetricspb.ExportMetricsServiceRequest
40+
}
41+
42+
func TestExportMetrics_conversionFromViewData(t *testing.T) {
43+
ln, err := net.Listen("tcp", ":0")
44+
if err != nil {
45+
t.Fatalf("Failed to get an available TCP address: %v", err)
46+
}
47+
defer ln.Close()
48+
49+
_, agentPortStr, _ := net.SplitHostPort(ln.Addr().String())
50+
ma := new(metricsAgent)
51+
srv := grpc.NewServer()
52+
agentmetricspb.RegisterMetricsServiceServer(srv, ma)
53+
defer srv.Stop()
54+
go func() {
55+
_ = srv.Serve(ln)
56+
}()
57+
58+
reconnectionPeriod := 2 * time.Millisecond
59+
ocexp, err := ocagent.NewExporter(ocagent.WithInsecure(),
60+
ocagent.WithAddress(":"+agentPortStr),
61+
ocagent.WithReconnectionPeriod(reconnectionPeriod))
62+
if err != nil {
63+
t.Fatalf("Failed to create the ocagent exporter: %v", err)
64+
}
65+
<-time.After(5 * reconnectionPeriod)
66+
ocexp.Flush()
67+
68+
startTime := time.Date(2018, 11, 25, 15, 38, 18, 997, time.UTC)
69+
endTime := startTime.Add(100 * time.Millisecond)
70+
71+
mLatencyMs := stats.Float64("latency", "The latency for various methods", "ms")
72+
73+
ocexp.ExportView(&view.Data{
74+
Start: startTime,
75+
End: endTime,
76+
View: &view.View{
77+
Name: "ocagent.io/latency",
78+
Description: "The latency of the various methods",
79+
Aggregation: view.Count(),
80+
Measure: mLatencyMs,
81+
},
82+
Rows: []*view.Row{
83+
{
84+
Data: &view.CountData{Value: 4},
85+
},
86+
},
87+
})
88+
89+
for i := 0; i < 5; i++ {
90+
ocexp.Flush()
91+
}
92+
93+
<-time.After(100 * time.Millisecond)
94+
95+
var received []*agentmetricspb.ExportMetricsServiceRequest
96+
ma.forEachRequest(func(req *agentmetricspb.ExportMetricsServiceRequest) {
97+
received = append(received, req)
98+
})
99+
100+
// Now compare them with what we expect
101+
want := []*agentmetricspb.ExportMetricsServiceRequest{
102+
{
103+
Node: ocagent.NodeWithStartTime(""), // The first message identifying this application.
104+
Metrics: nil,
105+
},
106+
{
107+
Metrics: []*metricspb.Metric{
108+
{
109+
Descriptor_: &metricspb.Metric_MetricDescriptor{
110+
MetricDescriptor: &metricspb.MetricDescriptor{
111+
Name: "ocagent.io/latency",
112+
Description: "The latency of the various methods",
113+
Unit: "ms", // Derived from the measure
114+
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
115+
LabelKeys: nil,
116+
},
117+
},
118+
Timeseries: []*metricspb.TimeSeries{
119+
{
120+
StartTimestamp: &timestamp.Timestamp{
121+
Seconds: 1543160298,
122+
Nanos: 997,
123+
},
124+
LabelValues: nil,
125+
Points: []*metricspb.Point{
126+
{
127+
Timestamp: &timestamp.Timestamp{
128+
Seconds: 1543160298,
129+
Nanos: 100000997,
130+
},
131+
Value: &metricspb.Point_Int64Value{Int64Value: 4},
132+
},
133+
},
134+
},
135+
},
136+
},
137+
},
138+
},
139+
}
140+
141+
if !reflect.DeepEqual(received, want) {
142+
gj, _ := json.MarshalIndent(received, "", " ")
143+
wj, _ := json.MarshalIndent(want, "", " ")
144+
if string(gj) != string(wj) {
145+
t.Errorf("Got:\n%s\nWant:\n%s", gj, wj)
146+
}
147+
}
148+
}
149+
150+
func (ma *metricsAgent) Export(mes agentmetricspb.MetricsService_ExportServer) error {
151+
// Expecting the first message to contain the Node information
152+
firstMetric, err := mes.Recv()
153+
if err != nil {
154+
return err
155+
}
156+
157+
if firstMetric == nil || firstMetric.Node == nil {
158+
return errors.New("Expecting a non-nil Node in the first message")
159+
}
160+
161+
ma.addMetric(firstMetric)
162+
163+
for {
164+
msg, err := mes.Recv()
165+
if err != nil {
166+
return err
167+
}
168+
ma.addMetric(msg)
169+
}
170+
171+
return nil
172+
}
173+
174+
func (ma *metricsAgent) addMetric(metric *agentmetricspb.ExportMetricsServiceRequest) {
175+
ma.mu.Lock()
176+
ma.metrics = append(ma.metrics, metric)
177+
ma.mu.Unlock()
178+
}
179+
180+
func (ma *metricsAgent) forEachRequest(fn func(*agentmetricspb.ExportMetricsServiceRequest)) {
181+
ma.mu.RLock()
182+
defer ma.mu.RUnlock()
183+
184+
for _, req := range ma.metrics {
185+
fn(req)
186+
}
187+
}

0 commit comments

Comments
 (0)