Skip to content

Commit ab2264b

Browse files
authored
Merge branch 'open-telemetry:main' into main
2 parents 0321279 + 7469f61 commit ab2264b

File tree

5 files changed

+58
-99
lines changed

5 files changed

+58
-99
lines changed

CHANGELOG.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212

1313
- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
1414

15+
### Fixed
16+
17+
- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537)
18+
1519
## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10
1620

1721
### Added
@@ -41,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
4145
- The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045)
4246
- The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now sets gRPC status code correctly for the `rpc.server.duration` metric. (#4481)
4347
- The `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now honor `otelgrpc.WithMessageEvents` options. (#4536)
48+
- The `net.sock.peer.*` and `net.peer.*` high cardinality attributes are removed from the metrics generated by `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4322)
4449

4550
## [1.20.0/0.45.0/0.14.0] - 2023-09-28
4651

@@ -55,10 +60,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
5560
- Upgrade dependencies of OpenTelemetry Go to use the new [`v1.19.0`/`v0.42.0`/`v0.0.7` release](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.19.0).
5661
- Use `grpc.StatsHandler` for gRPC instrumentation in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/example`. (#4325)
5762

58-
### Removed
59-
60-
- The `net.sock.peer.*` and `net.peer.*` high cardinality attributes are removed from the metrics generated by `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4322)
61-
6263
## [1.19.0/0.44.0/0.13.0] - 2023-09-12
6364

6465
### Added

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

+20-71
Original file line numberDiff line numberDiff line change
@@ -125,27 +125,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
125125
}
126126
}
127127

128-
type streamEventType int
129-
130-
type streamEvent struct {
131-
Type streamEventType
132-
Err error
133-
}
134-
135-
const (
136-
receiveEndEvent streamEventType = iota
137-
errorEvent
138-
)
139-
140128
// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
141129
// SendMsg method call.
142130
type clientStream struct {
143131
grpc.ClientStream
132+
desc *grpc.StreamDesc
144133

145-
desc *grpc.StreamDesc
146-
events chan streamEvent
147-
eventsDone chan struct{}
148-
finished chan error
134+
span trace.Span
149135

150136
receivedEvent bool
151137
sentEvent bool
@@ -160,11 +146,11 @@ func (w *clientStream) RecvMsg(m interface{}) error {
160146
err := w.ClientStream.RecvMsg(m)
161147

162148
if err == nil && !w.desc.ServerStreams {
163-
w.sendStreamEvent(receiveEndEvent, nil)
149+
w.endSpan(nil)
164150
} else if err == io.EOF {
165-
w.sendStreamEvent(receiveEndEvent, nil)
151+
w.endSpan(nil)
166152
} else if err != nil {
167-
w.sendStreamEvent(errorEvent, err)
153+
w.endSpan(err)
168154
} else {
169155
w.receivedMessageID++
170156

@@ -186,7 +172,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
186172
}
187173

188174
if err != nil {
189-
w.sendStreamEvent(errorEvent, err)
175+
w.endSpan(err)
190176
}
191177

192178
return err
@@ -195,7 +181,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
195181
func (w *clientStream) Header() (metadata.MD, error) {
196182
md, err := w.ClientStream.Header()
197183
if err != nil {
198-
w.sendStreamEvent(errorEvent, err)
184+
w.endSpan(err)
199185
}
200186

201187
return md, err
@@ -204,54 +190,32 @@ func (w *clientStream) Header() (metadata.MD, error) {
204190
func (w *clientStream) CloseSend() error {
205191
err := w.ClientStream.CloseSend()
206192
if err != nil {
207-
w.sendStreamEvent(errorEvent, err)
193+
w.endSpan(err)
208194
}
209195

210196
return err
211197
}
212198

213-
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
214-
events := make(chan streamEvent)
215-
eventsDone := make(chan struct{})
216-
finished := make(chan error)
217-
218-
go func() {
219-
defer close(eventsDone)
220-
221-
for {
222-
select {
223-
case event := <-events:
224-
switch event.Type {
225-
case receiveEndEvent:
226-
finished <- nil
227-
return
228-
case errorEvent:
229-
finished <- event.Err
230-
return
231-
}
232-
case <-ctx.Done():
233-
finished <- ctx.Err()
234-
return
235-
}
236-
}
237-
}()
238-
199+
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
239200
return &clientStream{
240201
ClientStream: s,
202+
span: span,
241203
desc: desc,
242-
events: events,
243-
eventsDone: eventsDone,
244-
finished: finished,
245204
receivedEvent: cfg.ReceivedEvent,
246205
sentEvent: cfg.SentEvent,
247206
}
248207
}
249208

250-
func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
251-
select {
252-
case <-w.eventsDone:
253-
case w.events <- streamEvent{Type: eventType, Err: err}:
209+
func (w *clientStream) endSpan(err error) {
210+
if err != nil {
211+
s, _ := status.FromError(err)
212+
w.span.SetStatus(codes.Error, s.Message())
213+
w.span.SetAttributes(statusCodeAttr(s.Code()))
214+
} else {
215+
w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
254216
}
217+
218+
w.span.End()
255219
}
256220

257221
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
@@ -306,22 +270,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
306270
span.End()
307271
return s, err
308272
}
309-
stream := wrapClientStream(ctx, s, desc, cfg)
310-
311-
go func() {
312-
err := <-stream.finished
313-
314-
if err != nil {
315-
s, _ := status.FromError(err)
316-
span.SetStatus(codes.Error, s.Message())
317-
span.SetAttributes(statusCodeAttr(s.Code()))
318-
} else {
319-
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
320-
}
321-
322-
span.End()
323-
}()
324-
273+
stream := wrapClientStream(ctx, s, desc, span, cfg)
325274
return stream, nil
326275
}
327276
}

instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ func TestStatsHandler(t *testing.T) {
4949

5050
listener, err := net.Listen("tcp", "127.0.0.1:0")
5151
require.NoError(t, err, "failed to open port")
52-
err = newGrpcTest(
53-
listener,
52+
client := newGrpcTest(t, listener,
5453
[]grpc.DialOption{
5554
grpc.WithStatsHandler(otelgrpc.NewClientHandler(
5655
otelgrpc.WithTracerProvider(clientTP),
@@ -66,7 +65,7 @@ func TestStatsHandler(t *testing.T) {
6665
),
6766
},
6867
)
69-
require.NoError(t, err)
68+
doCalls(client)
7069

7170
t.Run("ClientSpans", func(t *testing.T) {
7271
checkClientSpans(t, clientSR.Ended())

instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go

+13-21
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"net"
2020
"strconv"
2121
"testing"
22-
"time"
2322

2423
"github.com/stretchr/testify/assert"
2524
"github.com/stretchr/testify/require"
@@ -46,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{
4645
Version: otelgrpc.Version(),
4746
}
4847

49-
// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down.
50-
func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
48+
// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup.
49+
func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient {
5150
grpcServer := grpc.NewServer(sOpt...)
5251
pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
5352
errCh := make(chan error)
5453
go func() {
5554
errCh <- grpcServer.Serve(listener)
5655
}()
56+
t.Cleanup(func() {
57+
grpcServer.Stop()
58+
assert.NoError(t, <-errCh)
59+
})
5760
ctx := context.Background()
5861

5962
cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
@@ -68,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv
6871
listener.Addr().String(),
6972
cOpt...,
7073
)
71-
if err != nil {
72-
return err
73-
}
74-
client := pb.NewTestServiceClient(conn)
75-
76-
doCalls(client)
77-
78-
conn.Close()
79-
grpcServer.Stop()
74+
require.NoError(t, err)
75+
t.Cleanup(func() {
76+
assert.NoError(t, conn.Close())
77+
})
8078

81-
return <-errCh
79+
return pb.NewTestServiceClient(conn)
8280
}
8381

8482
func doCalls(client pb.TestServiceClient) {
@@ -106,7 +104,7 @@ func TestInterceptors(t *testing.T) {
106104

107105
listener, err := net.Listen("tcp", "127.0.0.1:0")
108106
require.NoError(t, err, "failed to open port")
109-
err = newGrpcTest(listener,
107+
client := newGrpcTest(t, listener,
110108
[]grpc.DialOption{
111109
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
112110
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
@@ -133,19 +131,13 @@ func TestInterceptors(t *testing.T) {
133131
)),
134132
},
135133
)
136-
require.NoError(t, err)
134+
doCalls(client)
137135

138136
t.Run("UnaryClientSpans", func(t *testing.T) {
139137
checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String())
140138
})
141139

142140
t.Run("StreamClientSpans", func(t *testing.T) {
143-
// StreamClientInterceptor ends the spans asynchronously.
144-
// We need to wait for all spans before asserting them.
145-
require.EventuallyWithT(t, func(c *assert.CollectT) {
146-
assert.Len(c, clientStreamSR.Ended(), 3)
147-
}, 5*time.Second, 100*time.Millisecond)
148-
149141
checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
150142
})
151143

instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"google.golang.org/grpc"
4141
grpc_codes "google.golang.org/grpc/codes"
4242
"google.golang.org/grpc/credentials/insecure"
43+
"google.golang.org/grpc/interop"
4344
"google.golang.org/grpc/interop/grpc_testing"
4445
"google.golang.org/grpc/metadata"
4546
"google.golang.org/grpc/status"
@@ -1128,3 +1129,20 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s
11281129
require.Len(t, rm.ScopeMetrics, 1)
11291130
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
11301131
}
1132+
1133+
func BenchmarkStreamClientInterceptor(b *testing.B) {
1134+
listener, err := net.Listen("tcp", "127.0.0.1:0")
1135+
require.NoError(b, err, "failed to open port")
1136+
client := newGrpcTest(b, listener,
1137+
[]grpc.DialOption{
1138+
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
1139+
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
1140+
},
1141+
[]grpc.ServerOption{},
1142+
)
1143+
1144+
b.ResetTimer()
1145+
for i := 0; i < b.N; i++ {
1146+
interop.DoClientStreaming(client)
1147+
}
1148+
}

0 commit comments

Comments
 (0)