Skip to content

Commit 1652076

Browse files
committed
Add MessageConsumerImpl class, implement pullAsync, add tests
1 parent 2fd2d56 commit 1652076

File tree

11 files changed

+974
-28
lines changed

11 files changed

+974
-28
lines changed

gcloud-java-core/src/main/java/com/google/cloud/GrpcServiceOptions.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.io.ObjectInputStream;
3030
import java.util.Objects;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.ScheduledThreadPoolExecutor;
3334
import java.util.concurrent.TimeUnit;
@@ -50,7 +51,7 @@ public abstract class GrpcServiceOptions<ServiceT extends Service<OptionsT>, Ser
5051
private final double timeoutMultiplier;
5152
private final int maxTimeout;
5253

53-
private transient ExecutorFactory executorFactory;
54+
private transient ExecutorFactory<ScheduledExecutorService> executorFactory;
5455

5556
/**
5657
* Shared thread pool executor.
@@ -73,30 +74,32 @@ public void close(ScheduledExecutorService instance) {
7374
};
7475

7576
/**
76-
* An interface for {@link ScheduledExecutorService} factories. Implementations of this interface
77-
* can be used to provide an user-defined scheduled executor to execute requests. Any
78-
* implementation of this interface must override the {@code get()} method to return the desired
79-
* executor. The {@code release(executor)} method should be overriden to free resources used by
80-
* the executor (if needed) according to application's logic.
77+
* An interface for {@link ExecutorService} factories. Implementations of this interface can be
78+
* used to provide an user-defined executor to execute requests. Any implementation of this
79+
* interface must override the {@code get()} method to return the desired executor. The
80+
* {@code release(executor)} method should be overriden to free resources used by the executor (if
81+
* needed) according to application's logic.
8182
*
8283
* <p>Implementation must provide a public no-arg constructor. Loading of a factory implementation
8384
* is done via {@link java.util.ServiceLoader}.
85+
*
86+
* @param <T> the {@link ExecutorService} subclass created by this factory
8487
*/
85-
public interface ExecutorFactory {
88+
public interface ExecutorFactory<T extends ExecutorService> {
8689

8790
/**
88-
* Gets a scheduled executor service instance.
91+
* Gets an executor service instance.
8992
*/
90-
ScheduledExecutorService get();
93+
T get();
9194

9295
/**
9396
* Releases resources used by the executor and possibly shuts it down.
9497
*/
95-
void release(ScheduledExecutorService executor);
98+
void release(T executor);
9699
}
97100

98101
@VisibleForTesting
99-
static class DefaultExecutorFactory implements ExecutorFactory {
102+
static class DefaultExecutorFactory implements ExecutorFactory<ScheduledExecutorService> {
100103

101104
private static final DefaultExecutorFactory INSTANCE = new DefaultExecutorFactory();
102105

@@ -148,7 +151,7 @@ protected Builder(GrpcServiceOptions<ServiceT, ServiceRpcT, OptionsT> options) {
148151
*
149152
* @return the builder
150153
*/
151-
public B executorFactory(ExecutorFactory executorFactory) {
154+
public B executorFactory(ExecutorFactory<ScheduledExecutorService> executorFactory) {
152155
this.executorFactory = executorFactory;
153156
return self();
154157
}
@@ -192,6 +195,7 @@ public B maxTimeout(int maxTimeout) {
192195
}
193196
}
194197

198+
@SuppressWarnings("unchecked")
195199
protected GrpcServiceOptions(
196200
Class<? extends ServiceFactory<ServiceT, OptionsT>> serviceFactoryClass,
197201
Class<? extends ServiceRpcFactory<ServiceRpcT, OptionsT>> rpcFactoryClass, Builder<ServiceT,
@@ -208,7 +212,7 @@ protected GrpcServiceOptions(
208212
/**
209213
* Returns a scheduled executor service provider.
210214
*/
211-
protected ExecutorFactory executorFactory() {
215+
protected ExecutorFactory<ScheduledExecutorService> executorFactory() {
212216
return executorFactory;
213217
}
214218

gcloud-java-core/src/test/java/com/google/cloud/GrpcServiceOptionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public void testBaseHashCode() {
211211

212212
@Test
213213
public void testDefaultExecutorFactory() {
214-
ExecutorFactory executorFactory = new DefaultExecutorFactory();
214+
ExecutorFactory<ScheduledExecutorService> executorFactory = new DefaultExecutorFactory();
215215
ScheduledExecutorService executorService = executorFactory.get();
216216
assertSame(executorService, executorFactory.get());
217217
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/AckDeadlineRenewer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class AckDeadlineRenewer implements AutoCloseable {
4848

4949
private final PubSub pubsub;
5050
private final ScheduledExecutorService executor;
51-
private final ExecutorFactory executorFactory;
51+
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
5252
private final Clock clock;
5353
private final Queue<Message> messageQueue;
5454
private final Map<MessageId, Long> messageDeadlines;
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import static com.google.cloud.pubsub.spi.v1.SubscriberApi.formatSubscriptionName;
20+
import static com.google.common.base.MoreObjects.firstNonNull;
21+
22+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
23+
import com.google.cloud.pubsub.PubSub.MessageConsumer;
24+
import com.google.cloud.pubsub.PubSub.MessageProcessor;
25+
import com.google.cloud.pubsub.spi.PubSubRpc;
26+
import com.google.pubsub.v1.PullRequest;
27+
import com.google.pubsub.v1.PullResponse;
28+
29+
import io.grpc.internal.SharedResourceHolder;
30+
31+
import java.util.List;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.Future;
35+
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.ScheduledThreadPoolExecutor;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
40+
/**
41+
* Default implementation for a message consumer.
42+
*/
43+
final class MessageConsumerImpl implements MessageConsumer {
44+
45+
private static final int MAX_QUEUED_CALLBACKS = 100;
46+
// shared scheduled executor, used to schedule pulls
47+
private static final SharedResourceHolder.Resource<ScheduledExecutorService> TIMER =
48+
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
49+
@Override
50+
public ScheduledExecutorService create() {
51+
ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1);
52+
timer.setRemoveOnCancelPolicy(true);
53+
return timer;
54+
}
55+
56+
@Override
57+
public void close(ScheduledExecutorService instance) {
58+
instance.shutdown();
59+
}
60+
};
61+
62+
private final PubSubOptions pubsubOptions;
63+
private final PubSubRpc pubsubRpc;
64+
private final PubSub pubsub;
65+
private final AckDeadlineRenewer deadlineRenewer;
66+
private final String subscription;
67+
private final MessageProcessor messageProcessor;
68+
private final ScheduledExecutorService timer;
69+
private final ExecutorFactory<ExecutorService> executorFactory;
70+
private final ExecutorService executor;
71+
private final AtomicInteger queuedCallbacks;
72+
private final int maxQueuedCallbacks;
73+
private final Object futureLock = new Object();
74+
private final Runnable puller;
75+
private Future<?> pullerFuture;
76+
private boolean closed;
77+
78+
/**
79+
* Default executor factory for the message processor executor. By default a single-threaded
80+
* executor is used.
81+
*/
82+
static class DefaultExecutorFactory implements ExecutorFactory<ExecutorService> {
83+
84+
private final ExecutorService executor = Executors.newSingleThreadExecutor();
85+
86+
@Override
87+
public ExecutorService get() {
88+
return executor;
89+
}
90+
91+
@Override
92+
public void release(ExecutorService executor) {
93+
executor.shutdownNow();
94+
}
95+
}
96+
97+
private MessageConsumerImpl(Builder builder) {
98+
this.pubsubOptions = builder.pubsubOptions;
99+
this.subscription = builder.subscription;
100+
this.messageProcessor = builder.messageProcessor;
101+
this.pubsubRpc = pubsubOptions.rpc();
102+
this.pubsub = pubsubOptions.service();
103+
this.deadlineRenewer = builder.deadlineRenewer;
104+
this.queuedCallbacks = new AtomicInteger();
105+
this.timer = SharedResourceHolder.get(TIMER);
106+
this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory());
107+
this.executor = executorFactory.get();
108+
this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS);
109+
this.puller = new Runnable() {
110+
@Override
111+
public void run() {
112+
pull();
113+
}
114+
};
115+
this.pullerFuture = timer.submit(puller);
116+
}
117+
118+
private Runnable ackingRunnable(final ReceivedMessage receivedMessage) {
119+
return new Runnable() {
120+
@Override
121+
public void run() {
122+
try {
123+
messageProcessor.process(receivedMessage);
124+
pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId());
125+
} catch (Exception ex) {
126+
pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId());
127+
} finally {
128+
deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId());
129+
queuedCallbacks.decrementAndGet();
130+
// We can now pull more messages. We do not pull immediately to possibly wait for other
131+
// callbacks to end
132+
synchronized (futureLock) {
133+
if (pullerFuture == null) {
134+
pullerFuture = timer.schedule(puller, 500, TimeUnit.MILLISECONDS);
135+
}
136+
}
137+
}
138+
}
139+
};
140+
}
141+
142+
private PullRequest createPullRequest() {
143+
return PullRequest.newBuilder()
144+
.setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription))
145+
.setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get())
146+
.setReturnImmediately(false)
147+
.build();
148+
}
149+
150+
private void pull() {
151+
// if we exceeded the maximum number of concurrent callbacks
152+
if (queuedCallbacks.get() >= maxQueuedCallbacks) {
153+
return;
154+
}
155+
Future<PullResponse> response = pubsubRpc.pull(createPullRequest());
156+
try {
157+
List<com.google.pubsub.v1.ReceivedMessage> messages =
158+
response.get().getReceivedMessagesList();
159+
queuedCallbacks.addAndGet(messages.size());
160+
161+
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
162+
deadlineRenewer.add(subscription, message.getAckId());
163+
final ReceivedMessage receivedMessage =
164+
ReceivedMessage.fromPb(pubsub, subscription, message);
165+
executor.execute(ackingRunnable(receivedMessage));
166+
}
167+
} catch (Exception ex) {
168+
// ignore
169+
} finally {
170+
// if we can still queue messages we pull some more
171+
synchronized (futureLock) {
172+
if (queuedCallbacks.get() < maxQueuedCallbacks) {
173+
pullerFuture = timer.submit(puller);
174+
} else {
175+
pullerFuture = null;
176+
}
177+
}
178+
}
179+
}
180+
181+
@Override
182+
public void close() {
183+
if (closed) {
184+
return;
185+
}
186+
closed = true;
187+
synchronized (futureLock) {
188+
if (pullerFuture != null) {
189+
pullerFuture.cancel(true);
190+
}
191+
}
192+
SharedResourceHolder.release(TIMER, timer);
193+
executorFactory.release(executor);
194+
}
195+
196+
static final class Builder {
197+
private final PubSubOptions pubsubOptions;
198+
private final String subscription;
199+
private final AckDeadlineRenewer deadlineRenewer;
200+
private final MessageProcessor messageProcessor;
201+
private Integer maxQueuedCallbacks;
202+
private ExecutorFactory<ExecutorService> executorFactory;
203+
204+
Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer,
205+
MessageProcessor messageProcessor) {
206+
this.pubsubOptions = pubsubOptions;
207+
this.subscription = subscription;
208+
this.deadlineRenewer = deadlineRenewer;
209+
this.messageProcessor = messageProcessor;
210+
}
211+
212+
/**
213+
* Sets the maximum number of callbacks either being executed or waiting for execution.
214+
*/
215+
Builder maxQueuedCallbacks(Integer maxQueuedCallbacks) {
216+
this.maxQueuedCallbacks = maxQueuedCallbacks;
217+
return this;
218+
}
219+
220+
/**
221+
* Sets the executor factory, used to manage the executor that will run message processor
222+
* callbacks message consumer.
223+
*/
224+
Builder executorFactory(ExecutorFactory<ExecutorService> executorFactory) {
225+
this.executorFactory = executorFactory;
226+
return this;
227+
}
228+
229+
/**
230+
* Creates a {@code MessageConsumerImpl} object.
231+
*/
232+
MessageConsumerImpl build() {
233+
return new MessageConsumerImpl(this);
234+
}
235+
}
236+
237+
/**
238+
* Returns a builder for {@code MessageConsumerImpl} objects given the service options, the
239+
* subscription from which messages must be pulled, the acknowledge deadline renewer and a message
240+
* processor used to process messages.
241+
*/
242+
static Builder builder(PubSubOptions pubsubOptions, String subscription,
243+
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
244+
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
245+
}
246+
247+
/**
248+
* Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from
249+
* which messages must be pulled, the acknowledge deadline renewer and a message processor used to
250+
* process messages.
251+
*/
252+
static Builder of(PubSubOptions pubsubOptions, String subscription,
253+
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
254+
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
255+
}
256+
}

0 commit comments

Comments
 (0)