-
Notifications
You must be signed in to change notification settings - Fork 4.5k
xdsclient: support fallback within an authority #7701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,8 +200,90 @@ | |
} | ||
} | ||
|
||
// TODO(easwars-fallback): Trigger fallback here if conditions for fallback | ||
// are met. | ||
// Two conditions need to be met for fallback to be triggered: | ||
// 1. There is a connectivity failure on the ADS stream, as described in | ||
// gRFC A57. For us, this means that the ADS stream was closed before the | ||
// first server response was received. We already checked that condition | ||
// earlier in this method. | ||
// 2. There is at least one watcher for a resource that is not cached. | ||
// Cached resources include ones that | ||
// - have been successfully received and can be used. | ||
// - are considered non-existent according to xDS Protocol Specification. | ||
if !a.uncachedWatcherExists() { | ||
if a.logger.V(2) { | ||
a.logger.Infof("No watchers for uncached resources. Not triggering fallback") | ||
} | ||
return | ||
} | ||
a.triggerFallbackOnStreamFailure(serverConfig) | ||
} | ||
|
||
// Determines the server to fallback to and triggers fallback to the same. If | ||
// required, creates an xdsChannel to that server, and re-subscribes to all | ||
// existing resources. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstrap.ServerConfig) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this function is simply falling back and not checking any stream failures. I guess that's done by caller? In that case name can be simply "triggerFallback". Also, in docstring we should mention it switches to next fallback server from the list (i.e. next lower priority than current failing one) and if no more servers below current then it returns as no-op. May be name can be more explicit "switchToNextFallbackServerIfAvailable"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decided to go with |
||
if a.logger.V(2) { | ||
a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig) | ||
} | ||
|
||
// The server to fallback to is the next server on the list. If the current | ||
// server is the last server, then there is nothing that can be done. | ||
currentServerIdx := 0 | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, cfg := range a.xdsChannelConfigs { | ||
if cfg.sc.Equal(failingServerConfig) { | ||
break | ||
} | ||
currentServerIdx++ | ||
} | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if currentServerIdx == len(a.xdsChannelConfigs)-1 { | ||
if a.logger.V(2) { | ||
a.logger.Infof("No more servers to fallback to") | ||
} | ||
return | ||
} | ||
fallbackServerIdx := currentServerIdx + 1 | ||
fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx] | ||
|
||
// If the server to fallback to already has an xdsChannel, it means that | ||
// this connectivity error is from a server with a higher priority. There | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't quite understand this comment. If there is an error from higher priority server, we should fallback to a lower priority server. How is having an existing channel for fallback server makes any difference? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets say the authority has two servers: primary and fallback. We start off with primary, and say it fails and therefore we switch to fallback. And lets say the connection to fallback works and we get all resources from it and we are happy. Now, we get another error from the primary (in fact, we will keep getting stream errors from it since we retry the stream with backoff). At this point, we will see that we already have a channel to the next server in the list which is the fallback server, and therefore we have nothing to do here. |
||
// is not much we can do here. | ||
if fallbackChannel.xc != nil { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.sc) | ||
} | ||
return | ||
} | ||
|
||
// Create an xdsChannel for the fallback server. | ||
if a.logger.V(2) { | ||
a.logger.Infof("Initiating fallback to server %s", fallbackChannel.sc) | ||
} | ||
xc, cleanup, err := a.getChannelForADS(fallbackChannel.sc, a) | ||
if err != nil { | ||
a.logger.Errorf("Failed to create XDS channel: %v", err) | ||
return | ||
} | ||
fallbackChannel.xc = xc | ||
fallbackChannel.cleanup = cleanup | ||
a.activeXDSChannel = fallbackChannel | ||
|
||
// Subscribe to all existing resources from the new management server. | ||
for typ, resources := range a.resources { | ||
for name, state := range resources { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name) | ||
} | ||
xc.subscribe(typ, name) | ||
|
||
// Add the fallback channel to the list of xdsChannels from which | ||
// this resource has been requested from. Retain the cached resource | ||
// and the set of existing watchers (and other metadata fields) in | ||
// the resource state. | ||
state.xdsChannelConfigs = append(state.xdsChannelConfigs, fallbackChannel) | ||
} | ||
} | ||
} | ||
|
||
// adsResourceUpdate is called to notify the authority about a resource update | ||
|
@@ -218,13 +300,15 @@ | |
// handleADSResourceUpdate processes an update from the xDS client, updating the | ||
// resource cache and notifying any registered watchers of the update. | ||
// | ||
// If the update is received from a higher priority xdsChannel that was | ||
// previously down, we revert to it and close all lower priority xdsChannels. | ||
// | ||
// Once the update has been processed by all watchers, the authority is expected | ||
// to invoke the onDone callback. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) { | ||
// TODO(easwars-fallback): Trigger reverting to a higher priority server if | ||
// the update is from one. | ||
a.handleRevertingToPrimaryOnUpdate(serverConfig) | ||
|
||
// We build a list of callback funcs to invoke, and invoke them at the end | ||
// of this method instead of inline (when handling the update for a | ||
|
@@ -416,6 +500,76 @@ | |
} | ||
} | ||
|
||
// handleRevertingToPrimaryOnUpdate is called when a resource update is received | ||
// from a server that is not the current active server. This method ensures that | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// all lower priority servers are closed and the active server is reverted to | ||
// the highest priority server that has sent an update. | ||
// | ||
// This method is only executed in the context of a serializer callback. | ||
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) { | ||
if a.activeXDSChannel != nil && a.activeXDSChannel.sc.Equal(serverConfig) { | ||
// If the resource update is from the current active server, nothing | ||
// needs to be done from fallback point of view. | ||
return | ||
} | ||
|
||
if a.logger.V(2) { | ||
a.logger.Infof("Received update from non-active server %q", serverConfig) | ||
} | ||
|
||
// If the resource update is not from the current active server, it means | ||
// that we have received an update from a higher priority server and we need | ||
// to revert back to it. This method guarantees that when an update is | ||
// received from a server, all lower priority servers are closed. | ||
serverIdx := 0 | ||
for _, cfg := range a.xdsChannelConfigs { | ||
if cfg.sc.Equal(serverConfig) { | ||
break | ||
} | ||
serverIdx++ | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if serverIdx == len(a.xdsChannelConfigs) { | ||
// This can never happen. | ||
a.logger.Errorf("Received update from an unknown server: %v", serverConfig) | ||
return | ||
} | ||
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] | ||
|
||
// Close all lower priority channel. | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ { | ||
cfg := a.xdsChannelConfigs[i] | ||
|
||
// Unsubscribe any resources that were subscribed to, on this channel | ||
// and remove it from the resource cache. When a ref to a channel is | ||
// being released, there should be no more references to it from the | ||
// resource cache. | ||
for rType, rState := range a.resources { | ||
for resourceName, state := range rState { | ||
idx := 0 | ||
for _, xc := range state.xdsChannelConfigs { | ||
if xc != cfg { | ||
state.xdsChannelConfigs[idx] = xc | ||
idx++ | ||
continue | ||
} | ||
xc.xc.unsubscribe(rType, resourceName) | ||
} | ||
state.xdsChannelConfigs = state.xdsChannelConfigs[:idx] | ||
zasweq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
// Release the reference to the channel. | ||
if cfg.cleanup != nil { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Closing lower priority server %q", cfg.sc) | ||
} | ||
cfg.cleanup() | ||
cfg.cleanup = nil | ||
} | ||
cfg.xc = nil | ||
} | ||
} | ||
|
||
// watchResource registers a new watcher for the specified resource type and | ||
// name. It returns a function that can be called to cancel the watch. | ||
// | ||
|
@@ -580,6 +734,21 @@ | |
a.activeXDSChannel = nil | ||
} | ||
|
||
// uncachedWatcherExists returns true if there is at least one watcher for a | ||
// resource that has not yet been cached. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) uncachedWatcherExists() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: name seems to convey "if any uncached watcher exist" where it should convey if there is a watcher that is looking for an uncached resource. May be rename to something like "uncachedResourceExistToWatch" or flip the bool to "allWatchableResourceCached" or simply "areAllRequestedResourceCached" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
for _, resourceStates := range a.resources { | ||
for _, state := range resourceStates { | ||
if state.md.Status == xdsresource.ServiceStatusRequested { | ||
return true | ||
} | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// dumpResources returns a dump of the resource configuration cached by this | ||
// authority, for CSDS purposes. | ||
func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig { | ||
|
Uh oh!
There was an error while loading. Please reload this page.