Skip to content

Commit 4e45df5

Browse files
authored
fix improper use of iteration (#1552)
* fix improper use of iteration The previous implementation modifies a HashMap while iterating through its key set, causeing unpredictable iteration behavior. This might be the reason our tests intermittently deadlock. The new implementation uses a PriorityQueue. The time complexity is O(M * log N), where M is the number of expirations before the cut over time and N is the total number of expirations. I am not sure how this compares to O(N) intended in the previous implementation. If required, O(N) is also possible using an ArrayList. Unfortunately, a new failure has emerged. Instead of deadlocking, testModifyAckDeadline intermittently fails. Maybe - I have fixed the old bug and created a new one, - I have fixed the old bug that was masking another one, - The deadlock wasn't caused by the iteration. Now the tests just fail before they could deadlock, or some combination thereof. The incorrect iteration should be fixed regardless. * bug fix don't process the same job twice record next expiration after the loop
1 parent 302a937 commit 4e45df5

File tree

2 files changed

+84
-73
lines changed

2 files changed

+84
-73
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

+84-71
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@
2828
import com.google.pubsub.v1.PubsubMessage;
2929
import com.google.pubsub.v1.ReceivedMessage;
3030
import java.util.ArrayList;
31-
import java.util.Collection;
32-
import java.util.HashMap;
31+
import java.util.Collections;
3332
import java.util.HashSet;
3433
import java.util.Iterator;
3534
import java.util.List;
36-
import java.util.Map;
35+
import java.util.PriorityQueue;
3736
import java.util.Set;
3837
import java.util.concurrent.ScheduledExecutorService;
3938
import java.util.concurrent.ScheduledFuture;
@@ -68,8 +67,7 @@ class MessageDispatcher {
6867
private final FlowController flowController;
6968
private final MessagesWaiter messagesWaiter;
7069

71-
// Map of outstanding messages (value) ordered by expiration time (key) in ascending order.
72-
private final Map<ExpirationInfo, List<AckHandler>> outstandingAckHandlers;
70+
private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
7371
private final Set<String> pendingAcks;
7472
private final Set<String> pendingNacks;
7573

@@ -82,40 +80,43 @@ class MessageDispatcher {
8280
// To keep track of number of seconds the receiver takes to process messages.
8381
private final Distribution ackLatencyDistribution;
8482

85-
private static class ExpirationInfo implements Comparable<ExpirationInfo> {
86-
private final Clock clock;
83+
// ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration.
84+
//
85+
// It is Comparable so that it may be put in a PriorityQueue.
86+
// For efficiency, it is also mutable, so great care should be taken to make sure
87+
// it is not modified while inside the queue.
88+
// The hashcode and equals methods are explicitly not implemented to discourage
89+
// the use of this class as keys in maps or similar containers.
90+
private static class ExtensionJob implements Comparable<ExtensionJob> {
8791
Instant expiration;
8892
int nextExtensionSeconds;
93+
ArrayList<AckHandler> ackHandlers;
8994

90-
ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) {
91-
this.clock = clock;
95+
ExtensionJob(
96+
Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
9297
this.expiration = expiration;
9398
nextExtensionSeconds = initialAckDeadlineExtension;
99+
this.ackHandlers = ackHandlers;
94100
}
95101

96-
void extendExpiration() {
97-
expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds));
102+
void extendExpiration(Instant now) {
103+
expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds));
98104
nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS);
99105
}
100106

101107
@Override
102-
public int hashCode() {
103-
return expiration.hashCode();
108+
public int compareTo(ExtensionJob other) {
109+
return expiration.compareTo(other.expiration);
104110
}
105111

106-
@Override
107-
public boolean equals(Object obj) {
108-
if (!(obj instanceof ExpirationInfo)) {
109-
return false;
112+
public String toString() {
113+
ArrayList<String> ackIds = new ArrayList<>();
114+
for (AckHandler ah : ackHandlers) {
115+
ackIds.add(ah.ackId);
110116
}
111-
112-
ExpirationInfo other = (ExpirationInfo) obj;
113-
return expiration.equals(other.expiration);
114-
}
115-
116-
@Override
117-
public int compareTo(ExpirationInfo other) {
118-
return expiration.compareTo(other.expiration);
117+
return String.format(
118+
"ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}",
119+
expiration, nextExtensionSeconds, ackIds);
119120
}
120121
}
121122

@@ -137,6 +138,12 @@ static class PendingModifyAckDeadline {
137138
public void addAckId(String ackId) {
138139
ackIds.add(ackId);
139140
}
141+
142+
public String toString() {
143+
return String.format(
144+
"PendingModifyAckDeadline{extension: %d sec, ackIds: %s}",
145+
deadlineExtensionSeconds, ackIds);
146+
}
140147
}
141148

142149
/**
@@ -217,7 +224,7 @@ void sendAckOperations(
217224
this.receiver = receiver;
218225
this.ackProcessor = ackProcessor;
219226
this.flowController = flowController;
220-
outstandingAckHandlers = new HashMap<>();
227+
outstandingAckHandlers = new PriorityQueue<>();
221228
pendingAcks = new HashSet<>();
222229
pendingNacks = new HashSet<>();
223230
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
@@ -257,18 +264,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
257264
}
258265
Instant now = new Instant(clock.millis());
259266
int totalByteCount = 0;
260-
final List<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
267+
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
261268
for (ReceivedMessage pubsubMessage : responseMessages) {
262269
int messageSize = pubsubMessage.getMessage().getSerializedSize();
263270
totalByteCount += messageSize;
264271
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
265272
}
266-
ExpirationInfo expiration =
267-
new ExpirationInfo(
268-
clock, now.plus(messageDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
269-
synchronized (outstandingAckHandlers) {
270-
addOutstadingAckHandlers(expiration, ackHandlers);
271-
}
273+
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
272274
logger.debug("Received {} messages at {}", responseMessages.size(), now);
273275
setupNextAckDeadlineExtensionAlarm(expiration);
274276

@@ -285,21 +287,24 @@ public void run() {
285287
}
286288
});
287289
}
290+
291+
// There is a race condition. setupNextAckDeadlineExtensionAlarm might set
292+
// an alarm that fires before this block can run.
293+
// The fix is to move setup below this block, but doing so aggravates another
294+
// race condition.
295+
// TODO(pongad): Fix both races.
296+
synchronized (outstandingAckHandlers) {
297+
outstandingAckHandlers.add(
298+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
299+
}
300+
288301
try {
289302
flowController.reserve(receivedMessagesCount, totalByteCount);
290303
} catch (FlowController.FlowControlException unexpectedException) {
291304
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
292305
}
293306
}
294307

295-
private void addOutstadingAckHandlers(
296-
ExpirationInfo expiration, final List<AckHandler> ackHandlers) {
297-
if (!outstandingAckHandlers.containsKey(expiration)) {
298-
outstandingAckHandlers.put(expiration, new ArrayList<AckHandler>(ackHandlers.size()));
299-
}
300-
outstandingAckHandlers.get(expiration).addAll(ackHandlers);
301-
}
302-
303308
private void setupPendingAcksAlarm() {
304309
alarmsLock.lock();
305310
try {
@@ -354,41 +359,49 @@ public void run() {
354359
now,
355360
cutOverTime,
356361
ackExpirationPadding);
357-
ExpirationInfo nextScheduleExpiration = null;
362+
Instant nextScheduleExpiration = null;
358363
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<>();
359364

365+
// Holding area for jobs we'll put back into the queue
366+
// so we don't process the same job twice.
367+
List<ExtensionJob> renewJobs = new ArrayList<>();
368+
360369
synchronized (outstandingAckHandlers) {
361-
for (ExpirationInfo messageExpiration : outstandingAckHandlers.keySet()) {
362-
if (messageExpiration.expiration.compareTo(cutOverTime) <= 0) {
363-
Collection<AckHandler> expiringAcks = outstandingAckHandlers.get(messageExpiration);
364-
outstandingAckHandlers.remove(messageExpiration);
365-
List<AckHandler> renewedAckHandlers = new ArrayList<>(expiringAcks.size());
366-
messageExpiration.extendExpiration();
367-
int extensionSeconds =
368-
Ints.saturatedCast(
369-
new Interval(now, messageExpiration.expiration)
370-
.toDuration()
371-
.getStandardSeconds());
372-
PendingModifyAckDeadline pendingModAckDeadline =
373-
new PendingModifyAckDeadline(extensionSeconds);
374-
for (AckHandler ackHandler : expiringAcks) {
375-
if (ackHandler.acked.get()) {
376-
continue;
377-
}
378-
pendingModAckDeadline.addAckId(ackHandler.ackId);
379-
renewedAckHandlers.add(ackHandler);
380-
}
381-
modifyAckDeadlinesToSend.add(pendingModAckDeadline);
382-
if (!renewedAckHandlers.isEmpty()) {
383-
addOutstadingAckHandlers(messageExpiration, renewedAckHandlers);
370+
while (!outstandingAckHandlers.isEmpty()
371+
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
372+
ExtensionJob job = outstandingAckHandlers.poll();
373+
374+
// If a message has already been acked, remove it, nothing to do.
375+
for (int i = 0; i < job.ackHandlers.size(); ) {
376+
if (job.ackHandlers.get(i).acked.get()) {
377+
Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1);
378+
job.ackHandlers.remove(job.ackHandlers.size() - 1);
384379
} else {
385-
outstandingAckHandlers.remove(messageExpiration);
380+
i++;
386381
}
387382
}
388-
if (nextScheduleExpiration == null
389-
|| nextScheduleExpiration.expiration.isAfter(messageExpiration.expiration)) {
390-
nextScheduleExpiration = messageExpiration;
383+
384+
if (job.ackHandlers.isEmpty()) {
385+
continue;
386+
}
387+
388+
job.extendExpiration(now);
389+
int extensionSeconds =
390+
Ints.saturatedCast(
391+
new Interval(now, job.expiration).toDuration().getStandardSeconds());
392+
PendingModifyAckDeadline pendingModAckDeadline =
393+
new PendingModifyAckDeadline(extensionSeconds);
394+
for (AckHandler ackHandler : job.ackHandlers) {
395+
pendingModAckDeadline.addAckId(ackHandler.ackId);
391396
}
397+
modifyAckDeadlinesToSend.add(pendingModAckDeadline);
398+
renewJobs.add(job);
399+
}
400+
for (ExtensionJob job : renewJobs) {
401+
outstandingAckHandlers.add(job);
402+
}
403+
if (!outstandingAckHandlers.isEmpty()) {
404+
nextScheduleExpiration = outstandingAckHandlers.peek().expiration;
392405
}
393406
}
394407

@@ -404,8 +417,8 @@ public void run() {
404417
}
405418
}
406419

407-
private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration) {
408-
Instant possibleNextAlarmTime = messageExpiration.expiration.minus(ackExpirationPadding);
420+
private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
421+
Instant possibleNextAlarmTime = expiration.minus(ackExpirationPadding);
409422
alarmsLock.lock();
410423
try {
411424
if (nextAckDeadlineExtensionAlarmTime.isAfter(possibleNextAlarmTime)) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java

-2
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ public void reset() throws IOException {
135135
*/
136136
@Override
137137
public void stop(Duration timeout) throws IOException, InterruptedException, TimeoutException {
138-
System.err.println("sending");
139138
sendPostRequest("/shutdown");
140-
System.err.println("sent");
141139
waitForProcess(timeout);
142140
}
143141
}

0 commit comments

Comments
 (0)