Skip to content

Commit 681108a

Browse files
committed
Add AckDeadlineRenewer class for automatic ack deadline renewal
1 parent b872399 commit 681108a

File tree

3 files changed

+512
-0
lines changed

3 files changed

+512
-0
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
20+
import com.google.cloud.ServiceOptions.Clock;
21+
import com.google.common.base.MoreObjects;
22+
import com.google.common.collect.LinkedListMultimap;
23+
import com.google.common.collect.ListMultimap;
24+
import com.google.common.collect.Multimaps;
25+
26+
import java.util.HashMap;
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
import java.util.Queue;
32+
import java.util.concurrent.ScheduledExecutorService;
33+
import java.util.concurrent.ScheduledFuture;
34+
import java.util.concurrent.TimeUnit;
35+
36+
/**
37+
* Class for an automatic ack deadline renewer. An ack deadline renewer automatically renews the
38+
* acknowledge deadline of messages added to it (via {@link #add(String, String)} or
39+
* {@link #add(String, Iterable)}. The acknowledge deadlines of added messages are renewed until the
40+
* messages are explicitly removed using either {@link #remove(String, String)} or
41+
* {@link #remove(String, Iterable)}.
42+
*/
43+
class AckDeadlineRenewer implements AutoCloseable {
44+
45+
private static final int MIN_DEADLINE_MILLISECONDS = 10_000;
46+
private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000;
47+
48+
private final PubSub pubsub;
49+
private final ScheduledExecutorService executor;
50+
private final ExecutorFactory executorFactory;
51+
private final Clock clock;
52+
private final Queue<Message> messageQueue;
53+
private final Map<MessageId, Long> messageDeadlines;
54+
private final ScheduledFuture<?> renewerFuture;
55+
private final Object lock = new Object();
56+
57+
/**
58+
* This class holds the identity of a message to renew: subscription and acknowledge id.
59+
*/
60+
private static class MessageId {
61+
62+
private final String subscription;
63+
private final String ackId;
64+
65+
MessageId(String subscription, String ackId) {
66+
this.subscription = subscription;
67+
this.ackId = ackId;
68+
}
69+
70+
String subscription() {
71+
return subscription;
72+
}
73+
74+
String ackId() {
75+
return ackId;
76+
}
77+
78+
@Override
79+
public boolean equals(Object obj) {
80+
if (obj == this) {
81+
return true;
82+
}
83+
if (!(obj instanceof MessageId)) {
84+
return false;
85+
}
86+
MessageId other = (MessageId) obj;
87+
return Objects.equals(other.subscription, this.subscription)
88+
&& Objects.equals(other.ackId, this.ackId);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hash(subscription, ackId);
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return MoreObjects.toStringHelper(this)
99+
.add("subscription", subscription)
100+
.add("ackId", ackId)
101+
.toString();
102+
}
103+
}
104+
105+
/**
106+
* This class holds the identity of a message to renew and its expected ack deadline.
107+
*/
108+
private static final class Message {
109+
110+
private final MessageId messageId;
111+
private final Long deadline;
112+
113+
Message(MessageId messageId, Long deadline) {
114+
this.messageId = messageId;
115+
this.deadline = deadline;
116+
}
117+
118+
MessageId messageId() {
119+
return messageId;
120+
}
121+
122+
Long expectedDeadline() {
123+
return deadline;
124+
}
125+
126+
@Override
127+
public boolean equals(Object obj) {
128+
if (obj == this) {
129+
return true;
130+
}
131+
if (!(obj instanceof Message)) {
132+
return false;
133+
}
134+
Message other = (Message) obj;
135+
return Objects.equals(other.messageId, this.messageId)
136+
&& Objects.equals(other.deadline, this.deadline);
137+
}
138+
139+
@Override
140+
public int hashCode() {
141+
return Objects.hash(messageId, deadline);
142+
}
143+
144+
@Override
145+
public String toString() {
146+
return MoreObjects.toStringHelper(this)
147+
.add("messageId", messageId)
148+
.add("expectedDeadline", deadline)
149+
.toString();
150+
}
151+
}
152+
153+
AckDeadlineRenewer(PubSub pubsub) {
154+
PubSubOptions options = pubsub.options();
155+
this.pubsub = pubsub;
156+
this.executorFactory = options.executorFactory();
157+
this.executor = executorFactory.get();
158+
this.clock = options.clock();
159+
this.messageQueue = new LinkedList<>();
160+
this.messageDeadlines = new HashMap<>();
161+
this.renewerFuture = this.executor.scheduleWithFixedDelay(new Runnable() {
162+
@Override
163+
public void run() {
164+
renewAckDeadlines();
165+
}
166+
}, 0, 1, TimeUnit.SECONDS);
167+
}
168+
169+
private void renewAckDeadlines() {
170+
ListMultimap<String, String> messagesToRenewNext = LinkedListMultimap.create();
171+
// At every activation we renew all ack deadlines that will expier in the following
172+
// RENEW_THRESHOLD_MILLISECONDS
173+
long threshold = clock.millis() + RENEW_THRESHOLD_MILLISECONDS;
174+
Message message;
175+
while ((message = nextMessageToRenew(threshold)) != null) {
176+
// If the expected deadline is null the message was removed and we must ignore it, otherwise
177+
// we renew its ack deadline
178+
if (message.expectedDeadline() != null) {
179+
messagesToRenewNext.put(message.messageId().subscription(), message.messageId().ackId());
180+
}
181+
}
182+
for (Map.Entry<String, List<String>> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) {
183+
// We send all ack deadline renewals for a subscription
184+
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLISECONDS,
185+
TimeUnit.MILLISECONDS, entry.getValue());
186+
}
187+
}
188+
189+
private Message nextMessageToRenew(long threshold) {
190+
synchronized (lock) {
191+
Message message = messageQueue.peek();
192+
// if the queue is empty or the next expected deadline is after threshold we stop
193+
if (message == null || message.expectedDeadline() > threshold) {
194+
return null;
195+
}
196+
MessageId messageId = messageQueue.poll().messageId();
197+
// Check if the next expected deadline changed. This can happen if the message was removed
198+
// from the ack deadline renewer or if it was nacked and then pulled again
199+
Long deadline = messageDeadlines.get(messageId);
200+
if (deadline == null || deadline > threshold) {
201+
// the message was removed (deadline == null) or removed and then added back
202+
// (deadline > threshold), we should not renew its deadline (yet)
203+
return new Message(messageId, null);
204+
} else {
205+
// Message deadline must be renewed, we must submit it again to the renewer
206+
add(messageId.subscription(), messageId.ackId());
207+
return new Message(messageId, deadline);
208+
}
209+
}
210+
}
211+
212+
/**
213+
* Adds a new message for which the acknowledge deadline should be automatically renewed. The
214+
* message is identified by the subscription from which it was pulled and its acknowledge id.
215+
* Auto-renewal will take place until the message is removed (see {@link #remove(String, String)}
216+
* or {@link #remove(String, Iterable)}).
217+
*
218+
* @param subscription the subscription from which the message has been pulled
219+
* @param ackId the message's acknowledge id
220+
*/
221+
void add(String subscription, String ackId) {
222+
synchronized (lock) {
223+
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
224+
Message message = new Message(new MessageId(subscription, ackId), deadline);
225+
messageQueue.add(message);
226+
messageDeadlines.put(message.messageId(), deadline);
227+
}
228+
}
229+
230+
/**
231+
* Adds new messages for which the acknowledge deadlined should be automatically renewed. The
232+
* messages are identified by the subscription from which they were pulled and their
233+
* acknowledge id. Auto-renewal will take place until the messages are removed (see
234+
* {@link #remove(String, String)} or {@link #remove(String, Iterable)}).
235+
*
236+
* @param subscription the subscription from which the messages have been pulled
237+
* @param ackIds the acknowledge ids of the messages
238+
*/
239+
void add(String subscription, Iterable<String> ackIds) {
240+
synchronized (lock) {
241+
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
242+
for (String ackId : ackIds) {
243+
Message message = new Message(new MessageId(subscription, ackId), deadline);
244+
messageQueue.add(message);
245+
messageDeadlines.put(message.messageId(), deadline);
246+
}
247+
}
248+
}
249+
250+
/**
251+
* Removes a message from this {@code AckDeadlineRenewer}. The message is identified by the
252+
* subscription from which it was pulled and its acknowledge id. Once the message is removed from
253+
* this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop.
254+
*
255+
* @param subscription the subscription from which the message has been pulled
256+
* @param ackId the message's acknowledge id
257+
*/
258+
void remove(String subscription, String ackId) {
259+
synchronized (lock) {
260+
messageDeadlines.remove(new MessageId(subscription, ackId));
261+
}
262+
}
263+
264+
@Override
265+
public void close() throws Exception {
266+
renewerFuture.cancel(false);
267+
executorFactory.release(executor);
268+
}
269+
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ protected PubSubOptions(Builder builder) {
8585
super(PubSubFactory.class, PubSubRpcFactory.class, builder);
8686
}
8787

88+
@Override
89+
protected ExecutorFactory executorFactory() {
90+
return super.executorFactory();
91+
}
92+
8893
@Override
8994
protected PubSubFactory defaultServiceFactory() {
9095
return DefaultPubSubFactory.INSTANCE;

0 commit comments

Comments
 (0)