Skip to content

Commit bc8a4ad

Browse files
authored
feat: make stream wait timeout a first class citizen (#1473)
1 parent aa7c8bd commit bc8a4ad

File tree

8 files changed

+115
-31
lines changed

8 files changed

+115
-31
lines changed

gax-java/gax-grpc/src/test/java/com/google/api/gax/grpc/TimeoutTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,12 @@ public void testNonRetryServerStreamingSettingsContextWithRetry() {
242242
.build();
243243
Duration newTimeout = Duration.ofSeconds(5);
244244
RetrySettings contextRetrySettings =
245-
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
245+
retrySettings
246+
.toBuilder()
247+
.setInitialRpcTimeout(newTimeout)
248+
.setMaxRpcTimeout(newTimeout)
249+
.setMaxAttempts(3)
250+
.build();
246251
GrpcCallContext retryingContext =
247252
defaultCallContext
248253
.withRetrySettings(contextRetrySettings)

gax-java/gax/src/main/java/com/google/api/gax/rpc/Callables.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
115115
callable.withDefaultCallContext(
116116
clientContext
117117
.getDefaultCallContext()
118-
.withStreamIdleTimeout(callSettings.getIdleTimeout()));
118+
.withStreamIdleTimeout(callSettings.getIdleTimeout())
119+
.withStreamWaitTimeout(callSettings.getWaitTimeout()));
119120

120121
return callable;
121122
}

gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.concurrent.Callable;
3838
import java.util.concurrent.CancellationException;
3939
import javax.annotation.concurrent.GuardedBy;
40-
import org.threeten.bp.Duration;
4140

4241
/**
4342
* A callable that generates Server Streaming attempts. At any one time, it is responsible for at
@@ -181,15 +180,6 @@ public void cancel() {
181180
}
182181
isStarted = true;
183182

184-
// Propagate the totalTimeout as the overall stream deadline, so long as the user
185-
// has not provided a timeout via the ApiCallContext. If they have, retain it.
186-
Duration totalTimeout =
187-
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();
188-
189-
if (totalTimeout != null && context != null && context.getTimeout() == null) {
190-
context = context.withTimeout(totalTimeout);
191-
}
192-
193183
// Call the inner callable
194184
call();
195185
}
@@ -218,13 +208,10 @@ public Void call() {
218208

219209
ApiCallContext attemptContext = context;
220210

221-
// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
222-
// does not already have a timeout set by a user via withStreamWaitTimeout.
223211
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
224-
&& attemptContext.getStreamWaitTimeout() == null) {
212+
&& attemptContext.getTimeout() == null) {
225213
attemptContext =
226-
attemptContext.withStreamWaitTimeout(
227-
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
214+
attemptContext.withTimeout(outerRetryingFuture.getAttemptSettings().getRpcTimeout());
228215
}
229216

230217
attemptContext

gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java

+38-7
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@
4747
* <p>This class includes settings that are applicable to all server streaming calls, which
4848
* currently just includes retries and watchdog timers.
4949
*
50-
* <p>The watchdog timer is configured via {@code idleTimeout}. The watchdog will terminate any
51-
* stream that has not has seen any demand (via {@link StreamController#request(int)}) in the
52-
* configured interval. To turn off idle checks, set the interval to {@link Duration#ZERO}.
50+
* <p>The watchdog timer is configured via {@code idleTimeout} and {@code waitTimeout}. The watchdog
51+
* will terminate any stream that has not has seen any demand (via {@link
52+
* StreamController#request(int)}) in the configured interval or has not seen a message from the
53+
* server in {@code waitTimeout}. To turn off idle checks, set the interval to {@link
54+
* Duration#ZERO}.
5355
*
5456
* <p>Retry configuration allows for the stream to be restarted and resumed. It is composed of 3
5557
* parts: the retryable codes, the retry settings and the stream resumption strategy. The retryable
@@ -79,12 +81,14 @@ public final class ServerStreamingCallSettings<RequestT, ResponseT>
7981
@Nonnull private final StreamResumptionStrategy<RequestT, ResponseT> resumptionStrategy;
8082

8183
@Nonnull private final Duration idleTimeout;
84+
@Nonnull private final Duration waitTimeout;
8285

8386
private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {
8487
this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes);
8588
this.retrySettings = builder.retrySettingsBuilder.build();
8689
this.resumptionStrategy = builder.resumptionStrategy;
8790
this.idleTimeout = builder.idleTimeout;
91+
this.waitTimeout = builder.waitTimeout;
8892
}
8993

9094
/**
@@ -123,6 +127,15 @@ public Duration getIdleTimeout() {
123127
return idleTimeout;
124128
}
125129

130+
/**
131+
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
132+
* the {@link #waitTimeout} does.
133+
*/
134+
@Nonnull
135+
public Duration getWaitTimeout() {
136+
return waitTimeout;
137+
}
138+
126139
public Builder<RequestT, ResponseT> toBuilder() {
127140
return new Builder<>(this);
128141
}
@@ -135,6 +148,7 @@ public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
135148
public String toString() {
136149
return MoreObjects.toStringHelper(this)
137150
.add("idleTimeout", idleTimeout)
151+
.add("waitTimeout", waitTimeout)
138152
.add("retryableCodes", retryableCodes)
139153
.add("retrySettings", retrySettings)
140154
.toString();
@@ -148,13 +162,16 @@ public static class Builder<RequestT, ResponseT>
148162

149163
@Nonnull private Duration idleTimeout;
150164

165+
@Nonnull private Duration waitTimeout;
166+
151167
/** Initialize the builder with default settings */
152168
private Builder() {
153169
this.retryableCodes = ImmutableSet.of();
154170
this.retrySettingsBuilder = RetrySettings.newBuilder();
155171
this.resumptionStrategy = new SimpleStreamResumptionStrategy<>();
156172

157173
this.idleTimeout = Duration.ZERO;
174+
this.waitTimeout = Duration.ZERO;
158175
}
159176

160177
private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
@@ -164,6 +181,7 @@ private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
164181
this.resumptionStrategy = settings.resumptionStrategy;
165182

166183
this.idleTimeout = settings.idleTimeout;
184+
this.waitTimeout = settings.waitTimeout;
167185
}
168186

169187
/**
@@ -233,9 +251,9 @@ public Builder<RequestT, ResponseT> setSimpleTimeoutNoRetries(@Nonnull Duration
233251
.setInitialRetryDelay(Duration.ZERO)
234252
.setRetryDelayMultiplier(1)
235253
.setMaxRetryDelay(Duration.ZERO)
236-
.setInitialRpcTimeout(Duration.ZERO)
254+
.setInitialRpcTimeout(timeout)
237255
.setRpcTimeoutMultiplier(1)
238-
.setMaxRpcTimeout(Duration.ZERO)
256+
.setMaxRpcTimeout(timeout)
239257
.setMaxAttempts(1)
240258
.build());
241259

@@ -264,14 +282,27 @@ public Duration getIdleTimeout() {
264282
}
265283

266284
/**
267-
* See the class documentation of {@link ServerStreamingCallSettings} for a description of what
268-
* the {@link #idleTimeout} does. {@link Duration#ZERO} disables the watchdog.
285+
* Set how long to wait before considering the stream orphaned by the user and closing it.
286+
* {@link Duration#ZERO} disables the check for abandoned streams.
269287
*/
270288
public Builder<RequestT, ResponseT> setIdleTimeout(@Nonnull Duration idleTimeout) {
271289
this.idleTimeout = Preconditions.checkNotNull(idleTimeout);
272290
return this;
273291
}
274292

293+
@Nonnull
294+
public Duration getWaitTimeout() {
295+
return waitTimeout;
296+
}
297+
298+
/**
299+
* Set the maximum amount of time to wait for the next message from the server. {@link
300+
* Duration#ZERO} disables the check for abandoned streams.
301+
*/
302+
public void setWaitTimeout(@Nonnull Duration waitTimeout) {
303+
this.waitTimeout = waitTimeout;
304+
}
305+
275306
@Override
276307
public ServerStreamingCallSettings<RequestT, ResponseT> build() {
277308
return new ServerStreamingCallSettings<>(this);

gax-java/gax/src/test/java/com/google/api/gax/rpc/CallableTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void testNonRetriedServerStreamingCallableWithRetrySettings() throws Exce
137137
ServerStreamingCallable<Object, Object> callable =
138138
Callables.retrying(innerServerStreamingCallable, callSettings, clientContext);
139139

140-
Duration timeout = retrySettings.getTotalTimeout();
140+
Duration timeout = retrySettings.getInitialRpcTimeout();
141141

142142
callable.call("Is your refrigerator running?", callContextWithRetrySettings);
143143

gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingAttemptCallableTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class ServerStreamingAttemptCallableTest {
6262
private FakeRetryingFuture fakeRetryingFuture;
6363
private StreamResumptionStrategy<String, String> resumptionStrategy;
6464
private static Duration totalTimeout = Duration.ofHours(1);
65+
private static final Duration attemptTimeout = Duration.ofMinutes(1);
6566
private FakeCallContext mockedCallContext;
6667

6768
@Before
@@ -100,7 +101,6 @@ public void testUserProvidedContextTimeout() {
100101
// Ensure that the callable did not overwrite the user provided timeouts
101102
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
102103
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
103-
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
104104
Mockito.verify(mockedCallContext, Mockito.never())
105105
.withStreamWaitTimeout(Mockito.any(Duration.class));
106106

@@ -128,7 +128,7 @@ public void testNoUserProvidedContextTimeout() {
128128
Mockito.doReturn(BaseApiTracer.getInstance()).when(mockedCallContext).getTracer();
129129
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
130130
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
131-
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
131+
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(attemptTimeout);
132132
Mockito.doReturn(mockedCallContext)
133133
.when(mockedCallContext)
134134
.withStreamWaitTimeout(Mockito.any(Duration.class));
@@ -139,10 +139,7 @@ public void testNoUserProvidedContextTimeout() {
139139
// Ensure that the callable configured the timeouts via the Settings in the
140140
// absence of user-defined timeouts.
141141
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
142-
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
143-
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
144-
Mockito.verify(mockedCallContext, Mockito.times(1))
145-
.withStreamWaitTimeout(Mockito.any(Duration.class));
142+
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(attemptTimeout);
146143

147144
// Should notify outer observer
148145
Truth.assertThat(observer.controller).isNotNull();

gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamingCallSettingsTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,20 @@ public void idleTimeoutIsNotLost() {
9898
assertThat(builder.build().toBuilder().getIdleTimeout()).isEqualTo(idleTimeout);
9999
}
100100

101+
@Test
102+
public void waitTimeoutIsNotLost() {
103+
Duration waitTimeout = Duration.ofSeconds(5);
104+
105+
ServerStreamingCallSettings.Builder<Object, Object> builder =
106+
ServerStreamingCallSettings.newBuilder();
107+
108+
builder.setWaitTimeout(waitTimeout);
109+
110+
assertThat(builder.getWaitTimeout()).isEqualTo(waitTimeout);
111+
assertThat(builder.build().getWaitTimeout()).isEqualTo(waitTimeout);
112+
assertThat(builder.build().toBuilder().getWaitTimeout()).isEqualTo(waitTimeout);
113+
}
114+
101115
@Test
102116
public void testRetrySettingsBuilder() {
103117
RetrySettings initialSettings =

showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java

+49
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@
1919
import static com.google.common.truth.Truth.assertThat;
2020
import static org.junit.Assert.assertThrows;
2121

22+
import com.google.api.gax.core.NoCredentialsProvider;
2223
import com.google.api.gax.rpc.CancelledException;
2324
import com.google.api.gax.rpc.ServerStream;
2425
import com.google.api.gax.rpc.StatusCode;
26+
import com.google.api.gax.rpc.WatchdogTimeoutException;
2527
import com.google.common.collect.ImmutableList;
2628
import com.google.rpc.Status;
2729
import com.google.showcase.v1beta1.EchoClient;
2830
import com.google.showcase.v1beta1.EchoResponse;
31+
import com.google.showcase.v1beta1.EchoSettings;
2932
import com.google.showcase.v1beta1.ExpandRequest;
3033
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
34+
import io.grpc.ManagedChannelBuilder;
3135
import java.util.ArrayList;
3236
import java.util.Iterator;
3337
import java.util.stream.Collectors;
3438
import org.junit.After;
39+
import org.junit.Assert;
3540
import org.junit.Before;
3641
import org.junit.Ignore;
3742
import org.junit.Test;
43+
import org.threeten.bp.Duration;
3844

3945
public class ITServerSideStreaming {
4046

@@ -104,6 +110,49 @@ public void testGrpc_serverError_receiveErrorAfterLastWordInStream() {
104110
assertThat(cancelledException.getStatusCode().getCode()).isEqualTo(StatusCode.Code.CANCELLED);
105111
}
106112

113+
@Test
114+
public void testGrpc_serverWaitTimeout_watchdogCancelsStream() throws Exception {
115+
EchoSettings.Builder settings =
116+
EchoSettings.newBuilder()
117+
.setCredentialsProvider(NoCredentialsProvider.create())
118+
.setTransportChannelProvider(
119+
EchoSettings.defaultGrpcTransportProviderBuilder()
120+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
121+
.build());
122+
123+
settings
124+
.expandSettings()
125+
.setIdleTimeout(Duration.ofMillis(100))
126+
.setWaitTimeout(Duration.ofMillis(100));
127+
128+
settings.getStubSettingsBuilder().setStreamWatchdogCheckInterval(Duration.ofMillis(50));
129+
130+
EchoClient echoClient = EchoClient.create(settings.build());
131+
132+
String content = "The rain in Spain stays mainly on the plain!";
133+
ServerStream<EchoResponse> responseStream =
134+
echoClient
135+
.expandCallable()
136+
.call(
137+
ExpandRequest.newBuilder()
138+
.setContent(content)
139+
// Configure server interval for returning the next response
140+
.setStreamWaitTime(
141+
com.google.protobuf.Duration.newBuilder().setSeconds(1).build())
142+
.build());
143+
ArrayList<String> responses = new ArrayList<>();
144+
try {
145+
for (EchoResponse response : responseStream) {
146+
responses.add(response.getContent());
147+
}
148+
Assert.fail("No exception was thrown");
149+
} catch (WatchdogTimeoutException e) {
150+
assertThat(e).hasMessageThat().contains("Canceled due to timeout waiting for next response");
151+
} finally {
152+
echoClient.close();
153+
}
154+
}
155+
107156
@Test
108157
public void testHttpJson_receiveStreamedContent() {
109158
String content = "The rain in Spain stays mainly on the plain!";

0 commit comments

Comments
 (0)