Skip to content

grpc-js: Pass channel args to LB policies with updates #2854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import PickResult = grpc.experimental.PickResult;
import PickResultType = grpc.experimental.PickResultType;
import createChildChannelControlHelper = grpc.experimental.createChildChannelControlHelper;
import parseLoadBalancingConfig = grpc.experimental.parseLoadBalancingConfig;
import { ChannelOptions } from '@grpc/grpc-js';

grpc_xds.register();

Expand Down Expand Up @@ -88,7 +89,7 @@ const RPC_BEHAVIOR_CHILD_CONFIG = parseLoadBalancingConfig({round_robin: {}});
class RpcBehaviorLoadBalancer implements LoadBalancer {
private child: ChildLoadBalancerHandler;
private latestConfig: RpcBehaviorLoadBalancingConfig | null = null;
constructor(channelControlHelper: ChannelControlHelper, options: grpc.ChannelOptions) {
constructor(channelControlHelper: ChannelControlHelper) {
const childChannelControlHelper = createChildChannelControlHelper(channelControlHelper, {
updateState: (connectivityState, picker) => {
if (connectivityState === grpc.connectivityState.READY && this.latestConfig) {
Expand All @@ -97,14 +98,14 @@ class RpcBehaviorLoadBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
});
this.child = new ChildLoadBalancerHandler(childChannelControlHelper, options);
this.child = new ChildLoadBalancerHandler(childChannelControlHelper);
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof RpcBehaviorLoadBalancingConfig)) {
return;
}
this.latestConfig = lbConfig;
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, attributes);
this.child.updateAddressList(endpointList, RPC_BEHAVIOR_CHILD_CONFIG, options);
}
exitIdle(): void {
this.child.exitIdle();
Expand Down
19 changes: 11 additions & 8 deletions packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { XdsConfig } from './xds-dependency-manager';
import { LocalityEndpoint, PriorityChildRaw } from './load-balancer-priority';
import { Locality__Output } from './generated/envoy/config/core/v3/Locality';
import { AGGREGATE_CLUSTER_BACKWARDS_COMPAT, EXPERIMENTAL_OUTLIER_DETECTION } from './environment';
import { XDS_CONFIG_KEY } from './resolver-xds';

const TRACER_NAME = 'cds_balancer';

Expand Down Expand Up @@ -91,6 +92,8 @@ export function localityToName(locality: Locality__Output) {
return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`;
}

export const ROOT_CLUSTER_KEY = 'grpc.internal.root_cluster';

export class CdsLoadBalancer implements LoadBalancer {
private childBalancer: ChildLoadBalancerHandler;

Expand All @@ -99,8 +102,8 @@ export class CdsLoadBalancer implements LoadBalancer {
private priorityNames: string[] = [];
private nextPriorityChildNumber = 0;

constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper, options);
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(channelControlHelper);
}

private getNextPriorityName(cluster: string) {
Expand All @@ -110,14 +113,14 @@ export class CdsLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof CdsLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig, undefined, 2));
return;
}
trace('Received update with config ' + JSON.stringify(lbConfig, undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const clusterName = lbConfig.getCluster();
const maybeClusterConfig = xdsConfig.clusters.get(clusterName);
if (!maybeClusterConfig) {
Expand Down Expand Up @@ -165,7 +168,7 @@ export class CdsLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({code: status.UNAVAILABLE, details: `LB policy config parsing failed with error ${(e as Error).message}`, metadata: new Metadata()}));
return;
}
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...attributes, rootCluster: clusterName});
this.childBalancer.updateAddressList(endpointList, typedChildConfig, {...options, [ROOT_CLUSTER_KEY]: clusterName});
} else {
if (!clusterConfig.children.endpoints) {
trace('Received update with no resolved endpoints for cluster ' + clusterName);
Expand All @@ -180,8 +183,8 @@ export class CdsLoadBalancer implements LoadBalancer {
if (clusterConfig.cluster.type === 'EDS') {
endpointPickingPolicy = clusterConfig.cluster.lbPolicyConfig;
if (AGGREGATE_CLUSTER_BACKWARDS_COMPAT) {
if (typeof attributes.rootCluster === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(attributes.rootCluster);
if (typeof options[ROOT_CLUSTER_KEY] === 'string') {
const maybeRootClusterConfig = xdsConfig.clusters.get(options[ROOT_CLUSTER_KEY]);
if (maybeRootClusterConfig?.success) {
endpointPickingPolicy = maybeRootClusterConfig.value.cluster.lbPolicyConfig;
}
Expand Down Expand Up @@ -279,7 +282,7 @@ export class CdsLoadBalancer implements LoadBalancer {
return;
}
trace(JSON.stringify(typedChildConfig.toJsonObject(), undefined, 2));
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, attributes);
this.childBalancer.updateAddressList(childEndpointList, typedChildConfig, options);
}
}
exitIdle(): void {
Expand Down
14 changes: 7 additions & 7 deletions packages/grpc-js-xds/src/load-balancer-priority.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
this.parent.channelControlHelper.requestReresolution();
}
}
}), parent.options);
}));
this.picker = new QueuePicker(this.childBalancer);
this.startFailoverTimer();
}
Expand Down Expand Up @@ -307,7 +307,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
* The attributes object from the latest update, saved to be passed along to
* each new child as they are created
*/
private latestAttributes: { [key: string]: unknown } = {};
private latestOptions: ChannelOptions = {};
/**
* The latest load balancing policies and address lists for each child from
* the latest update
Expand All @@ -323,7 +323,7 @@ export class PriorityLoadBalancer implements LoadBalancer {

private updatesPaused = false;

constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private updateState(state: ConnectivityState, picker: Picker) {
trace(
Expand Down Expand Up @@ -392,7 +392,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
child.updateAddressList(
childUpdate.subchannelAddress,
childUpdate.lbConfig,
this.latestAttributes
this.latestOptions
);
} else {
/* We're going to try to use this child, so reactivate it if it has been
Expand Down Expand Up @@ -431,7 +431,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof PriorityLoadBalancingConfig)) {
// Reject a config of the wrong type
Expand Down Expand Up @@ -467,7 +467,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
}
childAddressList.push(childAddress);
}
this.latestAttributes = attributes;
this.latestOptions = options;
this.latestUpdates.clear();
this.priorities = lbConfig.getPriorities();
this.updatesPaused = true;
Expand All @@ -486,7 +486,7 @@ export class PriorityLoadBalancer implements LoadBalancer {
existingChild.updateAddressList(
childAddresses,
childConfig.config,
attributes
options
);
}
}
Expand Down
22 changes: 10 additions & 12 deletions packages/grpc-js-xds/src/load-balancer-ring-hash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ class RingHashLoadBalancer implements LoadBalancer {
private updatesPaused = false;
private currentState: connectivityState = connectivityState.IDLE;
private ring: RingEntry[] = [];
private ringHashSizeCap = DEFAULT_RING_SIZE_CAP;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {
constructor(private channelControlHelper: ChannelControlHelper) {
this.childChannelControlHelper = createChildChannelControlHelper(
channelControlHelper,
{
Expand Down Expand Up @@ -254,9 +253,6 @@ class RingHashLoadBalancer implements LoadBalancer {
},
}
);
if (options['grpc.lb.ring_hash.ring_size_cap'] !== undefined) {
this.ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'];
}
}

private calculateAndUpdateState() {
Expand Down Expand Up @@ -316,7 +312,8 @@ class RingHashLoadBalancer implements LoadBalancer {

private constructRing(
endpointList: Endpoint[],
config: RingHashLoadBalancingConfig
config: RingHashLoadBalancingConfig,
ringHashSizeCap: number
) {
this.ring = [];
const endpointWeights: EndpointWeight[] = [];
Expand All @@ -336,8 +333,8 @@ class RingHashLoadBalancer implements LoadBalancer {
minNormalizedWeight
);
}
const minRingSize = Math.min(config.getMinRingSize(), this.ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), this.ringHashSizeCap);
const minRingSize = Math.min(config.getMinRingSize(), ringHashSizeCap);
const maxRingSize = Math.min(config.getMaxRingSize(), ringHashSizeCap);
/* Calculate a scale factor that meets the following conditions:
* 1. The result is between minRingSize and maxRingSize, inclusive
* 2. The smallest normalized weight is scaled to a whole number, if it
Expand Down Expand Up @@ -390,7 +387,7 @@ class RingHashLoadBalancer implements LoadBalancer {
updateAddressList(
endpointList: Endpoint[],
lbConfig: TypedLoadBalancingConfig,
attributes: { [key: string]: unknown }
options: ChannelOptions
): void {
if (!(lbConfig instanceof RingHashLoadBalancingConfig)) {
trace('Discarding address update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand All @@ -403,11 +400,11 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const endpoint of endpointList) {
const leafBalancer = this.leafMap.get(endpoint);
if (leafBalancer) {
leafBalancer.updateEndpoint(endpoint);
leafBalancer.updateEndpoint(endpoint, options);
} else {
this.leafMap.set(
endpoint,
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, this.options)
new LeafLoadBalancer(endpoint, this.childChannelControlHelper, options)
);
}
const weight = this.leafWeightMap.get(endpoint);
Expand All @@ -420,8 +417,9 @@ class RingHashLoadBalancer implements LoadBalancer {
for (const leaf of removedLeaves) {
leaf.destroy();
}
const ringHashSizeCap = options['grpc.lb.ring_hash.ring_size_cap'] ?? DEFAULT_RING_SIZE_CAP
loadXxhashApi().then(() => {
this.constructRing(dedupedEndpointList, lbConfig);
this.constructRing(dedupedEndpointList, lbConfig, ringHashSizeCap);
this.updatesPaused = false;
this.calculateAndUpdateState();
});
Expand Down
12 changes: 6 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-weighted-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));

this.picker = new QueuePicker(this.childBalancer);
}
Expand All @@ -190,9 +190,9 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.parent.maybeUpdateState();
}

updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: WeightedTarget, options: ChannelOptions): void {
this.weight = lbConfig.weight;
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.child_policy, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down Expand Up @@ -243,7 +243,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
private targetList: string[] = [];
private updatesPaused = false;

constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private maybeUpdateState() {
if (!this.updatesPaused) {
Expand Down Expand Up @@ -319,7 +319,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, picker);
}

updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(addressList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof WeightedTargetLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand Down Expand Up @@ -365,7 +365,7 @@ export class WeightedTargetLoadBalancer implements LoadBalancer {
}
const targetEndpoints = childEndpointMap.get(targetName) ?? [];
trace('Assigning target ' + targetName + ' address list ' + targetEndpoints.map(endpoint => '(' + endpointToString(endpoint) + ' path=' + endpoint.localityPath + ')'));
target.updateAddressList(targetEndpoints, targetConfig, attributes);
target.updateAddressList(targetEndpoints, targetConfig, options);
}

// Deactivate targets that are not in the new config
Expand Down
13 changes: 7 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import UnavailablePicker = experimental.UnavailablePicker;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";
import { XDS_CLIENT_KEY, XDS_CONFIG_KEY } from "./resolver-xds";

const TRACER_NAME = 'xds_cluster_impl';

Expand Down Expand Up @@ -211,7 +212,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
private xdsClient: XdsClient | null = null;
private latestClusterConfig: ClusterConfig | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper, options: ChannelOptions) {
constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
Expand Down Expand Up @@ -248,15 +249,15 @@ class XdsClusterImplBalancer implements LoadBalancer {
channelControlHelper.updateState(connectivityState, picker);
}
}
}), options);
}));
}
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterImplLoadBalancingConfig)) {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const xdsConfig = options[XDS_CONFIG_KEY] as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
Expand All @@ -281,7 +282,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.latestClusterConfig = clusterConfig;
this.xdsClient = attributes.xdsClient as XdsClient;
this.xdsClient = options[XDS_CLIENT_KEY] as XdsClient;
if (clusterConfig.cluster.lrsLoadReportingServer) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
clusterConfig.cluster.lrsLoadReportingServer,
Expand All @@ -290,7 +291,7 @@ class XdsClusterImplBalancer implements LoadBalancer {
);
}

this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), attributes);
this.childBalancer.updateAddressList(endpointList, lbConfig.getChildPolicy(), options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand Down
12 changes: 6 additions & 6 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class XdsClusterManager implements LoadBalancer {
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
this.updateState(connectivityState, picker);
},
}), parent.options);
}));

this.picker = new QueuePicker(this.childBalancer);
}
Expand All @@ -142,8 +142,8 @@ class XdsClusterManager implements LoadBalancer {
this.picker = picker;
this.parent.maybeUpdateState();
}
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
this.childBalancer.updateAddressList(endpointList, childConfig, attributes);
updateAddressList(endpointList: Endpoint[], childConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
this.childBalancer.updateAddressList(endpointList, childConfig, options);
}
exitIdle(): void {
this.childBalancer.exitIdle();
Expand All @@ -167,7 +167,7 @@ class XdsClusterManager implements LoadBalancer {
// Shutdown is a placeholder value that will never appear in normal operation.
private currentState: ConnectivityState = ConnectivityState.SHUTDOWN;
private updatesPaused = false;
constructor(private channelControlHelper: ChannelControlHelper, private options: ChannelOptions) {}
constructor(private channelControlHelper: ChannelControlHelper) {}

private maybeUpdateState() {
if (!this.updatesPaused) {
Expand Down Expand Up @@ -207,7 +207,7 @@ class XdsClusterManager implements LoadBalancer {
this.channelControlHelper.updateState(connectivityState, new XdsClusterManagerPicker(pickerMap));
}

updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, attributes: { [key: string]: unknown; }): void {
updateAddressList(endpointList: Endpoint[], lbConfig: TypedLoadBalancingConfig, options: ChannelOptions): void {
if (!(lbConfig instanceof XdsClusterManagerLoadBalancingConfig)) {
// Reject a config of the wrong type
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
Expand All @@ -234,7 +234,7 @@ class XdsClusterManager implements LoadBalancer {
child = new this.XdsClusterManagerChildImpl(this, name);
this.children.set(name, child);
}
child.updateAddressList(endpointList, childConfig, attributes);
child.updateAddressList(endpointList, childConfig, options);
}
this.updatesPaused = false;
this.updateState();
Expand Down
Loading
Loading