Skip to content

Commit 625f7ab

Browse files
mutianfigorbernstein2lqiu96
committed
fix: Fix watchdog to start with WAITING state (#2468)
Watchdog should start with WAITING state, and only switch to `idle` if auto flow control was disabled. Before the fix, when auto flow control was enabled, we wait for server to return a response without calling `onRequest()` and watchdog would report the timeout exception because of idle timeout, which is incorrect and causes confusion. Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/gapic-generator-java/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #2498 ☕️ --------- Co-authored-by: Igor Bernstein <[email protected]> Co-authored-by: Lawrence Qiu <[email protected]>
1 parent 6d154fe commit 625f7ab

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,11 @@ class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT>
193193
private final ResponseObserver<ResponseT> outerResponseObserver;
194194
private volatile StreamController innerController;
195195

196+
// When a stream is created it has automatic inbound flow control enabled. The stream
197+
// won't wait for the caller to request a message. Setting the default to WAITING
198+
// to reflect this state.
196199
@GuardedBy("lock")
197-
private State state = State.IDLE;
200+
private State state = State.WAITING;
198201

199202
@GuardedBy("lock")
200203
private int pendingCount = 0;
@@ -220,6 +223,16 @@ public void onStartImpl(StreamController controller) {
220223
public void disableAutoInboundFlowControl() {
221224
Preconditions.checkState(
222225
!hasStarted, "Can't disable automatic flow control after the stream has started");
226+
227+
// Adding the lock only to satisfy the annotation. It doesn't matter because before
228+
// the stream is started, this is only accessed by the caller.
229+
synchronized (lock) {
230+
// When auto flow control is disabled, caller needs to call onRequest() to request a
231+
// message. Setting the state to IDLE because now we're waiting for caller to call
232+
// onRequest().
233+
state = State.IDLE;
234+
}
235+
223236
autoAutoFlowControl = false;
224237
innerController.disableAutoInboundFlowControl();
225238
}

gax-java/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java

+51
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,31 @@ public void testTimedOutBeforeStart() throws InterruptedException {
154154
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
155155
}
156156

157+
@Test
158+
public void testTimedOutBeforeResponse() throws InterruptedException {
159+
MockServerStreamingCallable<String, String> autoFlowControlCallable =
160+
new MockServerStreamingCallable<>();
161+
AutoFlowControlObserver<String> downstreamObserver = new AutoFlowControlObserver<>();
162+
163+
autoFlowControlCallable.call("request", watchdog.watch(downstreamObserver, waitTime, idleTime));
164+
MockServerStreamingCall<String, String> call1 = autoFlowControlCallable.popLastCall();
165+
166+
clock.incrementNanoTime(idleTime.toNanos() + 1);
167+
watchdog.run();
168+
assertThat(downstreamObserver.done.isDone()).isFalse();
169+
assertThat(call1.getController().isCancelled()).isTrue();
170+
call1.getController().getObserver().onError(new CancellationException("cancelled"));
171+
172+
Throwable actualError = null;
173+
try {
174+
downstreamObserver.done.get();
175+
} catch (ExecutionException e) {
176+
actualError = e.getCause();
177+
}
178+
assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class);
179+
assertThat(actualError.getMessage()).contains("waiting for next response");
180+
}
181+
157182
@Test
158183
public void testMultiple() throws Exception {
159184
// Start stream1
@@ -310,4 +335,30 @@ public void onComplete() {
310335
done.set(null);
311336
}
312337
}
338+
339+
static class AutoFlowControlObserver<T> implements ResponseObserver<T> {
340+
SettableApiFuture<StreamController> controller = SettableApiFuture.create();
341+
Queue<T> responses = Queues.newLinkedBlockingDeque();
342+
SettableApiFuture<Void> done = SettableApiFuture.create();
343+
344+
@Override
345+
public void onStart(StreamController controller) {
346+
this.controller.set(controller);
347+
}
348+
349+
@Override
350+
public void onResponse(T response) {
351+
responses.add(response);
352+
}
353+
354+
@Override
355+
public void onError(Throwable t) {
356+
done.setException(t);
357+
}
358+
359+
@Override
360+
public void onComplete() {
361+
done.set(null);
362+
}
363+
}
313364
}

0 commit comments

Comments
 (0)