Skip to content

cds: stop child policies on resource-not-found errors #8122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
// ResolverError handles errors reported by the xdsResolver.
func (b *cdsBalancer) ResolverError(err error) {
b.serializer.TrySchedule(func(context.Context) {
// Resource not found error is reported by the resolver when the
// top-level cluster resource is removed by the management server.
// Missing Listener or RouteConfiguration on the management server
// results in a 'resource not found' error from the xDS resolver. In
// these cases, we should stap watching all of the current clusters
// being watched.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
b.closeAllWatchers()
b.closeChildPolicyAndReportTF(err)
return
}
var root string
if b.lbCfg != nil {
Expand All @@ -372,6 +376,22 @@ func (b *cdsBalancer) closeAllWatchers() {
}
}

// closeChildPolicyAndReportTF closes the child policy, if it exists, and
// updates the connectivity state of the channel to TransientFailure with an
// error picker.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) closeChildPolicyAndReportTF(err error) {
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
}

// Close cancels the CDS watch, closes the child policy and closes the
// cdsBalancer.
func (b *cdsBalancer) Close() {
Expand Down Expand Up @@ -537,16 +557,8 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
// If child balancer was never created, fail the RPCs with errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
}
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "cluster %q not found", name)
b.closeChildPolicyAndReportTF(err)
}

// Generates discovery mechanisms for the cluster graph rooted at `name`. This
Expand Down
125 changes: 88 additions & 37 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,6 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
// - when a bad cluster resource update is received after a previous good
// update from the management server, the cds LB policy is expected to
// continue using the previous good update.
// - when the cluster resource is removed after a previous good
// update from the management server, the cds LB policy is expected to put
// the channel in TRANSIENT_FAILURE.
func (s) TestClusterUpdate_Failure(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
Expand Down Expand Up @@ -778,34 +775,6 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
case <-ctx.Done():
t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
}

// Remove the cluster resource from the management server, triggering a
// resource-not-found error.
resources = e2e.UpdateOptions{
NodeID: nodeID,
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that the watch for the cluster resource is not cancelled.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-cdsResourceCanceledCh:
t.Fatal("Watch for cluster resource is cancelled when not expected to")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Ensure RPC fails with Unavailable. The actual error message depends on
// the picker returned from the priority LB policy, and therefore not
// checking for it here.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
}
}

// Tests the following scenarios for resolver errors:
Expand All @@ -822,7 +791,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) {
// is expected to push the error down the child policy and put the channel in
// TRANSIENT_FAILURE. It is also expected to cancel the CDS watch.
func (s) TestResolverError(t *testing.T) {
_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
_, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
lis := testutils.NewListenerWrapper(t, nil)
mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis)

Expand Down Expand Up @@ -938,12 +907,9 @@ func (s) TestResolverError(t *testing.T) {

// Verify that the resolver error is pushed to the child policy.
select {
case err := <-resolverErrCh:
if err != resolverErr {
t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
}
case <-childPolicyCloseCh:
case <-ctx.Done():
t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
t.Fatal("Timeout when waiting for child policy to be closed")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
Expand All @@ -956,6 +922,91 @@ func (s) TestResolverError(t *testing.T) {
}
}

// Tests scenarios involving removal of a cluster resource from the management
// server.
//
// - when the cluster resource is removed after a previous good
// update from the management server, the cds LB policy is expected to put
// the channel in TRANSIENT_FAILURE.
// - when the cluster resource is re-sent by the management server, RPCs
// should start succeeding.
func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) {
mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)

// Verify that the specified cluster resource is requested.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wantNames := []string{clusterName}
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
t.Fatal(err)
}

// Start a test service backend.
server := stubserver.StartTestService(t, nil)
t.Cleanup(server.Stop)

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

// Remove the cluster resource from the management server, triggering a
// resource-not-found error.
resources.Clusters = nil
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that the watch for the cluster resource is not cancelled.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-cdsResourceCanceledCh:
t.Fatal("Watch for cluster resource is cancelled when not expected to")
}

testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Ensure RPC fails with Unavailable.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if status.Code(err) != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
}
if !strings.Contains(err.Error(), wantErr) {
t.Fatalf("EmptyCall() failed with error: %v, want %v", err, wantErr)
}

// Re-add the cluster resource to the management server.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Verify that a successful RPC can be made.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
}

// Tests that closing the cds LB policy results in the the child policy being
// closed.
func (s) TestClose(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,12 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}

// Ensure that RPCs start to fail with expected error.
wantErr := fmt.Sprintf("cluster %q not found", clusterName)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
_, err := client.EmptyCall(sCtx, &testpb.Empty{})
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") {
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), wantErr) {
break
}
if err != nil {
Expand Down