Skip to content

Commit a9601d9

Browse files
authored
xds: update nonce even if the ACK/NACK is not sent on wire (#3497)
This can happen when the watch is canceled while the response is on wire. Also, tag ACK/NACK with the stream so nonce for a new stream doesn't get updated by a ACK from the previous stream.
1 parent 66e9dfe commit a9601d9

File tree

3 files changed

+137
-9
lines changed

3 files changed

+137
-9
lines changed

xds/internal/client/types.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,12 @@ func (wi *watchInfo) stopTimer() {
7272

7373
type ackInfo struct {
7474
typeURL string
75-
version string // Nack if version is an empty string.
75+
version string // NACK if version is an empty string.
7676
nonce string
77+
// ACK/NACK are tagged with the stream it's for. When the stream is down,
78+
// all the ACK/NACK for this stream will be dropped, and the version/nonce
79+
// won't be updated.
80+
stream adsStream
7781
}
7882

7983
type ldsUpdate struct {

xds/internal/client/v2client.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -255,25 +255,36 @@ func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, v
255255
// processAckInfo pulls the fields needed by the ack request from a ackInfo.
256256
//
257257
// If no active watch is found for this ack, it returns false for send.
258-
func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
258+
func (v2c *v2Client) processAckInfo(t *ackInfo, stream adsStream) (target []string, typeURL, version, nonce string, send bool) {
259+
if t.stream != stream {
260+
// If ACK's stream isn't the current sending stream, this means the ACK
261+
// was pushed to queue before the old stream broke, and a new stream has
262+
// been started since. Return immediately here so we don't update the
263+
// nonce for the new stream.
264+
return
265+
}
259266
typeURL = t.typeURL
260267

261268
v2c.mu.Lock()
262269
defer v2c.mu.Unlock()
270+
271+
// Update the nonce no matter if we are going to send the ACK request on
272+
// wire. We may not send the request if the watch is canceled. But the nonce
273+
// needs to be updated so the next request will have the right nonce.
274+
nonce = t.nonce
275+
v2c.nonceMap[typeURL] = nonce
276+
263277
wi, ok := v2c.watchMap[typeURL]
264278
if !ok {
265279
// We don't send the request ack if there's no active watch (this can be
266280
// either the server sends responses before any request, or the watch is
267281
// canceled while the ackInfo is in queue), because there's no resource
268282
// name. And if we send a request with empty resource name list, the
269283
// server may treat it as a wild card and send us everything.
270-
return // This returns all zero values, and false for send.
284+
return nil, "", "", "", false
271285
}
272286
send = true
273-
274287
version = t.version
275-
nonce = t.nonce
276-
target = wi.target
277288
if version == "" {
278289
// This is a nack, get the previous acked version.
279290
version = v2c.versionMap[typeURL]
@@ -283,8 +294,8 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi
283294
} else {
284295
v2c.versionMap[typeURL] = version
285296
}
286-
v2c.nonceMap[typeURL] = nonce
287-
return
297+
target = wi.target
298+
return target, typeURL, version, nonce, send
288299
}
289300

290301
// send is a separate goroutine for sending watch requests on the xds stream.
@@ -327,7 +338,7 @@ func (v2c *v2Client) send() {
327338
case *watchInfo:
328339
target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
329340
case *ackInfo:
330-
target, typeURL, version, nonce, send = v2c.processAckInfo(t)
341+
target, typeURL, version, nonce, send = v2c.processAckInfo(t, stream)
331342
}
332343
if !send {
333344
continue
@@ -381,6 +392,7 @@ func (v2c *v2Client) recv(stream adsStream) bool {
381392
typeURL: typeURL,
382393
version: "",
383394
nonce: resp.GetNonce(),
395+
stream: stream,
384396
})
385397
v2c.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce(), respHandleErr)
386398
continue
@@ -389,6 +401,7 @@ func (v2c *v2Client) recv(stream adsStream) bool {
389401
typeURL: typeURL,
390402
version: resp.GetVersionInfo(),
391403
nonce: resp.GetNonce(),
404+
stream: stream,
392405
})
393406
v2c.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce())
394407
success = true

xds/internal/client/v2client_ack_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Ch
9898
//
9999
// It also waits and checks that the ack request contains the given version, and
100100
// the generated nonce.
101+
//
102+
// TODO: make this and other helper function either consistently return error,
103+
// and fatal() in the test code, or all call t.Fatal(), and mark them as
104+
// helper().
101105
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (nonce string) {
102106
nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
103107
t.Logf("Good %s response pushed to fakeServer...", xdsname)
@@ -263,3 +267,110 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
263267
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
264268
versionLDS++
265269
}
270+
271+
// TestV2ClientAckNewWatchAfterCancel verifies the new request for a new watch
272+
// after the previous watch is canceled, has the right version.
273+
func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
274+
var versionCDS = 3000
275+
276+
fakeServer, cc, cleanup := startServerAndGetCC(t)
277+
defer cleanup()
278+
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
279+
defer v2c.close()
280+
t.Log("Started xds v2Client...")
281+
282+
// Start a CDS watch.
283+
callbackCh := testutils.NewChannel()
284+
cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
285+
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
286+
callbackCh.Send(struct{}{})
287+
})
288+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
289+
t.Fatal(err)
290+
}
291+
t.Logf("FakeServer received %s request...", "CDS")
292+
293+
// Send a good CDS response, this function waits for the ACK with the right
294+
// version.
295+
nonce := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
296+
297+
// Cancel the CDS watch, and start a new one. The new watch should have the
298+
// version from the response above.
299+
cancel()
300+
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
301+
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
302+
callbackCh.Send(struct{}{})
303+
})
304+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
305+
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
306+
}
307+
versionCDS++
308+
309+
// Send a bad response with the next version.
310+
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
311+
versionCDS++
312+
313+
// send another good response, and check for ack, with the new version.
314+
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
315+
versionCDS++
316+
}
317+
318+
// TestV2ClientAckCancelResponseRace verifies if the response and ACK request
319+
// race with cancel (which means the ACK request will not be sent on wire,
320+
// because there's no active watch), the nonce will still be updated, and the
321+
// new request with the new watch will have the correct nonce.
322+
func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
323+
var versionCDS = 3000
324+
325+
fakeServer, cc, cleanup := startServerAndGetCC(t)
326+
defer cleanup()
327+
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
328+
defer v2c.close()
329+
t.Log("Started xds v2Client...")
330+
331+
// Start a CDS watch.
332+
callbackCh := testutils.NewChannel()
333+
cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
334+
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
335+
callbackCh.Send(struct{}{})
336+
})
337+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
338+
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
339+
}
340+
t.Logf("FakeServer received %s request...", "CDS")
341+
342+
// send another good response, and check for ack, with the new version.
343+
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
344+
versionCDS++
345+
346+
// Cancel the watch before the next response is sent. This mimics the case
347+
// watch is canceled while response is on wire.
348+
cancel()
349+
350+
// Send a good response.
351+
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodCDSResponse1, versionCDS)
352+
t.Logf("Good %s response pushed to fakeServer...", "CDS")
353+
354+
// Expect no ACK because watch was canceled.
355+
if req, err := fakeServer.XDSRequestChan.Receive(); err != testutils.ErrRecvTimeout {
356+
t.Fatalf("Got unexpected xds request after watch is canceled: %v", req)
357+
}
358+
359+
// Start a new watch. The new watch should have the nonce from the response
360+
// above, and version from the first good response.
361+
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
362+
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err)
363+
callbackCh.Send(struct{}{})
364+
})
365+
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
366+
t.Fatalf("Failed to receive %s request: %v", "CDS", err)
367+
}
368+
369+
// Send a bad response with the next version.
370+
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
371+
versionCDS++
372+
373+
// send another good response, and check for ack, with the new version.
374+
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh)
375+
versionCDS++
376+
}

0 commit comments

Comments
 (0)