Skip to content

Commit 2c858ee

Browse files
authored
Fix race between executor insert and advanceTime (#1554)
This is a follow up to #1552 . The linked PR has a race condition: If the ExtensionJobs is added to the queue after the alarm is set up, it is possible that the alarm would fire before the jobs are added. Fixing this is easy, just set up the alarm after. However, doing this consistently deadlocks the tests. Why? The tests uses a fake executor. In tests, time does not flow naturally, but is forced to increment, usually by the executor's `advanceTime` method. There is a race between the test thread advancing the time and the mock server thread inserting more tasks to the fake executor. If the tasks get inserted first, all is well. Otherwise, `advanceTime` doesn't see the tasks, and they do not get executed. The fix is to check the "current time" every time a task is inserted. If the task is inserted "in the past", we run the task immediately. Doing this still deadlocks the tests. Why? The fake executor needs to lock the task queue when registering a task. If the task is inserted in the past, it also needs to run the task. Running the task might involve sending a requst to the mock server. A GRPC thread on the mock server might handle the request by adding more tasks to the executor. The executor's queue is locked by the first thread, resulting in a deadlock. The fix is to lock the queue just long enough to retrieve a task, then execute the task without the lock.
1 parent 4e45df5 commit 2c858ee

File tree

2 files changed

+23
-18
lines changed

2 files changed

+23
-18
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
272272
}
273273
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
274274
logger.debug("Received {} messages at {}", responseMessages.size(), now);
275-
setupNextAckDeadlineExtensionAlarm(expiration);
276275

277276
messagesWaiter.incrementPendingMessages(responseMessages.size());
278277
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
@@ -288,15 +287,11 @@ public void run() {
288287
});
289288
}
290289

291-
// There is a race condition. setupNextAckDeadlineExtensionAlarm might set
292-
// an alarm that fires before this block can run.
293-
// The fix is to move setup below this block, but doing so aggravates another
294-
// race condition.
295-
// TODO(pongad): Fix both races.
296290
synchronized (outstandingAckHandlers) {
297291
outstandingAckHandlers.add(
298292
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
299293
}
294+
setupNextAckDeadlineExtensionAlarm(expiration);
300295

301296
try {
302297
flowController.reserve(receivedMessagesCount, totalByteCount);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeScheduledExecutorService.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -79,32 +79,41 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
7979
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
8080
}
8181

82-
public void tick(long time, TimeUnit unit) {
83-
advanceTime(Duration.millis(unit.toMillis(time)));
84-
}
85-
8682
/**
8783
* This will advance the reference time of the executor and execute (in the same thread) any
8884
* outstanding callable which execution time has passed.
8985
*/
9086
public void advanceTime(Duration toAdvance) {
9187
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
88+
work();
89+
}
90+
91+
private void work() {
9292
DateTime cmpTime = new DateTime(clock.millis());
9393

94-
synchronized (pendingCallables) {
95-
while (!pendingCallables.isEmpty()
96-
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
97-
try {
98-
pendingCallables.poll().call();
99-
if (shutdown.get() && pendingCallables.isEmpty()) {
100-
pendingCallables.notifyAll();
101-
}
94+
for (;;) {
95+
PendingCallable<?> callable = null;
96+
synchronized (pendingCallables) {
97+
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
98+
break;
99+
}
100+
callable = pendingCallables.poll();
101+
}
102+
if (callable != null) {
103+
try{
104+
callable.call();
102105
} catch (Exception e) {
103106
// We ignore any callable exception, which should be set to the future but not relevant to
104107
// advanceTime.
105108
}
106109
}
107110
}
111+
112+
synchronized (pendingCallables) {
113+
if (shutdown.get() && pendingCallables.isEmpty()) {
114+
pendingCallables.notifyAll();
115+
}
116+
}
108117
}
109118

110119
@Override
@@ -172,6 +181,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
172181
synchronized (pendingCallables) {
173182
pendingCallables.add(callable);
174183
}
184+
work();
175185
return callable.getScheduledFuture();
176186
}
177187

0 commit comments

Comments
 (0)