Skip to content

Commit c5ec7ae

Browse files
committed
[otelgrpc] refactor otelgrpc to use grpc.StatsHandler
Signed-off-by: Ziqi Zhao <[email protected]>
1 parent 95167fd commit c5ec7ae

File tree

4 files changed

+901
-0
lines changed

4 files changed

+901
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1111
### Added
1212

1313
- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
14+
- [otelgrpc] refactor otelgrpc to use grpc.StatsHandler. (#3002)
1415

1516
## [1.17.0-rc.1/0.42.0-rc.1/0.11.0-rc.1] - 2023-05-17
1617

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
16+
17+
import (
18+
"context"
19+
"sync/atomic"
20+
21+
grpc_codes "google.golang.org/grpc/codes"
22+
"google.golang.org/grpc/stats"
23+
"google.golang.org/grpc/status"
24+
25+
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
26+
"go.opentelemetry.io/otel/attribute"
27+
"go.opentelemetry.io/otel/codes"
28+
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
29+
"go.opentelemetry.io/otel/trace"
30+
)
31+
32+
type gRPCContext struct {
33+
messagesReceived int64
34+
messagesSent int64
35+
}
36+
37+
// NewServerHandler creates a stats.Handler for gRPC server.
38+
func NewServerHandler(opts ...Option) stats.Handler {
39+
h := &serverHandler{
40+
config: newConfig(opts),
41+
}
42+
43+
h.tracer = h.config.TracerProvider.Tracer(
44+
instrumentationName,
45+
trace.WithInstrumentationVersion(SemVersion()),
46+
)
47+
return h
48+
}
49+
50+
type serverHandler struct {
51+
*config
52+
tracer trace.Tracer
53+
}
54+
55+
// TagRPC can attach some information to the given context.
56+
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
57+
ctx = extract(ctx, h.config.Propagators)
58+
59+
attrs := []attribute.KeyValue{RPCSystemGRPC}
60+
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
61+
attrs = append(attrs, mAttrs...)
62+
ctx, _ = h.tracer.Start(
63+
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
64+
name,
65+
trace.WithSpanKind(trace.SpanKindServer),
66+
trace.WithAttributes(attrs...),
67+
)
68+
69+
gctx := gRPCContext{}
70+
return context.WithValue(ctx, gRPCContext{}, &gctx)
71+
}
72+
73+
// HandleRPC processes the RPC stats.
74+
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
75+
handleRPC(ctx, rs)
76+
}
77+
78+
// TagConn can attach some information to the given context.
79+
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
80+
span := trace.SpanFromContext(ctx)
81+
attrs := peerAttr(peerFromCtx(ctx))
82+
span.SetAttributes(attrs...)
83+
return ctx
84+
}
85+
86+
// HandleConn processes the Conn stats.
87+
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
88+
}
89+
90+
// NewClientHandler creates a stats.Handler for gRPC client.
91+
func NewClientHandler(opts ...Option) stats.Handler {
92+
h := &clientHandler{
93+
config: newConfig(opts),
94+
}
95+
96+
h.tracer = h.config.TracerProvider.Tracer(
97+
instrumentationName,
98+
trace.WithInstrumentationVersion(SemVersion()),
99+
)
100+
101+
return h
102+
}
103+
104+
type clientHandler struct {
105+
*config
106+
tracer trace.Tracer
107+
}
108+
109+
// TagRPC can attach some information to the given context.
110+
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
111+
attrs := []attribute.KeyValue{RPCSystemGRPC}
112+
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
113+
attrs = append(attrs, mAttrs...)
114+
ctx, _ = h.tracer.Start(
115+
ctx,
116+
name,
117+
trace.WithSpanKind(trace.SpanKindClient),
118+
trace.WithAttributes(attrs...),
119+
)
120+
121+
gctx := gRPCContext{}
122+
123+
return inject(context.WithValue(ctx, gRPCContext{}, &gctx), h.config.Propagators)
124+
}
125+
126+
// HandleRPC processes the RPC stats.
127+
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
128+
handleRPC(ctx, rs)
129+
}
130+
131+
// TagConn can attach some information to the given context.
132+
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
133+
span := trace.SpanFromContext(ctx)
134+
attrs := peerAttr(cti.RemoteAddr.String())
135+
span.SetAttributes(attrs...)
136+
return ctx
137+
}
138+
139+
// HandleConn processes the Conn stats.
140+
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
141+
// no-op
142+
}
143+
144+
func handleRPC(ctx context.Context, rs stats.RPCStats) {
145+
span := trace.SpanFromContext(ctx)
146+
gctx, _ := ctx.Value(gRPCContext{}).(*gRPCContext)
147+
var messageId int64
148+
149+
switch rs := rs.(type) {
150+
case *stats.Begin:
151+
case *stats.InPayload:
152+
if gctx != nil {
153+
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
154+
}
155+
span.AddEvent("message",
156+
trace.WithAttributes(
157+
semconv.MessageTypeReceived,
158+
semconv.MessageIDKey.Int64(messageId),
159+
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
160+
semconv.MessageUncompressedSizeKey.Int(rs.Length),
161+
),
162+
)
163+
case *stats.OutPayload:
164+
if gctx != nil {
165+
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
166+
}
167+
168+
span.AddEvent("message",
169+
trace.WithAttributes(
170+
semconv.MessageTypeSent,
171+
semconv.MessageIDKey.Int64(messageId),
172+
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
173+
semconv.MessageUncompressedSizeKey.Int(rs.Length),
174+
),
175+
)
176+
case *stats.End:
177+
if rs.Error != nil {
178+
s, _ := status.FromError(rs.Error)
179+
span.SetStatus(codes.Error, s.Message())
180+
span.SetAttributes(statusCodeAttr(s.Code()))
181+
} else {
182+
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
183+
}
184+
span.End()
185+
default:
186+
return
187+
}
188+
}

0 commit comments

Comments
 (0)