Skip to content

Commit b5a92e4

Browse files
feat: add Batcher#close(timeout) and Batcher#cancelOutstanding (#3141)
There have been reports of batcher.close() hanging every once in awhile. Currently it is impossible to debug because we dont expose any internal state to analyze. This PR adds 2 additional methods that should help in diagnosing issues: 1. close(timeout) will try to close the batcher, but if any of the underlying batch operations fail, the exception message will contain a wealth of information describing the underlying state of operations as provided by #3140 2. cancelOutstanding this allows for remediation for close(timeout) throwing an exception. The intended usecase is dataflow connector's FinishBundle: ```java try { batcher.close(Duration.ofMinutes(1)); } catch(TimeoutException e) { // log details why the batch failed to close with the help of #3140 logger.error(e); batcher.cancelOutstanding(); batcher.close(Duration.ofMinutes(1)); } ``` Example exception message: > Exception in thread "main" com.google.api.gax.batching.BatchingException: Timed out trying to close batcher after PT1S. Batch request prototype: com.google.cloud.bigtable.data.v2.models.BulkMutation@2bac9ba. Outstanding batches: Batch{operation=CallbackChainRetryingFuture{super=null, latestCompletedAttemptResult=ImmediateFailedFuture@6a9d5dff[status=FAILURE, cause=[com.google.cloud.bigtable.data.v2.models.MutateRowsException: Some mutations failed to apply]], attemptResult=null, attemptSettings=TimedAttemptSettings{globalSettings=RetrySettings{totalTimeout=PT10M, initialRetryDelay=PT0.01S, retryDelayMultiplier=2.0, maxRetryDelay=PT1M, maxAttempts=0, jittered=true, initialRpcTimeout=PT1M, rpcTimeoutMultiplier=1.0, maxRpcTimeout=PT1M}, retryDelay=PT1.28S, rpcTimeout=PT1M, randomizedRetryDelay=PT0.877S, attemptCount=8, overallAttemptCount=8, firstAttemptStartTimeNanos=646922035424541}}, elements=com.google.cloud.bigtable.data.v2.models.RowMutationEntry@7a344b65} Co-authored-by: Blake Li <[email protected]>
1 parent e2f8b8b commit b5a92e4

File tree

4 files changed

+228
-13
lines changed

4 files changed

+228
-13
lines changed

gax-java/gax/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,10 @@
105105
<className>com/google/api/gax/tracing/MetricsTracer</className>
106106
<field>*</field>
107107
</difference>
108+
<!-- Ignore method additions to an InternalExtensionOnly interface-->
109+
<difference>
110+
<differenceType>7012</differenceType>
111+
<className>com/google/api/gax/batching/Batcher</className>
112+
<method>*</method>
113+
</difference>
108114
</differences>

gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import com.google.api.core.ApiFuture;
3333
import com.google.api.core.InternalExtensionOnly;
3434
import com.google.api.gax.rpc.ApiCallContext;
35+
import java.time.Duration;
36+
import java.util.concurrent.TimeoutException;
37+
import javax.annotation.Nullable;
3538

3639
/**
3740
* Represents a batching context where individual elements will be accumulated and flushed in a
@@ -77,13 +80,25 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
7780
*/
7881
void sendOutstanding();
7982

83+
/** Cancels all outstanding batch RPCs. */
84+
void cancelOutstanding();
85+
8086
/**
81-
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
82-
* elements.
87+
* Closes this Batcher by preventing new elements from being added, then flushing the existing
88+
* elements and waiting for all the outstanding work to be resolved.
8389
*/
8490
@Override
8591
void close() throws InterruptedException;
8692

93+
/**
94+
* Closes this Batcher by preventing new elements from being added, then flushing the existing
95+
* elements and waiting for all the outstanding work to be resolved. If all of the outstanding
96+
* work has not been resolved, then a {@link BatchingException} will be thrown with details of the
97+
* remaining work. The batcher will remain in a closed state and will not allow additional
98+
* elements to be added.
99+
*/
100+
void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException;
101+
87102
/**
88103
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
89104
* elements. The returned future will be resolved when the last element completes

gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

+78-11
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,29 @@
4242
import com.google.api.gax.rpc.ApiCallContext;
4343
import com.google.api.gax.rpc.UnaryCallable;
4444
import com.google.common.annotations.VisibleForTesting;
45+
import com.google.common.base.MoreObjects;
4546
import com.google.common.base.Preconditions;
4647
import com.google.common.base.Stopwatch;
4748
import com.google.common.util.concurrent.Futures;
4849
import java.lang.ref.Reference;
4950
import java.lang.ref.ReferenceQueue;
5051
import java.lang.ref.SoftReference;
5152
import java.lang.ref.WeakReference;
53+
import java.time.Duration;
5254
import java.util.ArrayList;
5355
import java.util.List;
56+
import java.util.Optional;
57+
import java.util.StringJoiner;
5458
import java.util.concurrent.ConcurrentHashMap;
5559
import java.util.concurrent.ConcurrentMap;
5660
import java.util.concurrent.ExecutionException;
5761
import java.util.concurrent.Future;
5862
import java.util.concurrent.ScheduledExecutorService;
5963
import java.util.concurrent.TimeUnit;
60-
import java.util.concurrent.atomic.AtomicInteger;
64+
import java.util.concurrent.TimeoutException;
6165
import java.util.logging.Level;
6266
import java.util.logging.Logger;
67+
import javax.annotation.Nonnull;
6368
import javax.annotation.Nullable;
6469

6570
/**
@@ -86,7 +91,8 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
8691
private final BatcherReference currentBatcherReference;
8792

8893
private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
89-
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
94+
private final ConcurrentMap<Batch<ElementT, ElementResultT, RequestT, ResponseT>, Boolean>
95+
outstandingBatches = new ConcurrentHashMap<>();
9096
private final Object flushLock = new Object();
9197
private final Object elementLock = new Object();
9298
private final Future<?> scheduledFuture;
@@ -297,8 +303,10 @@ public void sendOutstanding() {
297303
} catch (Exception ex) {
298304
batchResponse = ApiFutures.immediateFailedFuture(ex);
299305
}
306+
accumulatedBatch.setResponseFuture(batchResponse);
307+
308+
outstandingBatches.put(accumulatedBatch, Boolean.TRUE);
300309

301-
numOfOutstandingBatches.incrementAndGet();
302310
ApiFutures.addCallback(
303311
batchResponse,
304312
new ApiFutureCallback<ResponseT>() {
@@ -310,7 +318,7 @@ public void onSuccess(ResponseT response) {
310318
accumulatedBatch.resource.getByteCount());
311319
accumulatedBatch.onBatchSuccess(response);
312320
} finally {
313-
onBatchCompletion();
321+
onBatchCompletion(accumulatedBatch);
314322
}
315323
}
316324

@@ -322,18 +330,19 @@ public void onFailure(Throwable throwable) {
322330
accumulatedBatch.resource.getByteCount());
323331
accumulatedBatch.onBatchFailure(throwable);
324332
} finally {
325-
onBatchCompletion();
333+
onBatchCompletion(accumulatedBatch);
326334
}
327335
}
328336
},
329337
directExecutor());
330338
}
331339

332-
private void onBatchCompletion() {
340+
private void onBatchCompletion(Batch<ElementT, ElementResultT, RequestT, ResponseT> batch) {
333341
boolean shouldClose = false;
334342

335343
synchronized (flushLock) {
336-
if (numOfOutstandingBatches.decrementAndGet() == 0) {
344+
outstandingBatches.remove(batch);
345+
if (outstandingBatches.isEmpty()) {
337346
flushLock.notifyAll();
338347
shouldClose = closeFuture != null;
339348
}
@@ -349,22 +358,43 @@ private void onBatchCompletion() {
349358
}
350359

351360
private void awaitAllOutstandingBatches() throws InterruptedException {
352-
while (numOfOutstandingBatches.get() > 0) {
361+
while (!outstandingBatches.isEmpty()) {
353362
synchronized (flushLock) {
354363
// Check again under lock to avoid racing with onBatchCompletion
355-
if (numOfOutstandingBatches.get() == 0) {
364+
if (outstandingBatches.isEmpty()) {
356365
break;
357366
}
358367
flushLock.wait();
359368
}
360369
}
361370
}
362371

372+
@Override
373+
public void cancelOutstanding() {
374+
for (Batch<?, ?, ?, ?> batch : outstandingBatches.keySet()) {
375+
batch.cancel();
376+
}
377+
}
363378
/** {@inheritDoc} */
364379
@Override
365380
public void close() throws InterruptedException {
366381
try {
367-
closeAsync().get();
382+
close(null);
383+
} catch (TimeoutException e) {
384+
// should never happen with a null timeout
385+
throw new IllegalStateException(
386+
"Unexpected timeout exception when trying to close the batcher without a timeout", e);
387+
}
388+
}
389+
390+
@Override
391+
public void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException {
392+
try {
393+
if (timeout != null) {
394+
closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
395+
} else {
396+
closeAsync().get();
397+
}
368398
} catch (ExecutionException e) {
369399
// Original stacktrace of a batching exception is not useful, so rethrow the error with
370400
// the caller stacktrace
@@ -374,6 +404,17 @@ public void close() throws InterruptedException {
374404
} else {
375405
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
376406
}
407+
} catch (TimeoutException e) {
408+
StringJoiner batchesStr = new StringJoiner(",");
409+
for (Batch<ElementT, ElementResultT, RequestT, ResponseT> batch :
410+
outstandingBatches.keySet()) {
411+
batchesStr.add(batch.toString());
412+
}
413+
String msg = "Timed out trying to close batcher after " + timeout + ".";
414+
msg += " Batch request prototype: " + prototype + ".";
415+
msg += " Outstanding batches: " + batchesStr;
416+
417+
throw new TimeoutException(msg);
377418
}
378419
}
379420

@@ -393,7 +434,7 @@ public ApiFuture<Void> closeAsync() {
393434
// prevent admission of new elements
394435
closeFuture = SettableApiFuture.create();
395436
// check if we can close immediately
396-
closeImmediately = numOfOutstandingBatches.get() == 0;
437+
closeImmediately = outstandingBatches.isEmpty();
397438
}
398439

399440
// Clean up accounting
@@ -435,6 +476,8 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
435476
private long totalThrottledTimeMs = 0;
436477
private BatchResource resource;
437478

479+
private volatile ApiFuture<ResponseT> responseFuture;
480+
438481
private Batch(
439482
RequestT prototype,
440483
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
@@ -457,6 +500,17 @@ void add(
457500
totalThrottledTimeMs += throttledTimeMs;
458501
}
459502

503+
void setResponseFuture(@Nonnull ApiFuture<ResponseT> responseFuture) {
504+
Preconditions.checkNotNull(responseFuture);
505+
this.responseFuture = responseFuture;
506+
}
507+
508+
void cancel() {
509+
if (this.responseFuture != null) {
510+
this.responseFuture.cancel(true);
511+
}
512+
}
513+
460514
void onBatchSuccess(ResponseT response) {
461515
try {
462516
descriptor.splitResponse(response, entries);
@@ -480,6 +534,19 @@ void onBatchFailure(Throwable throwable) {
480534
boolean isEmpty() {
481535
return resource.getElementCount() == 0;
482536
}
537+
538+
@Override
539+
public String toString() {
540+
StringJoiner elementsStr = new StringJoiner(",");
541+
for (BatchEntry<ElementT, ElementResultT> entry : entries) {
542+
elementsStr.add(
543+
Optional.ofNullable(entry.getElement()).map(Object::toString).orElse("null"));
544+
}
545+
return MoreObjects.toStringHelper(this)
546+
.add("responseFuture", responseFuture)
547+
.add("elements", elementsStr)
548+
.toString();
549+
}
483550
}
484551

485552
/**

0 commit comments

Comments
 (0)