Skip to content

Commit dea8426

Browse files
authored
feat: Implement awaitTermination() for MangedHttpJsonChannel (#1677)
* feat: Implement awaitTermination() for MangedHttpJsonChannel * chore: Check if there is time to await first * chore: Check that transport is shutdown as well * chore: Add comments for executor behavior * chore: Add type cast for default executor * chore: Implement shutdownNow() * chore: Add a tests InstantiatingHttpJsonChannelProviderTest for the channel logic * chore: Add comments for the tests * chore: Add a default multiplier for IO tasks * chore: Address code smell * chore: Clean up code * chore: Shutdown executor in test * chore: Address pr comments
1 parent ef4a7ff commit dea8426

File tree

5 files changed

+175
-21
lines changed

5 files changed

+175
-21
lines changed

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java

+2
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ private HttpJsonTransportChannel createChannel() throws IOException, GeneralSecu
186186
httpTransportToUse = createHttpTransport();
187187
}
188188

189+
// Pass the executor to the ManagedChannel. If no executor was provided (or null),
190+
// the channel will use a default executor for the calls.
189191
ManagedHttpJsonChannel channel =
190192
ManagedHttpJsonChannel.newBuilder()
191193
.setEndpoint(endpoint)

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java

+93-17
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.api.core.BetaApi;
3535
import com.google.api.gax.core.BackgroundResource;
3636
import com.google.api.gax.core.InstantiatingExecutorProvider;
37+
import com.google.common.annotations.VisibleForTesting;
3738
import com.google.common.base.Preconditions;
3839
import java.io.IOException;
3940
import java.util.concurrent.Executor;
@@ -47,23 +48,24 @@
4748
@BetaApi
4849
public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource {
4950

50-
private static final ExecutorService DEFAULT_EXECUTOR =
51-
InstantiatingExecutorProvider.newBuilder().build().getExecutor();
52-
5351
private final Executor executor;
52+
private final boolean usingDefaultExecutor;
5453
private final String endpoint;
5554
private final HttpTransport httpTransport;
5655
private final ScheduledExecutorService deadlineScheduledExecutorService;
57-
5856
private boolean isTransportShutdown;
5957

6058
protected ManagedHttpJsonChannel() {
61-
this(null, null, null);
59+
this(null, true, null, null);
6260
}
6361

6462
private ManagedHttpJsonChannel(
65-
Executor executor, String endpoint, @Nullable HttpTransport httpTransport) {
63+
Executor executor,
64+
boolean usingDefaultExecutor,
65+
String endpoint,
66+
@Nullable HttpTransport httpTransport) {
6667
this.executor = executor;
68+
this.usingDefaultExecutor = usingDefaultExecutor;
6769
this.endpoint = endpoint;
6870
this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport;
6971
this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -82,58 +84,120 @@ public <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newCall(
8284
deadlineScheduledExecutorService);
8385
}
8486

87+
@VisibleForTesting
88+
Executor getExecutor() {
89+
return executor;
90+
}
91+
8592
@Override
8693
public synchronized void shutdown() {
94+
// Calling shutdown/ shutdownNow() twice should no-op
8795
if (isTransportShutdown) {
8896
return;
8997
}
9098
try {
99+
// Only shutdown the executor if it was created by Gax. External executors
100+
// should be managed by the user.
101+
if (shouldManageExecutor()) {
102+
((ExecutorService) executor).shutdown();
103+
}
91104
deadlineScheduledExecutorService.shutdown();
92105
httpTransport.shutdown();
93106
isTransportShutdown = true;
94107
} catch (IOException e) {
95-
e.printStackTrace();
108+
// TODO: Log this scenario once we implemented the Cloud SDK logging.
109+
// Swallow error if httpTransport shutdown fails
96110
}
97111
}
98112

99113
@Override
100114
public boolean isShutdown() {
101-
return isTransportShutdown;
115+
// TODO(lawrenceqiu): Expose an isShutdown() method for HttpTransport
116+
boolean isShutdown = isTransportShutdown && deadlineScheduledExecutorService.isShutdown();
117+
// Check that the Gax's ExecutorService is shutdown as well
118+
if (shouldManageExecutor()) {
119+
isShutdown = isShutdown && ((ExecutorService) executor).isShutdown();
120+
}
121+
return isShutdown;
102122
}
103123

104124
@Override
105125
public boolean isTerminated() {
106-
return isTransportShutdown;
126+
boolean isTerminated = deadlineScheduledExecutorService.isTerminated();
127+
// Check that the Gax's ExecutorService is terminated as well
128+
if (shouldManageExecutor()) {
129+
isTerminated = isTerminated && ((ExecutorService) executor).isTerminated();
130+
}
131+
return isTerminated;
107132
}
108133

109134
@Override
110135
public void shutdownNow() {
111-
shutdown();
136+
// Calling shutdown/ shutdownNow() twice should no-op
137+
if (isTransportShutdown) {
138+
return;
139+
}
140+
try {
141+
// Only shutdown the executor if it was created by Gax. External executors
142+
// should be managed by the user.
143+
if (shouldManageExecutor()) {
144+
((ExecutorService) executor).shutdownNow();
145+
}
146+
deadlineScheduledExecutorService.shutdownNow();
147+
httpTransport.shutdown();
148+
isTransportShutdown = true;
149+
} catch (IOException e) {
150+
// TODO: Log this scenario once we implemented the Cloud SDK logging.
151+
// Swallow error if httpTransport shutdown fails
152+
}
112153
}
113154

114155
@Override
115156
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
116-
// TODO
117-
return false;
157+
long endTimeNanos = System.nanoTime() + unit.toNanos(duration);
158+
long awaitTimeNanos = endTimeNanos - System.nanoTime();
159+
if (awaitTimeNanos <= 0) {
160+
return false;
161+
}
162+
// Only awaitTermination for the executor if it was created by Gax. External executors
163+
// should be managed by the user.
164+
if (usingDefaultExecutor && executor instanceof ExecutorService) {
165+
boolean terminated = ((ExecutorService) executor).awaitTermination(awaitTimeNanos, unit);
166+
// Termination duration has elapsed
167+
if (!terminated) {
168+
return false;
169+
}
170+
}
171+
awaitTimeNanos = endTimeNanos - System.nanoTime();
172+
return deadlineScheduledExecutorService.awaitTermination(awaitTimeNanos, unit);
173+
}
174+
175+
private boolean shouldManageExecutor() {
176+
return usingDefaultExecutor && executor instanceof ExecutorService;
118177
}
119178

120179
@Override
121-
public void close() {}
180+
public void close() {
181+
shutdown();
182+
}
122183

123184
public static Builder newBuilder() {
124-
return new Builder().setExecutor(DEFAULT_EXECUTOR);
185+
return new Builder();
125186
}
126187

127188
public static class Builder {
128189

129190
private Executor executor;
130191
private String endpoint;
131192
private HttpTransport httpTransport;
193+
private boolean usingDefaultExecutor;
132194

133-
private Builder() {}
195+
private Builder() {
196+
this.usingDefaultExecutor = false;
197+
}
134198

135199
public Builder setExecutor(Executor executor) {
136-
this.executor = executor == null ? DEFAULT_EXECUTOR : executor;
200+
this.executor = executor;
137201
return this;
138202
}
139203

@@ -150,8 +214,20 @@ public Builder setHttpTransport(HttpTransport httpTransport) {
150214
public ManagedHttpJsonChannel build() {
151215
Preconditions.checkNotNull(endpoint);
152216

217+
// If the executor provided for this channel is null, gax will provide a
218+
// default executor to used for the calls. Only the default executor's
219+
// lifecycle will be managed by the channel. Any external executor needs to
220+
// managed by the user.
221+
if (executor == null) {
222+
executor = InstantiatingExecutorProvider.newIOBuilder().build().getExecutor();
223+
usingDefaultExecutor = true;
224+
}
225+
153226
return new ManagedHttpJsonChannel(
154-
executor, endpoint, httpTransport == null ? new NetHttpTransport() : httpTransport);
227+
executor,
228+
usingDefaultExecutor,
229+
endpoint,
230+
httpTransport == null ? new NetHttpTransport() : httpTransport);
155231
}
156232
}
157233
}

gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonInterceptorChannel.java

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
package com.google.api.gax.httpjson;
3131

3232
import com.google.api.core.BetaApi;
33+
import com.google.common.annotations.VisibleForTesting;
3334
import java.util.concurrent.TimeUnit;
3435

3536
@BetaApi
@@ -45,6 +46,11 @@ class ManagedHttpJsonInterceptorChannel extends ManagedHttpJsonChannel {
4546
this.interceptor = interceptor;
4647
}
4748

49+
@VisibleForTesting
50+
ManagedHttpJsonChannel getChannel() {
51+
return channel;
52+
}
53+
4854
@Override
4955
public <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newCall(
5056
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor, HttpJsonCallOptions callOptions) {

gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProviderTest.java

+57-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.io.IOException;
4040
import java.security.GeneralSecurityException;
4141
import java.util.Collections;
42+
import java.util.Map;
4243
import java.util.concurrent.Executor;
4344
import java.util.concurrent.ScheduledExecutorService;
4445
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -50,20 +51,22 @@
5051
@RunWith(JUnit4.class)
5152
public class InstantiatingHttpJsonChannelProviderTest extends AbstractMtlsTransportChannelTest {
5253

54+
private static final String DEFAULT_ENDPOINT = "localhost:8080";
55+
private static final Map<String, String> DEFAULT_HEADER_MAP = Collections.emptyMap();
56+
5357
@Test
5458
public void basicTest() throws IOException {
55-
String endpoint = "localhost:8080";
5659
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
5760
executor.shutdown();
5861

5962
TransportChannelProvider provider = InstantiatingHttpJsonChannelProvider.newBuilder().build();
6063

6164
assertThat(provider.needsEndpoint()).isTrue();
62-
provider = provider.withEndpoint(endpoint);
65+
provider = provider.withEndpoint(DEFAULT_ENDPOINT);
6366
assertThat(provider.needsEndpoint()).isFalse();
6467

6568
assertThat(provider.needsHeaders()).isTrue();
66-
provider = provider.withHeaders(Collections.<String, String>emptyMap());
69+
provider = provider.withHeaders(DEFAULT_HEADER_MAP);
6770
assertThat(provider.needsHeaders()).isFalse();
6871

6972
// Make sure getTransportChannel works without setting executor
@@ -103,6 +106,57 @@ public void basicTest() throws IOException {
103106
provider.getTransportChannel().shutdownNow();
104107
}
105108

109+
// Ensure that a default executor is created by the ManagedHttpJsonChannel even
110+
// if not provided by the TransportChannelProvider
111+
@Test
112+
public void managedChannelUsesDefaultChannelExecutor() throws IOException {
113+
InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider =
114+
InstantiatingHttpJsonChannelProvider.newBuilder().setEndpoint(DEFAULT_ENDPOINT).build();
115+
instantiatingHttpJsonChannelProvider =
116+
(InstantiatingHttpJsonChannelProvider)
117+
instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP);
118+
HttpJsonTransportChannel httpJsonTransportChannel =
119+
instantiatingHttpJsonChannelProvider.getTransportChannel();
120+
121+
// By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel
122+
ManagedHttpJsonInterceptorChannel interceptorChannel =
123+
(ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel();
124+
ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel();
125+
assertThat(managedHttpJsonChannel.getExecutor()).isNotNull();
126+
127+
// Clean up the resources (executor, deadlineScheduler, httpTransport)
128+
instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow();
129+
}
130+
131+
// Ensure that the user's executor is used by the ManagedHttpJsonChannel
132+
@Test
133+
public void managedChannelUsesCustomExecutor() throws IOException {
134+
// Custom executor to use -- Lifecycle must be managed by this test
135+
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
136+
executor.shutdown();
137+
138+
InstantiatingHttpJsonChannelProvider instantiatingHttpJsonChannelProvider =
139+
InstantiatingHttpJsonChannelProvider.newBuilder()
140+
.setEndpoint(DEFAULT_ENDPOINT)
141+
.setExecutor(executor)
142+
.build();
143+
instantiatingHttpJsonChannelProvider =
144+
(InstantiatingHttpJsonChannelProvider)
145+
instantiatingHttpJsonChannelProvider.withHeaders(DEFAULT_HEADER_MAP);
146+
HttpJsonTransportChannel httpJsonTransportChannel =
147+
instantiatingHttpJsonChannelProvider.getTransportChannel();
148+
149+
// By default, the channel will be wrapped with ManagedHttpJsonInterceptorChannel
150+
ManagedHttpJsonInterceptorChannel interceptorChannel =
151+
(ManagedHttpJsonInterceptorChannel) httpJsonTransportChannel.getManagedChannel();
152+
ManagedHttpJsonChannel managedHttpJsonChannel = interceptorChannel.getChannel();
153+
assertThat(managedHttpJsonChannel.getExecutor()).isNotNull();
154+
assertThat(managedHttpJsonChannel.getExecutor()).isEqualTo(executor);
155+
156+
// Clean up the resources (executor, deadlineScheduler, httpTransport)
157+
instantiatingHttpJsonChannelProvider.getTransportChannel().shutdownNow();
158+
}
159+
106160
@Override
107161
protected Object getMtlsObjectFromTransportChannel(MtlsProvider provider)
108162
throws IOException, GeneralSecurityException {

gax-java/gax/src/main/java/com/google/api/gax/core/InstantiatingExecutorProvider.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public Thread newThread(Runnable runnable) {
5454
return thread;
5555
}
5656
};
57+
private static final int MIN_THREAD_AMOUNT = 4;
58+
// Attempt to choose a reasonable default core pool multiplier for IO Bound operations
59+
private static final int IO_THREAD_MULTIPLIER = 50;
5760

5861
// Package-private constructor prevents others from subclassing.
5962
InstantiatingExecutorProvider() {}
@@ -76,9 +79,22 @@ public boolean shouldAutoClose() {
7679

7780
public abstract Builder toBuilder();
7881

82+
// Used for CPU Bound tasks as the thread count is at max the number of processors
83+
// Thread count minimum is at least `MIN_CPU_AMOUNT`
7984
public static Builder newBuilder() {
8085
int numCpus = Runtime.getRuntime().availableProcessors();
81-
int numThreads = Math.max(4, numCpus);
86+
int numThreads = Math.max(MIN_THREAD_AMOUNT, numCpus);
87+
88+
return new AutoValue_InstantiatingExecutorProvider.Builder()
89+
.setExecutorThreadCount(numThreads)
90+
.setThreadFactory(DEFAULT_THREAD_FACTORY);
91+
}
92+
93+
// Used for IO Bound tasks as the thread count scales with the number of processors
94+
// Thread count minimum is at least `MIN_CPU_AMOUNT` * `IO_THREAD_MULTIPLIER`
95+
public static Builder newIOBuilder() {
96+
int numCpus = Runtime.getRuntime().availableProcessors();
97+
int numThreads = IO_THREAD_MULTIPLIER * Math.max(MIN_THREAD_AMOUNT, numCpus);
8298

8399
return new AutoValue_InstantiatingExecutorProvider.Builder()
84100
.setExecutorThreadCount(numThreads)

0 commit comments

Comments
 (0)