Skip to content

Commit 7a7574d

Browse files
authored
Add MessageConsumerImpl class, implement pullAsync, add tests (#1043)
1 parent d487307 commit 7a7574d

File tree

12 files changed

+1076
-29
lines changed

12 files changed

+1076
-29
lines changed

gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.io.ObjectInputStream;
3030
import java.util.Objects;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.ScheduledThreadPoolExecutor;
3334
import java.util.concurrent.TimeUnit;
@@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions<ServiceT extends Service<OptionsT>, Ser
5051
private final double timeoutMultiplier;
5152
private final int maxTimeout;
5253

53-
private transient ExecutorFactory executorFactory;
54+
private transient ExecutorFactory<ScheduledExecutorService> executorFactory;
5455

5556
/**
5657
* Shared thread pool executor.
@@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) {
7374
};
7475

7576
/**
76-
* An interface for {@link ScheduledExecutorService} factories. Implementations of this interface
77-
* can be used to provide an user-defined scheduled executor to execute requests. Any
78-
* implementation of this interface must override the {@code get()} method to return the desired
79-
* executor. The {@code release(executor)} method should be overriden to free resources used by
80-
* the executor (if needed) according to application's logic.
77+
* An interface for {@link ExecutorService} factories. Implementations of this interface can be
78+
* used to provide an user-defined executor to execute requests. Any implementation of this
79+
* interface must override the {@code get()} method to return the desired executor. The
80+
* {@code release(executor)} method should be overriden to free resources used by the executor (if
81+
* needed) according to application's logic.
8182
*
8283
* <p>Implementation must provide a public no-arg constructor. Loading of a factory implementation
8384
* is done via {@link java.util.ServiceLoader}.
85+
*
86+
* @param <T> the {@link ExecutorService} subclass created by this factory
8487
*/
85-
public interface ExecutorFactory {
88+
public interface ExecutorFactory<T extends ExecutorService> {
8689

8790
/**
88-
* Gets a scheduled executor service instance.
91+
* Gets an executor service instance.
8992
*/
90-
ScheduledExecutorService get();
93+
T get();
9194

9295
/**
9396
* Releases resources used by the executor and possibly shuts it down.
9497
*/
95-
void release(ScheduledExecutorService executor);
98+
void release(T executor);
9699
}
97100

98101
@VisibleForTesting
99-
static class DefaultExecutorFactory implements ExecutorFactory {
102+
static class DefaultExecutorFactory implements ExecutorFactory<ScheduledExecutorService> {
100103

101104
private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory();
102105

@@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
148151
*
149152
* @return the builder
150153
*/
151-
public B executorFactory(ExecutorFactory executorFactory) {
154+
public B executorFactory(ExecutorFactory<ScheduledExecutorService> executorFactory) {
152155
this.executorFactory = executorFactory;
153156
return self();
154157
}
@@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) {
192195
}
193196
}
194197

198+
@SuppressWarnings("unchecked")
195199
protected GrpcServiceOptions(
196200
Class<? extends ServiceFactory<ServiceT, OptionsT>> serviceFactoryClass,
197201
Class<? extends ServiceRpcFactory<ServiceRpcT, OptionsT>> rpcFactoryClass, Builder<ServiceT,
@@ -208,7 +212,7 @@ protected GrpcServiceOptions(
208212
/**
209213
* Returns a scheduled executor service provider.
210214
*/
211-
protected ExecutorFactory executorFactory() {
215+
protected ExecutorFactory<ScheduledExecutorService> executorFactory() {
212216
return executorFactory;
213217
}
214218

gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void testBaseHashCode() {
211211

212212
@Test
213213
public void testDefaultExecutorFactory() {
214-
ExecutorFactory executorFactory = new DefaultExecutorFactory();
214+
ExecutorFactory<ScheduledExecutorService> executorFactory = new DefaultExecutorFactory();
215215
ScheduledExecutorService executorService = executorFactory.get();
216216
assertSame(executorService, executorFactory.get());
217217
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable {
4848

4949
private final PubSub pubsub;
5050
private final ScheduledExecutorService executor;
51-
private final ExecutorFactory executorFactory;
51+
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
5252
private final Clock clock;
5353
private final Queue<Message> messageQueue;
5454
private final Map<MessageId, Long> messageDeadlines;

0 commit comments

Comments
 (0)