Skip to content

Commit bcf5ed8

Browse files
fix(batcher): exceptions in unaryCaller bubble up (#1166)
Co-authored-by: Blake Li <[email protected]>
1 parent 1b1a9a1 commit bcf5ed8

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,13 @@ public void sendOutstanding() {
276276
callContextWithOption =
277277
callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs);
278278
}
279-
final ApiFuture<ResponseT> batchResponse =
280-
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption);
279+
ApiFuture<ResponseT> batchResponse;
280+
try {
281+
batchResponse =
282+
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption);
283+
} catch (Exception ex) {
284+
batchResponse = ApiFutures.immediateFailedFuture(ex);
285+
}
281286

282287
numOfOutstandingBatches.incrementAndGet();
283288
ApiFutures.addCallback(

gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java

+77
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
4747
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
4848
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
49+
import com.google.api.gax.rpc.testing.FakeCallContext;
4950
import com.google.common.base.Stopwatch;
5051
import com.google.common.collect.ImmutableList;
5152
import com.google.common.collect.Queues;
@@ -931,6 +932,82 @@ public void testThrottlingNonBlocking() throws Exception {
931932
}
932933
}
933934

935+
/**
936+
* If the batcher's unary callable throws an exception when obtaining a response, then the
937+
* response .get() should throw the exception
938+
*/
939+
@Test
940+
public void testAddDoesNotHangIfExceptionThrowStartingACall() {
941+
BatchingDescriptor<Object, Object, Object, Object> batchingDescriptor =
942+
new BatchingDescriptor<Object, Object, Object, Object>() {
943+
@Override
944+
public BatchingRequestBuilder<Object, Object> newRequestBuilder(Object o) {
945+
return new BatchingRequestBuilder<Object, Object>() {
946+
@Override
947+
public void add(Object o) {}
948+
949+
@Override
950+
public Object build() {
951+
return new Object();
952+
}
953+
};
954+
}
955+
956+
@Override
957+
public void splitResponse(Object o, List<BatchEntry<Object, Object>> list) {
958+
for (BatchEntry<Object, Object> e : list) {
959+
e.getResultFuture().set(new Object());
960+
}
961+
}
962+
963+
@Override
964+
public void splitException(Throwable throwable, List<BatchEntry<Object, Object>> list) {
965+
for (BatchEntry<Object, Object> e : list) {
966+
e.getResultFuture().setException(new RuntimeException("fake"));
967+
}
968+
}
969+
970+
@Override
971+
public long countBytes(Object o) {
972+
return 1;
973+
}
974+
};
975+
976+
UnaryCallable<Object, Object> unaryCallable =
977+
new UnaryCallable<Object, Object>() {
978+
@Override
979+
public ApiFuture<Object> futureCall(Object o, ApiCallContext apiCallContext) {
980+
throw new RuntimeException("this should bubble up");
981+
}
982+
};
983+
Object prototype = new Object();
984+
BatchingSettings batchingSettings =
985+
BatchingSettings.newBuilder()
986+
.setDelayThreshold(Duration.ofSeconds(1))
987+
.setElementCountThreshold(100L)
988+
.setRequestByteThreshold(100L)
989+
.setFlowControlSettings(FlowControlSettings.getDefaultInstance())
990+
.build();
991+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
992+
FlowController flowController = new FlowController(batchingSettings.getFlowControlSettings());
993+
ApiCallContext callContext = FakeCallContext.createDefault();
994+
995+
BatcherImpl<Object, Object, Object, Object> batcher =
996+
new BatcherImpl<>(
997+
batchingDescriptor,
998+
unaryCallable,
999+
prototype,
1000+
batchingSettings,
1001+
executor,
1002+
flowController,
1003+
callContext);
1004+
1005+
ApiFuture<Object> f = batcher.add(new Object());
1006+
Assert.assertThrows(ExecutionException.class, f::get);
1007+
// bubbles up
1008+
Assert.assertThrows(RuntimeException.class, batcher::close);
1009+
}
1010+
9341011
private void testElementTriggers(BatchingSettings settings) throws Exception {
9351012
underTest = createDefaultBatcherImpl(settings, null);
9361013
Future<Integer> result = underTest.add(4);

0 commit comments

Comments
 (0)