From 1c4d2ec05bd9a750889563881e4d4430d40f1dac Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Mon, 13 Nov 2023 15:56:53 -0800 Subject: [PATCH 1/8] fix data race from stats_handler.go (#1) ``` WARNING: DATA RACE Read at 0x00c000bef1d8 by goroutine 890: go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.(*config).handleRPC() /go/pkg/mod/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc@v0.46.0/stats_handler.go:199 +0x190b go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc.(*clientHandler).HandleRPC() /go/pkg/mod/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc@v0.46.0/stats_handler.go:124 +0x65 google.golang.org/grpc.(*csAttempt).finish() /go/pkg/mod/google.golang.org/grpc@v1.59.0/stream.go:1171 +0x650 google.golang.org/grpc.(*clientStream).finish() /go/pkg/mod/google.golang.org/grpc@v1.59.0/stream.go:988 +0x34b google.golang.org/grpc.(*clientStream).RecvMsg() /go/pkg/mod/google.golang.org/grpc@v1.59.0/stream.go:940 +0x2d2 github.com/prodvana/prodvana-public/go/prodvana-sdk/proto/prodvana/agent.(*agentInteractionProxyAPIServerClient).Recv() /code/go-sdk/proto/prodvana/agent/agent_interaction_grpc.pb.go:181 +0x67 prodvana/grpc/connwrapper.ConnectConnAndStreamingServer[...].func2.2() grpc/connwrapper/conn.go:141 +0xbc ``` --- .../google.golang.org/grpc/otelgrpc/stats_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index 0211e55e003..8e13bb54e87 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -195,8 +195,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { metricAttrs = append(metricAttrs, rpcStatusAttr) c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) - c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) - c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + c.rpcRequestsPerRPC.Record(wctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...)) + c.rpcResponsesPerRPC.Record(wctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...)) default: return From 0321279a9e8bcb45530ec181c11b706e061e06e9 Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Mon, 13 Nov 2023 19:45:05 -0800 Subject: [PATCH 2/8] add test for stats_handler (#2) --- .../grpc/otelgrpc/stats_handler_test.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go new file mode 100644 index 00000000000..c5795663051 --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go @@ -0,0 +1,40 @@ +package otelgrpc + +import ( + "context" + "sync" + "testing" + + "google.golang.org/grpc/stats" +) + +func TestHandleRPC(t *testing.T) { + const iteration = 100 + wg := &sync.WaitGroup{} + ctx := context.Background() + h := NewClientHandler() + ctx = h.TagRPC(ctx, &stats.RPCTagInfo{ + FullMethodName: "test/method", + }) + for i := 0; i < iteration; i++ { + wg.Add(1) + go func() { + defer wg.Done() + h.HandleRPC(ctx, &stats.Begin{}) + wg.Add(3) + go func() { + defer wg.Done() + h.HandleRPC(ctx, &stats.InPayload{}) + }() + go func() { + defer wg.Done() + h.HandleRPC(ctx, &stats.OutPayload{}) + }() + go func() { + defer wg.Done() + h.HandleRPC(ctx, &stats.End{}) + }() + }() + } + wg.Wait() +} From fc217ae67f62f9080faa8641d3d4ccc31a5ada1d Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Tue, 14 Nov 2023 18:25:30 +0000 Subject: [PATCH 3/8] do full test --- .../grpc/otelgrpc/stats_handler_test.go | 40 ------------------- .../otelgrpc/test/grpc_stats_handler_test.go | 25 ++++++++++++ 2 files changed, 25 insertions(+), 40 deletions(-) delete mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go deleted file mode 100644 index c5795663051..00000000000 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package otelgrpc - -import ( - "context" - "sync" - "testing" - - "google.golang.org/grpc/stats" -) - -func TestHandleRPC(t *testing.T) { - const iteration = 100 - wg := &sync.WaitGroup{} - ctx := context.Background() - h := NewClientHandler() - ctx = h.TagRPC(ctx, &stats.RPCTagInfo{ - FullMethodName: "test/method", - }) - for i := 0; i < iteration; i++ { - wg.Add(1) - go func() { - defer wg.Done() - h.HandleRPC(ctx, &stats.Begin{}) - wg.Add(3) - go func() { - defer wg.Done() - h.HandleRPC(ctx, &stats.InPayload{}) - }() - go func() { - defer wg.Done() - h.HandleRPC(ctx, &stats.OutPayload{}) - }() - go func() { - defer wg.Done() - h.HandleRPC(ctx, &stats.End{}) - }() - }() - } - wg.Wait() -} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index e6fd212f904..888b6a13269 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -17,6 +17,7 @@ package test import ( "context" "net" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -1316,3 +1317,27 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } + +func TestStatsHandlerConcurrentSafe(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to open port") + client := newGrpcTest(t, listener, + []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + }, + []grpc.ServerOption{ + grpc.StatsHandler(otelgrpc.NewServerHandler()), + }, + ) + + wg := &sync.WaitGroup{} + const n = 100 + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + doCalls(client) + }() + } + wg.Wait() +} From e70689ca45a7f7570b5862070450b26577358651 Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Wed, 15 Nov 2023 01:21:05 +0000 Subject: [PATCH 4/8] simulate data race in test for real --- .../otelgrpc/test/grpc_stats_handler_test.go | 73 ++++++++++++++++--- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 888b6a13269..eb4179b0213 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -16,6 +16,8 @@ package test import ( "context" + "fmt" + "io" "net" "sync" "testing" @@ -24,6 +26,8 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/interop" + "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" @@ -35,6 +39,7 @@ import ( "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + testpb "google.golang.org/grpc/interop/grpc_testing" ) func TestStatsHandler(t *testing.T) { @@ -1318,7 +1323,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } -func TestStatsHandlerConcurrentSafe(t *testing.T) { +// Ensure there is no data race for the following scenario: +// Bidirectional streaming + client cancels context in the middle of streaming. +func TestStatsHandlerConcurrentSafeContextCancellation(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to open port") client := newGrpcTest(t, listener, @@ -1330,14 +1337,60 @@ func TestStatsHandlerConcurrentSafe(t *testing.T) { }, ) - wg := &sync.WaitGroup{} - const n = 100 - for i := 0; i < n; i++ { - wg.Add(1) - go func() { - defer wg.Done() - doCalls(client) - }() + const testCount = 100 + for i := 0; i < testCount; i++ { + t.Run(fmt.Sprintf("run_%d", i), func(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + ctx, cancel := context.WithCancel(context.Background()) + stream, err := client.FullDuplexCall(ctx) + require.NoError(t, err) + const messageCount = 100 + go func() { + defer wg.Done() + sendWg := &sync.WaitGroup{} + for i := 0; i < messageCount; i++ { + sendWg.Add(1) + go func() { + defer sendWg.Done() + const reqSize = 10 + pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) + respParam := []*testpb.ResponseParameters{ + { + Size: reqSize, + }, + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseParameters: respParam, + Payload: pl, + } + err := stream.Send(req) + if err == io.EOF { // possible due to context cancellation + require.ErrorIs(t, ctx.Err(), context.Canceled) + } else { + require.NoError(t, err) + } + }() + } + sendWg.Wait() + require.NoError(t, stream.CloseSend()) + }() + go func() { + defer wg.Done() + for i := 0; i < messageCount; i++ { + _, err := stream.Recv() + if i > messageCount/2 { + cancel() + } + // must continue to receive messages until server acknowledges the cancellation, to ensure no data race happens there too + if status.Code(err) == codes.Canceled { + return + } + require.NoError(t, err) + } + }() + wg.Wait() + }) } - wg.Wait() } From 150d4f1a5ad5da5f4999933ce08f5fba8c1a05c9 Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Wed, 15 Nov 2023 06:35:27 -0800 Subject: [PATCH 5/8] Update grpc_stats_handler_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert PajÄ…k --- .../otelgrpc/test/grpc_stats_handler_test.go | 101 +++++++++--------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index eb4179b0213..fb9f61972b5 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -1337,60 +1337,57 @@ func TestStatsHandlerConcurrentSafeContextCancellation(t *testing.T) { }, ) - const testCount = 100 - for i := 0; i < testCount; i++ { - t.Run(fmt.Sprintf("run_%d", i), func(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(2) - ctx, cancel := context.WithCancel(context.Background()) - stream, err := client.FullDuplexCall(ctx) - require.NoError(t, err) - const messageCount = 100 - go func() { - defer wg.Done() - sendWg := &sync.WaitGroup{} - for i := 0; i < messageCount; i++ { - sendWg.Add(1) - go func() { - defer sendWg.Done() - const reqSize = 10 - pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) - respParam := []*testpb.ResponseParameters{ - { - Size: reqSize, - }, - } - req := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseParameters: respParam, - Payload: pl, - } - err := stream.Send(req) - if err == io.EOF { // possible due to context cancellation - require.ErrorIs(t, ctx.Err(), context.Canceled) - } else { - require.NoError(t, err) - } - }() + const n = 10 + for i := 0; i < n; i++ { + ctx, cancel := context.WithCancel(context.Background()) + stream, err := client.FullDuplexCall(ctx) + require.NoError(t, err) + + const messageCount = 10 + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < messageCount; i++ { + const reqSize = 1 + pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) + respParam := []*testpb.ResponseParameters{ + { + Size: reqSize, + }, + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseParameters: respParam, + Payload: pl, } - sendWg.Wait() - require.NoError(t, stream.CloseSend()) - }() - go func() { - defer wg.Done() - for i := 0; i < messageCount; i++ { - _, err := stream.Recv() - if i > messageCount/2 { - cancel() - } - // must continue to receive messages until server acknowledges the cancellation, to ensure no data race happens there too - if status.Code(err) == codes.Canceled { - return - } + err := stream.Send(req) + if err == io.EOF { // possible due to context cancellation + require.ErrorIs(t, ctx.Err(), context.Canceled) + } else { require.NoError(t, err) } - }() - wg.Wait() - }) + } + require.NoError(t, stream.CloseSend()) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < messageCount; i++ { + _, err := stream.Recv() + if i > messageCount/2 { + cancel() + } + // must continue to receive messages until server acknowledges the cancellation, to ensure no data race happens there too + if status.Code(err) == codes.Canceled { + return + } + require.NoError(t, err) + } + }() + + wg.Wait() } } From 40a6ab783b1c39686f8c973af43a9c35c229bab0 Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Wed, 15 Nov 2023 17:58:44 +0000 Subject: [PATCH 6/8] fix fmt import --- .../grpc/otelgrpc/test/grpc_stats_handler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index fb9f61972b5..90e663ffa97 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -16,7 +16,6 @@ package test import ( "context" - "fmt" "io" "net" "sync" From 1e42c207d9b129841c248f39ba63fe1be9f7e620 Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Wed, 15 Nov 2023 17:59:19 +0000 Subject: [PATCH 7/8] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c0eb929792..275d1f4be14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537) +- Fix data race in stats handlers when processing messages received and sent metrics in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4577) ## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10 From d590a3b5068d75ba7104722dce20de1f331c5cea Mon Sep 17 00:00:00 2001 From: Naphat Sanguansin Date: Wed, 15 Nov 2023 21:47:19 +0000 Subject: [PATCH 8/8] ran make --- .../grpc/otelgrpc/test/grpc_stats_handler_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 90e663ffa97..f8dd8871072 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -35,10 +35,11 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + testpb "google.golang.org/grpc/interop/grpc_testing" + "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - testpb "google.golang.org/grpc/interop/grpc_testing" ) func TestStatsHandler(t *testing.T) {