Skip to content

Commit 0093ac3

Browse files
committed
Make PubSub and PubSubRpc extends AutoCloseable, fix grpc settings
1 parent 215a5df commit 0093ac3

File tree

4 files changed

+74
-21
lines changed

4 files changed

+74
-21
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*
3232
* @see <a href="https://cloud.google.com/pubsub/">Google Cloud Pub/Sub</a>
3333
*/
34-
public interface PubSub extends Service<PubSubOptions> {
34+
public interface PubSub extends AutoCloseable, Service<PubSubOptions> {
3535

3636
/**
3737
* Class for specifying options for listing topics and subscriptions.

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -278,4 +278,9 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
278278
Iterable<String> ackIds) {
279279
return null;
280280
}
281+
282+
@Override
283+
public void close() throws Exception {
284+
rpc.close();
285+
}
281286
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

+64-19
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.api.gax.core.RetrySettings;
2020
import com.google.api.gax.grpc.ApiCallSettings;
2121
import com.google.api.gax.grpc.ApiException;
22+
import com.google.auth.oauth2.GoogleCredentials;
23+
import com.google.cloud.AuthCredentials;
2224
import com.google.cloud.RetryParams;
2325
import com.google.cloud.pubsub.PubSubException;
2426
import com.google.cloud.pubsub.PubSubOptions;
@@ -27,8 +29,10 @@
2729
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
2830
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
2931
import com.google.common.base.Function;
32+
import com.google.common.collect.Sets;
3033
import com.google.common.util.concurrent.Futures;
3134
import com.google.common.util.concurrent.ListenableFuture;
35+
import com.google.common.util.concurrent.MoreExecutors;
3236
import com.google.protobuf.Empty;
3337
import com.google.pubsub.v1.AcknowledgeRequest;
3438
import com.google.pubsub.v1.DeleteSubscriptionRequest;
@@ -50,48 +54,78 @@
5054
import com.google.pubsub.v1.Subscription;
5155
import com.google.pubsub.v1.Topic;
5256

57+
import io.grpc.ManagedChannel;
58+
import io.grpc.Status.Code;
59+
import io.grpc.netty.NegotiationType;
60+
import io.grpc.netty.NettyChannelBuilder;
61+
5362
import org.joda.time.Duration;
5463

5564
import java.io.IOException;
5665
import java.util.Set;
5766
import java.util.concurrent.Future;
58-
59-
import autovalue.shaded.com.google.common.common.collect.Sets;
60-
import io.grpc.Status.Code;
67+
import java.util.concurrent.ScheduledExecutorService;
68+
import java.util.concurrent.ScheduledThreadPoolExecutor;
6169

6270
public class DefaultPubSubRpc implements PubSubRpc {
6371

6472
private final PublisherApi publisherApi;
6573
private final SubscriberApi subscriberApi;
74+
private final ScheduledExecutorService executor =
75+
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(8));
6676

6777
public DefaultPubSubRpc(PubSubOptions options) throws IOException {
6878
try {
69-
// Provide (and use a common thread-pool).
70-
// This depends on https://github.com/googleapis/gax-java/issues/73
71-
PublisherSettings.Builder pbuilder =
72-
PublisherSettings.defaultBuilder()
73-
.provideChannelWith(options.authCredentials().credentials())
74-
.applyToAllApiMethods(apiCallSettings(options));
75-
publisherApi = PublisherApi.create(pbuilder.build());
76-
SubscriberSettings.Builder sBuilder =
77-
SubscriberSettings.defaultBuilder()
78-
.provideChannelWith(options.authCredentials().credentials())
79-
.applyToAllApiMethods(apiCallSettings(options));
80-
subscriberApi = SubscriberApi.create(sBuilder.build());
79+
PublisherSettings.Builder pubBuilder =
80+
PublisherSettings.defaultBuilder().provideExecutorWith(executor, false);
81+
SubscriberSettings.Builder subBuilder =
82+
SubscriberSettings.defaultBuilder().provideExecutorWith(executor, false);
83+
// todo(mziccard): PublisherSettings should support null/absent credentials for testing
84+
if (options.host().contains("localhost")
85+
|| options.authCredentials().equals(AuthCredentials.noAuth())) {
86+
ManagedChannel channel = NettyChannelBuilder.forTarget(options.host())
87+
.negotiationType(NegotiationType.PLAINTEXT)
88+
.build();
89+
pubBuilder.provideChannelWith(channel, true);
90+
subBuilder.provideChannelWith(channel, true);
91+
} else {
92+
GoogleCredentials credentials = options.authCredentials().credentials();
93+
pubBuilder.provideChannelWith(
94+
credentials.createScoped(PublisherSettings.DEFAULT_SERVICE_SCOPES));
95+
subBuilder.provideChannelWith(
96+
credentials.createScoped(SubscriberSettings.DEFAULT_SERVICE_SCOPES));
97+
}
98+
pubBuilder.applyToAllApiMethods(apiCallSettings(options));
99+
subBuilder.applyToAllApiMethods(apiCallSettings(options));
100+
publisherApi = PublisherApi.create(pubBuilder.build());
101+
subscriberApi = SubscriberApi.create(subBuilder.build());
81102
} catch (Exception ex) {
82103
throw new IOException(ex);
83104
}
84105
}
85106

107+
private static long translateTimeout(long timeout) {
108+
if (timeout < 0) {
109+
return 20000;
110+
} else if (timeout == 0) {
111+
return Long.MAX_VALUE;
112+
}
113+
return timeout;
114+
}
115+
86116
private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
87117
// TODO: specify timeout these settings:
88118
// retryParams.retryMaxAttempts(), retryParams.retryMinAttempts()
89119
RetryParams retryParams = options.retryParams();
120+
long connectTimeout = translateTimeout(options.connectTimeout());
121+
long readTimeout = translateTimeout(options.readTimeout());
122+
long maxTimeout = connectTimeout == Long.MAX_VALUE || readTimeout == Long.MAX_VALUE
123+
? Long.MAX_VALUE : connectTimeout + readTimeout;
90124
final RetrySettings.Builder builder = RetrySettings.newBuilder()
91125
.setTotalTimeout(Duration.millis(retryParams.totalRetryPeriodMillis()))
92-
.setInitialRpcTimeout(Duration.millis(options.connectTimeout()))
126+
.setInitialRpcTimeout(Duration.millis(connectTimeout))
93127
.setRpcTimeoutMultiplier(1.5)
94-
.setMaxRpcTimeout(Duration.millis(options.connectTimeout() + options.readTimeout()))
128+
.setMaxRpcTimeout(Duration.millis(maxTimeout))
95129
.setInitialRetryDelay(Duration.millis(retryParams.initialRetryDelayMillis()))
96130
.setRetryDelayMultiplier(retryParams.retryDelayBackoffFactor())
97131
.setMaxRetryDelay(Duration.millis(retryParams.maxRetryDelayMillis()));
@@ -117,7 +151,7 @@ public V apply(ApiException exception) {
117151

118152
@Override
119153
public Future<Topic> create(Topic topic) {
120-
// TODO: it would be nice if we can get the idempotent inforamtion from the ApiCallSettings
154+
// TODO: it would be nice if we can get the idempotent information from the ApiCallSettings
121155
// or from the exception
122156
return translate(publisherApi.createTopicCallable().futureCall(topic), true);
123157
}
@@ -149,7 +183,6 @@ public Future<ListTopicSubscriptionsResponse> list(ListTopicSubscriptionsRequest
149183

150184
@Override
151185
public Future<Empty> delete(DeleteTopicRequest request) {
152-
// TODO: check if null is not going to work for Empty
153186
return translate(publisherApi.deleteTopicCallable().futureCall(request), true,
154187
Code.NOT_FOUND.value());
155188
}
@@ -195,4 +228,16 @@ public Future<PullResponse> pull(PullRequest request) {
195228
public Future<Empty> modify(ModifyPushConfigRequest request) {
196229
return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false);
197230
}
231+
232+
@Override
233+
public ScheduledExecutorService executor() {
234+
return executor;
235+
}
236+
237+
@Override
238+
public void close() throws Exception {
239+
subscriberApi.close();
240+
publisherApi.close();
241+
executor.shutdown();
242+
}
198243
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
import com.google.pubsub.v1.Topic;
3939

4040
import java.util.concurrent.Future;
41+
import java.util.concurrent.ScheduledExecutorService;
4142

42-
public interface PubSubRpc {
43+
public interface PubSubRpc extends AutoCloseable {
4344

4445
// in all cases root cause of ExecutionException is PubSubException
4546
Future<Topic> create(Topic topic);
@@ -69,4 +70,6 @@ public interface PubSubRpc {
6970
Future<PullResponse> pull(PullRequest request);
7071

7172
Future<Empty> modify(ModifyPushConfigRequest request);
73+
74+
ScheduledExecutorService executor();
7275
}

0 commit comments

Comments
 (0)