Skip to content

Commit b177d9e

Browse files
authored
fix: Cancel the Timeout Task for HttpJson (#2360)
Fixes: googleapis/google-cloud-java#10220 Currently, the executorService will wait for any previously submitted task to finish execution before being able to terminate. This PR attempts to fix the issue by cancelling the outstanding timeout task. If a response has been received prior to the timeout, the timeout task will be cancelled and the client should be able to terminate immediately afterwards. This fixes an issue for Clients that have RPCs with a large timeout value (i.e. 10 min). The client would wait 10 minutes before being able to terminate completely. ## Local Testing Running this with SNAPSHOT Gax version (2.41.1-SNAPSHOT) and latest Compute from Libraries-Bom v26.31.0 Logs: ``` } at com.google.api.client.http.HttpResponseException$Builder.build(HttpResponseException.java:293) at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1118) at com.google.api.gax.httpjson.HttpRequestRunnable.run(HttpRequestRunnable.java:115) ... 6 more Process finished with exit code 1 ``` Running this with latest Gax version (fix not included) and latest Compute from Libraries-Bom v26.31.0: ``` } } at com.google.api.client.http.HttpResponseException$Builder.build(HttpResponseException.java:293) at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1118) at com.google.api.gax.httpjson.HttpRequestRunnable.run(HttpRequestRunnable.java:115) ... 6 more ``` Missing the termination and exit code. It shows up after 10 minutes.
1 parent 6c9127c commit b177d9e

File tree

3 files changed

+328
-9
lines changed

3 files changed

+328
-9
lines changed

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java

+31-9
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.CancellationException;
4747
import java.util.concurrent.Executor;
4848
import java.util.concurrent.ScheduledExecutorService;
49+
import java.util.concurrent.ScheduledFuture;
4950
import java.util.concurrent.TimeUnit;
5051
import javax.annotation.Nullable;
5152

@@ -121,6 +122,13 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
121122
@GuardedBy("lock")
122123
private volatile boolean closed;
123124

125+
// Store the timeout future created by the deadline schedule executor. The future
126+
// can be cancelled if a response (either an error or valid payload) has been
127+
// received before the timeout. This value may be null if the RPC does not have a
128+
// timeout.
129+
@GuardedBy("lock")
130+
private volatile ScheduledFuture<?> timeoutFuture;
131+
124132
HttpJsonClientCallImpl(
125133
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor,
126134
String endpoint,
@@ -167,16 +175,20 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
167175
Preconditions.checkState(this.listener == null, "The call is already started");
168176
this.listener = responseListener;
169177
this.requestHeaders = requestHeaders;
170-
}
171178

172-
// Use the timeout duration value instead of calculating the future Instant
173-
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
174-
Duration timeout = callOptions.getTimeout();
175-
if (timeout != null) {
176-
// The future timeout value is guaranteed to not be a negative value as the
177-
// RetryAlgorithm will not retry
178-
long timeoutMs = timeout.toMillis();
179-
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
179+
// Use the timeout duration value instead of calculating the future Instant
180+
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
181+
Duration timeout = callOptions.getTimeout();
182+
if (timeout != null) {
183+
// The future timeout value is guaranteed to not be a negative value as the
184+
// RetryAlgorithm will not retry
185+
long timeoutMs = timeout.toMillis();
186+
// Assign the scheduled future so that it can be cancelled if the timeout task
187+
// is not needed (response received prior to timeout)
188+
timeoutFuture =
189+
this.deadlineCancellationExecutor.schedule(
190+
this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
191+
}
180192
}
181193
}
182194

@@ -430,6 +442,16 @@ private void close(
430442
return;
431443
}
432444
closed = true;
445+
446+
// Cancel the timeout future if there is a timeout associated with the RPC
447+
if (timeoutFuture != null) {
448+
// The timeout method also invokes close() and the second invocation of close()
449+
// will be guarded by the closed check above. No need to interrupt the timeout
450+
// task as running the timeout task is quick.
451+
timeoutFuture.cancel(false);
452+
timeoutFuture = null;
453+
}
454+
433455
// Best effort task cancellation (to not be confused with task's thread interruption).
434456
// If the task is in blocking I/O waiting for the server response, it will keep waiting for
435457
// the response from the server, but once response is received the task will exit silently.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.httpjson;
31+
32+
import com.google.api.client.http.HttpTransport;
33+
import com.google.common.truth.Truth;
34+
import com.google.protobuf.TypeRegistry;
35+
import java.io.ByteArrayInputStream;
36+
import java.io.InputStream;
37+
import java.io.Reader;
38+
import java.time.Duration;
39+
import java.util.concurrent.Executor;
40+
import java.util.concurrent.ScheduledThreadPoolExecutor;
41+
import java.util.concurrent.TimeUnit;
42+
import org.junit.Test;
43+
import org.junit.runner.RunWith;
44+
import org.mockito.Mock;
45+
import org.mockito.Mockito;
46+
import org.mockito.junit.MockitoJUnitRunner;
47+
48+
@RunWith(MockitoJUnitRunner.class)
49+
public class HttpJsonClientCallImplTest {
50+
@Mock private ApiMethodDescriptor apiMethodDescriptor;
51+
@Mock private HttpResponseParser httpResponseParser;
52+
@Mock private HttpJsonCallOptions httpJsonCallOptions;
53+
@Mock private TypeRegistry typeRegistry;
54+
@Mock private HttpTransport httpTransport;
55+
@Mock private Executor executor;
56+
@Mock private HttpJsonClientCall.Listener listener;
57+
58+
@Test
59+
public void responseReceived_noCancellationTask() {
60+
ScheduledThreadPoolExecutor deadlineSchedulerExecutor = new ScheduledThreadPoolExecutor(1);
61+
// Null timeout means no timeout task created
62+
Mockito.when(httpJsonCallOptions.getTimeout()).thenReturn(null);
63+
64+
HttpJsonClientCallImpl httpJsonClientCall =
65+
new HttpJsonClientCallImpl<>(
66+
apiMethodDescriptor,
67+
"",
68+
httpJsonCallOptions,
69+
httpTransport,
70+
executor,
71+
deadlineSchedulerExecutor);
72+
httpJsonClientCall.start(listener, HttpJsonMetadata.newBuilder().build());
73+
// No timeout task in the work queue
74+
Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0);
75+
// Follows the numMessages requested from HttpJsonClientCalls.futureUnaryCall()
76+
httpJsonClientCall.request(2);
77+
httpJsonClientCall.setResult(
78+
HttpRequestRunnable.RunnableResult.builder()
79+
.setStatusCode(200)
80+
.setTrailers(HttpJsonMetadata.newBuilder().build())
81+
.build());
82+
Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0);
83+
deadlineSchedulerExecutor.shutdown();
84+
// Scheduler is not waiting for any task and should terminate immediately
85+
Truth.assertThat(deadlineSchedulerExecutor.isTerminated()).isTrue();
86+
}
87+
88+
@Test
89+
public void responseReceived_cancellationTaskExists_isCancelledProperly()
90+
throws InterruptedException {
91+
ScheduledThreadPoolExecutor deadlineSchedulerExecutor = new ScheduledThreadPoolExecutor(1);
92+
// SetRemoveOnCancelPolicy will immediately remove the task from the work queue
93+
// when the task is cancelled
94+
deadlineSchedulerExecutor.setRemoveOnCancelPolicy(true);
95+
96+
// Setting a timeout for this call will enqueue a timeout task
97+
Mockito.when(httpJsonCallOptions.getTimeout()).thenReturn(Duration.ofMinutes(10));
98+
99+
String response = "Content";
100+
InputStream inputStream = new ByteArrayInputStream(response.getBytes());
101+
Mockito.when(httpJsonCallOptions.getTypeRegistry()).thenReturn(typeRegistry);
102+
Mockito.when(apiMethodDescriptor.getResponseParser()).thenReturn(httpResponseParser);
103+
Mockito.when(
104+
httpResponseParser.parse(Mockito.any(Reader.class), Mockito.any(TypeRegistry.class)))
105+
.thenReturn(response);
106+
HttpJsonClientCallImpl httpJsonClientCall =
107+
new HttpJsonClientCallImpl<>(
108+
apiMethodDescriptor,
109+
"",
110+
httpJsonCallOptions,
111+
httpTransport,
112+
executor,
113+
deadlineSchedulerExecutor);
114+
httpJsonClientCall.start(listener, HttpJsonMetadata.newBuilder().build());
115+
// The timeout task is scheduled for 10 minutes from invocation. The task should be
116+
// populated in the work queue, scheduled to run, but not active yet.
117+
Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(1);
118+
// Follows the numMessages requested from HttpJsonClientCalls.futureUnaryCall()
119+
httpJsonClientCall.request(2);
120+
httpJsonClientCall.setResult(
121+
HttpRequestRunnable.RunnableResult.builder()
122+
.setStatusCode(200)
123+
.setTrailers(HttpJsonMetadata.newBuilder().build())
124+
.setResponseContent(inputStream)
125+
.build());
126+
// After the result is received, `close()` should have run and removed the timeout task
127+
// Expect that there are no tasks in the queue and no active tasks
128+
Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0);
129+
deadlineSchedulerExecutor.shutdown();
130+
131+
// Ideally, this test wouldn't need to awaitTermination. Given the machine this test
132+
// is running on, we can't guarantee that isTerminated is true immediately. The point
133+
// of this test is that it doesn't wait the full timeout duration (10 min) to terminate
134+
// and rather is able to terminate after we invoke shutdown on the deadline scheduler.
135+
deadlineSchedulerExecutor.awaitTermination(5, TimeUnit.SECONDS);
136+
// Scheduler is not waiting for any task and should terminate quickly
137+
Truth.assertThat(deadlineSchedulerExecutor.isTerminated()).isTrue();
138+
}
139+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.showcase.v1beta1.it;
17+
18+
import com.google.api.gax.retrying.RetrySettings;
19+
import com.google.api.gax.rpc.StatusCode;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.google.common.truth.Truth;
22+
import com.google.showcase.v1beta1.BlockRequest;
23+
import com.google.showcase.v1beta1.BlockResponse;
24+
import com.google.showcase.v1beta1.EchoClient;
25+
import com.google.showcase.v1beta1.EchoRequest;
26+
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
27+
import org.junit.Test;
28+
import org.threeten.bp.Duration;
29+
30+
public class ITClientShutdown {
31+
32+
private static final long DEFAULT_RPC_TIMEOUT_MS = 15000L;
33+
private static final long DEFAULT_CLIENT_TERMINATION_MS = 5000L;
34+
35+
// Test to ensure the client can close + terminate properly
36+
@Test(timeout = 15000L)
37+
public void testGrpc_closeClient() throws Exception {
38+
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();
39+
assertClientTerminated(grpcClient);
40+
}
41+
42+
// Test to ensure the client can close + terminate properly
43+
@Test(timeout = 15000L)
44+
public void testHttpJson_closeClient() throws Exception {
45+
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();
46+
assertClientTerminated(httpjsonClient);
47+
}
48+
49+
// Test to ensure the client can close + terminate after a quick RPC invocation
50+
@Test(timeout = 15000L)
51+
public void testGrpc_rpcInvoked_closeClient() throws Exception {
52+
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();
53+
// Response is ignored for this test
54+
grpcClient.echo(EchoRequest.newBuilder().setContent("Test").build());
55+
assertClientTerminated(grpcClient);
56+
}
57+
58+
// Test to ensure the client can close + terminate after a quick RPC invocation
59+
@Test(timeout = 15000L)
60+
public void testHttpJson_rpcInvoked_closeClient() throws Exception {
61+
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();
62+
// Response is ignored for this test
63+
httpjsonClient.echo(EchoRequest.newBuilder().setContent("Test").build());
64+
assertClientTerminated(httpjsonClient);
65+
}
66+
67+
// This test is to ensure that the client is able to close + terminate any resources
68+
// once a response has been received. Set a max test duration of 15s to ensure that
69+
// the test does not continue on forever.
70+
@Test(timeout = 15000L)
71+
public void testGrpc_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
72+
throws Exception {
73+
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
74+
// invocation should time out in 15s, but the client will receive a response in 2s.
75+
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
76+
// received so the client can properly terminate.
77+
RetrySettings defaultRetrySettings =
78+
RetrySettings.newBuilder()
79+
.setInitialRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
80+
.setMaxRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
81+
.setTotalTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
82+
.setMaxAttempts(1)
83+
.build();
84+
EchoClient grpcClient =
85+
TestClientInitializer.createGrpcEchoClientCustomBlockSettings(
86+
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));
87+
88+
BlockRequest blockRequest =
89+
BlockRequest.newBuilder()
90+
.setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_2sDelay"))
91+
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
92+
.build();
93+
94+
// Response is ignored for this test
95+
grpcClient.block(blockRequest);
96+
97+
assertClientTerminated(grpcClient);
98+
}
99+
100+
// This test is to ensure that the client is able to close + terminate any resources
101+
// once a response has been received. Set a max test duration of 15s to ensure that
102+
// the test does not continue on forever.
103+
@Test(timeout = 15000L)
104+
public void testHttpJson_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
105+
throws Exception {
106+
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
107+
// invocation should time out in 15s, but the client will receive a response in 2s.
108+
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
109+
// received so the client can properly terminate.
110+
RetrySettings defaultRetrySettings =
111+
RetrySettings.newBuilder()
112+
.setInitialRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
113+
.setMaxRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
114+
.setTotalTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS))
115+
.setMaxAttempts(1)
116+
.build();
117+
EchoClient httpjsonClient =
118+
TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings(
119+
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));
120+
121+
BlockRequest blockRequest =
122+
BlockRequest.newBuilder()
123+
.setSuccess(BlockResponse.newBuilder().setContent("httpjsonBlockContent_2sDelay"))
124+
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
125+
.build();
126+
127+
// Response is ignored for this test
128+
httpjsonClient.block(blockRequest);
129+
130+
assertClientTerminated(httpjsonClient);
131+
}
132+
133+
// This helper method asserts that the client is able to terminate within
134+
// `AWAIT_TERMINATION_SECONDS`
135+
private void assertClientTerminated(EchoClient echoClient) throws InterruptedException {
136+
long start = System.currentTimeMillis();
137+
// Intentionally do not run echoClient.awaitTermination(...) as this test will
138+
// check that everything is properly terminated after close() is called.
139+
echoClient.close();
140+
141+
// Loop until the client has terminated successfully. For tests that use this,
142+
// try to ensure there is a timeout associated, otherwise this may run forever.
143+
// Future enhancement: Use awaitility instead of busy waiting
144+
while (!echoClient.isTerminated()) {
145+
Thread.sleep(500L);
146+
}
147+
// The busy-wait time won't be accurate, so account for a bit of buffer
148+
long end = System.currentTimeMillis();
149+
150+
Truth.assertThat(echoClient.isShutdown()).isTrue();
151+
152+
// Check the termination time. If all the tasks/ resources are closed successfully,
153+
// the termination time should only occur shortly after `close()` was invoked. The
154+
// `DEFAULT_TERMINATION_MS` value should include a bit of buffer.
155+
long terminationTime = end - start;
156+
Truth.assertThat(terminationTime).isLessThan(DEFAULT_CLIENT_TERMINATION_MS);
157+
}
158+
}

0 commit comments

Comments
 (0)