Skip to content

Commit a787582

Browse files
authored
Middleware: gRPC support (part 4/4) (#12846)
#### Description Adds the gRPC middleware support from #12842. #### Link to tracking issue Part of #12603. #### Testing Yes. #### Documentation Added.
1 parent 97130f0 commit a787582

File tree

8 files changed

+472
-6
lines changed

8 files changed

+472
-6
lines changed

.chloggen/middleware-grpc.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: configgrpc
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add gRPC middleware support.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12603, 9591]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

config/configgrpc/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ README](../configtls/README.md).
2727
- [`read_buffer_size`](https://godoc.org/google.golang.org/grpc#ReadBufferSize)
2828
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
2929
- [`auth`](../configauth/README.md)
30+
- [`middlewares`](../configmiddleware/README.md)
3031

3132
Please note that [`per_rpc_auth`](https://pkg.go.dev/google.golang.org/grpc#PerRPCCredentials) which allows the credentials to send for every RPC is now moved to become an [extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/extension/bearertokenauthextension). Note that this feature isn't about sending the headers only during the initial connection as an `authorization` header under the `headers` would do: this is sent for every RPC performed during an established connection.
3233

@@ -111,3 +112,4 @@ see [confignet README](../confignet/README.md).
111112
- [`tls`](../configtls/README.md)
112113
- [`write_buffer_size`](https://godoc.org/google.golang.org/grpc#WriteBufferSize)
113114
- [`auth`](../configauth/README.md)
115+
- [`middlewares`](../configmiddleware/README.md)
+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package configgrpc
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/metadata"
16+
17+
"go.opentelemetry.io/collector/component"
18+
"go.opentelemetry.io/collector/component/componenttest"
19+
"go.opentelemetry.io/collector/config/configmiddleware"
20+
"go.opentelemetry.io/collector/config/confignet"
21+
"go.opentelemetry.io/collector/config/configtls"
22+
"go.opentelemetry.io/collector/extension"
23+
"go.opentelemetry.io/collector/extension/extensionmiddleware"
24+
"go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest"
25+
)
26+
27+
// testlientMiddleware is a mock implementation of a middleware extension
28+
type testClientMiddleware struct {
29+
extension.Extension
30+
extensionmiddleware.GetGRPCClientOptionsFunc
31+
}
32+
33+
func newTestMiddlewareConfig(name string) configmiddleware.Config {
34+
return configmiddleware.Config{
35+
ID: component.MustNewID(name),
36+
}
37+
}
38+
39+
func newTestClientMiddleware(name string) extension.Extension {
40+
return &testClientMiddleware{
41+
Extension: extensionmiddlewaretest.NewNop(),
42+
GetGRPCClientOptionsFunc: func() ([]grpc.DialOption, error) {
43+
return []grpc.DialOption{
44+
grpc.WithChainUnaryInterceptor(
45+
func(
46+
ctx context.Context,
47+
method string,
48+
req, reply any,
49+
cc *grpc.ClientConn,
50+
invoker grpc.UnaryInvoker,
51+
opts ...grpc.CallOption,
52+
) error {
53+
// Get existing metadata or create new metadata
54+
md, ok := metadata.FromOutgoingContext(ctx)
55+
if !ok {
56+
md = metadata.New(nil)
57+
} else {
58+
// Clone the metadata to avoid modifying the real metadata map
59+
md = md.Copy()
60+
}
61+
62+
// Check if there's already a middleware sequence header
63+
sequence := ""
64+
if values := md.Get("middleware-sequence"); len(values) > 0 {
65+
sequence = values[0]
66+
}
67+
68+
// Append this middleware's ID to the sequence
69+
if sequence == "" {
70+
sequence = name
71+
} else {
72+
sequence = fmt.Sprintf("%s,%s", sequence, name)
73+
}
74+
75+
// Set the updated sequence
76+
md.Set("middleware-sequence", sequence)
77+
78+
// Create a new context with the updated metadata
79+
newCtx := metadata.NewOutgoingContext(ctx, md)
80+
81+
// Continue the call with our updated context
82+
return invoker(newCtx, method, req, reply, cc, opts...)
83+
}),
84+
}, nil
85+
},
86+
}
87+
}
88+
89+
// TestClientMiddlewareOrdering verifies that client middleware
90+
// interceptors are called in the right order.
91+
func TestClientMiddlewareOrdering(t *testing.T) {
92+
// Create a middleware tracking header that will be modified by our middleware interceptors
93+
const middlewareTrackingHeader = "middleware-sequence"
94+
95+
// Create middleware extensions that will modify the metadata to track their execution order
96+
mockMiddleware1 := newTestClientMiddleware("middleware-1")
97+
mockMiddleware2 := newTestClientMiddleware("middleware-2")
98+
99+
mockExt := map[component.ID]component.Component{
100+
component.MustNewID("middleware1"): mockMiddleware1,
101+
component.MustNewID("middleware2"): mockMiddleware2,
102+
}
103+
104+
// Start a gRPC server that will record the incoming metadata
105+
server := &grpcTraceServer{}
106+
srv, addr := server.startTestServer(t, ServerConfig{
107+
NetAddr: confignet.AddrConfig{
108+
Endpoint: "localhost:0",
109+
Transport: confignet.TransportTypeTCP,
110+
},
111+
})
112+
defer srv.Stop()
113+
114+
// Create client config with middleware extensions
115+
clientConfig := ClientConfig{
116+
Endpoint: addr,
117+
TLSSetting: configtls.ClientConfig{
118+
Insecure: true,
119+
},
120+
Middlewares: []configmiddleware.Config{
121+
newTestMiddlewareConfig("middleware1"),
122+
newTestMiddlewareConfig("middleware2"),
123+
},
124+
}
125+
126+
// Create a test host with our mock extensions
127+
host := &mockHost{ext: mockExt}
128+
129+
// Send a request using the client with middleware
130+
resp, err := sendTestRequestWithHost(t, clientConfig, host)
131+
require.NoError(t, err)
132+
assert.NotNil(t, resp)
133+
134+
// Verify that the middleware order was respected as recorded in the metadata
135+
ictx, ok := metadata.FromIncomingContext(server.recordedContext)
136+
require.True(t, ok, "middleware tracking header not found in metadata")
137+
md := ictx[middlewareTrackingHeader]
138+
require.Len(t, md, 1, "expected exactly one middleware tracking header value")
139+
140+
// The sequence should be "middleware-1,middleware-2" as that's the order they were registered
141+
expectedSequence := "middleware-1,middleware-2"
142+
assert.Equal(t, expectedSequence, md[0])
143+
}
144+
145+
// TestClientMiddlewareToClientErrors tests failure cases for the ToClient method
146+
// specifically related to middleware resolution and API calls.
147+
func TestClientMiddlewareToClientErrors(t *testing.T) {
148+
tests := []struct {
149+
name string
150+
host component.Host
151+
config ClientConfig
152+
errText string
153+
}{
154+
{
155+
name: "extension_not_found",
156+
host: &mockHost{
157+
ext: map[component.ID]component.Component{},
158+
},
159+
config: ClientConfig{
160+
Endpoint: "localhost:1234",
161+
TLSSetting: configtls.ClientConfig{
162+
Insecure: true,
163+
},
164+
Middlewares: []configmiddleware.Config{
165+
{
166+
ID: component.MustNewID("nonexistent"),
167+
},
168+
},
169+
},
170+
errText: "failed to resolve middleware \"nonexistent\": middleware not found",
171+
},
172+
{
173+
name: "get_client_options_fails",
174+
host: &mockHost{
175+
ext: map[component.ID]component.Component{
176+
component.MustNewID("errormw"): extensionmiddlewaretest.NewErr(errors.New("get options failed")),
177+
},
178+
},
179+
config: ClientConfig{
180+
Endpoint: "localhost:1234",
181+
TLSSetting: configtls.ClientConfig{
182+
Insecure: true,
183+
},
184+
Middlewares: []configmiddleware.Config{
185+
{
186+
ID: component.MustNewID("errormw"),
187+
},
188+
},
189+
},
190+
errText: "get options failed",
191+
},
192+
}
193+
194+
for _, tc := range tests {
195+
t.Run(tc.name, func(t *testing.T) {
196+
// Test creating the client with middleware errors
197+
_, err := tc.config.ToClientConn(context.Background(), tc.host, componenttest.NewNopTelemetrySettings())
198+
require.Error(t, err)
199+
assert.Contains(t, err.Error(), tc.errText)
200+
})
201+
}
202+
}

config/configgrpc/configgrpc.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"go.opentelemetry.io/collector/component"
3232
"go.opentelemetry.io/collector/config/configauth"
3333
"go.opentelemetry.io/collector/config/configcompression"
34+
"go.opentelemetry.io/collector/config/configmiddleware"
3435
"go.opentelemetry.io/collector/config/confignet"
3536
"go.opentelemetry.io/collector/config/configopaque"
3637
"go.opentelemetry.io/collector/config/configtls"
@@ -105,6 +106,9 @@ type ClientConfig struct {
105106

106107
// Auth configuration for outgoing RPCs.
107108
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`
109+
110+
// Middlewares for the gRPC client.
111+
Middlewares []configmiddleware.Config `mapstructure:"middlewares,omitempty"`
108112
}
109113

110114
// NewDefaultClientConfig returns a new instance of ClientConfig with default values.
@@ -197,6 +201,10 @@ type ServerConfig struct {
197201

198202
// Include propagates the incoming connection's metadata to downstream consumers.
199203
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`
204+
205+
// Middlewares for the gRPC server.
206+
Middlewares []configmiddleware.Config `mapstructure:"middlewares,omitempty"`
207+
200208
// prevent unkeyed literal initialization
201209
_ struct{}
202210
}
@@ -372,6 +380,15 @@ func (gcs *ClientConfig) getGrpcDialOptions(
372380
)
373381
}
374382

383+
// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
384+
for _, middleware := range gcs.Middlewares {
385+
middlewareOptions, err := middleware.GetGRPCClientOptions(ctx, host.GetExtensions())
386+
if err != nil {
387+
return nil, fmt.Errorf("failed to get gRPC client options from middleware: %w", err)
388+
}
389+
opts = append(opts, middlewareOptions...)
390+
}
391+
375392
for _, opt := range extraOpts {
376393
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
377394
opts = append(opts, wrapper.opt)
@@ -414,19 +431,20 @@ func (grpcServerOptionWrapper) isToServerOption() {}
414431

415432
// ToServer returns a [grpc.Server] for the configuration.
416433
func (gss *ServerConfig) ToServer(
417-
_ context.Context,
434+
ctx context.Context,
418435
host component.Host,
419436
settings component.TelemetrySettings,
420437
extraOpts ...ToServerOption,
421438
) (*grpc.Server, error) {
422-
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
439+
grpcOpts, err := gss.getGrpcServerOptions(ctx, host, settings, extraOpts)
423440
if err != nil {
424441
return nil, err
425442
}
426443
return grpc.NewServer(grpcOpts...), nil
427444
}
428445

429446
func (gss *ServerConfig) getGrpcServerOptions(
447+
ctx context.Context,
430448
host component.Host,
431449
settings component.TelemetrySettings,
432450
extraOpts []ToServerOption,
@@ -515,6 +533,15 @@ func (gss *ServerConfig) getGrpcServerOptions(
515533

516534
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))
517535

536+
// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
537+
for _, middleware := range gss.Middlewares {
538+
middlewareOptions, err := middleware.GetGRPCServerOptions(ctx, host.GetExtensions())
539+
if err != nil {
540+
return nil, fmt.Errorf("failed to get gRPC server options from middleware: %w", err)
541+
}
542+
opts = append(opts, middlewareOptions...)
543+
}
544+
518545
for _, opt := range extraOpts {
519546
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
520547
opts = append(opts, wrapper.opt)

config/configgrpc/configgrpc_test.go

+31-4
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
299299
Endpoint: "0.0.0.0:1234",
300300
},
301301
}
302-
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
302+
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
303303
require.NoError(t, err)
304304
assert.Len(t, opts, 3)
305305
}
@@ -312,6 +312,7 @@ func TestGrpcServerExtraOption(t *testing.T) {
312312
}
313313
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
314314
opts, err := gss.getGrpcServerOptions(
315+
context.Background(),
315316
componenttest.NewNopHost(),
316317
componenttest.NewNopTelemetrySettings(),
317318
[]ToServerOption{WithGrpcServerOption(extraOpt)},
@@ -401,7 +402,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
401402
},
402403
},
403404
}
404-
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
405+
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
405406
require.NoError(t, err)
406407
assert.Len(t, opts, 10)
407408
}
@@ -1144,9 +1145,13 @@ func (gts *grpcTraceServer) Export(ctx context.Context, _ ptraceotlp.ExportReque
11441145
}
11451146

11461147
func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*grpc.Server, string) {
1148+
return gts.startTestServerWithHost(t, gss, componenttest.NewNopHost())
1149+
}
1150+
1151+
func (gts *grpcTraceServer) startTestServerWithHost(t *testing.T, gss ServerConfig, host component.Host, opts ...ToServerOption) (*grpc.Server, string) {
11471152
listener, err := gss.NetAddr.Listen(context.Background())
11481153
require.NoError(t, err)
1149-
server, err := gss.ToServer(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
1154+
server, err := gss.ToServer(context.Background(), host, componenttest.NewNopTelemetrySettings(), opts...)
11501155
require.NoError(t, err)
11511156
ptraceotlp.RegisterGRPCServer(server, gts)
11521157
go func() {
@@ -1155,8 +1160,30 @@ func (gts *grpcTraceServer) startTestServer(t *testing.T, gss ServerConfig) (*gr
11551160
return server, listener.Addr().String()
11561161
}
11571162

1163+
func (gts *grpcTraceServer) startTestServerWithHostError(_ *testing.T, gss ServerConfig, host component.Host, opts ...ToServerOption) (*grpc.Server, error) {
1164+
listener, err := gss.NetAddr.Listen(context.Background())
1165+
if err != nil {
1166+
return nil, err
1167+
}
1168+
defer listener.Close()
1169+
1170+
server, err := gss.ToServer(context.Background(), host, componenttest.NewNopTelemetrySettings(), opts...)
1171+
if err != nil {
1172+
return nil, err
1173+
}
1174+
1175+
ptraceotlp.RegisterGRPCServer(server, gts)
1176+
return server, nil
1177+
}
1178+
1179+
// sendTestRequest issues a ptraceotlp export request and captures metadata.
11581180
func sendTestRequest(t *testing.T, gcs ClientConfig) (ptraceotlp.ExportResponse, error) {
1159-
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
1181+
return sendTestRequestWithHost(t, gcs, componenttest.NewNopHost())
1182+
}
1183+
1184+
// sendTestRequestWithHost is similar to sendTestRequest but allows specifying the host
1185+
func sendTestRequestWithHost(t *testing.T, gcs ClientConfig, host component.Host) (ptraceotlp.ExportResponse, error) {
1186+
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), host, componenttest.NewNopTelemetrySettings())
11601187
require.NoError(t, errClient)
11611188
defer func() { assert.NoError(t, grpcClientConn.Close()) }()
11621189
c := ptraceotlp.NewGRPCClient(grpcClientConn)

0 commit comments

Comments
 (0)