Skip to content

Commit 9afb49d

Browse files
authored
endpointsharding: cast EndpointMap values to *balancerWrapper instead of Balancer (#8069)
1 parent 267a09b commit 9afb49d

File tree

2 files changed

+50
-7
lines changed

2 files changed

+50
-7
lines changed

balancer/endpointsharding/endpointsharding.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (es *endpointSharding) ResolverError(err error) {
189189
}()
190190
children := es.children.Load()
191191
for _, child := range children.Values() {
192-
child.(balancer.Balancer).ResolverError(err)
192+
child.(*balancerWrapper).resolverErrorLocked(err)
193193
}
194194
}
195195

@@ -349,9 +349,10 @@ func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnSt
349349
// closeLocked closes the child balancer. Callers must hold the child mutext of
350350
// the parent endpointsharding balancer.
351351
func (bw *balancerWrapper) closeLocked() {
352-
if bw.isClosed {
353-
return
354-
}
355352
bw.child.Close()
356353
bw.isClosed = true
357354
}
355+
356+
func (bw *balancerWrapper) resolverErrorLocked(err error) {
357+
bw.child.ResolverError(err)
358+
}

balancer/endpointsharding/endpointsharding_test.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,27 @@ package endpointsharding_test
2121
import (
2222
"context"
2323
"encoding/json"
24+
"errors"
2425
"fmt"
2526
"log"
2627
"strings"
2728
"testing"
2829
"time"
2930

3031
"google.golang.org/grpc"
32+
"google.golang.org/grpc/backoff"
3133
"google.golang.org/grpc/balancer"
3234
"google.golang.org/grpc/balancer/endpointsharding"
3335
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
3436
"google.golang.org/grpc/codes"
37+
"google.golang.org/grpc/connectivity"
3538
"google.golang.org/grpc/credentials/insecure"
3639
"google.golang.org/grpc/grpclog"
3740
"google.golang.org/grpc/internal"
3841
"google.golang.org/grpc/internal/balancer/stub"
3942
"google.golang.org/grpc/internal/grpctest"
4043
"google.golang.org/grpc/internal/stubserver"
44+
"google.golang.org/grpc/internal/testutils"
4145
"google.golang.org/grpc/internal/testutils/roundrobin"
4246
"google.golang.org/grpc/peer"
4347
"google.golang.org/grpc/resolver"
@@ -125,7 +129,9 @@ func (fp *fakePetiole) UpdateState(state balancer.State) {
125129
// special picker, so it should fallback to the default behavior, which is to
126130
// round_robin amongst the endpoint children that are in the aggregated state.
127131
// It also verifies the petiole has access to the raw child state in case it
128-
// wants to implement a custom picker.
132+
// wants to implement a custom picker. The test sends a resolver error to the
133+
// endpointsharding balancer and verifies an error picker from the children
134+
// is used while making an RPC.
129135
func (s) TestEndpointShardingBasic(t *testing.T) {
130136
backend1 := stubserver.StartTestService(t, nil)
131137
defer backend1.Stop()
@@ -135,7 +141,7 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
135141
mr := manual.NewBuilderWithScheme("e2e-test")
136142
defer mr.Close()
137143

138-
json := `{"loadBalancingConfig": [{"fake_petiole":{}}]}`
144+
json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, fakePetioleName)
139145
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
140146
mr.InitialState(resolver.State{
141147
Endpoints: []resolver.Endpoint{
@@ -145,7 +151,20 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
145151
ServiceConfig: sc,
146152
})
147153

148-
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
154+
dOpts := []grpc.DialOption{
155+
grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()),
156+
// Use a large backoff delay to avoid the error picker being updated
157+
// too quickly.
158+
grpc.WithConnectParams(grpc.ConnectParams{
159+
Backoff: backoff.Config{
160+
BaseDelay: 2 * defaultTestTimeout,
161+
Multiplier: float64(0),
162+
Jitter: float64(0),
163+
MaxDelay: 2 * defaultTestTimeout,
164+
},
165+
}),
166+
}
167+
cc, err := grpc.NewClient(mr.Scheme()+":///", dOpts...)
149168
if err != nil {
150169
log.Fatalf("Failed to create new client: %v", err)
151170
}
@@ -159,6 +178,29 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
159178
if err = roundrobin.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: backend1.Address}, {Addr: backend2.Address}}); err != nil {
160179
t.Fatalf("error in expected round robin: %v", err)
161180
}
181+
182+
// Stopping both the backends should make the channel enter
183+
// TransientFailure.
184+
backend1.Stop()
185+
backend2.Stop()
186+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
187+
188+
// When the resolver reports an error, the picker should get updated to
189+
// return the resolver error.
190+
mr.ReportError(errors.New("test error"))
191+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
192+
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
193+
_, err := client.EmptyCall(ctx, &testpb.Empty{})
194+
if err == nil {
195+
t.Fatalf("EmptyCall succeeded when expected to fail with %q", "test error")
196+
}
197+
if strings.Contains(err.Error(), "test error") {
198+
break
199+
}
200+
}
201+
if ctx.Err() != nil {
202+
t.Fatalf("Context timed out waiting for picker with resolver error.")
203+
}
162204
}
163205

164206
// Tests that endpointsharding doesn't automatically re-connect IDLE children.

0 commit comments

Comments
 (0)