Skip to content

Commit 6c2ac00

Browse files
amishra-utraversaro
authored andcommitted
Update ignored_error logic for circuit_breaker
When the digest size exceeds the max configured digest size by remote-cache, an "out_of_range" error is returned. These errors should not be considered as API failures for the circuit breaker logic, as they do not indicate any issues with the remote-cache service. Similarly there are other non-retriable errors that should not be treated as server failure such as ALREADY_EXISTS. This change considers non-retriable errors as user/client error and logs them as success. While retriable errors such `DEADLINE_EXCEEDED`, `UNKNOWN` etc are logged as failure. Related PRs bazelbuild#18359 bazelbuild#18539 Closes bazelbuild#18613. PiperOrigin-RevId: 539948823 Change-Id: I5b51f6a3aecab7c17d73f78b8234d9a6da49fe6c
1 parent 455ca8b commit 6c2ac00

File tree

5 files changed

+93
-67
lines changed

5 files changed

+93
-67
lines changed

src/main/java/com/google/devtools/build/lib/remote/Retrier.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.util.concurrent.MoreExecutors;
2424
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
2525
import java.io.IOException;
26+
import java.util.Objects;
2627
import java.util.concurrent.Callable;
2728
import java.util.concurrent.RejectedExecutionException;
2829
import java.util.concurrent.TimeUnit;
@@ -100,7 +101,7 @@ enum State {
100101
State state();
101102

102103
/** Called after an execution failed. */
103-
void recordFailure(Exception e);
104+
void recordFailure();
104105

105106
/** Called after an execution succeeded. */
106107
void recordSuccess();
@@ -130,7 +131,7 @@ public State state() {
130131
}
131132

132133
@Override
133-
public void recordFailure(Exception e) {}
134+
public void recordFailure() {}
134135

135136
@Override
136137
public void recordSuccess() {}
@@ -245,12 +246,14 @@ public <T> T execute(Callable<T> call, Backoff backoff) throws Exception {
245246
circuitBreaker.recordSuccess();
246247
return r;
247248
} catch (Exception e) {
248-
circuitBreaker.recordFailure(e);
249249
Throwables.throwIfInstanceOf(e, InterruptedException.class);
250-
if (State.TRIAL_CALL.equals(circuitState)) {
250+
if (!shouldRetry.test(e)) {
251+
// A non-retriable error doesn't represent server failure.
252+
circuitBreaker.recordSuccess();
251253
throw e;
252254
}
253-
if (!shouldRetry.test(e)) {
255+
circuitBreaker.recordFailure();
256+
if (Objects.equals(circuitState, State.TRIAL_CALL)) {
254257
throw e;
255258
}
256259
final long delayMillis = backoff.nextDelayMillis(e);
@@ -297,11 +300,11 @@ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, Backoff backo
297300

298301
private <T> ListenableFuture<T> onExecuteAsyncFailure(
299302
Exception t, AsyncCallable<T> call, Backoff backoff, State circuitState) {
300-
circuitBreaker.recordFailure(t);
301-
if (circuitState.equals(State.TRIAL_CALL)) {
302-
return Futures.immediateFailedFuture(t);
303-
}
304303
if (isRetriable(t)) {
304+
circuitBreaker.recordFailure();
305+
if (circuitState.equals(State.TRIAL_CALL)) {
306+
return Futures.immediateFailedFuture(t);
307+
}
305308
long waitMillis = backoff.nextDelayMillis(t);
306309
if (waitMillis >= 0) {
307310
try {
@@ -315,6 +318,10 @@ private <T> ListenableFuture<T> onExecuteAsyncFailure(
315318
return Futures.immediateFailedFuture(t);
316319
}
317320
} else {
321+
// gRPC Errors NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS etc. are non-retriable error, and they
322+
// don't represent an
323+
// issue in Server. So treating these errors as successful api call.
324+
circuitBreaker.recordSuccess();
318325
return Futures.immediateFailedFuture(t);
319326
}
320327
}

src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/CircuitBreakerFactory.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.circuitbreaker;
1515

16-
import com.google.common.collect.ImmutableSet;
1716
import com.google.devtools.build.lib.remote.Retrier;
18-
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
1917
import com.google.devtools.build.lib.remote.options.RemoteOptions;
2018

2119
/** Factory for {@link Retrier.CircuitBreaker} */
2220
public class CircuitBreakerFactory {
23-
24-
public static final ImmutableSet<Class<? extends Exception>> DEFAULT_IGNORED_ERRORS =
25-
ImmutableSet.of(CacheNotFoundException.class);
2621
public static final int DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE = 100;
2722

2823
private CircuitBreakerFactory() {}

src/main/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreaker.java

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote.circuitbreaker;
1515

16-
import com.google.common.collect.ImmutableSet;
1716
import com.google.devtools.build.lib.remote.Retrier;
1817
import java.util.concurrent.Executors;
1918
import java.util.concurrent.ScheduledExecutorService;
@@ -33,12 +32,10 @@ public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
3332
private State state;
3433
private final AtomicInteger successes;
3534
private final AtomicInteger failures;
36-
private final AtomicInteger ignoredFailures;
3735
private final int failureRateThreshold;
3836
private final int slidingWindowSize;
3937
private final int minCallCountToComputeFailureRate;
4038
private final ScheduledExecutorService scheduledExecutor;
41-
private final ImmutableSet<Class<? extends Exception>> ignoredErrors;
4239

4340
/**
4441
* Creates a {@link FailureCircuitBreaker}.
@@ -51,15 +48,13 @@ public class FailureCircuitBreaker implements Retrier.CircuitBreaker {
5148
public FailureCircuitBreaker(int failureRateThreshold, int slidingWindowSize) {
5249
this.failures = new AtomicInteger(0);
5350
this.successes = new AtomicInteger(0);
54-
this.ignoredFailures = new AtomicInteger(0);
5551
this.failureRateThreshold = failureRateThreshold;
5652
this.slidingWindowSize = slidingWindowSize;
5753
this.minCallCountToComputeFailureRate =
5854
CircuitBreakerFactory.DEFAULT_MIN_CALL_COUNT_TO_COMPUTE_FAILURE_RATE;
5955
this.state = State.ACCEPT_CALLS;
6056
this.scheduledExecutor =
6157
slidingWindowSize > 0 ? Executors.newSingleThreadScheduledExecutor() : null;
62-
this.ignoredErrors = CircuitBreakerFactory.DEFAULT_IGNORED_ERRORS;
6358
}
6459

6560
@Override
@@ -68,33 +63,24 @@ public State state() {
6863
}
6964

7065
@Override
71-
public void recordFailure(Exception e) {
72-
if (!ignoredErrors.contains(e.getClass())) {
73-
int failureCount = failures.incrementAndGet();
74-
int totalCallCount = successes.get() + failureCount + ignoredFailures.get();
75-
if (slidingWindowSize > 0) {
76-
var unused =
77-
scheduledExecutor.schedule(
78-
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
79-
}
66+
public void recordFailure() {
67+
int failureCount = failures.incrementAndGet();
68+
int totalCallCount = successes.get() + failureCount;
69+
if (slidingWindowSize > 0) {
70+
var unused =
71+
scheduledExecutor.schedule(
72+
failures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
73+
}
8074

81-
if (totalCallCount < minCallCountToComputeFailureRate) {
82-
// The remote call count is below the threshold required to calculate the failure rate.
83-
return;
84-
}
85-
double failureRate = (failureCount * 100.0) / totalCallCount;
75+
if (totalCallCount < minCallCountToComputeFailureRate) {
76+
// The remote call count is below the threshold required to calculate the failure rate.
77+
return;
78+
}
79+
double failureRate = (failureCount * 100.0) / totalCallCount;
8680

87-
// Since the state can only be changed to the open state, synchronization is not required.
88-
if (failureRate > this.failureRateThreshold) {
89-
this.state = State.REJECT_CALLS;
90-
}
91-
} else {
92-
ignoredFailures.incrementAndGet();
93-
if (slidingWindowSize > 0) {
94-
var unused =
95-
scheduledExecutor.schedule(
96-
ignoredFailures::decrementAndGet, slidingWindowSize, TimeUnit.MILLISECONDS);
97-
}
81+
// Since the state can only be changed to the open state, synchronization is not required.
82+
if (failureRate > this.failureRateThreshold) {
83+
this.state = State.REJECT_CALLS;
9884
}
9985
}
10086

src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import static com.google.common.truth.Truth.assertThat;
1818
import static org.junit.Assert.assertThrows;
19-
import static org.mockito.ArgumentMatchers.any;
2019
import static org.mockito.Mockito.never;
2120
import static org.mockito.Mockito.times;
2221
import static org.mockito.Mockito.verify;
@@ -31,6 +30,10 @@
3130
import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException;
3231
import com.google.devtools.build.lib.remote.Retrier.ZeroBackoff;
3332
import com.google.devtools.build.lib.testutil.TestUtils;
33+
import io.grpc.Status;
34+
import io.grpc.StatusRuntimeException;
35+
import java.util.Arrays;
36+
import java.util.List;
3437
import java.util.concurrent.ExecutionException;
3538
import java.util.concurrent.Executors;
3639
import java.util.concurrent.TimeUnit;
@@ -94,7 +97,7 @@ public void retryShouldWork_failure() throws Exception {
9497
assertThat(e).hasMessageThat().isEqualTo("call failed");
9598

9699
assertThat(numCalls.get()).isEqualTo(3);
97-
verify(alwaysOpen, times(3)).recordFailure(any(Exception.class));
100+
verify(alwaysOpen, times(3)).recordFailure();
98101
verify(alwaysOpen, never()).recordSuccess();
99102
}
100103

@@ -118,8 +121,8 @@ public void retryShouldWorkNoRetries_failure() throws Exception {
118121
assertThat(e).hasMessageThat().isEqualTo("call failed");
119122

120123
assertThat(numCalls.get()).isEqualTo(1);
121-
verify(alwaysOpen, times(1)).recordFailure(e);
122-
verify(alwaysOpen, never()).recordSuccess();
124+
verify(alwaysOpen, never()).recordFailure();
125+
verify(alwaysOpen, times(1)).recordSuccess();
123126
}
124127

125128
@Test
@@ -139,7 +142,7 @@ public void retryShouldWork_success() throws Exception {
139142
});
140143
assertThat(val).isEqualTo(1);
141144

142-
verify(alwaysOpen, times(2)).recordFailure(any(Exception.class));
145+
verify(alwaysOpen, times(2)).recordFailure();
143146
verify(alwaysOpen, times(1)).recordSuccess();
144147
}
145148

@@ -332,6 +335,46 @@ public void asyncRetryEmptyError() throws Exception {
332335
assertThat(e).hasCauseThat().hasMessageThat().isEqualTo("");
333336
}
334337

338+
@Test
339+
public void testCircuitBreakerFailureAndSuccessCallOnDifferentGrpcError() {
340+
int maxRetries = 2;
341+
Supplier<Backoff> s = () -> new ZeroBackoff(maxRetries);
342+
List<Status> retriableGrpcError =
343+
Arrays.asList(Status.ABORTED, Status.UNKNOWN, Status.DEADLINE_EXCEEDED);
344+
List<Status> nonRetriableGrpcError =
345+
Arrays.asList(Status.NOT_FOUND, Status.OUT_OF_RANGE, Status.ALREADY_EXISTS);
346+
TripAfterNCircuitBreaker cb =
347+
new TripAfterNCircuitBreaker(retriableGrpcError.size() * (maxRetries + 1));
348+
Retrier r = new Retrier(s, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService, cb);
349+
350+
int expectedConsecutiveFailures = 0;
351+
352+
for (Status status : retriableGrpcError) {
353+
ListenableFuture<Void> res =
354+
r.executeAsync(
355+
() -> {
356+
throw new StatusRuntimeException(status);
357+
});
358+
expectedConsecutiveFailures += maxRetries + 1;
359+
assertThrows(ExecutionException.class, res::get);
360+
assertThat(cb.consecutiveFailures).isEqualTo(expectedConsecutiveFailures);
361+
}
362+
363+
assertThat(cb.state).isEqualTo(State.REJECT_CALLS);
364+
cb.trialCall();
365+
366+
for (Status status : nonRetriableGrpcError) {
367+
ListenableFuture<Void> res =
368+
r.executeAsync(
369+
() -> {
370+
throw new StatusRuntimeException(status);
371+
});
372+
assertThat(cb.consecutiveFailures).isEqualTo(0);
373+
assertThrows(ExecutionException.class, res::get);
374+
}
375+
assertThat(cb.state).isEqualTo(State.ACCEPT_CALLS);
376+
}
377+
335378
/** Simple circuit breaker that trips after N consecutive failures. */
336379
@ThreadSafe
337380
private static class TripAfterNCircuitBreaker implements CircuitBreaker {
@@ -351,7 +394,7 @@ public synchronized State state() {
351394
}
352395

353396
@Override
354-
public synchronized void recordFailure(Exception e) {
397+
public synchronized void recordFailure() {
355398
consecutiveFailures++;
356399
if (consecutiveFailures >= maxConsecutiveFailures) {
357400
state = State.REJECT_CALLS;

src/test/java/com/google/devtools/build/lib/remote/circuitbreaker/FailureCircuitBreakerTest.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515

1616
import static com.google.common.truth.Truth.assertThat;
1717

18-
import build.bazel.remote.execution.v2.Digest;
1918
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
20-
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
2119
import java.util.ArrayList;
2220
import java.util.Collections;
2321
import java.util.List;
@@ -30,40 +28,37 @@
3028
public class FailureCircuitBreakerTest {
3129

3230
@Test
33-
public void testRecordFailure_withIgnoredErrors() throws InterruptedException {
31+
public void testRecordFailure_circuitTrips() throws InterruptedException {
3432
final int failureRateThreshold = 10;
3533
final int windowInterval = 100;
3634
FailureCircuitBreaker failureCircuitBreaker =
3735
new FailureCircuitBreaker(failureRateThreshold, windowInterval);
3836

39-
List<Exception> listOfExceptionThrownOnFailure = new ArrayList<>();
37+
List<Runnable> listOfSuccessAndFailureCalls = new ArrayList<>();
4038
for (int index = 0; index < failureRateThreshold; index++) {
41-
listOfExceptionThrownOnFailure.add(new Exception());
39+
listOfSuccessAndFailureCalls.add(failureCircuitBreaker::recordFailure);
4240
}
41+
4342
for (int index = 0; index < failureRateThreshold * 9; index++) {
44-
listOfExceptionThrownOnFailure.add(new CacheNotFoundException(Digest.newBuilder().build()));
43+
listOfSuccessAndFailureCalls.add(failureCircuitBreaker::recordSuccess);
4544
}
4645

47-
Collections.shuffle(listOfExceptionThrownOnFailure);
46+
Collections.shuffle(listOfSuccessAndFailureCalls);
4847

4948
// make calls equals to threshold number of not ignored failure calls in parallel.
50-
listOfExceptionThrownOnFailure.stream()
51-
.parallel()
52-
.forEach(failureCircuitBreaker::recordFailure);
49+
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
5350
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);
5451

5552
// Sleep for windowInterval + 1ms.
5653
Thread.sleep(windowInterval + 1 /*to compensate any delay*/);
5754

5855
// make calls equals to threshold number of not ignored failure calls in parallel.
59-
listOfExceptionThrownOnFailure.stream()
60-
.parallel()
61-
.forEach(failureCircuitBreaker::recordFailure);
56+
listOfSuccessAndFailureCalls.stream().parallel().forEach(Runnable::run);
6257
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);
6358

6459
// Sleep for less than windowInterval.
6560
Thread.sleep(windowInterval - 5);
66-
failureCircuitBreaker.recordFailure(new Exception());
61+
failureCircuitBreaker.recordFailure();
6762
assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS);
6863
}
6964

@@ -80,15 +75,15 @@ public void testRecordFailure_minCallCriteriaNotMet() throws InterruptedExceptio
8075
// minCallToComputeFailure.
8176
IntStream.range(0, minCallToComputeFailure >> 1)
8277
.parallel()
83-
.forEach(i -> failureCircuitBreaker.recordFailure(new Exception()));
78+
.forEach(i -> failureCircuitBreaker.recordFailure());
8479
IntStream.range(0, minCallToComputeFailure >> 1)
8580
.parallel()
8681
.forEach(i -> failureCircuitBreaker.recordSuccess());
8782
assertThat(failureCircuitBreaker.state()).isEqualTo(State.ACCEPT_CALLS);
8883

8984
// Sleep for less than windowInterval.
90-
Thread.sleep(windowInterval - 20);
91-
failureCircuitBreaker.recordFailure(new Exception());
85+
Thread.sleep(windowInterval - 50);
86+
failureCircuitBreaker.recordFailure();
9287
assertThat(failureCircuitBreaker.state()).isEqualTo(State.REJECT_CALLS);
9388
}
9489
}

0 commit comments

Comments
 (0)