Skip to content

Commit 5575ff2

Browse files
amishra-ucopybara-github
authored andcommitted
feat: Implement failure circuit breaker
Copy of #18120: I accidentally closed #18120 during rebase and doesn't have permission to reopen. ### Issue We have noticed that any problems with the remote cache have a detrimental effect on build times. On investigation we found that the interface for the circuit breaker was left unimplemented. ### Solution To address this issue, implemented a failure circuit breaker, which includes three new Bazel flags: 1) experimental_circuitbreaker_strategy, 2) experimental_remote_failure_threshold, and 3) experimental_emote_failure_window. In this implementation, I have implemented failure strategy for circuit breaker and used failure count to trip the circuit. Reasoning behind using failure count instead of failure rate : To measure failure rate I also need the success count. While both the failure and success count need to be an AtomicInteger as both will be modified concurrently by multiple threads. Even though getAndIncrement is very light weight operation, at very high request it might contribute to latency. Reasoning behind using failure circuit breaker : A new instance of Retrier.CircuitBreaker is created for each build. Therefore, if the circuit breaker trips during a build, the remote cache will be disabled for that build. However, it will be enabled again for the next build as a new instance of Retrier.CircuitBreaker will be created. If needed in the future we may add cool down strategy also. e.g. failure_and_cool_down_startegy. closes #18136 Closes #18359. PiperOrigin-RevId: 536349954 Change-Id: I5e1c57d4ad0ce07ddc4808bf1f327bc5df6ce704
1 parent 468c056 commit 5575ff2

File tree

16 files changed

+554
-176
lines changed

16 files changed

+554
-176
lines changed

src/main/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package(
88
filegroup(
99
name = "srcs",
1010
srcs = glob(["*"]) + [
11+
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
1112
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
1213
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
1314
"//src/main/java/com/google/devtools/build/lib/remote/downloader:srcs",
@@ -85,6 +86,7 @@ java_library(
8586
"//src/main/java/com/google/devtools/build/lib/exec/local",
8687
"//src/main/java/com/google/devtools/build/lib/packages/semantics",
8788
"//src/main/java/com/google/devtools/build/lib/profiler",
89+
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
8890
"//src/main/java/com/google/devtools/build/lib/remote/common",
8991
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
9092
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,4 +500,8 @@ ListenableFuture<Void> uploadChunker(
500500
MoreExecutors.directExecutor());
501501
return f;
502502
}
503+
504+
Retrier getRetrier() {
505+
return this.retrier;
506+
}
503507
}

src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,8 @@ public void close() {
252252
}
253253
channel.release();
254254
}
255+
256+
RemoteRetrier getRetrier() {
257+
return this.retrier;
258+
}
255259
}

src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.google.devtools.build.lib.exec.ModuleActionContextRegistry;
6161
import com.google.devtools.build.lib.exec.SpawnStrategyRegistry;
6262
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
63+
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
6364
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
6465
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
6566
import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader;
@@ -510,12 +511,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
510511
GoogleAuthUtils.newCallCredentialsProvider(credentials);
511512
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
512513

514+
Retrier.CircuitBreaker circuitBreaker =
515+
CircuitBreakerFactory.createCircuitBreaker(remoteOptions);
513516
RemoteRetrier retrier =
514517
new RemoteRetrier(
515-
remoteOptions,
516-
RemoteRetrier.RETRIABLE_GRPC_ERRORS,
517-
retryScheduler,
518-
Retrier.ALLOW_ALL_CALLS);
518+
remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryScheduler, circuitBreaker);
519519

520520
// We only check required capabilities for a given endpoint.
521521
//
@@ -636,7 +636,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
636636
remoteOptions,
637637
RemoteRetrier.RETRIABLE_GRPC_ERRORS, // Handle NOT_FOUND internally
638638
retryScheduler,
639-
Retrier.ALLOW_ALL_CALLS);
639+
circuitBreaker);
640640
remoteExecutor =
641641
new ExperimentalGrpcRemoteExecutor(
642642
executionCapabilities,
@@ -650,7 +650,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
650650
remoteOptions,
651651
RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS,
652652
retryScheduler,
653-
Retrier.ALLOW_ALL_CALLS);
653+
circuitBreaker);
654654
remoteExecutor =
655655
new GrpcRemoteExecutor(
656656
executionCapabilities, execChannel.retain(), callCredentialsProvider, execRetrier);

src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.google.devtools.build.lib.profiler.SilentCloseable;
5858
import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult;
5959
import com.google.devtools.build.lib.remote.RemoteExecutionService.ServerLogs;
60+
import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory;
6061
import com.google.devtools.build.lib.remote.common.BulkTransferException;
6162
import com.google.devtools.build.lib.remote.common.OperationObserver;
6263
import com.google.devtools.build.lib.remote.options.RemoteOptions;
@@ -655,6 +656,8 @@ private void report(Event evt) {
655656
private static RemoteRetrier createExecuteRetrier(
656657
RemoteOptions options, ListeningScheduledExecutorService retryService) {
657658
return new ExecuteRetrier(
658-
options.remoteMaxRetryAttempts, retryService, Retrier.ALLOW_ALL_CALLS);
659+
options.remoteMaxRetryAttempts,
660+
retryService,
661+
CircuitBreakerFactory.createCircuitBreaker(options));
659662
}
660663
}

src/main/java/com/google/devtools/build/lib/remote/Retrier.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ enum State {
100100
State state();
101101

102102
/** Called after an execution failed. */
103-
void recordFailure();
103+
void recordFailure(Exception e);
104104

105105
/** Called after an execution succeeded. */
106106
void recordSuccess();
@@ -130,7 +130,7 @@ public State state() {
130130
}
131131

132132
@Override
133-
public void recordFailure() {}
133+
public void recordFailure(Exception e) {}
134134

135135
@Override
136136
public void recordSuccess() {}
@@ -245,7 +245,7 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
245245
circuitBreaker.recordSuccess();
246246
return r;
247247
} catch (Exception e) {
248-
circuitBreaker.recordFailure();
248+
circuitBreaker.recordFailure(e);
249249
Throwables.throwIfInstanceOf(e, InterruptedException.class);
250250
if (State.TRIAL_CALL.equals(circuitState)) {
251251
throw e;
@@ -272,19 +272,35 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
272272
* backoff.
273273
*/
274274
public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backoff) {
275+
final State circuitState = circuitBreaker.state();
276+
if (State.REJECT_CALLS.equals(circuitState)) {
277+
return Futures.immediateFailedFuture(new CircuitBreakerException());
278+
}
275279
try {
280+
ListenableFuture<T> future =
281+
Futures.transformAsync(
282+
call.call(),
283+
(f) -> {
284+
circuitBreaker.recordSuccess();
285+
return Futures.immediateFuture(f);
286+
},
287+
MoreExecutors.directExecutor());
276288
return Futures.catchingAsync(
277-
call.call(),
289+
future,
278290
Exception.class,
279-
t -> onExecuteAsyncFailure(t, call, backoff),
291+
t -> onExecuteAsyncFailure(t, call, backoff, circuitState),
280292
MoreExecutors.directExecutor());
281293
} catch (Exception e) {
282-
return onExecuteAsyncFailure(e, call, backoff);
294+
return onExecuteAsyncFailure(e, call, backoff, circuitState);
283295
}
284296
}
285297

286298
private <T> ListenableFuture<T> onExecuteAsyncFailure(
287-
Exception t, AsyncCallable<T> call, Backoff backoff) {
299+
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
300+
circuitBreaker.recordFailure(t);
301+
if (circuitState.equals(State.TRIAL_CALL)) {
302+
return Futures.immediateFailedFuture(t);
303+
}
288304
if (isRetriable(t)) {
289305
long waitMillis = backoff.nextDelayMillis(t);
290306
if (waitMillis >= 0) {
@@ -310,4 +326,8 @@ public Backoff newBackoff() {
310326
public boolean isRetriable(Exception e) {
311327
return shouldRetry.test(e);
312328
}
329+
330+
CircuitBreaker getCircuitBreaker() {
331+
return this.circuitBreaker;
332+
}
313333
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
load("@rules_java//java:defs.bzl", "java_library")
2+
3+
package(
4+
default_applicable_licenses = ["//:license"],
5+
default_visibility = ["//src:__subpackages__"],
6+
)
7+
8+
filegroup(
9+
name = "srcs",
10+
srcs = glob(["*"]),
11+
visibility = ["//src:__subpackages__"],
12+
)
13+
14+
java_library(
15+
name = "circuitbreaker",
16+
srcs = glob(["*.java"]),
17+
deps = [
18+
"//src/main/java/com/google/devtools/build/lib/remote:Retrier",
19+
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
20+
"//src/main/java/com/google/devtools/build/lib/remote/options",
21+
"//third_party:guava",
22+
],
23+
)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2023 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.circuitbreaker;
15+
16+
import com.google.common.collect.ImmutableSet;
17+
import com.google.devtools.build.lib.remote.Retrier;
18+
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
19+
import com.google.devtools.build.lib.remote.options.RemoteOptions;
20+
21+
/** Factory for {@link Retrier.CircuitBreaker} */
22+
public class CircuitBreakerFactory {
23+
24+
public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
25+
ImmutableSet.of(CacheNotFoundException.class);
26+
27+
private CircuitBreakerFactory() {}
28+
29+
/**
30+
* Creates the instance of the {@link Retrier.CircuitBreaker} as per the strategy defined in
31+
* {@link RemoteOptions}. In case of undefined strategy defaults to {@link
32+
* Retrier.ALLOW_ALL_CALLS} implementation.
33+
*
34+
* @param remoteOptions The configuration for the CircuitBreaker implementation.
35+
* @return an instance of CircuitBreaker.
36+
*/
37+
public static Retrier.CircuitBreaker createCircuitBreaker(final RemoteOptions remoteOptions) {
38+
if (remoteOptions.circuitBreakerStrategy == RemoteOptions.CircuitBreakerStrategy.FAILURE) {
39+
return new FailureCircuitBreaker(
40+
remoteOptions.remoteFailureThreshold,
41+
(int) remoteOptions.remoteFailureWindowInterval.toMillis());
42+
}
43+
return Retrier.ALLOW_ALL_CALLS;
44+
}
45+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2023 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.circuitbreaker;
15+
16+
import com.google.common.collect.ImmutableSet;
17+
import com.google.devtools.build.lib.remote.Retrier;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
/**
24+
* The {@link FailureCircuitBreaker} implementation of the {@link Retrier.CircuitBreaker} prevents
25+
* further calls to a remote cache once the number of failures within a given window exceeds a
26+
* specified threshold for a build. In the context of Bazel, a new instance of {@link
27+
* Retrier.CircuitBreaker} is created for each build. Therefore, if the circuit breaker trips during
28+
* a build, the remote cache will be disabled for that build. However, it will be enabled again for
29+
* the next build as a new instance of {@link Retrier.CircuitBreaker} will be created.
30+
*/
31+
public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
32+
33+
private State state;
34+
private final AtomicInteger failures;
35+
private final int failureThreshold;
36+
private final int slidingWindowSize;
37+
private final ScheduledExecutorService scheduledExecutor;
38+
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;
39+
40+
/**
41+
* Creates a {@link FailureCircuitBreaker}.
42+
*
43+
* @param failureThreshold is used to set the number of failures required to trip the circuit
44+
* breaker in given time window.
45+
* @param slidingWindowSize the size of the sliding window in milliseconds to calculate the number
46+
* of failures.
47+
*/
48+
public FailureCircuitBreaker(int failureThreshold, int slidingWindowSize) {
49+
this.failureThreshold = failureThreshold;
50+
this.failures = new AtomicInteger(0);
51+
this.slidingWindowSize = slidingWindowSize;
52+
this.state = State.ACCEPT_CALLS;
53+
this.scheduledExecutor =
54+
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
55+
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
56+
}
57+
58+
@Override
59+
public State state() {
60+
return this.state;
61+
}
62+
63+
@Override
64+
public void recordFailure(Exception e) {
65+
if (!ignoredErrors.contains(e.getClass())) {
66+
int failureCount = failures.incrementAndGet();
67+
if (slidingWindowSize > 0) {
68+
var unused =
69+
scheduledExecutor.schedule(
70+
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
71+
}
72+
// Since the state can only be changed to the open state, synchronization is not required.
73+
if (failureCount > this.failureThreshold) {
74+
this.state = State.REJECT_CALLS;
75+
}
76+
}
77+
}
78+
79+
@Override
80+
public void recordSuccess() {
81+
// do nothing, implement if we need to set threshold on failure rate instead of count.
82+
}
83+
}

src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,43 @@ public RemoteOutputsStrategyConverter() {
660660
+ "cache misses and retries.")
661661
public boolean remoteDiscardMerkleTrees;
662662

663+
@Option(
664+
name = "experimental_circuit_breaker_strategy",
665+
documentationCategory = OptionDocumentationCategory.REMOTE,
666+
defaultValue = "null",
667+
effectTags = {OptionEffectTag.EXECUTION},
668+
converter = CircuitBreakerStrategy.Converter.class,
669+
help =
670+
"Specifies the strategy for the circuit breaker to use. Available strategies are"
671+
+ " \"failure\". On invalid value for the option the behavior same as the option is"
672+
+ " not set.")
673+
public CircuitBreakerStrategy circuitBreakerStrategy;
674+
675+
@Option(
676+
name = "experimental_remote_failure_threshold",
677+
defaultValue = "100",
678+
documentationCategory = OptionDocumentationCategory.REMOTE,
679+
effectTags = {OptionEffectTag.EXECUTION},
680+
help =
681+
"Sets the allowed number of failures in a specific time window after which it stops"
682+
+ " calling to the remote cache/executor. By default the value is 100. Setting this"
683+
+ " to 0 or negative means no limitation.")
684+
public int remoteFailureThreshold;
685+
686+
@Option(
687+
name = "experimental_remote_failure_window_interval",
688+
defaultValue = "60s",
689+
documentationCategory = OptionDocumentationCategory.REMOTE,
690+
effectTags = {OptionEffectTag.EXECUTION},
691+
converter = RemoteDurationConverter.class,
692+
help =
693+
"The interval in which the failure count of the remote requests are computed. On zero or"
694+
+ " negative value the failure duration is computed the whole duration of the"
695+
+ " execution.Following units can be used: Days (d), hours (h), minutes (m), seconds"
696+
+ " (s), and milliseconds (ms). If the unit is omitted, the value is interpreted as"
697+
+ " seconds.")
698+
public Duration remoteFailureWindowInterval;
699+
663700
// The below options are not configurable by users, only tests.
664701
// This is part of the effort to reduce the overall number of flags.
665702

@@ -749,4 +786,16 @@ public boolean shouldPrintMessages(boolean success) {
749786
|| this == ExecutionMessagePrintMode.ALL);
750787
}
751788
}
789+
790+
/** An enum for specifying different strategy for circuit breaker. */
791+
public enum CircuitBreakerStrategy {
792+
FAILURE;
793+
794+
/** Converts to {@link CircuitBreakerStrategy}. */
795+
public static class Converter extends EnumConverter<CircuitBreakerStrategy> {
796+
public Converter() {
797+
super(CircuitBreakerStrategy.class, "CircuitBreaker strategy");
798+
}
799+
}
800+
}
752801
}

0 commit comments

Comments
 (0)