Skip to content

Commit 63a15e6

Browse files
committed
grpc-js: Add a single-subchannel channel implementation
1 parent d9c8ac3 commit 63a15e6

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright 2025 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import { AuthContext } from "./auth-context";
19+
import { CallCredentials } from "./call-credentials";
20+
import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface";
21+
import { getNextCallNumber } from "./call-number";
22+
import { Channel } from "./channel";
23+
import { ChannelOptions } from "./channel-options";
24+
import { ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, unregisterChannelzRef } from "./channelz";
25+
import { ConnectivityState } from "./connectivity-state";
26+
import { Propagate, Status } from "./constants";
27+
import { Deadline, getRelativeTimeout } from "./deadline";
28+
import { Metadata } from "./metadata";
29+
import { getDefaultAuthority } from "./resolver";
30+
import { Subchannel } from "./subchannel";
31+
import { SubchannelCall } from "./subchannel-call";
32+
import { GrpcUri, uriToString } from "./uri-parser";
33+
34+
class SubchannelCallWrapper implements Call {
35+
private childCall: SubchannelCall | null = null;
36+
private pendingMessage: { context: MessageContext; message: Buffer } | null =
37+
null;
38+
private readPending = false;
39+
private halfClosePending = false;
40+
private pendingStatus: StatusObject | null = null;
41+
constructor(private subchannel: Subchannel, private method: string, private options: CallStreamOptions, private callNumber: number) {
42+
const timeout = getRelativeTimeout(options.deadline);
43+
if (timeout !== Infinity) {
44+
if (timeout <= 0) {
45+
this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
46+
} else {
47+
setTimeout(() => {
48+
this.cancelWithStatus(Status.DEADLINE_EXCEEDED, 'Deadline exceeded');
49+
}, timeout);
50+
}
51+
}
52+
}
53+
54+
cancelWithStatus(status: Status, details: string): void {
55+
if (this.childCall) {
56+
this.childCall.cancelWithStatus(status, details);
57+
} else {
58+
this.pendingStatus = {
59+
code: status,
60+
details: details,
61+
metadata: new Metadata()
62+
};
63+
}
64+
65+
}
66+
getPeer(): string {
67+
return this.childCall?.getPeer() ?? this.subchannel.getAddress();
68+
}
69+
start(metadata: Metadata, listener: InterceptingListener): void {
70+
if (this.pendingStatus) {
71+
listener.onReceiveStatus(this.pendingStatus);
72+
return;
73+
}
74+
if (this.subchannel.getConnectivityState() !== ConnectivityState.READY) {
75+
listener.onReceiveStatus({
76+
code: Status.UNAVAILABLE,
77+
details: 'Subchannel not ready',
78+
metadata: new Metadata()
79+
});
80+
return;
81+
}
82+
this.childCall = this.subchannel.createCall(metadata, this.options.host, this.method, listener);
83+
if (this.readPending) {
84+
this.childCall.startRead();
85+
}
86+
if (this.pendingMessage) {
87+
this.childCall.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message);
88+
}
89+
if (this.halfClosePending) {
90+
this.childCall.halfClose();
91+
}
92+
}
93+
sendMessageWithContext(context: MessageContext, message: Buffer): void {
94+
if (this.childCall) {
95+
this.childCall.sendMessageWithContext(context, message);
96+
} else {
97+
this.pendingMessage = { context, message };
98+
}
99+
}
100+
startRead(): void {
101+
if (this.childCall) {
102+
this.childCall.startRead();
103+
} else {
104+
this.readPending = true;
105+
}
106+
}
107+
halfClose(): void {
108+
if (this.childCall) {
109+
this.childCall.halfClose();
110+
} else {
111+
this.halfClosePending = true;
112+
}
113+
}
114+
getCallNumber(): number {
115+
return this.callNumber;
116+
}
117+
setCredentials(credentials: CallCredentials): void {
118+
throw new Error("Method not implemented.");
119+
}
120+
getAuthContext(): AuthContext | null {
121+
if (this.childCall) {
122+
return this.childCall.getAuthContext();
123+
} else {
124+
return null;
125+
}
126+
}
127+
}
128+
129+
export class SingleSubchannelChannel implements Channel {
130+
private channelzRef: ChannelRef;
131+
private channelzEnabled = false;
132+
private channelzTrace = new ChannelzTrace();
133+
private callTracker = new ChannelzCallTracker();
134+
private childrenTracker = new ChannelzChildrenTracker();
135+
constructor(private subchannel: Subchannel, private target: GrpcUri, options: ChannelOptions) {
136+
this.channelzEnabled = options['grpc.enable_channelz'] !== 0;
137+
this.channelzRef = registerChannelzChannel(uriToString(target), () => ({
138+
target: `${uriToString(target)} (${subchannel.getAddress()})`,
139+
state: this.subchannel.getConnectivityState(),
140+
trace: this.channelzTrace,
141+
callTracker: this.callTracker,
142+
children: this.childrenTracker.getChildLists()
143+
}), this.channelzEnabled);
144+
if (this.channelzEnabled) {
145+
this.childrenTracker.refChild(subchannel.getChannelzRef());
146+
}
147+
}
148+
149+
close(): void {
150+
if (this.channelzEnabled) {
151+
this.childrenTracker.unrefChild(this.subchannel.getChannelzRef());
152+
}
153+
unregisterChannelzRef(this.channelzRef);
154+
}
155+
156+
getTarget(): string {
157+
return uriToString(this.target);
158+
}
159+
getConnectivityState(tryToConnect: boolean): ConnectivityState {
160+
throw new Error("Method not implemented.");
161+
}
162+
watchConnectivityState(currentState: ConnectivityState, deadline: Date | number, callback: (error?: Error) => void): void {
163+
throw new Error("Method not implemented.");
164+
}
165+
getChannelzRef(): ChannelRef {
166+
return this.channelzRef;
167+
}
168+
createCall(method: string, deadline: Deadline): Call {
169+
const callOptions: CallStreamOptions = {
170+
deadline: deadline,
171+
host: getDefaultAuthority(this.target),
172+
flags: Propagate.DEFAULTS,
173+
parentCall: null
174+
};
175+
return new SubchannelCallWrapper(this.subchannel, method, callOptions, getNextCallNumber());
176+
}
177+
}

0 commit comments

Comments
 (0)