Skip to content

Commit 7f04a7f

Browse files
committed
fix: Cancel the Timeout Task for HttpJson
1 parent 4d043de commit 7f04a7f

File tree

2 files changed

+197
-9
lines changed

2 files changed

+197
-9
lines changed

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.CancellationException;
4747
import java.util.concurrent.Executor;
4848
import java.util.concurrent.ScheduledExecutorService;
49+
import java.util.concurrent.ScheduledFuture;
4950
import java.util.concurrent.TimeUnit;
5051
import javax.annotation.Nullable;
5152

@@ -121,6 +122,11 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
121122
@GuardedBy("lock")
122123
private volatile boolean closed;
123124

125+
// Store the timeout future created by the deadline schedule executor. The future
126+
// can be cancelled if a response has been received before the timeout.
127+
@GuardedBy("lock")
128+
private ScheduledFuture<?> timeoutFuture;
129+
124130
HttpJsonClientCallImpl(
125131
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor,
126132
String endpoint,
@@ -167,16 +173,20 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
167173
Preconditions.checkState(this.listener == null, "The call is already started");
168174
this.listener = responseListener;
169175
this.requestHeaders = requestHeaders;
170-
}
171176

172-
// Use the timeout duration value instead of calculating the future Instant
173-
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
174-
Duration timeout = callOptions.getTimeout();
175-
if (timeout != null) {
176-
// The future timeout value is guaranteed to not be a negative value as the
177-
// RetryAlgorithm will not retry
178-
long timeoutMs = timeout.toMillis();
179-
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
177+
// Use the timeout duration value instead of calculating the future Instant
178+
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
179+
Duration timeout = callOptions.getTimeout();
180+
if (timeout != null) {
181+
// The future timeout value is guaranteed to not be a negative value as the
182+
// RetryAlgorithm will not retry
183+
long timeoutMs = timeout.toMillis();
184+
// Assign the scheduled future so that it can be cancelled if the timeout task
185+
// is not needed (response received prior to timeout)
186+
timeoutFuture =
187+
this.deadlineCancellationExecutor.schedule(
188+
this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
189+
}
180190
}
181191
}
182192

@@ -430,6 +440,14 @@ private void close(
430440
return;
431441
}
432442
closed = true;
443+
444+
// Cancel the timeout future if there is a timeout task created
445+
if (timeoutFuture != null) {
446+
// timeout() invokes close(), but cancelling a completed task should no-op
447+
timeoutFuture.cancel(true);
448+
timeoutFuture = null;
449+
}
450+
433451
// Best effort task cancellation (to not be confused with task's thread interruption).
434452
// If the task is in blocking I/O waiting for the server response, it will keep waiting for
435453
// the response from the server, but once response is received the task will exit silently.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.showcase.v1beta1.it;
17+
18+
import com.google.api.gax.retrying.RetrySettings;
19+
import com.google.api.gax.rpc.StatusCode;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.google.common.truth.Truth;
22+
import com.google.showcase.v1beta1.BlockRequest;
23+
import com.google.showcase.v1beta1.BlockResponse;
24+
import com.google.showcase.v1beta1.EchoClient;
25+
import com.google.showcase.v1beta1.EchoRequest;
26+
import com.google.showcase.v1beta1.it.util.TestClientInitializer;
27+
import java.util.concurrent.TimeUnit;
28+
import org.junit.Test;
29+
import org.threeten.bp.Duration;
30+
31+
public class ITClientShutdown {
32+
33+
@Test
34+
public void testGrpc_closeClient() throws Exception {
35+
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();
36+
grpcClient.close();
37+
grpcClient.awaitTermination(TestClientInitializer.AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
38+
Truth.assertThat(grpcClient.isShutdown()).isTrue();
39+
Truth.assertThat(grpcClient.isTerminated()).isTrue();
40+
}
41+
42+
@Test
43+
public void testHttpJson_closeClient() throws Exception {
44+
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();
45+
httpjsonClient.close();
46+
httpjsonClient.awaitTermination(
47+
TestClientInitializer.AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
48+
Truth.assertThat(httpjsonClient.isShutdown()).isTrue();
49+
Truth.assertThat(httpjsonClient.isTerminated()).isTrue();
50+
}
51+
52+
@Test
53+
public void testGrpc_rpcInvoked_closeClient() throws Exception {
54+
EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient();
55+
56+
grpcClient.echo(EchoRequest.newBuilder().setContent("Test").build());
57+
58+
grpcClient.close();
59+
grpcClient.awaitTermination(TestClientInitializer.AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
60+
Truth.assertThat(grpcClient.isShutdown()).isTrue();
61+
Truth.assertThat(grpcClient.isTerminated()).isTrue();
62+
}
63+
64+
@Test
65+
public void testHttpJson_rpcInvoked_closeClient() throws Exception {
66+
EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient();
67+
68+
httpjsonClient.echo(EchoRequest.newBuilder().setContent("Test").build());
69+
70+
httpjsonClient.close();
71+
httpjsonClient.awaitTermination(
72+
TestClientInitializer.AWAIT_TERMINATION_SECONDS, TimeUnit.SECONDS);
73+
Truth.assertThat(httpjsonClient.isShutdown()).isTrue();
74+
Truth.assertThat(httpjsonClient.isTerminated()).isTrue();
75+
}
76+
77+
// This test is to ensure that the client is able to close + terminate any resources
78+
// once a response has been received. Set a max test duration of 15s to ensure that
79+
// the test does not continue on forever.
80+
@Test(timeout = 15000L)
81+
public void testGrpc_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
82+
throws Exception {
83+
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
84+
// invocation should time out in 15s, but the client will receive a response in 2s.
85+
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
86+
// received so the client can properly terminate.
87+
RetrySettings defaultRetrySettings =
88+
RetrySettings.newBuilder()
89+
.setInitialRpcTimeout(Duration.ofMillis(15000L))
90+
.setMaxRpcTimeout(Duration.ofMillis(15000L))
91+
.setTotalTimeout(Duration.ofMillis(15000L))
92+
.setMaxAttempts(1)
93+
.build();
94+
EchoClient grpcClient =
95+
TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings(
96+
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));
97+
98+
BlockRequest blockRequest =
99+
BlockRequest.newBuilder()
100+
.setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_2sDelay"))
101+
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
102+
.build();
103+
104+
long start = System.currentTimeMillis();
105+
BlockResponse response = grpcClient.block(blockRequest);
106+
Truth.assertThat(response.getContent()).isEqualTo("gRPCBlockContent_2sDelay");
107+
grpcClient.close();
108+
// Loop until the client has terminated successfully
109+
// Future enhancement: Use awaitility instead of busy waiting
110+
while (!grpcClient.isTerminated()) ;
111+
long end = System.currentTimeMillis();
112+
Truth.assertThat(grpcClient.isShutdown()).isTrue();
113+
Truth.assertThat(grpcClient.isTerminated()).isTrue();
114+
115+
// Check the termination time. If all the tasks/ resources are closed successfully,
116+
// the termination time should only take about 2s (time to receive a response) + time
117+
// to close the client. Check that this takes less than 5s (2s request time + 3s
118+
// buffer time).
119+
long terminationTime = end - start;
120+
Truth.assertThat(terminationTime).isLessThan(5000L);
121+
}
122+
123+
// This test is to ensure that the client is able to close + terminate any resources
124+
// once a response has been received. Set a max test duration of 15s to ensure that
125+
// the test does not continue on forever.
126+
@Test(timeout = 15000L)
127+
public void testHttpJson_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived()
128+
throws Exception {
129+
// Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC
130+
// invocation should time out in 15s, but the client will receive a response in 2s.
131+
// Any outstanding tasks (timeout tasks) should be cancelled once a response has been
132+
// received so the client can properly terminate.
133+
RetrySettings defaultRetrySettings =
134+
RetrySettings.newBuilder()
135+
.setInitialRpcTimeout(Duration.ofMillis(15000L))
136+
.setMaxRpcTimeout(Duration.ofMillis(15000L))
137+
.setTotalTimeout(Duration.ofMillis(15000L))
138+
.setMaxAttempts(1)
139+
.build();
140+
EchoClient httpjsonClient =
141+
TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings(
142+
defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED));
143+
144+
BlockRequest blockRequest =
145+
BlockRequest.newBuilder()
146+
.setSuccess(BlockResponse.newBuilder().setContent("httpjsonBlockContent_2sDelay"))
147+
.setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build())
148+
.build();
149+
150+
long start = System.currentTimeMillis();
151+
BlockResponse response = httpjsonClient.block(blockRequest);
152+
Truth.assertThat(response.getContent()).isEqualTo("httpjsonBlockContent_2sDelay");
153+
httpjsonClient.close();
154+
// Loop until the client has terminated successfully
155+
// Future enhancement: Use awaitility instead of busy waiting
156+
while (!httpjsonClient.isTerminated()) {
157+
Thread.sleep(1000L);
158+
}
159+
long end = System.currentTimeMillis();
160+
Truth.assertThat(httpjsonClient.isShutdown()).isTrue();
161+
Truth.assertThat(httpjsonClient.isTerminated()).isTrue();
162+
163+
// Check the termination time. If all the tasks/ resources are closed successfully,
164+
// the termination time should only take about 2s (time to receive a response) + time
165+
// to close the client. Check that this takes less than 5s (2s request time + 3s
166+
// buffer time).
167+
long terminationTime = end - start;
168+
Truth.assertThat(terminationTime).isLessThan(5000L);
169+
}
170+
}

0 commit comments

Comments
 (0)