Skip to content

Commit 17d133b

Browse files
authored
fix: Fix race condition in GrpcDirectStreamController (#1537)
* fix: Fix race condition in GrpcDirectStreamController * add copyright
1 parent af86664 commit 17d133b

File tree

2 files changed

+160
-4
lines changed

2 files changed

+160
-4
lines changed

gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectStreamController.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void run() {}
5353
private final ClientCall<RequestT, ResponseT> clientCall;
5454
private final ResponseObserver<ResponseT> responseObserver;
5555
private final Runnable onReady;
56-
private boolean hasStarted;
56+
private volatile boolean hasStarted;
5757
private boolean autoflowControl = true;
5858
private int numRequested;
5959
private volatile CancellationException cancellationException;
@@ -88,7 +88,6 @@ public void disableAutoInboundFlowControl() {
8888
@Override
8989
public void request(int count) {
9090
Preconditions.checkState(!autoflowControl, "Autoflow control is enabled.");
91-
9291
// Buffer the requested count in case the consumer requested responses in the onStart()
9392
if (!hasStarted) {
9493
numRequested += count;
@@ -110,10 +109,10 @@ void startBidi() {
110109
private void startCommon() {
111110
responseObserver.onStart(this);
112111

113-
this.hasStarted = true;
114-
115112
clientCall.start(new ResponseObserverAdapter(), new Metadata());
116113

114+
this.hasStarted = true;
115+
117116
if (autoflowControl) {
118117
clientCall.request(1);
119118
} else if (numRequested > 0) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.grpc;
31+
32+
import com.google.api.gax.core.NoCredentialsProvider;
33+
import com.google.api.gax.grpc.testing.FakeServiceGrpc;
34+
import com.google.api.gax.retrying.RetrySettings;
35+
import com.google.api.gax.retrying.StreamResumptionStrategy;
36+
import com.google.api.gax.rpc.Callables;
37+
import com.google.api.gax.rpc.ClientContext;
38+
import com.google.api.gax.rpc.DeadlineExceededException;
39+
import com.google.api.gax.rpc.FixedTransportChannelProvider;
40+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
41+
import com.google.api.gax.rpc.ServerStreamingCallable;
42+
import com.google.api.gax.rpc.StatusCode;
43+
import com.google.api.gax.rpc.StubSettings;
44+
import com.google.type.Color;
45+
import com.google.type.Money;
46+
import io.grpc.ManagedChannel;
47+
import io.grpc.ManagedChannelBuilder;
48+
import io.grpc.Server;
49+
import io.grpc.ServerBuilder;
50+
import io.grpc.Status;
51+
import io.grpc.stub.StreamObserver;
52+
import javax.annotation.Nonnull;
53+
import javax.annotation.Nullable;
54+
import org.junit.Test;
55+
import org.junit.runner.RunWith;
56+
import org.junit.runners.JUnit4;
57+
import org.threeten.bp.Duration;
58+
59+
@RunWith(JUnit4.class)
60+
public class GrpcDirectStreamControllerTest {
61+
62+
@Test
63+
public void testRetryNoRaceCondition() throws Exception {
64+
Server server = ServerBuilder.forPort(1234).addService(new FakeService()).build();
65+
server.start();
66+
67+
ManagedChannel channel =
68+
ManagedChannelBuilder.forAddress("localhost", 1234).usePlaintext().build();
69+
70+
StreamResumptionStrategy<Color, Money> resumptionStrategy =
71+
new StreamResumptionStrategy<Color, Money>() {
72+
@Nonnull
73+
@Override
74+
public StreamResumptionStrategy<Color, Money> createNew() {
75+
return this;
76+
}
77+
78+
@Nonnull
79+
@Override
80+
public Money processResponse(Money response) {
81+
return response;
82+
}
83+
84+
@Nullable
85+
@Override
86+
public Color getResumeRequest(Color originalRequest) {
87+
return originalRequest;
88+
}
89+
90+
@Override
91+
public boolean canResume() {
92+
return true;
93+
}
94+
};
95+
96+
// Set up retry settings. Set total timeout to 1 minute to limit the total runtime of this test.
97+
// Set retry delay to 1 ms so the retries will be scheduled in a loop with no delays.
98+
// Set max attempt to max so there could be as many retries as possible.
99+
ServerStreamingCallSettings<Color, Money> callSettigs =
100+
ServerStreamingCallSettings.<Color, Money>newBuilder()
101+
.setResumptionStrategy(resumptionStrategy)
102+
.setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED)
103+
.setRetrySettings(
104+
RetrySettings.newBuilder()
105+
.setTotalTimeout(Duration.ofMinutes(1))
106+
.setMaxAttempts(Integer.MAX_VALUE)
107+
.setInitialRetryDelay(Duration.ofMillis(1))
108+
.setMaxRetryDelay(Duration.ofMillis(1))
109+
.build())
110+
.build();
111+
112+
StubSettings.Builder builder =
113+
new StubSettings.Builder() {
114+
@Override
115+
public StubSettings build() {
116+
return new StubSettings(this) {
117+
@Override
118+
public Builder toBuilder() {
119+
throw new IllegalStateException();
120+
}
121+
};
122+
}
123+
};
124+
125+
builder
126+
.setEndpoint("localhost:1234")
127+
.setCredentialsProvider(NoCredentialsProvider.create())
128+
.setTransportChannelProvider(
129+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)));
130+
131+
ServerStreamingCallable<Color, Money> callable =
132+
GrpcCallableFactory.createServerStreamingCallable(
133+
GrpcCallSettings.create(FakeServiceGrpc.METHOD_SERVER_STREAMING_RECOGNIZE),
134+
callSettigs,
135+
ClientContext.create(builder.build()));
136+
137+
ServerStreamingCallable<Color, Money> retrying =
138+
Callables.retrying(callable, callSettigs, ClientContext.create(builder.build()));
139+
140+
Color request = Color.newBuilder().getDefaultInstanceForType();
141+
142+
try {
143+
for (Money money : retrying.call(request, GrpcCallContext.createDefault())) {}
144+
145+
} catch (DeadlineExceededException e) {
146+
// Ignore this error
147+
}
148+
}
149+
150+
class FakeService extends FakeServiceGrpc.FakeServiceImplBase {
151+
@Override
152+
public void serverStreamingRecognize(Color request, StreamObserver<Money> responseObserver) {
153+
responseObserver.onNext(Money.getDefaultInstance());
154+
responseObserver.onError(Status.DEADLINE_EXCEEDED.asException());
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)