Skip to content

Commit f4e4e26

Browse files
committed
Add AckDeadlineRenewer class for automatic ack deadline renewal (googleapis#1031)
* Add AckDeadlineRenewer class for automatic ack deadline renewal * Refactor renewer tests and wake up renewer only when needed * Skip removed/re-added messages when scheduling new renewal * Better tuning of ack deadline in renewer
1 parent 174849f commit f4e4e26

File tree

3 files changed

+603
-0
lines changed

3 files changed

+603
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import com.google.cloud.Clock;
20+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
21+
import com.google.common.base.MoreObjects;
22+
import com.google.common.collect.LinkedListMultimap;
23+
import com.google.common.collect.ListMultimap;
24+
import com.google.common.collect.Multimaps;
25+
26+
import java.util.HashMap;
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
import java.util.Queue;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.TimeUnit;
35+
36+
/**
37+
* Class for an automatic ack deadline renewer. An ack deadline renewer automatically renews the
38+
* acknowledge deadline of messages added to it (via {@link #add(String, String)} or
39+
* {@link #add(String, Iterable)}. The acknowledge deadlines of added messages are renewed until the
40+
* messages are explicitly removed using {@link #remove(String, String)}.
41+
*/
42+
class AckDeadlineRenewer implements AutoCloseable {
43+
44+
private static final int MIN_DEADLINE_MILLIS = 10_000;
45+
private static final int DEADLINE_SLACK_MILLIS = 1_000;
46+
private static final int RENEW_THRESHOLD_MILLIS = 3_000;
47+
private static final int NEXT_RENEWAL_THRESHOLD_MILLIS = 1_000;
48+
49+
private final PubSub pubsub;
50+
private final ScheduledExecutorService executor;
51+
private final ExecutorFactory executorFactory;
52+
private final Clock clock;
53+
private final Queue<Message> messageQueue;
54+
private final Map<MessageId, Long> messageDeadlines;
55+
private final Object lock = new Object();
56+
private final Object futureLock = new Object();
57+
private Future<?> renewerFuture;
58+
private boolean closed;
59+
60+
/**
61+
* This class holds the identity of a message to renew: subscription and acknowledge id.
62+
*/
63+
private static class MessageId {
64+
65+
private final String subscription;
66+
private final String ackId;
67+
68+
MessageId(String subscription, String ackId) {
69+
this.subscription = subscription;
70+
this.ackId = ackId;
71+
}
72+
73+
String subscription() {
74+
return subscription;
75+
}
76+
77+
String ackId() {
78+
return ackId;
79+
}
80+
81+
@Override
82+
public boolean equals(Object obj) {
83+
if (obj == this) {
84+
return true;
85+
}
86+
if (!(obj instanceof MessageId)) {
87+
return false;
88+
}
89+
MessageId other = (MessageId) obj;
90+
return Objects.equals(other.subscription, this.subscription)
91+
&& Objects.equals(other.ackId, this.ackId);
92+
}
93+
94+
@Override
95+
public int hashCode() {
96+
return Objects.hash(subscription, ackId);
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return MoreObjects.toStringHelper(this)
102+
.add("subscription", subscription)
103+
.add("ackId", ackId)
104+
.toString();
105+
}
106+
}
107+
108+
/**
109+
* This class holds the identity of a message to renew and its expected ack deadline.
110+
*/
111+
private static final class Message {
112+
113+
private final MessageId messageId;
114+
private final Long deadline;
115+
116+
Message(MessageId messageId, Long deadline) {
117+
this.messageId = messageId;
118+
this.deadline = deadline;
119+
}
120+
121+
MessageId messageId() {
122+
return messageId;
123+
}
124+
125+
Long expectedDeadline() {
126+
return deadline;
127+
}
128+
129+
@Override
130+
public boolean equals(Object obj) {
131+
if (obj == this) {
132+
return true;
133+
}
134+
if (!(obj instanceof Message)) {
135+
return false;
136+
}
137+
Message other = (Message) obj;
138+
return Objects.equals(other.messageId, this.messageId)
139+
&& Objects.equals(other.deadline, this.deadline);
140+
}
141+
142+
@Override
143+
public int hashCode() {
144+
return Objects.hash(messageId, deadline);
145+
}
146+
147+
@Override
148+
public String toString() {
149+
return MoreObjects.toStringHelper(this)
150+
.add("messageId", messageId)
151+
.add("expectedDeadline", deadline)
152+
.toString();
153+
}
154+
}
155+
156+
AckDeadlineRenewer(PubSub pubsub) {
157+
PubSubOptions options = pubsub.options();
158+
this.pubsub = pubsub;
159+
this.executorFactory = options.executorFactory();
160+
this.executor = executorFactory.get();
161+
this.clock = options.clock();
162+
this.messageQueue = new LinkedList<>();
163+
this.messageDeadlines = new HashMap<>();
164+
}
165+
166+
private void unsetAndScheduleNextRenewal() {
167+
synchronized (futureLock) {
168+
renewerFuture = null;
169+
scheduleNextRenewal();
170+
}
171+
}
172+
173+
private void scheduleNextRenewal() {
174+
// Schedules next renewal if there are still messages to process and no renewals scheduled that
175+
// could handle them, otherwise does nothing
176+
Message nextMessage;
177+
synchronized (lock) {
178+
Message peek = messageQueue.peek();
179+
// We remove from the queue messages that were removed from the ack deadline renewer (and
180+
// possibly re-added)
181+
while (peek != null && (!messageDeadlines.containsKey(peek.messageId())
182+
|| messageDeadlines.get(peek.messageId()) > peek.expectedDeadline())) {
183+
messageQueue.poll();
184+
peek = messageQueue.peek();
185+
}
186+
nextMessage = peek;
187+
}
188+
synchronized (futureLock) {
189+
if (renewerFuture == null && nextMessage != null) {
190+
long delay =
191+
(nextMessage.expectedDeadline() - clock.millis()) - NEXT_RENEWAL_THRESHOLD_MILLIS;
192+
renewerFuture = executor.schedule(new Runnable() {
193+
@Override
194+
public void run() {
195+
renewAckDeadlines();
196+
}
197+
}, delay, TimeUnit.MILLISECONDS);
198+
}
199+
}
200+
}
201+
202+
private void renewAckDeadlines() {
203+
ListMultimap<String, String> messagesToRenewNext = LinkedListMultimap.create();
204+
// At every activation we renew all ack deadlines that will expire in the following
205+
// RENEW_THRESHOLD_MILLIS
206+
long threshold = clock.millis() + RENEW_THRESHOLD_MILLIS;
207+
Message message;
208+
while ((message = nextMessageToRenew(threshold)) != null) {
209+
// If the expected deadline is null the message was removed and we must ignore it, otherwise
210+
// we renew its ack deadline
211+
if (message.expectedDeadline() != null) {
212+
messagesToRenewNext.put(message.messageId().subscription(), message.messageId().ackId());
213+
}
214+
}
215+
for (Map.Entry<String, List<String>> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) {
216+
// We send all ack deadline renewals for a subscription
217+
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLIS, TimeUnit.MILLISECONDS,
218+
entry.getValue());
219+
}
220+
unsetAndScheduleNextRenewal();
221+
}
222+
223+
private Message nextMessageToRenew(long threshold) {
224+
synchronized (lock) {
225+
Message message = messageQueue.peek();
226+
// if the queue is empty or the next expected deadline is after threshold we stop
227+
if (message == null || message.expectedDeadline() > threshold) {
228+
return null;
229+
}
230+
MessageId messageId = messageQueue.poll().messageId();
231+
// Check if the next expected deadline changed. This can happen if the message was removed
232+
// from the ack deadline renewer or if it was nacked and then pulled again
233+
Long deadline = messageDeadlines.get(messageId);
234+
if (deadline == null || deadline > threshold) {
235+
// the message was removed (deadline == null) or removed and then added back
236+
// (deadline > threshold), we should not renew its deadline (yet)
237+
return new Message(messageId, null);
238+
} else {
239+
// Message deadline must be renewed, we must submit it again to the renewer
240+
add(messageId.subscription(), messageId.ackId());
241+
return new Message(messageId, deadline);
242+
}
243+
}
244+
}
245+
246+
/**
247+
* Adds a new message for which the acknowledge deadline should be automatically renewed. The
248+
* message is identified by the subscription from which it was pulled and its acknowledge id.
249+
* Auto-renewal will take place until the message is removed (see
250+
* {@link #remove(String, String)}).
251+
*
252+
* @param subscription the subscription from which the message has been pulled
253+
* @param ackId the message's acknowledge id
254+
*/
255+
void add(String subscription, String ackId) {
256+
synchronized (lock) {
257+
long deadline = clock.millis() + MIN_DEADLINE_MILLIS - DEADLINE_SLACK_MILLIS;
258+
Message message = new Message(new MessageId(subscription, ackId), deadline);
259+
messageQueue.add(message);
260+
messageDeadlines.put(message.messageId(), deadline);
261+
}
262+
scheduleNextRenewal();
263+
}
264+
265+
/**
266+
* Adds new messages for which the acknowledge deadlined should be automatically renewed. The
267+
* messages are identified by the subscription from which they were pulled and their
268+
* acknowledge id. Auto-renewal will take place until the messages are removed (see
269+
* {@link #remove(String, String)}).
270+
*
271+
* @param subscription the subscription from which the messages have been pulled
272+
* @param ackIds the acknowledge ids of the messages
273+
*/
274+
void add(String subscription, Iterable<String> ackIds) {
275+
synchronized (lock) {
276+
long deadline = clock.millis() + MIN_DEADLINE_MILLIS - DEADLINE_SLACK_MILLIS;
277+
for (String ackId : ackIds) {
278+
Message message = new Message(new MessageId(subscription, ackId), deadline);
279+
messageQueue.add(message);
280+
messageDeadlines.put(message.messageId(), deadline);
281+
}
282+
}
283+
scheduleNextRenewal();
284+
}
285+
286+
/**
287+
* Removes a message from this {@code AckDeadlineRenewer}. The message is identified by the
288+
* subscription from which it was pulled and its acknowledge id. Once the message is removed from
289+
* this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop.
290+
*
291+
* @param subscription the subscription from which the message has been pulled
292+
* @param ackId the message's acknowledge id
293+
*/
294+
void remove(String subscription, String ackId) {
295+
synchronized (lock) {
296+
messageDeadlines.remove(new MessageId(subscription, ackId));
297+
}
298+
}
299+
300+
@Override
301+
public void close() throws Exception {
302+
if (closed) {
303+
return;
304+
}
305+
closed = true;
306+
synchronized (lock) {
307+
messageDeadlines.clear();
308+
messageQueue.clear();
309+
}
310+
synchronized (futureLock) {
311+
if (renewerFuture != null) {
312+
renewerFuture.cancel(true);
313+
}
314+
}
315+
executorFactory.release(executor);
316+
}
317+
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java

+5
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ protected PubSubOptions(Builder builder) {
8585
super(PubSubFactory.class, PubSubRpcFactory.class, builder);
8686
}
8787

88+
@Override
89+
protected ExecutorFactory executorFactory() {
90+
return super.executorFactory();
91+
}
92+
8893
@Override
8994
protected PubSubFactory defaultServiceFactory() {
9095
return DefaultPubSubFactory.INSTANCE;

0 commit comments

Comments
 (0)