Skip to content

Commit 8ea3460

Browse files
authored
balancer: fix logic to prevent producer streams before READY is reported (#7651)
1 parent 6c48e47 commit 8ea3460

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

balancer_wrapper.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ type acBalancerWrapper struct {
262262

263263
// updateState is invoked by grpc to push a subConn state update to the
264264
// underlying balancer.
265-
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
265+
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error, readyChan chan struct{}) {
266266
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
267267
if ctx.Err() != nil || acbw.ccb.balancer == nil {
268268
return
@@ -278,12 +278,11 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
278278
acbw.ac.mu.Lock()
279279
defer acbw.ac.mu.Unlock()
280280
if s == connectivity.Ready {
281-
// When changing states to READY, reset stateReadyChan. Wait until
281+
// When changing states to READY, close stateReadyChan. Wait until
282282
// after we notify the LB policy's listener(s) in order to prevent
283283
// ac.getTransport() from unblocking before the LB policy starts
284284
// tracking the subchannel as READY.
285-
close(acbw.ac.stateReadyChan)
286-
acbw.ac.stateReadyChan = make(chan struct{})
285+
close(readyChan)
287286
}
288287
})
289288
}

clientconn.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,14 +1193,22 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
11931193
if ac.state == s {
11941194
return
11951195
}
1196+
if ac.state == connectivity.Ready {
1197+
// When leaving ready, re-create the ready channel.
1198+
ac.stateReadyChan = make(chan struct{})
1199+
}
1200+
if s == connectivity.Shutdown {
1201+
// Wake any producer waiting to create a stream on the transport.
1202+
close(ac.stateReadyChan)
1203+
}
11961204
ac.state = s
11971205
ac.channelz.ChannelMetrics.State.Store(&s)
11981206
if lastErr == nil {
11991207
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
12001208
} else {
12011209
channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
12021210
}
1203-
ac.acbw.updateState(s, ac.curAddr, lastErr)
1211+
ac.acbw.updateState(s, ac.curAddr, lastErr, ac.stateReadyChan)
12041212
}
12051213

12061214
// adjustParams updates parameters used to create transports upon
@@ -1510,18 +1518,20 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
15101518
func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
15111519
for ctx.Err() == nil {
15121520
ac.mu.Lock()
1513-
t, state, sc := ac.transport, ac.state, ac.stateReadyChan
1521+
t, state, readyChan := ac.transport, ac.state, ac.stateReadyChan
15141522
ac.mu.Unlock()
1515-
if state == connectivity.Ready {
1516-
return t, nil
1517-
}
15181523
if state == connectivity.Shutdown {
1524+
// Return an error immediately in only this case since a connection
1525+
// will never occur.
15191526
return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
15201527
}
15211528

15221529
select {
15231530
case <-ctx.Done():
1524-
case <-sc:
1531+
case <-readyChan:
1532+
if state == connectivity.Ready {
1533+
return t, nil
1534+
}
15251535
}
15261536
}
15271537
return nil, status.FromContextError(ctx.Err()).Err()

producer_ext_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
5151
}
5252
}
5353

54-
func (p *producer) TestStreamStart(t *testing.T, streamStarted chan<- struct{}) {
54+
func (p *producer) testStreamStart(t *testing.T, streamStarted chan<- struct{}) {
5555
go func() {
5656
defer close(p.stopped)
5757
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -68,8 +68,11 @@ var producerBuilderSingleton = &producerBuilder{}
6868
// TestProducerStreamStartsAfterReady ensures producer streams only start after
6969
// the subchannel reports as READY to the LB policy.
7070
func (s) TestProducerStreamStartsAfterReady(t *testing.T) {
71+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
72+
defer cancel()
7173
name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "")
7274
producerCh := make(chan balancer.Producer)
75+
var producerClose func()
7376
streamStarted := make(chan struct{})
7477
done := make(chan struct{})
7578
bf := stub.BalancerFuncs{
@@ -90,7 +93,8 @@ func (s) TestProducerStreamStartsAfterReady(t *testing.T) {
9093
if err != nil {
9194
return err
9295
}
93-
producer, _ := sc.GetOrBuildProducer(producerBuilderSingleton)
96+
var producer balancer.Producer
97+
producer, producerClose = sc.GetOrBuildProducer(producerBuilderSingleton)
9498
producerCh <- producer
9599
sc.Connect()
96100
return nil
@@ -119,7 +123,15 @@ func (s) TestProducerStreamStartsAfterReady(t *testing.T) {
119123

120124
go cc.Connect()
121125
p := <-producerCh
122-
p.(*producer).TestStreamStart(t, streamStarted)
126+
p.(*producer).testStreamStart(t, streamStarted)
123127

124-
<-done
128+
select {
129+
case <-done:
130+
// Wait for the stream to start before exiting; otherwise the ClientConn
131+
// will close and cause stream creation to fail.
132+
<-streamStarted
133+
producerClose()
134+
case <-ctx.Done():
135+
t.Error("Timed out waiting for test to complete")
136+
}
125137
}

0 commit comments

Comments
 (0)