diff --git a/nodeinfo.go b/nodeinfo.go index 5b9a058..dd41c34 100644 --- a/nodeinfo.go +++ b/nodeinfo.go @@ -21,7 +21,12 @@ import ( "go.opencensus.io" ) -func createNodeInfo(nodeName string) *commonpb.Node { +// NodeWithStartTime creates a node using nodeName and derives: +// Hostname from the environment +// Pid from the current process +// StartTimestamp from the start time of this process +// Language and library information. +func NodeWithStartTime(nodeName string) *commonpb.Node { return &commonpb.Node{ Identifier: &commonpb.ProcessIdentifier{ HostName: os.Getenv("HOSTNAME"), diff --git a/ocagent.go b/ocagent.go index 02b50d9..338f667 100644 --- a/ocagent.go +++ b/ocagent.go @@ -24,10 +24,13 @@ import ( "google.golang.org/api/support/bundler" "google.golang.org/grpc" + "go.opencensus.io/stats/view" "go.opencensus.io/trace" - agentcommonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" ) @@ -41,6 +44,7 @@ func init() { } var _ trace.Exporter = (*Exporter)(nil) +var _ view.Exporter = (*Exporter)(nil) type Exporter struct { connectionState int32 @@ -53,7 +57,8 @@ type Exporter struct { serviceName string canDialInsecure bool traceExporter agenttracepb.TraceService_ExportClient - nodeInfo *agentcommonpb.Node + metricsExporter agentmetricspb.MetricsService_ExportClient + nodeInfo *commonpb.Node grpcClientConn *grpc.ClientConn reconnectionPeriod time.Duration @@ -64,6 +69,11 @@ type Exporter struct { backgroundConnectionDoneCh chan bool traceBundler *bundler.Bundler + + // viewDataBundler is the bundler to enable conversion + // from OpenCensus-Go view.Data to metricspb.Metric. + // Please do not confuse it with metricsBundler! + viewDataBundler *bundler.Bundler } func NewExporter(opts ...ExporterOption) (*Exporter, error) { @@ -90,7 +100,14 @@ func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) { traceBundler.DelayThreshold = 2 * time.Second traceBundler.BundleCountThreshold = spanDataBufferSize e.traceBundler = traceBundler - e.nodeInfo = createNodeInfo(e.serviceName) + + viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) { + e.uploadViewData(bundle.([]*view.Data)) + }) + viewDataBundler.DelayThreshold = 2 * time.Second + viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable. + e.viewDataBundler = viewDataBundler + e.nodeInfo = NodeWithStartTime(e.serviceName) return e, nil } @@ -155,6 +172,14 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error { ae.grpcClientConn = cc ae.mu.Unlock() + if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil { + return err + } + + return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo) +} + +func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { // Initiate the trace service by sending over node identifier info. traceSvcClient := agenttracepb.NewTraceServiceClient(cc) traceExporter, err := traceSvcClient.Export(context.Background()) @@ -162,18 +187,21 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error { return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err) } - firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{Node: nodeInfo} + firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{Node: node} if err := traceExporter.Send(firstTraceMessage); err != nil { return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) } + + ae.mu.Lock() ae.traceExporter = traceExporter + ae.mu.Unlock() // Initiate the config service by sending over node identifier info. configStream, err := traceSvcClient.Config(context.Background()) if err != nil { return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err) } - firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: nodeInfo} + firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node} if err := configStream.Send(firstCfgMessage); err != nil { return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) } @@ -185,6 +213,26 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error { return nil } +func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { + metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc) + metricsExporter, err := metricsSvcClient.Export(context.Background()) + if err != nil { + return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err) + } + // Initiate the metrics service by sending over the first message just containing the Node. + firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{Node: node} + if err := metricsExporter.Send(firstMetricsMessage); err != nil { + return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err) + } + + ae.mu.Lock() + ae.metricsExporter = metricsExporter + ae.mu.Unlock() + + // With that we are good to go and can start sending metrics + return nil +} + func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) { addr := ae.prepareAgentAddress() var dialOpts []grpc.DialOption @@ -278,6 +326,13 @@ func (ae *Exporter) ExportSpan(sd *trace.SpanData) { _ = ae.traceBundler.Add(sd, 1) } +func (ae *Exporter) ExportView(vd *view.Data) { + if vd == nil { + return + } + _ = ae.viewDataBundler.Add(vd, 1) +} + func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span { if len(sdl) == 0 { return nil @@ -314,6 +369,51 @@ func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) { } } +func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric { + if len(vdl) == 0 { + return nil + } + metrics := make([]*metricspb.Metric, 0, len(vdl)) + for _, vd := range vdl { + if vd != nil { + vmetric, err := viewDataToMetric(vd) + // TODO: (@odeke-em) somehow report this error, if it is non-nil. + if err == nil && vmetric != nil { + metrics = append(metrics, vmetric) + } + } + } + return metrics +} + +func (ae *Exporter) uploadViewData(vdl []*view.Data) { + select { + case <-ae.stopCh: + return + + default: + if !ae.connected() { + return + } + + protoMetrics := ocViewDataToPbMetrics(vdl) + if len(protoMetrics) == 0 { + return + } + err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{ + Metrics: protoMetrics, + // TODO:(@odeke-em) + // a) Figure out how to derive a Node from the environment + // b) Figure out how to derive a Resource from the environment + // or better letting users of the exporter configure it. + }) + if err != nil { + ae.setStateDisconnected() + } + } +} + func (ae *Exporter) Flush() { ae.traceBundler.Flush() + ae.viewDataBundler.Flush() } diff --git a/viewdata_to_metrics_test.go b/viewdata_to_metrics_test.go new file mode 100644 index 0000000..6e0d616 --- /dev/null +++ b/viewdata_to_metrics_test.go @@ -0,0 +1,185 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ocagent_test + +import ( + "encoding/json" + "errors" + "net" + "reflect" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/ptypes/timestamp" + "google.golang.org/grpc" + + "contrib.go.opencensus.io/exporter/ocagent" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + + agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" +) + +type metricsAgent struct { + mu sync.RWMutex + metrics []*agentmetricspb.ExportMetricsServiceRequest +} + +func TestExportMetrics_conversionFromViewData(t *testing.T) { + ln, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to get an available TCP address: %v", err) + } + defer ln.Close() + + _, agentPortStr, _ := net.SplitHostPort(ln.Addr().String()) + ma := new(metricsAgent) + srv := grpc.NewServer() + agentmetricspb.RegisterMetricsServiceServer(srv, ma) + defer srv.Stop() + go func() { + _ = srv.Serve(ln) + }() + + reconnectionPeriod := 2 * time.Millisecond + ocexp, err := ocagent.NewExporter(ocagent.WithInsecure(), + ocagent.WithAddress(":"+agentPortStr), + ocagent.WithReconnectionPeriod(reconnectionPeriod)) + if err != nil { + t.Fatalf("Failed to create the ocagent exporter: %v", err) + } + <-time.After(5 * reconnectionPeriod) + ocexp.Flush() + + startTime := time.Date(2018, 11, 25, 15, 38, 18, 997, time.UTC) + endTime := startTime.Add(100 * time.Millisecond) + + mLatencyMs := stats.Float64("latency", "The latency for various methods", "ms") + + ocexp.ExportView(&view.Data{ + Start: startTime, + End: endTime, + View: &view.View{ + Name: "ocagent.io/latency", + Description: "The latency of the various methods", + Aggregation: view.Count(), + Measure: mLatencyMs, + }, + Rows: []*view.Row{ + { + Data: &view.CountData{Value: 4}, + }, + }, + }) + + for i := 0; i < 5; i++ { + ocexp.Flush() + } + + <-time.After(100 * time.Millisecond) + + var received []*agentmetricspb.ExportMetricsServiceRequest + ma.forEachRequest(func(req *agentmetricspb.ExportMetricsServiceRequest) { + received = append(received, req) + }) + + // Now compare them with what we expect + want := []*agentmetricspb.ExportMetricsServiceRequest{ + { + Node: ocagent.NodeWithStartTime(""), // The first message identifying this application. + Metrics: nil, + }, + { + Metrics: []*metricspb.Metric{ + { + Descriptor_: &metricspb.Metric_MetricDescriptor{ + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "ocagent.io/latency", + Description: "The latency of the various methods", + Unit: "ms", // Derived from the measure + Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, + LabelKeys: nil, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + StartTimestamp: ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 997, + }, + LabelValues: nil, + Points: []*metricspb.Point{ + { + Timestamp: ×tamp.Timestamp{ + Seconds: 1543160298, + Nanos: 100000997, + }, + Value: &metricspb.Point_Int64Value{Int64Value: 4}, + }, + }, + }, + }, + }, + }, + }, + } + + if !reflect.DeepEqual(received, want) { + gj, _ := json.MarshalIndent(received, "", " ") + wj, _ := json.MarshalIndent(want, "", " ") + if string(gj) != string(wj) { + t.Errorf("Got:\n%s\nWant:\n%s", gj, wj) + } + } +} + +func (ma *metricsAgent) Export(mes agentmetricspb.MetricsService_ExportServer) error { + // Expecting the first message to contain the Node information + firstMetric, err := mes.Recv() + if err != nil { + return err + } + + if firstMetric == nil || firstMetric.Node == nil { + return errors.New("Expecting a non-nil Node in the first message") + } + + ma.addMetric(firstMetric) + + for { + msg, err := mes.Recv() + if err != nil { + return err + } + ma.addMetric(msg) + } +} + +func (ma *metricsAgent) addMetric(metric *agentmetricspb.ExportMetricsServiceRequest) { + ma.mu.Lock() + ma.metrics = append(ma.metrics, metric) + ma.mu.Unlock() +} + +func (ma *metricsAgent) forEachRequest(fn func(*agentmetricspb.ExportMetricsServiceRequest)) { + ma.mu.RLock() + defer ma.mu.RUnlock() + + for _, req := range ma.metrics { + fn(req) + } +}