Skip to content

Commit daa24e5

Browse files
committed
wip
1 parent 5c094fa commit daa24e5

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
268268
}
269269
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
270270
logger.debug("Received {} messages at {}", responseMessages.size(), now);
271-
setupNextAckDeadlineExtensionAlarm(expiration);
272271

273272
messagesWaiter.incrementPendingMessages(responseMessages.size());
274273
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
@@ -288,6 +287,7 @@ public void run() {
288287
outstandingAckHandlers.add(
289288
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
290289
}
290+
setupNextAckDeadlineExtensionAlarm(expiration);
291291

292292
try {
293293
flowController.reserve(receivedMessagesCount, totalByteCount);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,29 @@ public void advanceTime(Duration toAdvance) {
9191
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
9292
DateTime cmpTime = new DateTime(clock.millis());
9393

94-
synchronized (pendingCallables) {
95-
while (!pendingCallables.isEmpty()
96-
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
97-
try {
98-
pendingCallables.poll().call();
99-
if (shutdown.get() && pendingCallables.isEmpty()) {
100-
pendingCallables.notifyAll();
101-
}
94+
for (;;) {
95+
PendingCallable<?> callable = null;
96+
synchronized (pendingCallables) {
97+
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
98+
break;
99+
}
100+
callable = pendingCallables.poll();
101+
}
102+
if (callable != null) {
103+
try{
104+
callable.call();
102105
} catch (Exception e) {
103106
// We ignore any callable exception, which should be set to the future but not relevant to
104107
// advanceTime.
105108
}
106109
}
107110
}
111+
112+
synchronized (pendingCallables) {
113+
if (shutdown.get() && pendingCallables.isEmpty()) {
114+
pendingCallables.notifyAll();
115+
}
116+
}
108117
}
109118

110119
@Override
@@ -172,6 +181,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
172181
synchronized (pendingCallables) {
173182
pendingCallables.add(callable);
174183
}
184+
tick(0, TimeUnit.MILLISECONDS);
175185
return callable.getScheduledFuture();
176186
}
177187

0 commit comments

Comments
 (0)