Skip to content

xdsclient: support fallback within an authority #7701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 173 additions & 4 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,90 @@
}
}

// TODO(easwars-fallback): Trigger fallback here if conditions for fallback
// are met.
// Two conditions need to be met for fallback to be triggered:
// 1. There is a connectivity failure on the ADS stream, as described in
// gRFC A57. For us, this means that the ADS stream was closed before the
// first server response was received. We already checked that condition
// earlier in this method.
// 2. There is at least one watcher for a resource that is not cached.
// Cached resources include ones that
// - have been successfully received and can be used.
// - are considered non-existent according to xDS Protocol Specification.
if !a.uncachedWatcherExists() {
if a.logger.V(2) {
a.logger.Infof("No watchers for uncached resources. Not triggering fallback")
}

Check warning on line 215 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L214-L215

Added lines #L214 - L215 were not covered by tests
return
}
a.triggerFallbackOnStreamFailure(serverConfig)
}

// Determines the server to fallback to and triggers fallback to the same. If
// required, creates an xdsChannel to that server, and re-subscribes to all
// existing resources.
//
// Only executed in the context of a serializer callback.
func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstrap.ServerConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this function is simply falling back and not checking any stream failures. I guess that's done by caller? In that case name can be simply "triggerFallback". Also, in docstring we should mention it switches to next fallback server from the list (i.e. next lower priority than current failing one) and if no more servers below current then it returns as no-op. May be name can be more explicit "switchToNextFallbackServerIfAvailable"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to go with fallbackToNextServerIfPossible since Available is a term that can be viewed differently in this context, i.e. that the server is actually available for traffic, which is not what we check here.

if a.logger.V(2) {
a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig)
}

Check warning on line 229 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L228-L229

Added lines #L228 - L229 were not covered by tests

// The server to fallback to is the next server on the list. If the current
// server is the last server, then there is nothing that can be done.
currentServerIdx := 0
for _, cfg := range a.xdsChannelConfigs {
if cfg.sc.Equal(failingServerConfig) {
break
}
currentServerIdx++
}
if currentServerIdx == len(a.xdsChannelConfigs)-1 {
if a.logger.V(2) {
a.logger.Infof("No more servers to fallback to")
}

Check warning on line 243 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L242-L243

Added lines #L242 - L243 were not covered by tests
return
}
fallbackServerIdx := currentServerIdx + 1
fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx]

// If the server to fallback to already has an xdsChannel, it means that
// this connectivity error is from a server with a higher priority. There
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't quite understand this comment. If there is an error from higher priority server, we should fallback to a lower priority server. How is having an existing channel for fallback server makes any difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say the authority has two servers: primary and fallback. We start off with primary, and say it fails and therefore we switch to fallback. And lets say the connection to fallback works and we get all resources from it and we are happy. Now, we get another error from the primary (in fact, we will keep getting stream errors from it since we retry the stream with backoff). At this point, we will see that we already have a channel to the next server in the list which is the fallback server, and therefore we have nothing to do here.

// is not much we can do here.
if fallbackChannel.xc != nil {
if a.logger.V(2) {
a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.sc)
}

Check warning on line 255 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L254-L255

Added lines #L254 - L255 were not covered by tests
return
}

// Create an xdsChannel for the fallback server.
if a.logger.V(2) {
a.logger.Infof("Initiating fallback to server %s", fallbackChannel.sc)
}

Check warning on line 262 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L261-L262

Added lines #L261 - L262 were not covered by tests
xc, cleanup, err := a.getChannelForADS(fallbackChannel.sc, a)
if err != nil {
a.logger.Errorf("Failed to create XDS channel: %v", err)
return
}

Check warning on line 267 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L265-L267

Added lines #L265 - L267 were not covered by tests
fallbackChannel.xc = xc
fallbackChannel.cleanup = cleanup
a.activeXDSChannel = fallbackChannel

// Subscribe to all existing resources from the new management server.
for typ, resources := range a.resources {
for name, state := range resources {
if a.logger.V(2) {
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name)
}

Check warning on line 277 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L276-L277

Added lines #L276 - L277 were not covered by tests
xc.subscribe(typ, name)

// Add the fallback channel to the list of xdsChannels from which
// this resource has been requested from. Retain the cached resource
// and the set of existing watchers (and other metadata fields) in
// the resource state.
state.xdsChannelConfigs = append(state.xdsChannelConfigs, fallbackChannel)
}
}
}

// adsResourceUpdate is called to notify the authority about a resource update
Expand All @@ -218,13 +300,15 @@
// handleADSResourceUpdate processes an update from the xDS client, updating the
// resource cache and notifying any registered watchers of the update.
//
// If the update is received from a higher priority xdsChannel that was
// previously down, we revert to it and close all lower priority xdsChannels.
//
// Once the update has been processed by all watchers, the authority is expected
// to invoke the onDone callback.
//
// Only executed in the context of a serializer callback.
func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
// TODO(easwars-fallback): Trigger reverting to a higher priority server if
// the update is from one.
a.handleRevertingToPrimaryOnUpdate(serverConfig)

// We build a list of callback funcs to invoke, and invoke them at the end
// of this method instead of inline (when handling the update for a
Expand Down Expand Up @@ -416,6 +500,76 @@
}
}

// handleRevertingToPrimaryOnUpdate is called when a resource update is received
// from a server that is not the current active server. This method ensures that
// all lower priority servers are closed and the active server is reverted to
// the highest priority server that has sent an update.
//
// This method is only executed in the context of a serializer callback.
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) {
if a.activeXDSChannel != nil && a.activeXDSChannel.sc.Equal(serverConfig) {
// If the resource update is from the current active server, nothing
// needs to be done from fallback point of view.
return
}

if a.logger.V(2) {
a.logger.Infof("Received update from non-active server %q", serverConfig)
}

Check warning on line 518 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L517-L518

Added lines #L517 - L518 were not covered by tests

// If the resource update is not from the current active server, it means
// that we have received an update from a higher priority server and we need
// to revert back to it. This method guarantees that when an update is
// received from a server, all lower priority servers are closed.
serverIdx := 0
for _, cfg := range a.xdsChannelConfigs {
if cfg.sc.Equal(serverConfig) {
break
}
serverIdx++

Check warning on line 529 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L529

Added line #L529 was not covered by tests
}
if serverIdx == len(a.xdsChannelConfigs) {
// This can never happen.
a.logger.Errorf("Received update from an unknown server: %v", serverConfig)
return
}

Check warning on line 535 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L532-L535

Added lines #L532 - L535 were not covered by tests
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx]

// Close all lower priority channel.
for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ {
cfg := a.xdsChannelConfigs[i]

// Unsubscribe any resources that were subscribed to, on this channel
// and remove it from the resource cache. When a ref to a channel is
// being released, there should be no more references to it from the
// resource cache.
for rType, rState := range a.resources {
for resourceName, state := range rState {
idx := 0
for _, xc := range state.xdsChannelConfigs {
if xc != cfg {
state.xdsChannelConfigs[idx] = xc
idx++
continue
}
xc.xc.unsubscribe(rType, resourceName)
}
state.xdsChannelConfigs = state.xdsChannelConfigs[:idx]
}
}

// Release the reference to the channel.
if cfg.cleanup != nil {
if a.logger.V(2) {
a.logger.Infof("Closing lower priority server %q", cfg.sc)
}

Check warning on line 565 in xds/internal/xdsclient/authority.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/authority.go#L564-L565

Added lines #L564 - L565 were not covered by tests
cfg.cleanup()
cfg.cleanup = nil
}
cfg.xc = nil
}
}

// watchResource registers a new watcher for the specified resource type and
// name. It returns a function that can be called to cancel the watch.
//
Expand Down Expand Up @@ -580,6 +734,21 @@
a.activeXDSChannel = nil
}

// uncachedWatcherExists returns true if there is at least one watcher for a
// resource that has not yet been cached.
//
// Only executed in the context of a serializer callback.
func (a *authority) uncachedWatcherExists() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name seems to convey "if any uncached watcher exist" where it should convey if there is a watcher that is looking for an uncached resource. May be rename to something like "uncachedResourceExistToWatch" or flip the bool to "allWatchableResourceCached" or simply "areAllRequestedResourceCached"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for _, resourceStates := range a.resources {
for _, state := range resourceStates {
if state.md.Status == xdsresource.ServiceStatusRequested {
return true
}
}
}
return false
}

// dumpResources returns a dump of the resource configuration cached by this
// authority, for CSDS purposes.
func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig {
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi
type OptionsForTesting struct {
// Name is a unique name for this xDS client.
Name string

// Contents contain a JSON representation of the bootstrap configuration to
// be used when creating the xDS client.
Contents []byte
Expand Down Expand Up @@ -180,7 +181,6 @@ func GetForTesting(name string) (XDSClient, func(), error) {
func init() {
internal.TriggerXDSResourceNotFoundForTesting = triggerXDSResourceNotFoundForTesting
xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting

}

func triggerXDSResourceNotFoundForTesting(client XDSClient, typ xdsresource.Type, name string) error {
Expand Down
Loading