Skip to content

Commit 6014843

Browse files
committed
interceptor/opencensus: record number of received spans
Instrument interceptor/opencensus with stats to record the number of received spans. This is accomplished by a helper to process spans that sends over spans even if the number of spans is 0. Also record with tag_key "opencensus_interceptor" whose value is the name of the respective interceptor. The test to ensure that 0 length spans are also added is currently disabled because: * Issue census-instrumentation/opencensus-go#862 is not yet implemented which requests that the OpenCensus-Go stats worker provide a method Flush to flush all data. Without it, metrics will contain stale data from previous tests. Add tests to lock-in this behavior. Also replace grpc.NewServer with a stats enabled server helper call. This enables tracing and metrics for each gRPC server by replacing naked usages of grpc.NewServer with the new helper function: internal.GRPCServerWithObservabilityEnabled Updates census-instrumentation#63
1 parent 8e8eef9 commit 6014843

File tree

6 files changed

+321
-13
lines changed

6 files changed

+321
-13
lines changed

cmd/ocagent/main.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626
"os/signal"
2727
"time"
2828

29-
"google.golang.org/grpc"
30-
3129
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3230
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/exporterparser"
3331
"github.com/census-instrumentation/opencensus-service/exporter"
3432
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
33+
"github.com/census-instrumentation/opencensus-service/internal"
3534
"github.com/census-instrumentation/opencensus-service/spanreceiver"
35+
"go.opencensus.io/plugin/ocgrpc"
36+
"go.opencensus.io/stats/view"
3637
)
3738

3839
func main() {
@@ -84,7 +85,14 @@ func runOCInterceptor(ocInterceptorPort int, sr spanreceiver.SpanReceiver) (done
8485
if err != nil {
8586
return nil, fmt.Errorf("Cannot bind to address %q: %v", addr, err)
8687
}
87-
srv := grpc.NewServer()
88+
srv := internal.GRPCServerWithObservabilityEnabled()
89+
if err := view.Register(internal.AllViews...); err != nil {
90+
return nil, fmt.Errorf("Failed to register internal.AllViews: %v", err)
91+
}
92+
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
93+
return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err)
94+
}
95+
8896
agenttracepb.RegisterTraceServiceServer(srv, oci)
8997
go func() {
9098
log.Printf("Running OpenCensus interceptor as a gRPC service at %q", addr)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ocinterceptor_test
16+
17+
import (
18+
"fmt"
19+
"reflect"
20+
"strings"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"contrib.go.opencensus.io/exporter/ocagent"
26+
"go.opencensus.io/stats/view"
27+
"go.opencensus.io/tag"
28+
"go.opencensus.io/trace"
29+
30+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
31+
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
32+
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
33+
"github.com/census-instrumentation/opencensus-service/internal"
34+
)
35+
36+
// Ensure that if we add a metrics exporter that our target metrics
37+
// will be recorded but also with the proper tag keys and values.
38+
// See Issue https://github.com/census-instrumentation/opencensus-service/issues/63
39+
//
40+
// Note: we are intentionally skipping the ocgrpc.ServerDefaultViews as this
41+
// test is to ensure exactness, but with the mentioned views registered, the
42+
// output will be quite noisy.
43+
func TestEnsureRecordedMetrics(t *testing.T) {
44+
sappender := newSpanAppender()
45+
46+
_, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond))
47+
defer doneFn()
48+
49+
// Now the opencensus-agent exporter.
50+
oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure())
51+
if err != nil {
52+
t.Fatalf("Failed to create the ocagent-exporter: %v", err)
53+
}
54+
trace.RegisterExporter(oce)
55+
defer func() {
56+
oce.Stop()
57+
trace.UnregisterExporter(oce)
58+
}()
59+
60+
// Now for the stats exporter
61+
if err := view.Register(internal.AllViews...); err != nil {
62+
t.Fatalf("Failed to register all views: %v", err)
63+
}
64+
defer view.Unregister(internal.AllViews...)
65+
66+
metricsReportingPeriod := 5 * time.Millisecond
67+
view.SetReportingPeriod(metricsReportingPeriod)
68+
// On exit, revert the metrics reporting period.
69+
defer view.SetReportingPeriod(60 * time.Second)
70+
71+
cme := newCountMetricsExporter()
72+
view.RegisterExporter(cme)
73+
defer view.UnregisterExporter(cme)
74+
75+
n := 20
76+
// Now it is time to send over some spans
77+
// and we'll count the numbers received.
78+
for i := 0; i < n; i++ {
79+
now := time.Now().UTC()
80+
oce.ExportSpan(&trace.SpanData{
81+
StartTime: now.Add(-10 * time.Second),
82+
EndTime: now.Add(20 * time.Second),
83+
SpanContext: trace.SpanContext{
84+
TraceID: trace.TraceID{byte(0x20 + i), 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41},
85+
SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78},
86+
TraceOptions: trace.TraceOptions(i & 0x01),
87+
},
88+
ParentSpanID: trace.SpanID{byte(0x01 + i), 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37},
89+
Name: fmt.Sprintf("Span-%d", i),
90+
Status: trace.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"},
91+
})
92+
}
93+
94+
// Give them some time to be exported.
95+
// say n * metricsReportingPeriod
96+
<-time.After(time.Duration(n) * metricsReportingPeriod)
97+
oce.Flush()
98+
99+
checkCountMetricsExporterResults(t, cme, n, 1)
100+
}
101+
102+
func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) {
103+
t.Skipf("Currently disabled, enable this test when the following are fixed:\nIssue %s\nPR %s",
104+
"https://github.com/census-instrumentation/opencensus-go/issues/862",
105+
"https://github.com/census-instrumentation/opencensus-go/pull/922",
106+
)
107+
sappender := newSpanAppender()
108+
109+
_, port, doneFn := ocInterceptorOnGRPCServer(t, sappender, ocinterceptor.WithSpanBufferPeriod(2*time.Millisecond))
110+
defer doneFn()
111+
112+
// Now the opencensus-agent exporter.
113+
oce, err := ocagent.NewExporter(ocagent.WithPort(uint16(port)), ocagent.WithInsecure())
114+
if err != nil {
115+
t.Fatalf("Failed to create the ocagent-exporter: %v", err)
116+
}
117+
trace.RegisterExporter(oce)
118+
defer func() {
119+
oce.Stop()
120+
trace.UnregisterExporter(oce)
121+
}()
122+
123+
// Now for the stats exporter
124+
if err := view.Register(internal.AllViews...); err != nil {
125+
t.Fatalf("Failed to register all views: %v", err)
126+
}
127+
defer view.Unregister(internal.AllViews...)
128+
129+
metricsReportingPeriod := 10 * time.Millisecond
130+
view.SetReportingPeriod(metricsReportingPeriod)
131+
// On exit, revert the metrics reporting period.
132+
defer view.SetReportingPeriod(60 * time.Second)
133+
134+
cme := newCountMetricsExporter()
135+
view.RegisterExporter(cme)
136+
defer view.UnregisterExporter(cme)
137+
138+
n := 20
139+
// Now for the traceExporter that sends 0 length spans
140+
traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port)
141+
if err != nil {
142+
t.Fatalf("Failed to create the trace service client: %v", err)
143+
}
144+
defer traceSvcDoneFn()
145+
for i := 0; i <= n; i++ {
146+
_ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: nil, Node: &commonpb.Node{}})
147+
}
148+
<-time.After(time.Duration(n) * metricsReportingPeriod)
149+
checkCountMetricsExporterResults(t, cme, n, 0)
150+
}
151+
152+
func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) {
153+
cme.mu.Lock()
154+
defer cme.mu.Unlock()
155+
156+
// The only tags that we are expecting are "opencensus_interceptor": "opencensus" * n
157+
wantTagKey, _ := tag.NewKey("opencensus_interceptor")
158+
valuesPlusBlank := strings.Split(strings.Repeat("opencensus,opencensus,", n/2), ",")
159+
wantValues := valuesPlusBlank[:len(valuesPlusBlank)-1]
160+
wantTags := map[tag.Key][]string{
161+
wantTagKey: wantValues,
162+
}
163+
164+
gotTags := cme.tags
165+
if !reflect.DeepEqual(gotTags, wantTags) {
166+
t.Errorf("\nGotTags:\n\t%#v\n\nWantTags:\n\t%#v\n", gotTags, wantTags)
167+
}
168+
169+
// The only data types we are expecting are:
170+
// * DistributionData
171+
for key, aggregation := range cme.data {
172+
switch agg := aggregation.(type) {
173+
case *view.DistributionData:
174+
if g, w := agg.Count, int64(1); g != w {
175+
t.Errorf("Data point #%d GotCount %d Want %d", key, g, w)
176+
}
177+
default:
178+
t.Errorf("Data point #%d Got %T want %T", key, agg, (*view.DistributionData)(nil))
179+
}
180+
}
181+
}
182+
183+
type countMetricsExporter struct {
184+
mu sync.Mutex
185+
tags map[tag.Key][]string
186+
data map[int]view.AggregationData
187+
}
188+
189+
func newCountMetricsExporter() *countMetricsExporter {
190+
return &countMetricsExporter{
191+
tags: make(map[tag.Key][]string),
192+
data: make(map[int]view.AggregationData),
193+
}
194+
}
195+
196+
func (cme *countMetricsExporter) clear() {
197+
cme.mu.Lock()
198+
defer cme.mu.Unlock()
199+
200+
cme.data = make(map[int]view.AggregationData)
201+
cme.tags = make(map[tag.Key][]string)
202+
}
203+
204+
var _ view.Exporter = (*countMetricsExporter)(nil)
205+
206+
func (cme *countMetricsExporter) ExportView(vd *view.Data) {
207+
cme.mu.Lock()
208+
defer cme.mu.Unlock()
209+
210+
for _, row := range vd.Rows {
211+
cme.data[len(cme.data)] = row.Data
212+
for _, tag_ := range row.Tags {
213+
cme.tags[tag_.Key] = append(cme.tags[tag_.Key], tag_.Value)
214+
}
215+
}
216+
}

interceptor/opencensus/opencensus.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
2525
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2626
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
27+
"github.com/census-instrumentation/opencensus-service/internal"
2728
"github.com/census-instrumentation/opencensus-service/spanreceiver"
2829
)
2930

@@ -89,20 +90,28 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
8990
return errTraceExportProtocolViolation
9091
}
9192

92-
var lastNonNilNode *commonpb.Node
93+
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), "opencensus")
94+
95+
processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
96+
// Firstly, we'll add them to the bundler.
97+
if len(recv.Spans) > 0 {
98+
bundlerPayload := &spansAndNode{node: ni, spans: recv.Spans}
99+
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
100+
}
93101

102+
// We MUST unconditionally record metrics from this reception.
103+
spansMetricsFn(ni, recv.Spans)
104+
}
105+
106+
var lastNonNilNode *commonpb.Node
94107
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
95108
for {
96109
// If a Node has been sent from downstream, save and use it.
97110
if recv.Node != nil {
98111
lastNonNilNode = recv.Node
99112
}
100113

101-
// Otherwise add them to the bundler.
102-
if len(recv.Spans) > 0 {
103-
bundlerPayload := &spansAndNode{node: lastNonNilNode, spans: recv.Spans}
104-
traceBundler.Add(bundlerPayload, len(bundlerPayload.spans))
105-
}
114+
processReceivedSpans(lastNonNilNode, recv.Spans)
106115

107116
recv, err = tes.Recv()
108117
if err != nil {

interceptor/opencensus/opencensus_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3939
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
4040
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
41+
"github.com/census-instrumentation/opencensus-service/internal"
4142
"github.com/census-instrumentation/opencensus-service/spanreceiver"
4243
"go.opencensus.io/trace"
4344
"go.opencensus.io/trace/tracestate"
@@ -483,7 +484,7 @@ func ocInterceptorOnGRPCServer(t *testing.T, sr spanreceiver.SpanReceiver, opts
483484
}
484485

485486
// Now run it as a gRPC server
486-
srv := grpc.NewServer()
487+
srv := internal.GRPCServerWithObservabilityEnabled()
487488
agenttracepb.RegisterTraceServiceServer(srv, oci)
488489
go func() {
489490
_ = srv.Serve(ln)

interceptor/opencensus/trace_interceptor.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2727
"github.com/census-instrumentation/opencensus-service/interceptor"
28+
"github.com/census-instrumentation/opencensus-service/internal"
2829
"github.com/census-instrumentation/opencensus-service/spanreceiver"
2930
)
3031

@@ -86,9 +87,7 @@ func (ocih *ocInterceptorHandler) startInternal(ctx context.Context, sr spanrece
8687
return err
8788
}
8889

89-
// TODO: (@odeke-em) in the future, also add OpenCensus
90-
// stats handlers to start this gPRC server.
91-
srv := grpc.NewServer()
90+
srv := internal.GRPCServerWithObservabilityEnabled()
9291

9392
agenttracepb.RegisterTraceServiceServer(srv, oci)
9493
go func() {

0 commit comments

Comments
 (0)