@@ -102,6 +102,7 @@ type resourceTypeState struct {
102
102
nonce string // Last received nonce. Should be reset when the stream breaks.
103
103
bufferedRequests chan struct {} // Channel to buffer requests when writing is blocked.
104
104
subscribedResources map [string ]* ResourceWatchState // Map of subscribed resource names to their state.
105
+ pendingWrite bool // True if there is a pending write for this resource type.
105
106
}
106
107
107
108
// StreamImpl provides the functionality associated with an ADS (Aggregated
@@ -203,6 +204,7 @@ func (s *StreamImpl) Subscribe(typ xdsresource.Type, name string) {
203
204
// Create state for the newly subscribed resource. The watch timer will
204
205
// be started when a request for this resource is actually sent out.
205
206
state .subscribedResources [name ] = & ResourceWatchState {State : ResourceWatchStateStarted }
207
+ state .pendingWrite = true
206
208
207
209
// Send a request for the resource type with updated subscriptions.
208
210
s .requestCh .Put (typ )
@@ -233,6 +235,7 @@ func (s *StreamImpl) Unsubscribe(typ xdsresource.Type, name string) {
233
235
rs .ExpiryTimer .Stop ()
234
236
}
235
237
delete (state .subscribedResources , name )
238
+ state .pendingWrite = true
236
239
237
240
// Send a request for the resource type with updated subscriptions.
238
241
s .requestCh .Put (typ )
@@ -346,17 +349,7 @@ func (s *StreamImpl) sendNew(stream transport.StreamingCall, typ xdsresource.Typ
346
349
return nil
347
350
}
348
351
349
- names := resourceNames (state .subscribedResources )
350
- if err := s .sendMessageLocked (stream , names , typ .TypeURL (), state .version , state .nonce , nil ); err != nil {
351
- return err
352
-
353
- }
354
- select {
355
- case <- state .bufferedRequests :
356
- default :
357
- }
358
- s .startWatchTimersLocked (typ , names )
359
- return nil
352
+ return s .sendMessageIfWritePendingLocked (stream , typ , state )
360
353
}
361
354
362
355
// sendExisting sends out discovery requests for existing resources when
@@ -385,18 +378,10 @@ func (s *StreamImpl) sendExisting(stream transport.StreamingCall) error {
385
378
continue
386
379
}
387
380
388
- names := resourceNames (state .subscribedResources )
389
- if s .logger .V (2 ) {
390
- s .logger .Infof ("Re-requesting resources %v of type %q, as the stream has been recreated" , names , typ .TypeURL ())
391
- }
392
- if err := s .sendMessageLocked (stream , names , typ .TypeURL (), state .version , state .nonce , nil ); err != nil {
381
+ state .pendingWrite = true
382
+ if err := s .sendMessageIfWritePendingLocked (stream , typ , state ); err != nil {
393
383
return err
394
384
}
395
- select {
396
- case <- state .bufferedRequests :
397
- default :
398
- }
399
- s .startWatchTimersLocked (typ , names )
400
385
}
401
386
return nil
402
387
}
@@ -413,11 +398,9 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error {
413
398
for typ , state := range s .resourceTypeState {
414
399
select {
415
400
case <- state .bufferedRequests :
416
- names := resourceNames (state .subscribedResources )
417
- if err := s .sendMessageLocked (stream , names , typ .TypeURL (), state .version , state .nonce , nil ); err != nil {
401
+ if err := s .sendMessageIfWritePendingLocked (stream , typ , state ); err != nil {
418
402
return err
419
403
}
420
- s .startWatchTimersLocked (typ , names )
421
404
default :
422
405
// No buffered request.
423
406
continue
@@ -426,6 +409,38 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error {
426
409
return nil
427
410
}
428
411
412
+ // sendMessageIfWritePendingLocked attempts to sends a discovery request to the
413
+ // server, if there is a pending write for the given resource type.
414
+ //
415
+ // If the request is successfully sent, the pending write field is cleared and
416
+ // watch timers are started for the resources in the request.
417
+ //
418
+ // Caller needs to hold c.mu.
419
+ func (s * StreamImpl ) sendMessageIfWritePendingLocked (stream transport.StreamingCall , typ xdsresource.Type , state * resourceTypeState ) error {
420
+ if ! state .pendingWrite {
421
+ if s .logger .V (2 ) {
422
+ s .logger .Infof ("Skipping sending request for type %q, because all subscribed resources were already sent" , typ .TypeURL ())
423
+ }
424
+ return nil
425
+ }
426
+
427
+ names := resourceNames (state .subscribedResources )
428
+ if err := s .sendMessageLocked (stream , names , typ .TypeURL (), state .version , state .nonce , nil ); err != nil {
429
+ return err
430
+ }
431
+ state .pendingWrite = false
432
+
433
+ // Drain the buffered requests channel because we just sent a request for this
434
+ // resource type.
435
+ select {
436
+ case <- state .bufferedRequests :
437
+ default :
438
+ }
439
+
440
+ s .startWatchTimersLocked (typ , names )
441
+ return nil
442
+ }
443
+
429
444
// sendMessageLocked sends a discovery request to the server, populating the
430
445
// different fields of the message with the given parameters. Returns a non-nil
431
446
// error if the request could not be sent.
0 commit comments