Skip to content

Commit d7ba0aa

Browse files
committed
Implement consumer restart based on processed callbacks
1 parent ff4c0a2 commit d7ba0aa

File tree

1 file changed

+76
-33
lines changed

1 file changed

+76
-33
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.concurrent.Future;
3838
import java.util.concurrent.ScheduledExecutorService;
3939
import java.util.concurrent.ScheduledThreadPoolExecutor;
40-
import java.util.concurrent.TimeUnit;
4140
import java.util.concurrent.atomic.AtomicInteger;
4241

4342
/**
@@ -75,10 +74,39 @@ public void close(ScheduledExecutorService instance) {
7574
private final int maxQueuedCallbacks;
7675
private final Object futureLock = new Object();
7776
private final Runnable consumerRunnable;
77+
private final RestartPolicy restartPolicy;
7878
private boolean closed;
7979
private Future<?> scheduledFuture;
8080
private PullFuture pullerFuture;
8181

82+
/**
83+
* Interface for policies according to which the consumer should be restarted.
84+
*/
85+
interface RestartPolicy {
86+
87+
boolean shouldRestart(int queuedCallbacks);
88+
}
89+
90+
/**
91+
* Default restart policy. Restarts the consumer once {@code restartThreshold} messages out of
92+
* {@code maxQueuedCallbacks} have already been processed.
93+
*/
94+
static class DefaultRestartPolicy implements RestartPolicy {
95+
96+
final int maxQueuedCallbacks;
97+
final int restartThreshold;
98+
99+
DefaultRestartPolicy(int maxQueuedCallbacks, int restartThreshold) {
100+
this.maxQueuedCallbacks = maxQueuedCallbacks;
101+
this.restartThreshold = restartThreshold;
102+
}
103+
104+
@Override
105+
public boolean shouldRestart(int queuedCallbacks) {
106+
return (maxQueuedCallbacks - queuedCallbacks) >= restartThreshold;
107+
}
108+
}
109+
82110
/**
83111
* Default executor factory for the message processor executor. By default a single-threaded
84112
* executor is used.
@@ -127,6 +155,33 @@ public void failure(Throwable error) {
127155
}
128156
});
129157
}
158+
159+
private PullRequest createPullRequest() {
160+
return PullRequest.newBuilder()
161+
.setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription))
162+
.setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get())
163+
.setReturnImmediately(false)
164+
.build();
165+
}
166+
167+
private Runnable ackingRunnable(final ReceivedMessage receivedMessage) {
168+
return new Runnable() {
169+
@Override
170+
public void run() {
171+
try {
172+
messageProcessor.process(receivedMessage);
173+
pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId());
174+
} catch (Exception ex) {
175+
pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId());
176+
} finally {
177+
deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId());
178+
queuedCallbacks.decrementAndGet();
179+
// We can now pull more messages, according to the restart policy.
180+
restartIfNeeded();
181+
}
182+
}
183+
};
184+
}
130185
}
131186

132187
private MessageConsumerImpl(Builder builder) {
@@ -138,47 +193,24 @@ private MessageConsumerImpl(Builder builder) {
138193
this.deadlineRenewer = builder.deadlineRenewer;
139194
this.queuedCallbacks = new AtomicInteger();
140195
this.timer = SharedResourceHolder.get(TIMER);
141-
this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory());
196+
this.executorFactory =
197+
builder.executorFactory != null ? builder.executorFactory : new DefaultExecutorFactory();
142198
this.executor = executorFactory.get();
143199
this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS);
144200
this.consumerRunnable = new ConsumerRunnable();
201+
int restartThreshold = builder.restartThreshold != null ? builder.restartThreshold
202+
: this.maxQueuedCallbacks / 2;
203+
this.restartPolicy = new DefaultRestartPolicy(maxQueuedCallbacks, restartThreshold);
145204
nextPull();
146205
}
147206

148-
private Runnable ackingRunnable(final ReceivedMessage receivedMessage) {
149-
return new Runnable() {
150-
@Override
151-
public void run() {
152-
try {
153-
messageProcessor.process(receivedMessage);
154-
pubsub.ackAsync(receivedMessage.subscription(), receivedMessage.ackId());
155-
} catch (Exception ex) {
156-
pubsub.nackAsync(receivedMessage.subscription(), receivedMessage.ackId());
157-
} finally {
158-
deadlineRenewer.remove(receivedMessage.subscription(), receivedMessage.ackId());
159-
queuedCallbacks.decrementAndGet();
160-
// We can now pull more messages. We do not pull immediately to possibly wait for other
161-
// callbacks to end
162-
scheduleNextPull(500, TimeUnit.MILLISECONDS);
163-
}
164-
}
165-
};
166-
}
167-
168-
private PullRequest createPullRequest() {
169-
return PullRequest.newBuilder()
170-
.setSubscription(formatSubscriptionName(pubsubOptions.projectId(), subscription))
171-
.setMaxMessages(maxQueuedCallbacks - queuedCallbacks.get())
172-
.setReturnImmediately(false)
173-
.build();
174-
}
175-
176-
private void scheduleNextPull(long delay, TimeUnit timeUnit) {
207+
private void restartIfNeeded() {
177208
synchronized (futureLock) {
178-
if (closed || scheduledFuture != null) {
209+
if (closed || scheduledFuture != null
210+
|| !restartPolicy.shouldRestart(queuedCallbacks.get())) {
179211
return;
180212
}
181-
scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit);
213+
scheduledFuture = timer.submit(consumerRunnable);
182214
}
183215
}
184216

@@ -217,6 +249,7 @@ static final class Builder {
217249
private final MessageProcessor messageProcessor;
218250
private Integer maxQueuedCallbacks;
219251
private ExecutorFactory<ExecutorService> executorFactory;
252+
private Integer restartThreshold;
220253

221254
Builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer,
222255
MessageProcessor messageProcessor) {
@@ -243,6 +276,16 @@ Builder executorFactory(ExecutorFactory<ExecutorService> executorFactory) {
243276
return this;
244277
}
245278

279+
/**
280+
* Sets the restart threshold. If the consumer was interrupted for reaching the maximum number
281+
* of queued callbacks, it will be restarted only once at least {@code restartThreshold}
282+
* callbacks have completed their execution.
283+
*/
284+
Builder restartThreshold(Integer restartThreshold) {
285+
this.restartThreshold = restartThreshold;
286+
return this;
287+
}
288+
246289
/**
247290
* Creates a {@code MessageConsumerImpl} object.
248291
*/

0 commit comments

Comments
 (0)