Skip to content

Commit 0003b4f

Browse files
authored
weightedtarget: return erroring picker when no targets are configured (#8070)
1 parent 4b5608f commit 0003b4f

File tree

5 files changed

+271
-0
lines changed

5 files changed

+271
-0
lines changed

balancer/weightedtarget/weightedaggregator/aggregator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
package weightedaggregator
2727

2828
import (
29+
"errors"
2930
"fmt"
3031
"sync"
3132

@@ -251,6 +252,14 @@ func (wbsa *Aggregator) buildAndUpdateLocked() {
251252
func (wbsa *Aggregator) build() balancer.State {
252253
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
253254

255+
if len(wbsa.idToPickerState) == 0 {
256+
// This is the case when all sub-balancers are removed.
257+
return balancer.State{
258+
ConnectivityState: connectivity.TransientFailure,
259+
Picker: base.NewErrPicker(errors.New("weighted-target: no targets to pick from")),
260+
}
261+
}
262+
254263
// Make sure picker's return error is consistent with the aggregatedState.
255264
pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
256265

balancer/weightedtarget/weightedtarget_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,23 @@ import (
2828
"time"
2929

3030
"github.com/google/go-cmp/cmp"
31+
"google.golang.org/grpc"
3132
"google.golang.org/grpc/attributes"
3233
"google.golang.org/grpc/balancer"
3334
"google.golang.org/grpc/balancer/roundrobin"
35+
"google.golang.org/grpc/codes"
3436
"google.golang.org/grpc/connectivity"
37+
"google.golang.org/grpc/credentials/insecure"
3538
"google.golang.org/grpc/internal/balancer/stub"
3639
"google.golang.org/grpc/internal/grpctest"
3740
"google.golang.org/grpc/internal/hierarchy"
3841
"google.golang.org/grpc/internal/testutils"
3942
"google.golang.org/grpc/resolver"
4043
"google.golang.org/grpc/serviceconfig"
44+
"google.golang.org/grpc/status"
45+
46+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
47+
testpb "google.golang.org/grpc/interop/grpc_testing"
4148
)
4249

4350
const (
@@ -163,6 +170,39 @@ func init() {
163170
NewRandomWRR = testutils.NewTestWRR
164171
}
165172

173+
// Tests the behavior of the weighted_target LB policy when there are no targets
174+
// configured. It verifies that the LB policy sets the overall channel state to
175+
// TRANSIENT_FAILURE and fails RPCs with an expected status code and message.
176+
func (s) TestWeightedTarget_NoTargets(t *testing.T) {
177+
dopts := []grpc.DialOption{
178+
grpc.WithTransportCredentials(insecure.NewCredentials()),
179+
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"weighted_target_experimental":{}}]}`),
180+
}
181+
cc, err := grpc.NewClient("passthrough:///test.server", dopts...)
182+
if err != nil {
183+
t.Fatalf("grpc.NewClient() failed: %v", err)
184+
}
185+
defer cc.Close()
186+
cc.Connect()
187+
188+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
189+
defer cancel()
190+
client := testgrpc.NewTestServiceClient(cc)
191+
_, err = client.EmptyCall(ctx, &testpb.Empty{})
192+
if err == nil {
193+
t.Error("EmptyCall() succeeded, want failure")
194+
}
195+
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
196+
t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode)
197+
}
198+
if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) {
199+
t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg)
200+
}
201+
if gotState, wantState := cc.GetState(), connectivity.TransientFailure; gotState != wantState {
202+
t.Errorf("cc.GetState() = %v, want %v", gotState, wantState)
203+
}
204+
}
205+
166206
// TestWeightedTarget covers the cases that a sub-balancer is added and a
167207
// sub-balancer is removed. It verifies that the addresses and balancer configs
168208
// are forwarded to the right sub-balancer. This test is intended to test the
@@ -326,6 +366,7 @@ func (s) TestWeightedTarget(t *testing.T) {
326366
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
327367
}
328368
}
369+
329370
// Update the Weighted Target Balancer with an empty address list and no
330371
// targets. This should cause a Transient Failure State update to the Client
331372
// Conn.
@@ -344,6 +385,11 @@ func (s) TestWeightedTarget(t *testing.T) {
344385
if state != connectivity.TransientFailure {
345386
t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
346387
}
388+
p = <-cc.NewPickerCh
389+
const wantErr = "no targets to pick from"
390+
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantErr) {
391+
t.Fatalf("Pick() returned error: %v, want: %v", err, wantErr)
392+
}
347393
}
348394

349395
// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we

internal/internal.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,20 @@ var (
168168
// other features, including the CSDS service.
169169
NewXDSResolverWithPoolForTesting any // func(*xdsclient.Pool) (resolver.Builder, error)
170170

171+
// NewXDSResolverWithClientForTesting creates a new xDS resolver builder
172+
// using the provided xDS client instead of creating a new one using the
173+
// bootstrap configuration specified by the supported environment variables.
174+
// The resolver.Builder is meant to be used in conjunction with the
175+
// grpc.WithResolvers DialOption. The resolver.Builder does not take
176+
// ownership of the provided xDS client and it is the responsibility of the
177+
// caller to close the client when no longer required.
178+
//
179+
// Testing Only
180+
//
181+
// This function should ONLY be used for testing and may not work with some
182+
// other features, including the CSDS service.
183+
NewXDSResolverWithClientForTesting any // func(xdsclient.XDSClient) (resolver.Builder, error)
184+
171185
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
172186
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
173187
// variable.

xds/internal/resolver/xds_resolver.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,24 @@ func newBuilderWithPoolForTesting(pool *xdsclient.Pool) (resolver.Builder, error
7979
}, nil
8080
}
8181

82+
// newBuilderWithClientForTesting creates a new xds resolver builder using the
83+
// specific xDS client, so that tests have complete control over the exact
84+
// specific xDS client being used.
85+
func newBuilderWithClientForTesting(client xdsclient.XDSClient) (resolver.Builder, error) {
86+
return &xdsResolverBuilder{
87+
newXDSClient: func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error) {
88+
// Returning an empty close func here means that the responsibility
89+
// of closing the client lies with the caller.
90+
return client, func() {}, nil
91+
},
92+
}, nil
93+
}
94+
8295
func init() {
8396
resolver.Register(&xdsResolverBuilder{})
8497
internal.NewXDSResolverWithConfigForTesting = newBuilderWithConfigForTesting
8598
internal.NewXDSResolverWithPoolForTesting = newBuilderWithPoolForTesting
99+
internal.NewXDSResolverWithClientForTesting = newBuilderWithClientForTesting
86100

87101
rinternal.NewWRR = wrr.NewRandom
88102
rinternal.NewXDSClient = xdsclient.DefaultPool.NewClient

xds/test/eds_resource_missing_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package xds_test
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"strings"
25+
"testing"
26+
"time"
27+
28+
"github.com/google/uuid"
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/credentials/insecure"
32+
"google.golang.org/grpc/internal"
33+
"google.golang.org/grpc/internal/grpctest"
34+
"google.golang.org/grpc/internal/testutils/xds/e2e"
35+
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
36+
"google.golang.org/grpc/internal/xds/bootstrap"
37+
"google.golang.org/grpc/resolver"
38+
"google.golang.org/grpc/status"
39+
"google.golang.org/grpc/xds/internal/xdsclient"
40+
41+
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
42+
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
43+
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
44+
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
45+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
46+
testpb "google.golang.org/grpc/interop/grpc_testing"
47+
48+
_ "google.golang.org/grpc/xds" // To register the xDS resolver and LB policies.
49+
)
50+
51+
const (
52+
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
53+
defaultTestTimeout = 5 * time.Second
54+
)
55+
56+
type s struct {
57+
grpctest.Tester
58+
}
59+
60+
func Test(t *testing.T) {
61+
grpctest.RunSubTests(t, s{})
62+
}
63+
64+
// Test verifies the xDS-enabled gRPC channel's behavior when the management
65+
// server fails to send an EDS resource referenced by a Cluster resource. The
66+
// expected outcome is an RPC failure with a status code Unavailable and a
67+
// message indicating the absence of available targets.
68+
func (s) TestEDS_MissingResource(t *testing.T) {
69+
// Start an xDS management server.
70+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
71+
72+
// Create bootstrap configuration pointing to the above management server.
73+
nodeID := uuid.New().String()
74+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
75+
config, err := bootstrap.NewConfigFromContents(bc)
76+
if err != nil {
77+
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
78+
}
79+
80+
// Create an xDS client with a short resource expiry timer.
81+
pool := xdsclient.NewPool(config)
82+
xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
83+
Name: t.Name(),
84+
WatchExpiryTimeout: defaultTestWatchExpiryTimeout,
85+
})
86+
if err != nil {
87+
t.Fatalf("Failed to create xDS client: %v", err)
88+
}
89+
defer xdsClose()
90+
91+
// Create an xDS resolver for the test that uses the above xDS client.
92+
resolverBuilder := internal.NewXDSResolverWithClientForTesting.(func(xdsclient.XDSClient) (resolver.Builder, error))
93+
xdsResolver, err := resolverBuilder(xdsC)
94+
if err != nil {
95+
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
96+
}
97+
98+
// Create resources on the management server. No EDS resource is configured.
99+
const serviceName = "my-service-client-side-xds"
100+
const routeConfigName = "route-" + serviceName
101+
const clusterName = "cluster-" + serviceName
102+
const endpointsName = "endpoints-" + serviceName
103+
resources := e2e.UpdateOptions{
104+
NodeID: nodeID,
105+
SkipValidation: true, // Cluster resource refers to an EDS resource that is not configured.
106+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)},
107+
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)},
108+
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)},
109+
}
110+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
111+
defer cancel()
112+
if err := mgmtServer.Update(ctx, resources); err != nil {
113+
t.Fatal(err)
114+
}
115+
116+
// Create a ClientConn with the xds:/// scheme.
117+
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver))
118+
if err != nil {
119+
t.Fatalf("Failed to create a grpc channel: %v", err)
120+
}
121+
defer cc.Close()
122+
123+
// Make an RPC and verify that it fails with the expected error.
124+
client := testgrpc.NewTestServiceClient(cc)
125+
_, err = client.EmptyCall(ctx, &testpb.Empty{})
126+
if err == nil {
127+
t.Fatal("EmptyCall() succeeded, want failure")
128+
}
129+
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
130+
t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode)
131+
}
132+
if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) {
133+
t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg)
134+
}
135+
}
136+
137+
// Test verifies the xDS-enabled gRPC channel's behavior when the management
138+
// server sends an EDS resource with no endpoints. The expected outcome is an
139+
// RPC failure with a status code Unavailable and a message indicating the
140+
// absence of available targets.
141+
func (s) TestEDS_NoEndpointsInResource(t *testing.T) {
142+
managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)
143+
144+
// Create resources on the management server, with the EDS resource
145+
// containing no endpoints.
146+
const serviceName = "my-service-client-side-xds"
147+
const routeConfigName = "route-" + serviceName
148+
const clusterName = "cluster-" + serviceName
149+
const endpointsName = "endpoints-" + serviceName
150+
resources := e2e.UpdateOptions{
151+
NodeID: nodeID,
152+
SkipValidation: true, // Cluster resource refers to an EDS resource that is not configured.
153+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)},
154+
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)},
155+
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)},
156+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
157+
e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
158+
ClusterName: "endpoints-" + serviceName,
159+
Host: "localhost",
160+
}),
161+
},
162+
}
163+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
164+
defer cancel()
165+
if err := managementServer.Update(ctx, resources); err != nil {
166+
t.Fatal(err)
167+
}
168+
169+
// Create a ClientConn with the xds:/// scheme.
170+
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver))
171+
if err != nil {
172+
t.Fatalf("Failed to create a grpc channel: %v", err)
173+
}
174+
defer cc.Close()
175+
176+
// Make an RPC and verify that it fails with the expected error.
177+
client := testgrpc.NewTestServiceClient(cc)
178+
_, err = client.EmptyCall(ctx, &testpb.Empty{})
179+
if err == nil {
180+
t.Fatal("EmptyCall() succeeded, want failure")
181+
}
182+
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
183+
t.Errorf("EmptyCall() failed with code = %v, want %s", gotCode, wantCode)
184+
}
185+
if gotMsg, wantMsg := err.Error(), "no targets to pick from"; !strings.Contains(gotMsg, wantMsg) {
186+
t.Errorf("EmptyCall() failed with message = %q, want to contain %q", gotMsg, wantMsg)
187+
}
188+
}

0 commit comments

Comments
 (0)