@@ -72,10 +72,11 @@ type cdsBB struct{}
72
72
// Build creates a new CDS balancer with the ClientConn.
73
73
func (cdsBB ) Build (cc balancer.ClientConn , opts balancer.BuildOptions ) balancer.Balancer {
74
74
b := & cdsBalancer {
75
- cc : cc ,
76
- bOpts : opts ,
77
- updateCh : buffer .NewUnbounded (),
78
- closed : grpcsync .NewEvent (),
75
+ cc : cc ,
76
+ bOpts : opts ,
77
+ updateCh : buffer .NewUnbounded (),
78
+ closed : grpcsync .NewEvent (),
79
+ cancelWatch : func () {}, // No-op at this point.
79
80
}
80
81
b .logger = prefixLogger ((b ))
81
82
b .logger .Infof ("Created" )
@@ -158,22 +159,15 @@ type cdsBalancer struct {
158
159
// run is a long-running goroutine which handles all updates from gRPC. All
159
160
// methods which are invoked directly by gRPC or xdsClient simply push an
160
161
// update onto a channel which is read and acted upon right here.
161
- //
162
- // 1. Good clientConn updates lead to registration of a CDS watch. Updates with
163
- // error lead to cancellation of existing watch and propagation of the same
164
- // error to the edsBalancer.
165
- // 2. SubConn updates are passthrough and are simply handed over to the
166
- // underlying edsBalancer.
167
- // 3. Watch API updates lead to clientConn updates being invoked on the
168
- // underlying edsBalancer.
169
- // 4. Close results in cancellation of the CDS watch and closing of the
170
- // underlying edsBalancer and is the only way to exit this goroutine.
171
162
func (b * cdsBalancer ) run () {
172
163
for {
173
164
select {
174
165
case u := <- b .updateCh .Get ():
175
166
b .updateCh .Load ()
176
167
switch update := u .(type ) {
168
+ // Good clientConn updates lead to registration of a CDS watch.
169
+ // Updates with error lead to cancellation of existing watch and
170
+ // propagation of the same error to the edsBalancer.
177
171
case * ccUpdate :
178
172
// We first handle errors, if any, and then proceed with handling
179
173
// the update, only if the status quo has changed.
@@ -187,9 +181,7 @@ func (b *cdsBalancer) run() {
187
181
// Since the cdsBalancer doesn't own the xdsClient object, we
188
182
// don't have to bother about closing the old client here, but
189
183
// we still need to cancel the watch on the old client.
190
- if b .cancelWatch != nil {
191
- b .cancelWatch ()
192
- }
184
+ b .cancelWatch ()
193
185
b .client = update .client
194
186
}
195
187
if update .clusterName != "" {
@@ -201,12 +193,18 @@ func (b *cdsBalancer) run() {
201
193
}
202
194
b .clusterToWatch = update .clusterName
203
195
}
196
+
197
+ // SubConn updates are passthrough and are simply handed over to the
198
+ // underlying edsBalancer.
204
199
case * scUpdate :
205
200
if b .edsLB == nil {
206
201
b .logger .Errorf ("xds: received scUpdate {%+v} with no edsBalancer" , update )
207
202
break
208
203
}
209
204
b .edsLB .UpdateSubConnState (update .subConn , update .state )
205
+
206
+ // Watch API updates lead to clientConn updates being invoked on the
207
+ // underlying edsBalancer.
210
208
case * watchUpdate :
211
209
if err := update .err ; err != nil {
212
210
b .logger .Warningf ("Watch error from xds-client %p: %v" , b .client , err )
@@ -243,11 +241,13 @@ func (b *cdsBalancer) run() {
243
241
b .logger .Errorf ("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v" , ccState , err )
244
242
}
245
243
}
244
+
245
+ // Close results in cancellation of the CDS watch and closing of the
246
+ // underlying edsBalancer and is the only way to exit this goroutine.
246
247
case <- b .closed .Done ():
247
- if b .cancelWatch != nil {
248
- b .cancelWatch ()
249
- b .cancelWatch = nil
250
- }
248
+ b .cancelWatch ()
249
+ b .cancelWatch = func () {}
250
+
251
251
if b .edsLB != nil {
252
252
b .edsLB .Close ()
253
253
b .edsLB = nil
@@ -282,11 +282,8 @@ func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
282
282
//
283
283
// This is not necessary today, because xds client never sends connection
284
284
// errors.
285
-
286
285
if fromParent && xdsclient .ErrType (err ) == xdsclient .ErrorTypeResourceNotFound {
287
- if b .cancelWatch != nil {
288
- b .cancelWatch ()
289
- }
286
+ b .cancelWatch ()
290
287
}
291
288
if b .edsLB != nil {
292
289
b .edsLB .ResolverError (err )
@@ -319,7 +316,7 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
319
316
return errBalancerClosed
320
317
}
321
318
322
- b .logger .Infof ("Receive update from resolver, balancer config: %+v" , state .BalancerConfig )
319
+ b .logger .Infof ("Received update from resolver, balancer config: %+v" , state .BalancerConfig )
323
320
// The errors checked here should ideally never happen because the
324
321
// ServiceConfig in this case is prepared by the xdsResolver and is not
325
322
// something that is received on the wire.
@@ -352,7 +349,6 @@ func (b *cdsBalancer) ResolverError(err error) {
352
349
b .logger .Warningf ("xds: received resolver error {%v} after cdsBalancer was closed" , err )
353
350
return
354
351
}
355
-
356
352
b .updateCh .Put (& ccUpdate {err : err })
357
353
}
358
354
0 commit comments