Skip to content

Commit b205df6

Browse files
menghanlGarrettGutierrez1
authored andcommitted
xdsclient: populate error details for NACK (#3975)
1 parent 75e2768 commit b205df6

File tree

5 files changed

+57
-30
lines changed

5 files changed

+57
-30
lines changed

xds/internal/client/client_xds.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (ma
4545
update := make(map[string]ListenerUpdate)
4646
for _, r := range resources {
4747
if !IsListenerResource(r.GetTypeUrl()) {
48-
return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
48+
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl())
4949
}
5050
lis := &v3listenerpb.Listener{}
5151
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
@@ -69,7 +69,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
6969
}
7070
apiLisAny := lis.GetApiListener().GetApiListener()
7171
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
72-
return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
72+
return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
7373
}
7474
apiLis := &v3httppb.HttpConnectionManager{}
7575
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
@@ -106,7 +106,7 @@ func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpcl
106106
update := make(map[string]RouteConfigUpdate)
107107
for _, r := range resources {
108108
if !IsRouteConfigResource(r.GetTypeUrl()) {
109-
return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
109+
return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl())
110110
}
111111
rc := &v3routepb.RouteConfiguration{}
112112
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
@@ -370,7 +370,7 @@ func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map
370370
update := make(map[string]ClusterUpdate)
371371
for _, r := range resources {
372372
if !IsClusterResource(r.GetTypeUrl()) {
373-
return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
373+
return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl())
374374
}
375375

376376
cluster := &v3clusterpb.Cluster{}
@@ -418,7 +418,7 @@ func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (m
418418
update := make(map[string]EndpointsUpdate)
419419
for _, r := range resources {
420420
if !IsEndpointsResource(r.GetTypeUrl()) {
421-
return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
421+
return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl())
422422
}
423423

424424
cla := &v3endpointpb.ClusterLoadAssignment{}

xds/internal/client/transport_helper.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type VersionedClient interface {
5151

5252
// SendRequest constructs and sends out a DiscoveryRequest message specific
5353
// to the underlying transport protocol version.
54-
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
54+
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error
5555

5656
// RecvResponse uses the provided stream to receive a response specific to
5757
// the underlying transport protocol version.
@@ -246,10 +246,10 @@ func (t *TransportHelper) send(ctx context.Context) {
246246
t.sendCh.Load()
247247

248248
var (
249-
target []string
250-
rType ResourceType
251-
version, nonce string
252-
send bool
249+
target []string
250+
rType ResourceType
251+
version, nonce, errMsg string
252+
send bool
253253
)
254254
switch update := u.(type) {
255255
case *watchAction:
@@ -259,6 +259,7 @@ func (t *TransportHelper) send(ctx context.Context) {
259259
if !send {
260260
continue
261261
}
262+
errMsg = update.errMsg
262263
}
263264
if stream == nil {
264265
// There's no stream yet. Skip the request. This request
@@ -267,7 +268,7 @@ func (t *TransportHelper) send(ctx context.Context) {
267268
// sending response back).
268269
continue
269270
}
270-
if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
271+
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
271272
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
272273
// send failed, clear the current stream.
273274
stream = nil
@@ -292,7 +293,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
292293
t.nonceMap = make(map[ResourceType]string)
293294

294295
for rType, s := range t.watchMap {
295-
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
296+
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
296297
t.logger.Errorf("ADS request failed: %v", err)
297298
return false
298299
}
@@ -321,6 +322,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
321322
rType: rType,
322323
version: "",
323324
nonce: nonce,
325+
errMsg: err.Error(),
324326
stream: stream,
325327
})
326328
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
@@ -387,6 +389,7 @@ type ackAction struct {
387389
rType ResourceType
388390
version string // NACK if version is an empty string.
389391
nonce string
392+
errMsg string // Empty unless it's a NACK.
390393
// ACK/NACK are tagged with the stream it's for. When the stream is down,
391394
// all the ACK/NACK for this stream will be dropped, and the version/nonce
392395
// won't be updated.

xds/internal/client/v2/client.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/golang/protobuf/proto"
2828
"google.golang.org/grpc"
29+
"google.golang.org/grpc/codes"
2930
"google.golang.org/grpc/internal/grpclog"
3031
xdsclient "google.golang.org/grpc/xds/internal/client"
3132
"google.golang.org/grpc/xds/internal/client/load"
@@ -34,6 +35,7 @@ import (
3435
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
3536
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
3637
v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
38+
statuspb "google.golang.org/genproto/googleapis/rpc/status"
3739
)
3840

3941
func init() {
@@ -155,7 +157,7 @@ func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
155157
// - If this is an ack, version will be the version from the response.
156158
// - If this is a nack, version will be the previous acked version (from
157159
// versionMap). If there was no ack before, it will be empty.
158-
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
160+
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
159161
stream, ok := s.(adsStream)
160162
if !ok {
161163
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -166,7 +168,11 @@ func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
166168
ResourceNames: resourceNames,
167169
VersionInfo: version,
168170
ResponseNonce: nonce,
169-
// TODO: populate ErrorDetails for nack.
171+
}
172+
if errMsg != "" {
173+
req.ErrorDetail = &statuspb.Status{
174+
Code: int32(codes.InvalidArgument), Message: errMsg,
175+
}
170176
}
171177
if err := stream.Send(req); err != nil {
172178
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)

xds/internal/client/v2/client_ack_test.go

+26-14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
anypb "github.com/golang/protobuf/ptypes/any"
3030
"github.com/google/go-cmp/cmp"
3131
"google.golang.org/grpc"
32+
"google.golang.org/grpc/codes"
3233
"google.golang.org/grpc/internal/testutils"
3334
xdsclient "google.golang.org/grpc/xds/internal/client"
3435
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
@@ -73,7 +74,7 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb
7374
}
7475

7576
// compareXDSRequest reads requests from channel, compare it with want.
76-
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error {
77+
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error {
7778
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
7879
defer cancel()
7980
val, err := ch.Receive(ctx)
@@ -84,11 +85,22 @@ func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver,
8485
if req.Err != nil {
8586
return fmt.Errorf("unexpected error from request: %v", req.Err)
8687
}
88+
89+
xdsReq := req.Req.(*xdspb.DiscoveryRequest)
90+
if (xdsReq.ErrorDetail != nil) != wantErr {
91+
return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr)
92+
}
93+
// All NACK request.ErrorDetails have hardcoded status code InvalidArguments.
94+
if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) {
95+
return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument)
96+
}
97+
98+
xdsReq.ErrorDetail = nil // Clear the error details field before comparing.
8799
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
88100
wantClone.VersionInfo = ver
89101
wantClone.ResponseNonce = nonce
90-
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
91-
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
102+
if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) {
103+
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal)))
92104
}
93105
return nil
94106
}
@@ -118,7 +130,7 @@ func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan *
118130
}
119131
v2c.AddWatch(rType, nameToWatch)
120132

121-
if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
133+
if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil {
122134
t.Fatalf("Failed to receive %v request: %v", rType, err)
123135
}
124136
t.Logf("FakeServer received %v request...", rType)
@@ -133,7 +145,7 @@ func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakese
133145
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
134146
t.Logf("Good %v response pushed to fakeServer...", rType)
135147

136-
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
148+
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil {
137149
return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
138150
}
139151
t.Logf("Good %v response acked", rType)
@@ -168,7 +180,7 @@ func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeser
168180
TypeUrl: typeURL,
169181
}, ver)
170182
t.Logf("Bad %v response pushed to fakeServer...", rType)
171-
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
183+
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil {
172184
return fmt.Errorf("failed to receive %v request: %v", rType, err)
173185
}
174186
t.Logf("Bad %v response nacked", rType)
@@ -274,7 +286,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {
274286

275287
// The expected version string is an empty string, because this is the first
276288
// response, and it's nacked (so there's no previous ack version).
277-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
289+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil {
278290
t.Errorf("Failed to receive request: %v", err)
279291
}
280292
t.Logf("Bad response nacked")
@@ -314,7 +326,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
314326
t.Logf("Bad response pushed to fakeServer...")
315327

316328
// The expected version string is the previous acked version.
317-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
329+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil {
318330
t.Errorf("Failed to receive request: %v", err)
319331
}
320332
t.Logf("Bad response nacked")
@@ -339,7 +351,7 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
339351

340352
// Start a CDS watch.
341353
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
342-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
354+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
343355
t.Fatal(err)
344356
}
345357
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -356,12 +368,12 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
356368
// Wait for a request with no resource names, because the only watch was
357369
// removed.
358370
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
359-
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
371+
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
360372
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
361373
}
362374
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
363375
// Wait for a request with correct resource names and version.
364-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
376+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil {
365377
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
366378
}
367379
versionCDS++
@@ -394,7 +406,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
394406

395407
// Start a CDS watch.
396408
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
397-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
409+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
398410
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
399411
}
400412
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
@@ -410,7 +422,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
410422
// Wait for a request with no resource names, because the only watch was
411423
// removed.
412424
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
413-
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
425+
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
414426
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
415427
}
416428
versionCDS++
@@ -440,7 +452,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
440452
// Start a new watch. The new watch should have the nonce from the response
441453
// above, and version from the first good response.
442454
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
443-
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
455+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil {
444456
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
445457
}
446458

xds/internal/client/v3/client.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
"sync"
2626

2727
"github.com/golang/protobuf/proto"
28+
statuspb "google.golang.org/genproto/googleapis/rpc/status"
2829
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
2931
"google.golang.org/grpc/internal/grpclog"
3032
xdsclient "google.golang.org/grpc/xds/internal/client"
3133
"google.golang.org/grpc/xds/internal/client/load"
@@ -153,7 +155,7 @@ func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
153155
// - If this is an ack, version will be the version from the response.
154156
// - If this is a nack, version will be the previous acked version (from
155157
// versionMap). If there was no ack before, it will be empty.
156-
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
158+
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
157159
stream, ok := s.(adsStream)
158160
if !ok {
159161
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
@@ -164,7 +166,11 @@ func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
164166
ResourceNames: resourceNames,
165167
VersionInfo: version,
166168
ResponseNonce: nonce,
167-
// TODO: populate ErrorDetails for nack.
169+
}
170+
if errMsg != "" {
171+
req.ErrorDetail = &statuspb.Status{
172+
Code: int32(codes.InvalidArgument), Message: errMsg,
173+
}
168174
}
169175
if err := stream.Send(req); err != nil {
170176
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)

0 commit comments

Comments
 (0)