-
Notifications
You must be signed in to change notification settings - Fork 4.5k
xdsclient: support fallback within an authority #7701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,8 +200,101 @@ | |
} | ||
} | ||
|
||
// TODO(easwars-fallback): Trigger fallback here if conditions for fallback | ||
// are met. | ||
// Two conditions need to be met for fallback to be triggered: | ||
// 1. There is a connectivity failure on the ADS stream, as described in | ||
// gRFC A57. For us, this means that the ADS stream was closed before the | ||
// first server response was received. We already checked that condition | ||
// earlier in this method. | ||
// 2. There is at least one watcher for a resource that is not cached. | ||
// Cached resources include ones that | ||
// - have been successfully received and can be used. | ||
// - are considered non-existent according to xDS Protocol Specification. | ||
if !a.uncachedWatcherExists() { | ||
if a.logger.V(2) { | ||
a.logger.Infof("No watchers for uncached resources. Not triggering fallback") | ||
} | ||
return | ||
} | ||
a.triggerFallbackOnStreamFailure(serverConfig) | ||
} | ||
|
||
// serverIndexForConfig returns the index of the xdsChannelConfig that matches | ||
// the provided ServerConfig. If no match is found, it returns the length of the | ||
// xdsChannelConfigs slice, which represents the index of a non-existent config. | ||
func (a *authority) serverIndexForConfig(sc *bootstrap.ServerConfig) int { | ||
for i, cfg := range a.xdsChannelConfigs { | ||
if cfg.sc.Equal(sc) { | ||
return i | ||
} | ||
} | ||
return len(a.xdsChannelConfigs) | ||
} | ||
|
||
// Determines the server to fallback to and triggers fallback to the same. If | ||
// required, creates an xdsChannel to that server, and re-subscribes to all | ||
// existing resources. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) triggerFallbackOnStreamFailure(failingServerConfig *bootstrap.ServerConfig) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this function is simply falling back and not checking any stream failures. I guess that's done by caller? In that case name can be simply "triggerFallback". Also, in docstring we should mention it switches to next fallback server from the list (i.e. next lower priority than current failing one) and if no more servers below current then it returns as no-op. May be name can be more explicit "switchToNextFallbackServerIfAvailable"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decided to go with |
||
if a.logger.V(2) { | ||
a.logger.Infof("Attempting to initiate fallback after failure from server %q", failingServerConfig) | ||
} | ||
|
||
// The server to fallback to is the next server on the list. If the current | ||
// server is the last server, then there is nothing that can be done. | ||
currentServerIdx := a.serverIndexForConfig(failingServerConfig) | ||
if currentServerIdx == len(a.xdsChannelConfigs) { | ||
// This can never happen. | ||
a.logger.Errorf("Received error from an unknown server: %s", failingServerConfig) | ||
return | ||
} | ||
if currentServerIdx == len(a.xdsChannelConfigs)-1 { | ||
if a.logger.V(2) { | ||
a.logger.Infof("No more servers to fallback to") | ||
} | ||
return | ||
} | ||
fallbackServerIdx := currentServerIdx + 1 | ||
fallbackChannel := a.xdsChannelConfigs[fallbackServerIdx] | ||
|
||
// If the server to fallback to already has an xdsChannel, it means that | ||
// this connectivity error is from a server with a higher priority. There | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't quite understand this comment. If there is an error from higher priority server, we should fallback to a lower priority server. How is having an existing channel for fallback server makes any difference? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets say the authority has two servers: primary and fallback. We start off with primary, and say it fails and therefore we switch to fallback. And lets say the connection to fallback works and we get all resources from it and we are happy. Now, we get another error from the primary (in fact, we will keep getting stream errors from it since we retry the stream with backoff). At this point, we will see that we already have a channel to the next server in the list which is the fallback server, and therefore we have nothing to do here. |
||
// is not much we can do here. | ||
if fallbackChannel.xc != nil { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Channel to the next server in the list %q already exists", fallbackChannel.sc) | ||
} | ||
return | ||
} | ||
|
||
// Create an xdsChannel for the fallback server. | ||
if a.logger.V(2) { | ||
a.logger.Infof("Initiating fallback to server %s", fallbackChannel.sc) | ||
} | ||
xc, cleanup, err := a.getChannelForADS(fallbackChannel.sc, a) | ||
if err != nil { | ||
a.logger.Errorf("Failed to create XDS channel: %v", err) | ||
return | ||
} | ||
fallbackChannel.xc = xc | ||
fallbackChannel.cleanup = cleanup | ||
a.activeXDSChannel = fallbackChannel | ||
|
||
// Subscribe to all existing resources from the new management server. | ||
for typ, resources := range a.resources { | ||
for name, state := range resources { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName(), name) | ||
} | ||
xc.subscribe(typ, name) | ||
|
||
// Add the fallback channel to the list of xdsChannels from which | ||
// this resource has been requested from. Retain the cached resource | ||
// and the set of existing watchers (and other metadata fields) in | ||
// the resource state. | ||
state.xdsChannelConfigs = append(state.xdsChannelConfigs, fallbackChannel) | ||
} | ||
} | ||
} | ||
|
||
// adsResourceUpdate is called to notify the authority about a resource update | ||
|
@@ -218,13 +311,15 @@ | |
// handleADSResourceUpdate processes an update from the xDS client, updating the | ||
// resource cache and notifying any registered watchers of the update. | ||
// | ||
// If the update is received from a higher priority xdsChannel that was | ||
// previously down, we revert to it and close all lower priority xdsChannels. | ||
// | ||
// Once the update has been processed by all watchers, the authority is expected | ||
// to invoke the onDone callback. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) handleADSResourceUpdate(serverConfig *bootstrap.ServerConfig, rType xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) { | ||
// TODO(easwars-fallback): Trigger reverting to a higher priority server if | ||
// the update is from one. | ||
a.handleRevertingToPrimaryOnUpdate(serverConfig) | ||
|
||
// We build a list of callback funcs to invoke, and invoke them at the end | ||
// of this method instead of inline (when handling the update for a | ||
|
@@ -416,6 +511,73 @@ | |
} | ||
} | ||
|
||
// handleRevertingToPrimaryOnUpdate is called when a resource update is received | ||
// from the xDS client. | ||
// | ||
// If the update is from the currently active server, nothing is done. Else, all | ||
// lower priority servers are closed and the active server is reverted to the | ||
// highest priority server that sent the update. | ||
// | ||
// This method is only executed in the context of a serializer callback. | ||
func (a *authority) handleRevertingToPrimaryOnUpdate(serverConfig *bootstrap.ServerConfig) { | ||
if a.activeXDSChannel != nil && a.activeXDSChannel.sc.Equal(serverConfig) { | ||
// If the resource update is from the current active server, nothing | ||
// needs to be done from fallback point of view. | ||
return | ||
} | ||
|
||
if a.logger.V(2) { | ||
a.logger.Infof("Received update from non-active server %q", serverConfig) | ||
} | ||
|
||
// If the resource update is not from the current active server, it means | ||
// that we have received an update from a higher priority server and we need | ||
// to revert back to it. This method guarantees that when an update is | ||
// received from a server, all lower priority servers are closed. | ||
serverIdx := a.serverIndexForConfig(serverConfig) | ||
if serverIdx == len(a.xdsChannelConfigs) { | ||
// This can never happen. | ||
a.logger.Errorf("Received update from an unknown server: %s", serverConfig) | ||
return | ||
} | ||
a.activeXDSChannel = a.xdsChannelConfigs[serverIdx] | ||
|
||
// Close all lower priority channels. | ||
// | ||
// But before closing any channel, we need to unsubscribe from any resources | ||
// that were subscribed to on this channel. Resources could be subscribed to | ||
// from multiple channels as we fallback to lower priority servers. But when | ||
// a higher priority one comes back up, we need to unsubscribe from all | ||
// lower priority ones before releasing the reference to them. | ||
for i := serverIdx + 1; i < len(a.xdsChannelConfigs); i++ { | ||
cfg := a.xdsChannelConfigs[i] | ||
|
||
for rType, rState := range a.resources { | ||
for resourceName, state := range rState { | ||
for idx, xc := range state.xdsChannelConfigs { | ||
// If the current resource is subscribed to on this channel, | ||
// unsubscribe, and remove the channel from the list of | ||
// channels that this resource is subscribed to. | ||
if xc == cfg { | ||
state.xdsChannelConfigs = append(state.xdsChannelConfigs[:idx], state.xdsChannelConfigs[idx+1:]...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this panic due to idx + 1 because of all these mutations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah if it hits the last index does the index+1: panic, or is it just like nil or something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Decided to go with a map instead of a slice and that significantly simplifies this piece of code. |
||
xc.xc.unsubscribe(rType, resourceName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I still get confused by xc.xc, can we switch the top level symbol to xcc or something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, in a few other places as well. Also, I renames the struct fields to be more descriptive. |
||
} | ||
} | ||
} | ||
} | ||
|
||
// Release the reference to the channel. | ||
if cfg.cleanup != nil { | ||
if a.logger.V(2) { | ||
a.logger.Infof("Closing lower priority server %q", cfg.sc) | ||
} | ||
cfg.cleanup() | ||
cfg.cleanup = nil | ||
} | ||
cfg.xc = nil | ||
} | ||
} | ||
|
||
// watchResource registers a new watcher for the specified resource type and | ||
// name. It returns a function that can be called to cancel the watch. | ||
// | ||
|
@@ -580,6 +742,21 @@ | |
a.activeXDSChannel = nil | ||
} | ||
|
||
// uncachedWatcherExists returns true if there is at least one watcher for a | ||
// resource that has not yet been cached. | ||
// | ||
// Only executed in the context of a serializer callback. | ||
func (a *authority) uncachedWatcherExists() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: name seems to convey "if any uncached watcher exist" where it should convey if there is a watcher that is looking for an uncached resource. May be rename to something like "uncachedResourceExistToWatch" or flip the bool to "allWatchableResourceCached" or simply "areAllRequestedResourceCached" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
for _, resourceStates := range a.resources { | ||
for _, state := range resourceStates { | ||
if state.md.Status == xdsresource.ServiceStatusRequested { | ||
return true | ||
} | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// dumpResources returns a dump of the resource configuration cached by this | ||
// authority, for CSDS purposes. | ||
func (a *authority) dumpResources() []*v3statuspb.ClientConfig_GenericXdsConfig { | ||
|
Uh oh!
There was an error while loading. Please reload this page.