Skip to content

Commit 5edab9e

Browse files
authored
xdsclient: add grpc.xds_client.server_failure counter mertric (#8203)
1 parent 78ba661 commit 5edab9e

File tree

2 files changed

+178
-1
lines changed

2 files changed

+178
-1
lines changed

xds/internal/xdsclient/clientimpl.go

+11
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ var (
7676
Labels: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
7777
Default: false,
7878
})
79+
xdsClientServerFailureMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
80+
Name: "grpc.xds_client.server_failure",
81+
Description: "A counter of xDS servers going from healthy to unhealthy. A server goes unhealthy when we have a connectivity failure or when the ADS stream fails without seeing a response message, as per gRFC A57.",
82+
Unit: "failure",
83+
Labels: []string{"grpc.target", "grpc.xds.server"},
84+
Default: false,
85+
})
7986
)
8087

8188
// clientImpl is the real implementation of the xDS client. The exported Client
@@ -417,6 +424,10 @@ func (cs *channelState) adsStreamFailure(err error) {
417424
return
418425
}
419426

427+
if xdsresource.ErrType(err) != xdsresource.ErrTypeStreamFailedAfterRecv {
428+
xdsClientServerFailureMetric.Record(cs.parent.metricsRecorder, 1, cs.parent.target, cs.serverConfig.ServerURI())
429+
}
430+
420431
cs.parent.channelsMu.Lock()
421432
defer cs.parent.channelsMu.Unlock()
422433
for authority := range cs.interestedAuthorities {

xds/internal/xdsclient/metrics_test.go

+167-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ func (s) TestResourceUpdateMetrics(t *testing.T) {
6060
if err != nil {
6161
t.Fatalf("net.Listen() failed: %v", err)
6262
}
63-
6463
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l})
6564
const listenerResourceName = "test-listener-resource"
6665
const routeConfigurationName = "test-route-configuration-resource"
@@ -147,3 +146,170 @@ func (s) TestResourceUpdateMetrics(t *testing.T) {
147146
t.Fatalf("Unexpected data for metric \"grpc.xds_client.resource_updates_invalid\", got: %v, want: %v", got, 1)
148147
}
149148
}
149+
150+
// TestServerFailureMetrics_BeforeResponseRecv configures an xDS client, and a
151+
// management server. It then register a watcher and stops the management
152+
// server before sending a resource update, and verifies that the expected
153+
// metrics for server failure are emitted.
154+
func (s) TestServerFailureMetrics_BeforeResponseRecv(t *testing.T) {
155+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
156+
defer cancel()
157+
158+
tmr := stats.NewTestMetricsRecorder()
159+
l, err := testutils.LocalTCPListener()
160+
if err != nil {
161+
t.Fatalf("net.Listen() failed: %v", err)
162+
}
163+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: l})
164+
nodeID := uuid.New().String()
165+
166+
bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
167+
Servers: []byte(fmt.Sprintf(`[{
168+
"server_uri": %q,
169+
"channel_creds": [{"type": "insecure"}]
170+
}]`, mgmtServer.Address)),
171+
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
172+
Authorities: map[string]json.RawMessage{
173+
"authority": []byte("{}"),
174+
},
175+
})
176+
if err != nil {
177+
t.Fatalf("Failed to create bootstrap configuration: %v", err)
178+
}
179+
180+
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
181+
if err != nil {
182+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
183+
}
184+
pool := NewPool(config)
185+
client, close, err := pool.NewClientForTesting(OptionsForTesting{
186+
Name: t.Name(),
187+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
188+
MetricsRecorder: tmr,
189+
})
190+
if err != nil {
191+
t.Fatalf("Failed to create an xDS client: %v", err)
192+
}
193+
defer close()
194+
195+
const listenerResourceName = "test-listener-resource"
196+
197+
// Watch for the listener on the above management server.
198+
xdsresource.WatchListener(client, listenerResourceName, noopListenerWatcher{})
199+
200+
// Close the listener and ensure that the ADS stream breaks. This should
201+
// cause a server failure count to emit eventually.
202+
l.Close()
203+
select {
204+
case <-ctx.Done():
205+
t.Fatal("Timeout when waiting for ADS stream to close")
206+
default:
207+
}
208+
209+
mdWant := stats.MetricsData{
210+
Handle: xdsClientServerFailureMetric.Descriptor(),
211+
IntIncr: 1,
212+
LabelKeys: []string{"grpc.target", "grpc.xds.server"},
213+
LabelVals: []string{"Test/ServerFailureMetrics_BeforeResponseRecv", mgmtServer.Address},
214+
}
215+
if err := tmr.WaitForInt64Count(ctx, mdWant); err != nil {
216+
t.Fatal(err.Error())
217+
}
218+
}
219+
220+
// TestServerFailureMetrics_AfterResponseRecv configures an xDS client, and a
221+
// management server to send a valid LDS updates, and verifies that the
222+
// server failure metric is not emitted. It then closes the management server
223+
// listener to close the ADS stream and verifies that the server failure metric
224+
// is still not emitted because the the ADS stream was closed after having
225+
// received a response on the stream.
226+
func (s) TestServerFailureMetrics_AfterResponseRecv(t *testing.T) {
227+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
228+
defer cancel()
229+
230+
tmr := stats.NewTestMetricsRecorder()
231+
l, err := testutils.LocalTCPListener()
232+
if err != nil {
233+
t.Fatalf("net.Listen() failed: %v", err)
234+
}
235+
lis := testutils.NewRestartableListener(l)
236+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis})
237+
const listenerResourceName = "test-listener-resource"
238+
const routeConfigurationName = "test-route-configuration-resource"
239+
nodeID := uuid.New().String()
240+
resources := e2e.UpdateOptions{
241+
NodeID: nodeID,
242+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
243+
SkipValidation: true,
244+
}
245+
if err := mgmtServer.Update(ctx, resources); err != nil {
246+
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
247+
}
248+
249+
bootstrapContents, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
250+
Servers: []byte(fmt.Sprintf(`[{
251+
"server_uri": %q,
252+
"channel_creds": [{"type": "insecure"}]
253+
}]`, mgmtServer.Address)),
254+
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
255+
Authorities: map[string]json.RawMessage{
256+
"authority": []byte("{}"),
257+
},
258+
})
259+
if err != nil {
260+
t.Fatalf("Failed to create bootstrap configuration: %v", err)
261+
}
262+
263+
config, err := bootstrap.NewConfigFromContents(bootstrapContents)
264+
if err != nil {
265+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bootstrapContents), err)
266+
}
267+
pool := NewPool(config)
268+
client, close, err := pool.NewClientForTesting(OptionsForTesting{
269+
Name: t.Name(),
270+
MetricsRecorder: tmr,
271+
})
272+
if err != nil {
273+
t.Fatalf("Failed to create an xDS client: %v", err)
274+
}
275+
defer close()
276+
277+
// Watch the valid listener configured on the management server. This should
278+
// cause a resource updates valid count to emit eventually.
279+
xdsresource.WatchListener(client, listenerResourceName, noopListenerWatcher{})
280+
mdWant := stats.MetricsData{
281+
Handle: xdsClientResourceUpdatesValidMetric.Descriptor(),
282+
IntIncr: 1,
283+
LabelKeys: []string{"grpc.target", "grpc.xds.server", "grpc.xds.resource_type"},
284+
LabelVals: []string{"Test/ServerFailureMetrics_AfterResponseRecv", mgmtServer.Address, "ListenerResource"},
285+
}
286+
if err := tmr.WaitForInt64Count(ctx, mdWant); err != nil {
287+
t.Fatal(err.Error())
288+
}
289+
// Server failure should have no recording point.
290+
if got, _ := tmr.Metric("grpc.xds_client.server_failure"); got != 0 {
291+
t.Fatalf("Unexpected data for metric \"grpc.xds_client.server_failure\", got: %v, want: %v", got, 0)
292+
}
293+
294+
// Close the listener and ensure that the ADS stream breaks. This should
295+
// cause a server failure count to emit eventually.
296+
lis.Stop()
297+
select {
298+
case <-ctx.Done():
299+
t.Fatal("Timeout when waiting for ADS stream to close")
300+
default:
301+
}
302+
// Restart to prevent the attempt to create a new ADS stream after back off.
303+
lis.Restart()
304+
305+
mdWant = stats.MetricsData{
306+
Handle: xdsClientServerFailureMetric.Descriptor(),
307+
IntIncr: 1,
308+
LabelKeys: []string{"grpc.target", "grpc.xds.server"},
309+
LabelVals: []string{"Test/ServerFailureMetrics_AfterResponseRecv", mgmtServer.Address},
310+
}
311+
// Server failure should still have no recording point.
312+
if err := tmr.WaitForInt64Count(ctx, mdWant); err == nil {
313+
t.Fatal("tmr.WaitForInt64Count(ctx, mdWant) succeeded when expected to timeout.")
314+
}
315+
}

0 commit comments

Comments
 (0)