Skip to content

Fix race between executor insert and advanceTime #1554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 24, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -68,8 +67,7 @@ class MessageDispatcher {
private final FlowController flowController;
private final MessagesWaiter messagesWaiter;

// Map of outstanding messages (value) ordered by expiration time (key) in ascending order.
private final Map<ExpirationInfo, List<AckHandler>> outstandingAckHandlers;
private final PriorityQueue<ExtensionJob> outstandingAckHandlers;
private final Set<String> pendingAcks;
private final Set<String> pendingNacks;

Expand All @@ -82,40 +80,43 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private static class ExpirationInfo implements Comparable<ExpirationInfo> {
private final Clock clock;
// ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration.
//
// It is Comparable so that it may be put in a PriorityQueue.
// For efficiency, it is also mutable, so great care should be taken to make sure
// it is not modified while inside the queue.
// The hashcode and equals methods are explicitly not implemented to discourage
// the use of this class as keys in maps or similar containers.
private static class ExtensionJob implements Comparable<ExtensionJob> {
Instant expiration;
int nextExtensionSeconds;
ArrayList<AckHandler> ackHandlers;

ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) {
this.clock = clock;
ExtensionJob(
Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
this.expiration = expiration;
nextExtensionSeconds = initialAckDeadlineExtension;
this.ackHandlers = ackHandlers;
}

void extendExpiration() {
expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds));
void extendExpiration(Instant now) {
expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds));
nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS);
}

@Override
public int hashCode() {
return expiration.hashCode();
public int compareTo(ExtensionJob other) {
return expiration.compareTo(other.expiration);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ExpirationInfo)) {
return false;
public String toString() {
ArrayList<String> ackIds = new ArrayList<>();
for (AckHandler ah : ackHandlers) {
ackIds.add(ah.ackId);
}

ExpirationInfo other = (ExpirationInfo) obj;
return expiration.equals(other.expiration);
}

@Override
public int compareTo(ExpirationInfo other) {
return expiration.compareTo(other.expiration);
return String.format(
"ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}",
expiration, nextExtensionSeconds, ackIds);
}
}

Expand All @@ -137,6 +138,12 @@ static class PendingModifyAckDeadline {
public void addAckId(String ackId) {
ackIds.add(ackId);
}

public String toString() {
return String.format(
"PendingModifyAckDeadline{extension: %d sec, ackIds: %s}",
deadlineExtensionSeconds, ackIds);
}
}

/**
Expand Down Expand Up @@ -217,7 +224,7 @@ void sendAckOperations(
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
outstandingAckHandlers = new HashMap<>();
outstandingAckHandlers = new PriorityQueue<>();
pendingAcks = new HashSet<>();
pendingNacks = new HashSet<>();
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
Expand Down Expand Up @@ -257,20 +264,14 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
}
Instant now = new Instant(clock.millis());
int totalByteCount = 0;
final List<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
for (ReceivedMessage pubsubMessage : responseMessages) {
int messageSize = pubsubMessage.getMessage().getSerializedSize();
totalByteCount += messageSize;
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
}
ExpirationInfo expiration =
new ExpirationInfo(
clock, now.plus(messageDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
synchronized (outstandingAckHandlers) {
addOutstadingAckHandlers(expiration, ackHandlers);
}
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
logger.debug("Received {} messages at {}", responseMessages.size(), now);
setupNextAckDeadlineExtensionAlarm(expiration);

messagesWaiter.incrementPendingMessages(responseMessages.size());
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
Expand All @@ -285,21 +286,20 @@ public void run() {
}
});
}

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
}

private void addOutstadingAckHandlers(
ExpirationInfo expiration, final List<AckHandler> ackHandlers) {
if (!outstandingAckHandlers.containsKey(expiration)) {
outstandingAckHandlers.put(expiration, new ArrayList<AckHandler>(ackHandlers.size()));
}
outstandingAckHandlers.get(expiration).addAll(ackHandlers);
}

private void setupPendingAcksAlarm() {
alarmsLock.lock();
try {
Expand Down Expand Up @@ -354,41 +354,49 @@ public void run() {
now,
cutOverTime,
ackExpirationPadding);
ExpirationInfo nextScheduleExpiration = null;
Instant nextScheduleExpiration = null;
List<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<>();

// Holding area for jobs we'll put back into the queue
// so we don't process the same job twice.
List<ExtensionJob> renewJobs = new ArrayList<>();

synchronized (outstandingAckHandlers) {
for (ExpirationInfo messageExpiration : outstandingAckHandlers.keySet()) {
if (messageExpiration.expiration.compareTo(cutOverTime) <= 0) {
Collection<AckHandler> expiringAcks = outstandingAckHandlers.get(messageExpiration);
outstandingAckHandlers.remove(messageExpiration);
List<AckHandler> renewedAckHandlers = new ArrayList<>(expiringAcks.size());
messageExpiration.extendExpiration();
int extensionSeconds =
Ints.saturatedCast(
new Interval(now, messageExpiration.expiration)
.toDuration()
.getStandardSeconds());
PendingModifyAckDeadline pendingModAckDeadline =
new PendingModifyAckDeadline(extensionSeconds);
for (AckHandler ackHandler : expiringAcks) {
if (ackHandler.acked.get()) {
continue;
}
pendingModAckDeadline.addAckId(ackHandler.ackId);
renewedAckHandlers.add(ackHandler);
}
modifyAckDeadlinesToSend.add(pendingModAckDeadline);
if (!renewedAckHandlers.isEmpty()) {
addOutstadingAckHandlers(messageExpiration, renewedAckHandlers);
while (!outstandingAckHandlers.isEmpty()
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
ExtensionJob job = outstandingAckHandlers.poll();

// If a message has already been acked, remove it, nothing to do.
for (int i = 0; i < job.ackHandlers.size(); ) {
if (job.ackHandlers.get(i).acked.get()) {
Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1);
job.ackHandlers.remove(job.ackHandlers.size() - 1);
} else {
outstandingAckHandlers.remove(messageExpiration);
i++;
}
}
if (nextScheduleExpiration == null
|| nextScheduleExpiration.expiration.isAfter(messageExpiration.expiration)) {
nextScheduleExpiration = messageExpiration;

if (job.ackHandlers.isEmpty()) {
continue;
}

job.extendExpiration(now);
int extensionSeconds =
Ints.saturatedCast(
new Interval(now, job.expiration).toDuration().getStandardSeconds());
PendingModifyAckDeadline pendingModAckDeadline =
new PendingModifyAckDeadline(extensionSeconds);
for (AckHandler ackHandler : job.ackHandlers) {
pendingModAckDeadline.addAckId(ackHandler.ackId);
}
modifyAckDeadlinesToSend.add(pendingModAckDeadline);
renewJobs.add(job);
}
for (ExtensionJob job : renewJobs) {
outstandingAckHandlers.add(job);
}
if (!outstandingAckHandlers.isEmpty()) {
nextScheduleExpiration = outstandingAckHandlers.peek().expiration;
}
}

Expand All @@ -404,8 +412,8 @@ public void run() {
}
}

private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration) {
Instant possibleNextAlarmTime = messageExpiration.expiration.minus(ackExpirationPadding);
private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
Instant possibleNextAlarmTime = expiration.minus(ackExpirationPadding);
alarmsLock.lock();
try {
if (nextAckDeadlineExtensionAlarmTime.isAfter(possibleNextAlarmTime)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void reset() throws IOException {
*/
@Override
public void stop(Duration timeout) throws IOException, InterruptedException, TimeoutException {
System.err.println("sending");
sendPostRequest("/shutdown");
System.err.println("sent");
waitForProcess(timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,41 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
}

public void tick(long time, TimeUnit unit) {
advanceTime(Duration.millis(unit.toMillis(time)));
}

/**
* This will advance the reference time of the executor and execute (in the same thread) any
* outstanding callable which execution time has passed.
*/
public void advanceTime(Duration toAdvance) {
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
work();
}

private void work() {
DateTime cmpTime = new DateTime(clock.millis());

synchronized (pendingCallables) {
while (!pendingCallables.isEmpty()
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
try {
pendingCallables.poll().call();
if (shutdown.get() && pendingCallables.isEmpty()) {
pendingCallables.notifyAll();
}
for (;;) {
PendingCallable<?> callable = null;
synchronized (pendingCallables) {
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
break;
}
callable = pendingCallables.poll();
}
if (callable != null) {
try{
callable.call();
} catch (Exception e) {
// We ignore any callable exception, which should be set to the future but not relevant to
// advanceTime.
}
}
}

synchronized (pendingCallables) {
if (shutdown.get() && pendingCallables.isEmpty()) {
pendingCallables.notifyAll();
}
}
}

@Override
Expand Down Expand Up @@ -172,6 +181,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
synchronized (pendingCallables) {
pendingCallables.add(callable);
}
work();
return callable.getScheduledFuture();
}

Expand Down