Skip to content

Commit 6667ad7

Browse files
Googlercopybara-github
Googler
authored andcommitted
Remote: gRPC load balancing. (Part 3)
Implement SharedConnectionFactory which applys rate limiting on top of one connection. PiperOrigin-RevId: 358084865
1 parent f3c2ee2 commit 6667ad7

File tree

4 files changed

+533
-0
lines changed

4 files changed

+533
-0
lines changed

src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ java_library(
1818
deps = [
1919
"//src/main/java/com/google/devtools/build/lib/concurrent",
2020
"//third_party:guava",
21+
"//third_party:jsr305",
2122
"//third_party:rxjava3",
2223
"//third_party/grpc:grpc-jar",
2324
],

src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionPool.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.grpc;
1515

16+
import io.reactivex.rxjava3.core.Single;
1617
import java.io.Closeable;
1718
import java.io.IOException;
1819

@@ -24,6 +25,13 @@
2425
* <p>Connections must be closed with {@link Connection#close()} in order to be reused later.
2526
*/
2627
public interface ConnectionPool extends ConnectionFactory, Closeable {
28+
/**
29+
* Reuses a {@link Connection} in the pool and will potentially create a new connection depends on
30+
* implementation.
31+
*/
32+
@Override
33+
Single<? extends Connection> create();
34+
2735
/** Closes the connection pool and closes all the underlying connections */
2836
@Override
2937
void close() throws IOException;
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2021 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.grpc;
15+
16+
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
17+
import io.grpc.CallOptions;
18+
import io.grpc.ClientCall;
19+
import io.grpc.MethodDescriptor;
20+
import io.reactivex.rxjava3.core.Single;
21+
import io.reactivex.rxjava3.disposables.Disposable;
22+
import io.reactivex.rxjava3.functions.Action;
23+
import io.reactivex.rxjava3.subjects.AsyncSubject;
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.concurrent.locks.ReentrantLock;
29+
import javax.annotation.Nullable;
30+
import javax.annotation.concurrent.GuardedBy;
31+
32+
/**
33+
* A {@link ConnectionPool} that creates one connection using provided {@link ConnectionFactory} and
34+
* shares the connection upto {@code maxConcurrency}.
35+
*
36+
* <p>This is useful if underlying connection maintains a connection pool internally. (such as
37+
* {@code Channel} in gRPC)
38+
*
39+
* <p>Connections must be closed with {@link Connection#close()} in order to be reused later.
40+
*/
41+
@ThreadSafe
42+
public class SharedConnectionFactory implements ConnectionPool {
43+
private final TokenBucket<Integer> tokenBucket;
44+
private final ConnectionFactory factory;
45+
46+
@Nullable
47+
@GuardedBy("connectionLock")
48+
private AsyncSubject<Connection> connectionAsyncSubject = null;
49+
50+
private final ReentrantLock connectionLock = new ReentrantLock();
51+
private final AtomicReference<Disposable> connectionCreationDisposable =
52+
new AtomicReference<>(null);
53+
54+
public SharedConnectionFactory(ConnectionFactory factory, int maxConcurrency) {
55+
this.factory = factory;
56+
57+
List<Integer> initialTokens = new ArrayList<>(maxConcurrency);
58+
for (int i = 0; i < maxConcurrency; ++i) {
59+
initialTokens.add(i);
60+
}
61+
this.tokenBucket = new TokenBucket<>(initialTokens);
62+
}
63+
64+
@Override
65+
public void close() throws IOException {
66+
tokenBucket.close();
67+
68+
Disposable d = connectionCreationDisposable.getAndSet(null);
69+
if (d != null && !d.isDisposed()) {
70+
d.dispose();
71+
}
72+
73+
try {
74+
connectionLock.lockInterruptibly();
75+
76+
if (connectionAsyncSubject != null) {
77+
Connection connection = connectionAsyncSubject.getValue();
78+
if (connection != null) {
79+
connection.close();
80+
}
81+
82+
if (!connectionAsyncSubject.hasComplete()) {
83+
connectionAsyncSubject.onError(new IllegalStateException("closed"));
84+
}
85+
}
86+
} catch (InterruptedException e) {
87+
throw new IOException(e);
88+
} finally {
89+
connectionLock.unlock();
90+
}
91+
}
92+
93+
private AsyncSubject<Connection> createUnderlyingConnectionIfNot() throws InterruptedException {
94+
connectionLock.lockInterruptibly();
95+
try {
96+
if (connectionAsyncSubject == null || connectionAsyncSubject.hasThrowable()) {
97+
connectionAsyncSubject =
98+
factory
99+
.create()
100+
.doOnSubscribe(connectionCreationDisposable::set)
101+
.toObservable()
102+
.subscribeWith(AsyncSubject.create());
103+
}
104+
105+
return connectionAsyncSubject;
106+
} finally {
107+
connectionLock.unlock();
108+
}
109+
}
110+
111+
private Single<? extends Connection> acquireConnection() {
112+
return Single.fromCallable(this::createUnderlyingConnectionIfNot)
113+
.flatMap(Single::fromObservable);
114+
}
115+
116+
/**
117+
* Reuses the underlying {@link Connection} and wait for it to be released if is exceeding {@code
118+
* maxConcurrency}.
119+
*/
120+
@Override
121+
public Single<SharedConnection> create() {
122+
return tokenBucket
123+
.acquireToken()
124+
.flatMap(
125+
token ->
126+
acquireConnection()
127+
.doOnError(ignored -> tokenBucket.addToken(token))
128+
.doOnDispose(() -> tokenBucket.addToken(token))
129+
.map(
130+
conn ->
131+
new SharedConnection(
132+
conn, /* onClose= */ () -> tokenBucket.addToken(token))));
133+
}
134+
135+
/** Returns current number of available connections. */
136+
public int numAvailableConnections() {
137+
return tokenBucket.size();
138+
}
139+
140+
/** A {@link Connection} which wraps an underlying connection and is shared between consumers. */
141+
public static class SharedConnection implements Connection {
142+
private final Connection connection;
143+
private final Action onClose;
144+
145+
public SharedConnection(Connection connection, Action onClose) {
146+
this.connection = connection;
147+
this.onClose = onClose;
148+
}
149+
150+
@Override
151+
public <ReqT, RespT> ClientCall<ReqT, RespT> call(
152+
MethodDescriptor<ReqT, RespT> method, CallOptions options) {
153+
return connection.call(method, options);
154+
}
155+
156+
@Override
157+
public void close() throws IOException {
158+
try {
159+
onClose.run();
160+
} catch (Throwable t) {
161+
throw new IOException(t);
162+
}
163+
}
164+
165+
/** Returns the underlying connection this shared connection built on */
166+
public Connection getUnderlyingConnection() {
167+
return connection;
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)