Skip to content

Commit e2b9a42

Browse files
Googlercopybara-github
Googler
authored andcommitted
Remote: gRPC load balancing. (Part 2)
Add a TokenBucket which is used later for rate limiting in connection pool. PiperOrigin-RevId: 357946500
1 parent af3a556 commit e2b9a42

File tree

6 files changed

+358
-0
lines changed

6 files changed

+358
-0
lines changed

src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ java_library(
1616
name = "grpc",
1717
srcs = glob(["*.java"]),
1818
deps = [
19+
"//src/main/java/com/google/devtools/build/lib/concurrent",
20+
"//third_party:guava",
1921
"//third_party:rxjava3",
2022
"//third_party/grpc:grpc-jar",
2123
],

src/main/java/com/google/devtools/build/lib/remote/grpc/ConnectionFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.grpc;
1515

16+
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
1617
import io.reactivex.rxjava3.core.Single;
1718

1819
/**
@@ -25,7 +26,10 @@
2526
*
2627
* <p>Connection creation must be cancellable. Canceling connection creation must release (“close”)
2728
* the connection and all associated resources.
29+
*
30+
* <p>Implementations must be thread-safe.
2831
*/
32+
@ThreadSafe
2933
public interface ConnectionFactory {
3034
/** Creates a new {@link Connection}. */
3135
Single<? extends Connection> create();
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2021 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.grpc;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
18+
import io.reactivex.rxjava3.annotations.NonNull;
19+
import io.reactivex.rxjava3.core.Observer;
20+
import io.reactivex.rxjava3.core.Single;
21+
import io.reactivex.rxjava3.disposables.Disposable;
22+
import io.reactivex.rxjava3.subjects.BehaviorSubject;
23+
import java.io.Closeable;
24+
import java.io.IOException;
25+
import java.util.Collection;
26+
import java.util.concurrent.ConcurrentLinkedDeque;
27+
28+
/** A container for tokens which is used for rate limiting. */
29+
@ThreadSafe
30+
public class TokenBucket<T> implements Closeable {
31+
private final ConcurrentLinkedDeque<T> tokens;
32+
private final BehaviorSubject<T> tokenBehaviorSubject;
33+
34+
public TokenBucket() {
35+
this(ImmutableList.of());
36+
}
37+
38+
public TokenBucket(Collection<T> initialTokens) {
39+
tokens = new ConcurrentLinkedDeque<>(initialTokens);
40+
tokenBehaviorSubject = BehaviorSubject.create();
41+
42+
if (!tokens.isEmpty()) {
43+
tokenBehaviorSubject.onNext(tokens.getFirst());
44+
}
45+
}
46+
47+
/** Add a token to the bucket. */
48+
public void addToken(T token) {
49+
tokens.addLast(token);
50+
tokenBehaviorSubject.onNext(token);
51+
}
52+
53+
/** Returns current number of tokens in the bucket. */
54+
public int size() {
55+
return tokens.size();
56+
}
57+
58+
/**
59+
* Returns a cold {@link Single} which will start the token acquisition process upon subscription.
60+
*/
61+
public Single<T> acquireToken() {
62+
return Single.create(
63+
downstream ->
64+
tokenBehaviorSubject.subscribe(
65+
new Observer<T>() {
66+
Disposable upstream;
67+
68+
@Override
69+
public void onSubscribe(@NonNull Disposable d) {
70+
upstream = d;
71+
downstream.setDisposable(d);
72+
}
73+
74+
@Override
75+
public void onNext(@NonNull T ignored) {
76+
if (!downstream.isDisposed()) {
77+
T token = tokens.pollFirst();
78+
if (token != null) {
79+
downstream.onSuccess(token);
80+
}
81+
}
82+
}
83+
84+
@Override
85+
public void onError(@NonNull Throwable e) {
86+
downstream.onError(new IllegalStateException(e));
87+
}
88+
89+
@Override
90+
public void onComplete() {
91+
if (!downstream.isDisposed()) {
92+
downstream.onError(new IllegalStateException("closed"));
93+
}
94+
}
95+
}));
96+
}
97+
98+
/**
99+
* Closes the bucket and release all the tokens.
100+
*
101+
* <p>Subscriptions after closed to the Single returned by {@link TokenBucket#acquireToken()} will
102+
* emit error.
103+
*/
104+
@Override
105+
public void close() throws IOException {
106+
tokens.clear();
107+
tokenBehaviorSubject.onComplete();
108+
}
109+
}

src/test/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ filegroup(
1111
srcs = glob(["**"]) + [
1212
"//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs",
1313
"//src/test/java/com/google/devtools/build/lib/remote/http:srcs",
14+
"//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs",
1415
"//src/test/java/com/google/devtools/build/lib/remote/logging:srcs",
1516
"//src/test/java/com/google/devtools/build/lib/remote/merkletree:srcs",
1617
"//src/test/java/com/google/devtools/build/lib/remote/options:srcs",
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@rules_java//java:defs.bzl", "java_test")
2+
3+
package(
4+
default_testonly = 1,
5+
default_visibility = ["//src:__subpackages__"],
6+
)
7+
8+
filegroup(
9+
name = "srcs",
10+
testonly = 0,
11+
srcs = glob(["**"]),
12+
visibility = ["//src/test/java/com/google/devtools/build/lib/remote:__pkg__"],
13+
)
14+
15+
java_test(
16+
name = "grpc",
17+
srcs = glob(["*.java"]),
18+
tags = [
19+
"requires-network",
20+
],
21+
test_class = "com.google.devtools.build.lib.AllTests",
22+
deps = [
23+
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
24+
"//src/test/java/com/google/devtools/build/lib:test_runner",
25+
"//third_party:guava",
26+
"//third_party:junit4",
27+
"//third_party:mockito",
28+
"//third_party:rxjava3",
29+
"//third_party:truth",
30+
],
31+
)
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright 2021 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.grpc;
15+
16+
import static com.google.common.truth.Truth.assertThat;
17+
18+
import com.google.common.collect.ImmutableList;
19+
import io.reactivex.rxjava3.core.Single;
20+
import io.reactivex.rxjava3.observers.TestObserver;
21+
import java.io.IOException;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.junit.runners.JUnit4;
25+
26+
/** Tests for {@link TokenBucket} */
27+
@RunWith(JUnit4.class)
28+
public class TokenBucketTest {
29+
30+
@Test
31+
public void acquireToken_smoke() {
32+
TokenBucket<Integer> bucket = new TokenBucket<>();
33+
assertThat(bucket.size()).isEqualTo(0);
34+
bucket.addToken(0);
35+
assertThat(bucket.size()).isEqualTo(1);
36+
37+
TestObserver<Integer> observer = bucket.acquireToken().test();
38+
39+
observer.assertValue(0).assertComplete();
40+
assertThat(bucket.size()).isEqualTo(0);
41+
}
42+
43+
@Test
44+
public void acquireToken_releaseInitialTokens() {
45+
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0));
46+
assertThat(bucket.size()).isEqualTo(1);
47+
48+
TestObserver<Integer> observer = bucket.acquireToken().test();
49+
50+
observer.assertValue(0).assertComplete();
51+
assertThat(bucket.size()).isEqualTo(0);
52+
}
53+
54+
@Test
55+
public void acquireToken_multipleInitialTokens_releaseFirstToken() {
56+
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0, 1));
57+
assertThat(bucket.size()).isEqualTo(2);
58+
59+
TestObserver<Integer> observer = bucket.acquireToken().test();
60+
61+
observer.assertValue(0).assertComplete();
62+
assertThat(bucket.size()).isEqualTo(1);
63+
}
64+
65+
@Test
66+
public void acquireToken_multipleInitialTokens_releaseSecondToken() {
67+
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0, 1));
68+
assertThat(bucket.size()).isEqualTo(2);
69+
bucket.acquireToken().test().assertValue(0).assertComplete();
70+
71+
TestObserver<Integer> observer = bucket.acquireToken().test();
72+
73+
observer.assertValue(1).assertComplete();
74+
assertThat(bucket.size()).isEqualTo(0);
75+
}
76+
77+
@Test
78+
public void acquireToken_releaseTokenToPreviousObserver() {
79+
TokenBucket<Integer> bucket = new TokenBucket<>();
80+
TestObserver<Integer> observer = bucket.acquireToken().test();
81+
observer.assertEmpty();
82+
83+
bucket.addToken(0);
84+
85+
observer.assertValue(0).assertComplete();
86+
assertThat(bucket.size()).isEqualTo(0);
87+
}
88+
89+
@Test
90+
public void acquireToken_notReleaseTokenToDisposedObserver() {
91+
TokenBucket<Integer> bucket = new TokenBucket<>();
92+
TestObserver<Integer> observer = bucket.acquireToken().test();
93+
94+
observer.dispose();
95+
bucket.addToken(0);
96+
97+
observer.assertEmpty();
98+
assertThat(bucket.size()).isEqualTo(1);
99+
}
100+
101+
@Test
102+
public void acquireToken_disposeAfterTokenAcquired() {
103+
TokenBucket<Integer> bucket = new TokenBucket<>();
104+
TestObserver<Integer> observer = bucket.acquireToken().test();
105+
106+
bucket.addToken(0);
107+
bucket.addToken(1);
108+
109+
observer.assertValue(0).assertComplete();
110+
assertThat(bucket.size()).isEqualTo(1);
111+
}
112+
113+
@Test
114+
public void acquireToken_multipleObservers_onlyOneCanAcquire() {
115+
TokenBucket<Integer> bucket = new TokenBucket<>();
116+
TestObserver<Integer> observer1 = bucket.acquireToken().test();
117+
TestObserver<Integer> observer2 = bucket.acquireToken().test();
118+
119+
bucket.addToken(0);
120+
121+
if (!observer1.values().isEmpty()) {
122+
observer1.assertValue(0).assertComplete();
123+
observer2.assertEmpty();
124+
125+
bucket.addToken(1);
126+
observer2.assertValue(1).assertComplete();
127+
} else {
128+
observer1.assertEmpty();
129+
observer2.assertValue(0).assertComplete();
130+
131+
bucket.addToken(1);
132+
observer1.assertValue(1).assertComplete();
133+
}
134+
}
135+
136+
@Test
137+
public void acquireToken_reSubscription_waitAvailableToken() {
138+
TokenBucket<Integer> bucket = new TokenBucket<>();
139+
bucket.addToken(0);
140+
Single<Integer> tokenSingle = bucket.acquireToken();
141+
142+
TestObserver<Integer> observer1 = tokenSingle.test();
143+
TestObserver<Integer> observer2 = tokenSingle.test();
144+
145+
observer1.assertValue(0).assertComplete();
146+
observer2.assertEmpty();
147+
}
148+
149+
@Test
150+
public void acquireToken_reSubscription_acquireNewToken() {
151+
TokenBucket<Integer> bucket = new TokenBucket<>();
152+
bucket.addToken(0);
153+
Single<Integer> tokenSingle = bucket.acquireToken();
154+
TestObserver<Integer> observer1 = tokenSingle.test();
155+
TestObserver<Integer> observer2 = tokenSingle.test();
156+
157+
bucket.addToken(1);
158+
159+
observer1.assertValue(0).assertComplete();
160+
observer2.assertValue(1).assertComplete();
161+
}
162+
163+
@Test
164+
public void acquireToken_reSubscription_acquireNextToken() {
165+
TokenBucket<Integer> bucket = new TokenBucket<>();
166+
bucket.addToken(0);
167+
bucket.addToken(1);
168+
Single<Integer> tokenSingle = bucket.acquireToken();
169+
170+
TestObserver<Integer> observer1 = tokenSingle.test();
171+
TestObserver<Integer> observer2 = tokenSingle.test();
172+
173+
observer1.assertValue(0).assertComplete();
174+
observer2.assertValue(1).assertComplete();
175+
}
176+
177+
@Test
178+
public void acquireToken_disposed_tokenRemains() {
179+
TokenBucket<Integer> bucket = new TokenBucket<>();
180+
TestObserver<Integer> observer = bucket.acquireToken().test();
181+
observer.assertEmpty();
182+
183+
observer.dispose();
184+
bucket.addToken(0);
185+
186+
assertThat(bucket.size()).isEqualTo(1);
187+
}
188+
189+
@Test
190+
public void close_errorAfterClose() throws IOException {
191+
TokenBucket<Integer> bucket = new TokenBucket<>();
192+
bucket.addToken(0);
193+
bucket.close();
194+
195+
TestObserver<Integer> observer = bucket.acquireToken().test();
196+
197+
observer.assertError(
198+
e -> e instanceof IllegalStateException && e.getMessage().contains("closed"));
199+
}
200+
201+
@Test
202+
public void close_errorPreviousObservers() throws IOException {
203+
TokenBucket<Integer> bucket = new TokenBucket<>();
204+
TestObserver<Integer> observer = bucket.acquireToken().test();
205+
206+
bucket.close();
207+
208+
observer.assertError(
209+
e -> e instanceof IllegalStateException && e.getMessage().contains("closed"));
210+
}
211+
}

0 commit comments

Comments
 (0)