Skip to content

Commit 8a61249

Browse files
authored
fix: Handle cancel in ReleasingClientCall and rethrow the exception in start (#1221)
* fix: Handle cancel in ReleasingClientCall and rethrow the exception in start * address comments
1 parent 84a1355 commit 8a61249

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java

+13
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.io.IOException;
4646
import java.util.ArrayList;
4747
import java.util.List;
48+
import java.util.concurrent.CancellationException;
4849
import java.util.concurrent.Executors;
4950
import java.util.concurrent.ScheduledExecutorService;
5051
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,7 @@
5354
import java.util.concurrent.atomic.AtomicReference;
5455
import java.util.logging.Level;
5556
import java.util.logging.Logger;
57+
import javax.annotation.Nullable;
5658
import org.threeten.bp.Duration;
5759

5860
/**
@@ -517,6 +519,7 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
517519

518520
/** ClientCall wrapper that makes sure to decrement the outstanding RPC count on completion. */
519521
static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
522+
@Nullable private CancellationException cancellationException;
520523
final Entry entry;
521524

522525
public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry) {
@@ -526,6 +529,9 @@ public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry) {
526529

527530
@Override
528531
public void start(Listener<RespT> responseListener, Metadata headers) {
532+
if (cancellationException != null) {
533+
throw new IllegalStateException("Call is already cancelled", cancellationException);
534+
}
529535
try {
530536
super.start(
531537
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@@ -542,7 +548,14 @@ public void onClose(Status status, Metadata trailers) {
542548
} catch (Exception e) {
543549
// In case start failed, make sure to release
544550
entry.release();
551+
throw e;
545552
}
546553
}
554+
555+
@Override
556+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
557+
this.cancellationException = new CancellationException(message);
558+
super.cancel(message, cause);
559+
}
547560
}
548561
}

gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/ChannelPoolTest.java

+54
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,17 @@
2929
*/
3030
package com.google.api.gax.grpc;
3131

32+
import static com.google.api.gax.grpc.testing.FakeServiceGrpc.METHOD_SERVER_STREAMING_RECOGNIZE;
3233
import static com.google.common.truth.Truth.assertThat;
3334

3435
import com.google.api.gax.grpc.testing.FakeChannelFactory;
3536
import com.google.api.gax.grpc.testing.FakeMethodDescriptor;
3637
import com.google.api.gax.grpc.testing.FakeServiceGrpc;
38+
import com.google.api.gax.rpc.ClientContext;
39+
import com.google.api.gax.rpc.ResponseObserver;
40+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
41+
import com.google.api.gax.rpc.ServerStreamingCallable;
42+
import com.google.api.gax.rpc.StreamController;
3743
import com.google.common.collect.ImmutableList;
3844
import com.google.common.collect.Lists;
3945
import com.google.type.Color;
@@ -49,12 +55,14 @@
4955
import java.util.ArrayList;
5056
import java.util.Arrays;
5157
import java.util.List;
58+
import java.util.concurrent.CancellationException;
5259
import java.util.concurrent.ExecutorService;
5360
import java.util.concurrent.Executors;
5461
import java.util.concurrent.ScheduledExecutorService;
5562
import java.util.concurrent.ScheduledFuture;
5663
import java.util.concurrent.TimeUnit;
5764
import java.util.concurrent.atomic.AtomicInteger;
65+
import org.junit.Assert;
5866
import org.junit.Test;
5967
import org.junit.runner.RunWith;
6068
import org.junit.runners.JUnit4;
@@ -595,4 +603,50 @@ public void removedActiveChannelsAreShutdown() throws Exception {
595603
// Now the channel should be closed
596604
Mockito.verify(channels.get(1), Mockito.times(1)).shutdown();
597605
}
606+
607+
@Test
608+
public void testReleasingClientCallCancelEarly() throws IOException {
609+
ClientCall mockClientCall = Mockito.mock(ClientCall.class);
610+
Mockito.doAnswer(invocation -> null).when(mockClientCall).cancel(Mockito.any(), Mockito.any());
611+
ManagedChannel fakeChannel = Mockito.mock(ManagedChannel.class);
612+
Mockito.when(fakeChannel.newCall(Mockito.any(), Mockito.any())).thenReturn(mockClientCall);
613+
ChannelPoolSettings channelPoolSettings = ChannelPoolSettings.staticallySized(1);
614+
ChannelFactory factory = new FakeChannelFactory(ImmutableList.of(fakeChannel));
615+
ChannelPool channelPool = ChannelPool.create(channelPoolSettings, factory);
616+
ClientContext context =
617+
ClientContext.newBuilder()
618+
.setTransportChannel(GrpcTransportChannel.create(channelPool))
619+
.setDefaultCallContext(GrpcCallContext.of(channelPool, CallOptions.DEFAULT))
620+
.build();
621+
ServerStreamingCallSettings settings =
622+
ServerStreamingCallSettings.<Color, Money>newBuilder().build();
623+
ServerStreamingCallable streamingCallable =
624+
GrpcCallableFactory.createServerStreamingCallable(
625+
GrpcCallSettings.create(METHOD_SERVER_STREAMING_RECOGNIZE), settings, context);
626+
Color request = Color.newBuilder().setRed(0.5f).build();
627+
628+
IllegalStateException e =
629+
Assert.assertThrows(
630+
IllegalStateException.class,
631+
() ->
632+
streamingCallable.call(
633+
request,
634+
new ResponseObserver() {
635+
@Override
636+
public void onStart(StreamController controller) {
637+
controller.cancel();
638+
}
639+
640+
@Override
641+
public void onResponse(Object response) {}
642+
643+
@Override
644+
public void onError(Throwable t) {}
645+
646+
@Override
647+
public void onComplete() {}
648+
}));
649+
assertThat(e.getCause()).isInstanceOf(CancellationException.class);
650+
assertThat(e.getMessage()).isEqualTo("Call is already cancelled");
651+
}
598652
}

0 commit comments

Comments
 (0)