Skip to content

Commit 3b5fa74

Browse files
authored
balancer/least_request : Fix panic while handling resolver errors (#8333) (#8339)
1 parent edf643f commit 3b5fa74

File tree

2 files changed

+81
-7
lines changed

2 files changed

+81
-7
lines changed

balancer/leastrequest/leastrequest.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,8 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
9797
}
9898

9999
type leastRequestBalancer struct {
100-
// Embeds balancer.Balancer because needs to intercept UpdateClientConnState
101-
// to learn about choiceCount.
102-
balancer.Balancer
103-
// Embeds balancer.ClientConn because needs to intercept UpdateState calls
104-
// from the child balancer.
100+
// Embeds balancer.ClientConn because we need to intercept UpdateState
101+
// calls from the child balancer.
105102
balancer.ClientConn
106103
child balancer.Balancer
107104
logger *internalgrpclog.PrefixLogger
@@ -118,6 +115,21 @@ func (lrb *leastRequestBalancer) Close() {
118115
lrb.endpointRPCCounts = nil
119116
}
120117

118+
func (lrb *leastRequestBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
119+
lrb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
120+
}
121+
122+
func (lrb *leastRequestBalancer) ResolverError(err error) {
123+
// Will cause inline picker update from endpoint sharding.
124+
lrb.child.ResolverError(err)
125+
}
126+
127+
func (lrb *leastRequestBalancer) ExitIdle() {
128+
if ei, ok := lrb.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding.
129+
ei.ExitIdle()
130+
}
131+
}
132+
121133
func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
122134
lrCfg, ok := ccs.BalancerConfig.(*LBConfig)
123135
if !ok {

balancer/leastrequest/balancer_test.go renamed to balancer/leastrequest/leastrequest_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ import (
2727
"time"
2828

2929
"github.com/google/go-cmp/cmp"
30-
3130
"google.golang.org/grpc"
31+
"google.golang.org/grpc/connectivity"
3232
"google.golang.org/grpc/credentials/insecure"
3333
"google.golang.org/grpc/internal"
3434
"google.golang.org/grpc/internal/grpctest"
3535
"google.golang.org/grpc/internal/stubserver"
36+
"google.golang.org/grpc/internal/testutils"
3637
testgrpc "google.golang.org/grpc/interop/grpc_testing"
3738
testpb "google.golang.org/grpc/interop/grpc_testing"
3839
"google.golang.org/grpc/peer"
@@ -42,7 +43,8 @@ import (
4243
)
4344

4445
const (
45-
defaultTestTimeout = 5 * time.Second
46+
defaultTestTimeout = 5 * time.Second
47+
defaultTestShortTimeout = 10 * time.Millisecond
4648
)
4749

4850
type s struct {
@@ -706,3 +708,63 @@ func (s) TestLeastRequestEndpoints_MultipleAddresses(t *testing.T) {
706708
t.Fatalf("error in expected round robin: %v", err)
707709
}
708710
}
711+
712+
// Test tests that the least request balancer properly surfaces resolver
713+
// errors.
714+
func (s) TestLeastRequestEndpoints_ResolverError(t *testing.T) {
715+
const sc = `{"loadBalancingConfig": [{"least_request_experimental": {}}]}`
716+
mr := manual.NewBuilderWithScheme("lr-e2e")
717+
defer mr.Close()
718+
719+
cc, err := grpc.NewClient(
720+
mr.Scheme()+":///",
721+
grpc.WithResolvers(mr),
722+
grpc.WithTransportCredentials(insecure.NewCredentials()),
723+
grpc.WithDefaultServiceConfig(sc),
724+
)
725+
if err != nil {
726+
t.Fatalf("grpc.NewClient() failed: %v", err)
727+
}
728+
defer cc.Close()
729+
730+
// We need to pass an endpoint with a valid address to the resolver before
731+
// reporting an error - otherwise endpointsharding does not report the
732+
// error through.
733+
lis, err := testutils.LocalTCPListener()
734+
if err != nil {
735+
t.Fatalf("net.Listen() failed: %v", err)
736+
}
737+
// Act like a server that closes the connection without sending a server
738+
// preface.
739+
go func() {
740+
conn, err := lis.Accept()
741+
if err != nil {
742+
t.Errorf("Unexpected error when accepting a connection: %v", err)
743+
}
744+
conn.Close()
745+
}()
746+
mr.UpdateState(resolver.State{
747+
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}},
748+
})
749+
cc.Connect()
750+
751+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
752+
defer cancel()
753+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
754+
755+
// Report an error through the resolver
756+
resolverErr := fmt.Errorf("simulated resolver error")
757+
mr.CC().ReportError(resolverErr)
758+
759+
// Ensure the client returns the expected resolver error.
760+
testServiceClient := testgrpc.NewTestServiceClient(cc)
761+
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
762+
_, err = testServiceClient.EmptyCall(ctx, &testpb.Empty{})
763+
if strings.Contains(err.Error(), resolverErr.Error()) {
764+
break
765+
}
766+
}
767+
if ctx.Err() != nil {
768+
t.Fatalf("Timeout when waiting for RPCs to fail with error containing %s. Last error: %v", resolverErr, err)
769+
}
770+
}

0 commit comments

Comments
 (0)