Skip to content

Commit ff4c0a2

Browse files
committed
Refactor MessageConsumerImpl
1 parent 5260293 commit ff4c0a2

File tree

3 files changed

+39
-61
lines changed

3 files changed

+39
-61
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,10 @@ public void close(ScheduledExecutorService instance) {
7474
private final AtomicInteger queuedCallbacks;
7575
private final int maxQueuedCallbacks;
7676
private final Object futureLock = new Object();
77-
private final Runnable scheduleRunnable;
77+
private final Runnable consumerRunnable;
7878
private boolean closed;
7979
private Future<?> scheduledFuture;
8080
private PullFuture pullerFuture;
81-
private boolean stopped = true;
8281

8382
/**
8483
* Default executor factory for the message processor executor. By default a single-threaded
@@ -99,6 +98,37 @@ public void release(ExecutorService executor) {
9998
}
10099
}
101100

101+
class ConsumerRunnable implements Runnable {
102+
103+
@Override
104+
public void run() {
105+
if (closed) {
106+
return;
107+
}
108+
pullerFuture = pubsubRpc.pull(createPullRequest());
109+
pullerFuture.addCallback(new PullCallback() {
110+
@Override
111+
public void success(PullResponse response) {
112+
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
113+
queuedCallbacks.addAndGet(messages.size());
114+
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
115+
deadlineRenewer.add(subscription, message.getAckId());
116+
ReceivedMessage receivedMessage = ReceivedMessage.fromPb(pubsub, subscription, message);
117+
executor.execute(ackingRunnable(receivedMessage));
118+
}
119+
nextPull();
120+
}
121+
122+
@Override
123+
public void failure(Throwable error) {
124+
if (!(error instanceof CancellationException)) {
125+
nextPull();
126+
}
127+
}
128+
});
129+
}
130+
}
131+
102132
private MessageConsumerImpl(Builder builder) {
103133
this.pubsubOptions = builder.pubsubOptions;
104134
this.subscription = builder.subscription;
@@ -111,17 +141,7 @@ private MessageConsumerImpl(Builder builder) {
111141
this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory());
112142
this.executor = executorFactory.get();
113143
this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS);
114-
this.scheduleRunnable = new Runnable() {
115-
@Override
116-
public void run() {
117-
synchronized (futureLock) {
118-
if (closed) {
119-
return;
120-
}
121-
pull();
122-
}
123-
}
124-
};
144+
this.consumerRunnable = new ConsumerRunnable();
125145
nextPull();
126146
}
127147

@@ -155,51 +175,23 @@ private PullRequest createPullRequest() {
155175

156176
private void scheduleNextPull(long delay, TimeUnit timeUnit) {
157177
synchronized (futureLock) {
158-
if (!closed && stopped) {
159-
scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit);
178+
if (closed || scheduledFuture != null) {
179+
return;
160180
}
181+
scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit);
161182
}
162183
}
163184

164185
private void nextPull() {
165186
synchronized (futureLock) {
166-
if (closed) {
187+
if (closed || queuedCallbacks.get() == maxQueuedCallbacks) {
188+
scheduledFuture = null;
167189
return;
168190
}
169-
if (queuedCallbacks.get() == maxQueuedCallbacks) {
170-
stopped = true;
171-
} else {
172-
stopped = false;
173-
scheduledFuture = timer.submit(scheduleRunnable);
174-
}
191+
scheduledFuture = timer.submit(consumerRunnable);
175192
}
176193
}
177194

178-
private void pull() {
179-
pullerFuture = pubsubRpc.pull(createPullRequest());
180-
pullerFuture.addCallback(new PullCallback() {
181-
@Override
182-
public void success(PullResponse response) {
183-
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
184-
queuedCallbacks.addAndGet(messages.size());
185-
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
186-
deadlineRenewer.add(subscription, message.getAckId());
187-
final ReceivedMessage receivedMessage =
188-
ReceivedMessage.fromPb(pubsub, subscription, message);
189-
executor.execute(ackingRunnable(receivedMessage));
190-
}
191-
nextPull();
192-
}
193-
194-
@Override
195-
public void failure(Throwable error) {
196-
if (!(error instanceof CancellationException)) {
197-
nextPull();
198-
}
199-
}
200-
});
201-
}
202-
203195
@Override
204196
public void close() {
205197
synchronized (futureLock) {
@@ -268,14 +260,4 @@ static Builder builder(PubSubOptions pubsubOptions, String subscription,
268260
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
269261
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
270262
}
271-
272-
/**
273-
* Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from
274-
* which messages must be pulled, the acknowledge deadline renewer and a message processor used to
275-
* process messages.
276-
*/
277-
static Builder of(PubSubOptions pubsubOptions, String subscription,
278-
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
279-
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
280-
}
281263
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.google.cloud.pubsub;
1818

19-
import static com.google.common.base.Preconditions.checkArgument;
20-
2119
import com.google.cloud.AsyncPage;
2220
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2321
import com.google.cloud.Page;

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.easymock.EasyMock;
2727
import org.junit.Test;
2828

29-
import java.util.concurrent.ExecutorService;
30-
3129
public class PubSubTest {
3230

3331
private static final int PAGE_SIZE = 42;

0 commit comments

Comments
 (0)