Skip to content

Commit d4f296e

Browse files
committed
Add AckDeadlineRenewer class for automatic ack deadline renewal
1 parent b872399 commit d4f296e

File tree

3 files changed

+511
-0
lines changed

3 files changed

+511
-0
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub;
18+
19+
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
20+
import com.google.cloud.ServiceOptions.Clock;
21+
import com.google.common.base.MoreObjects;
22+
import com.google.common.collect.LinkedListMultimap;
23+
import com.google.common.collect.ListMultimap;
24+
import com.google.common.collect.Multimaps;
25+
26+
import java.util.HashMap;
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
import java.util.Queue;
32+
import java.util.concurrent.ScheduledExecutorService;
33+
import java.util.concurrent.ScheduledFuture;
34+
import java.util.concurrent.TimeUnit;
35+
36+
/**
37+
* Class for an automatic ack deadline renewer. An ack deadline renewer automatically renews the
38+
* acknowledge deadline of messages added to it (via {@link #add(String, String)} or
39+
* {@link #add(String, Iterable)}. The acknowledge deadlines of added messages are renewed until the
40+
* messages are explicitly removed using {@link #remove(String, String)}.
41+
*/
42+
class AckDeadlineRenewer implements AutoCloseable {
43+
44+
private static final int MIN_DEADLINE_MILLISECONDS = 10_000;
45+
private static final int RENEW_THRESHOLD_MILLISECONDS = 2_000;
46+
47+
private final PubSub pubsub;
48+
private final ScheduledExecutorService executor;
49+
private final ExecutorFactory executorFactory;
50+
private final Clock clock;
51+
private final Queue<Message> messageQueue;
52+
private final Map<MessageId, Long> messageDeadlines;
53+
private final ScheduledFuture<?> renewerFuture;
54+
private final Object lock = new Object();
55+
56+
/**
57+
* This class holds the identity of a message to renew: subscription and acknowledge id.
58+
*/
59+
private static class MessageId {
60+
61+
private final String subscription;
62+
private final String ackId;
63+
64+
MessageId(String subscription, String ackId) {
65+
this.subscription = subscription;
66+
this.ackId = ackId;
67+
}
68+
69+
String subscription() {
70+
return subscription;
71+
}
72+
73+
String ackId() {
74+
return ackId;
75+
}
76+
77+
@Override
78+
public boolean equals(Object obj) {
79+
if (obj == this) {
80+
return true;
81+
}
82+
if (!(obj instanceof MessageId)) {
83+
return false;
84+
}
85+
MessageId other = (MessageId) obj;
86+
return Objects.equals(other.subscription, this.subscription)
87+
&& Objects.equals(other.ackId, this.ackId);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
return Objects.hash(subscription, ackId);
93+
}
94+
95+
@Override
96+
public String toString() {
97+
return MoreObjects.toStringHelper(this)
98+
.add("subscription", subscription)
99+
.add("ackId", ackId)
100+
.toString();
101+
}
102+
}
103+
104+
/**
105+
* This class holds the identity of a message to renew and its expected ack deadline.
106+
*/
107+
private static final class Message {
108+
109+
private final MessageId messageId;
110+
private final Long deadline;
111+
112+
Message(MessageId messageId, Long deadline) {
113+
this.messageId = messageId;
114+
this.deadline = deadline;
115+
}
116+
117+
MessageId messageId() {
118+
return messageId;
119+
}
120+
121+
Long expectedDeadline() {
122+
return deadline;
123+
}
124+
125+
@Override
126+
public boolean equals(Object obj) {
127+
if (obj == this) {
128+
return true;
129+
}
130+
if (!(obj instanceof Message)) {
131+
return false;
132+
}
133+
Message other = (Message) obj;
134+
return Objects.equals(other.messageId, this.messageId)
135+
&& Objects.equals(other.deadline, this.deadline);
136+
}
137+
138+
@Override
139+
public int hashCode() {
140+
return Objects.hash(messageId, deadline);
141+
}
142+
143+
@Override
144+
public String toString() {
145+
return MoreObjects.toStringHelper(this)
146+
.add("messageId", messageId)
147+
.add("expectedDeadline", deadline)
148+
.toString();
149+
}
150+
}
151+
152+
AckDeadlineRenewer(PubSub pubsub) {
153+
PubSubOptions options = pubsub.options();
154+
this.pubsub = pubsub;
155+
this.executorFactory = options.executorFactory();
156+
this.executor = executorFactory.get();
157+
this.clock = options.clock();
158+
this.messageQueue = new LinkedList<>();
159+
this.messageDeadlines = new HashMap<>();
160+
this.renewerFuture = this.executor.scheduleWithFixedDelay(new Runnable() {
161+
@Override
162+
public void run() {
163+
renewAckDeadlines();
164+
}
165+
}, 0, 1, TimeUnit.SECONDS);
166+
}
167+
168+
private void renewAckDeadlines() {
169+
ListMultimap<String, String> messagesToRenewNext = LinkedListMultimap.create();
170+
// At every activation we renew all ack deadlines that will expier in the following
171+
// RENEW_THRESHOLD_MILLISECONDS
172+
long threshold = clock.millis() + RENEW_THRESHOLD_MILLISECONDS;
173+
Message message;
174+
while ((message = nextMessageToRenew(threshold)) != null) {
175+
// If the expected deadline is null the message was removed and we must ignore it, otherwise
176+
// we renew its ack deadline
177+
if (message.expectedDeadline() != null) {
178+
messagesToRenewNext.put(message.messageId().subscription(), message.messageId().ackId());
179+
}
180+
}
181+
for (Map.Entry<String, List<String>> entry : Multimaps.asMap(messagesToRenewNext).entrySet()) {
182+
// We send all ack deadline renewals for a subscription
183+
pubsub.modifyAckDeadlineAsync(entry.getKey(), MIN_DEADLINE_MILLISECONDS,
184+
TimeUnit.MILLISECONDS, entry.getValue());
185+
}
186+
}
187+
188+
private Message nextMessageToRenew(long threshold) {
189+
synchronized (lock) {
190+
Message message = messageQueue.peek();
191+
// if the queue is empty or the next expected deadline is after threshold we stop
192+
if (message == null || message.expectedDeadline() > threshold) {
193+
return null;
194+
}
195+
MessageId messageId = messageQueue.poll().messageId();
196+
// Check if the next expected deadline changed. This can happen if the message was removed
197+
// from the ack deadline renewer or if it was nacked and then pulled again
198+
Long deadline = messageDeadlines.get(messageId);
199+
if (deadline == null || deadline > threshold) {
200+
// the message was removed (deadline == null) or removed and then added back
201+
// (deadline > threshold), we should not renew its deadline (yet)
202+
return new Message(messageId, null);
203+
} else {
204+
// Message deadline must be renewed, we must submit it again to the renewer
205+
add(messageId.subscription(), messageId.ackId());
206+
return new Message(messageId, deadline);
207+
}
208+
}
209+
}
210+
211+
/**
212+
* Adds a new message for which the acknowledge deadline should be automatically renewed. The
213+
* message is identified by the subscription from which it was pulled and its acknowledge id.
214+
* Auto-renewal will take place until the message is removed (see {@link #remove(String, String)}
215+
* or {@link #remove(String, Iterable)}).
216+
*
217+
* @param subscription the subscription from which the message has been pulled
218+
* @param ackId the message's acknowledge id
219+
*/
220+
void add(String subscription, String ackId) {
221+
synchronized (lock) {
222+
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
223+
Message message = new Message(new MessageId(subscription, ackId), deadline);
224+
messageQueue.add(message);
225+
messageDeadlines.put(message.messageId(), deadline);
226+
}
227+
}
228+
229+
/**
230+
* Adds new messages for which the acknowledge deadlined should be automatically renewed. The
231+
* messages are identified by the subscription from which they were pulled and their
232+
* acknowledge id. Auto-renewal will take place until the messages are removed (see
233+
* {@link #remove(String, String)} or {@link #remove(String, Iterable)}).
234+
*
235+
* @param subscription the subscription from which the messages have been pulled
236+
* @param ackIds the acknowledge ids of the messages
237+
*/
238+
void add(String subscription, Iterable<String> ackIds) {
239+
synchronized (lock) {
240+
long deadline = clock.millis() + MIN_DEADLINE_MILLISECONDS;
241+
for (String ackId : ackIds) {
242+
Message message = new Message(new MessageId(subscription, ackId), deadline);
243+
messageQueue.add(message);
244+
messageDeadlines.put(message.messageId(), deadline);
245+
}
246+
}
247+
}
248+
249+
/**
250+
* Removes a message from this {@code AckDeadlineRenewer}. The message is identified by the
251+
* subscription from which it was pulled and its acknowledge id. Once the message is removed from
252+
* this {@code AckDeadlineRenewer}, automated ack deadline renewals will stop.
253+
*
254+
* @param subscription the subscription from which the message has been pulled
255+
* @param ackId the message's acknowledge id
256+
*/
257+
void remove(String subscription, String ackId) {
258+
synchronized (lock) {
259+
messageDeadlines.remove(new MessageId(subscription, ackId));
260+
}
261+
}
262+
263+
@Override
264+
public void close() throws Exception {
265+
renewerFuture.cancel(false);
266+
executorFactory.release(executor);
267+
}
268+
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ protected PubSubOptions(Builder builder) {
8585
super(PubSubFactory.class, PubSubRpcFactory.class, builder);
8686
}
8787

88+
@Override
89+
protected ExecutorFactory executorFactory() {
90+
return super.executorFactory();
91+
}
92+
8893
@Override
8994
protected PubSubFactory defaultServiceFactory() {
9095
return DefaultPubSubFactory.INSTANCE;

0 commit comments

Comments
 (0)