From 2b8154d23b62f7a0aa3126dd0da6276c9be28b82 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 23 Jan 2017 12:20:52 +1100 Subject: [PATCH 1/2] fix race in TestReceiver If TestReceiver is set so that messages must be explicitly acked, test code acks a message by calling replyNextOutstandingMessage. There is a race between calling the function and the message being delivered. Previously, if the function gets called before the message is delivered, the test fails since polling an empty deque returns a null pointer. This commit makes us wait until a message becomes available instead. --- .../pubsub/spi/v1/SubscriberImplTest.java | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 3b029327daca..09d7dca804f2 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -43,11 +43,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Deque; import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import org.joda.time.Duration; import org.junit.After; import org.junit.Before; @@ -86,8 +85,8 @@ public static Collection data() { private TestReceiver testReceiver; static class TestReceiver implements MessageReceiver { - private final Deque> outstandingMessageReplies = - new ConcurrentLinkedDeque<>(); + private final LinkedBlockingQueue> outstandingMessageReplies = + new LinkedBlockingQueue<>(); private AckReply ackReply = AckReply.ACK; private Optional messageCountLatch = Optional.absent(); private Optional error = Optional.absent(); @@ -123,13 +122,13 @@ public ListenableFuture receiveMessage(PubsubMessage message) { SettableFuture reply = SettableFuture.create(); if (explicitAckReplies) { - outstandingMessageReplies.add(reply); - } else { - if (error.isPresent()) { - reply.setException(error.get()); - } else { - reply.set(ackReply); + try { + outstandingMessageReplies.put(reply); + } catch (InterruptedException e) { + throw new IllegalStateException(e); } + } else { + replyTo(reply); } return reply; @@ -137,20 +136,29 @@ public ListenableFuture receiveMessage(PubsubMessage message) { public void replyNextOutstandingMessage() { Preconditions.checkState(explicitAckReplies); - - SettableFuture reply = outstandingMessageReplies.poll(); - if (error.isPresent()) { - reply.setException(error.get()); - } else { - reply.set(ackReply); + try { + replyTo(outstandingMessageReplies.take()); + } catch (InterruptedException e) { + throw new IllegalStateException(e); } } public void replyAllOutstandingMessage() { Preconditions.checkState(explicitAckReplies); + for (; ; ) { + SettableFuture reply = outstandingMessageReplies.poll(); + if (reply == null) { + return; + } + replyTo(reply); + } + } - while (!outstandingMessageReplies.isEmpty()) { - replyNextOutstandingMessage(); + private void replyTo(SettableFuture reply) { + if (error.isPresent()) { + reply.setException(error.get()); + } else { + reply.set(ackReply); } } } From 861a3186da44cdc872a2a6b9fbc6fd6e5f65284c Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 23 Jan 2017 16:36:48 +1100 Subject: [PATCH 2/2] PR comment --- .../com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 09d7dca804f2..990f004c2461 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -145,11 +145,8 @@ public void replyNextOutstandingMessage() { public void replyAllOutstandingMessage() { Preconditions.checkState(explicitAckReplies); - for (; ; ) { - SettableFuture reply = outstandingMessageReplies.poll(); - if (reply == null) { - return; - } + SettableFuture reply; + while ((reply = outstandingMessageReplies.poll()) != null) { replyTo(reply); } }