Skip to content

Commit 7e5b51e

Browse files
committed
fix deadlock in testStreamAckDeadlineUpdate
Fixes #1577. The detailed description of the bug is available in the issue. This commit fixes the bug by ensuring that the Future received by MessageReceiver cannot be completed before a callback is added to it.
1 parent 16957b0 commit 7e5b51e

File tree

5 files changed

+18
-20
lines changed

5 files changed

+18
-20
lines changed

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
2020
import com.google.cloud.pubsub.spi.v1.Subscriber;
2121
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
22-
import com.google.common.util.concurrent.Futures;
23-
import com.google.common.util.concurrent.ListenableFuture;
2422
import com.google.common.util.concurrent.MoreExecutors;
23+
import com.google.common.util.concurrent.SettableFuture;
2524
import com.google.pubsub.v1.PubsubMessage;
2625
import com.google.pubsub.v1.PushConfig;
2726
import com.google.pubsub.v1.SubscriptionName;
@@ -44,9 +43,10 @@ public static void main(String... args) throws Exception {
4443
MessageReceiver receiver =
4544
new MessageReceiver() {
4645
@Override
47-
public ListenableFuture<MessageReceiver.AckReply> receiveMessage(PubsubMessage message) {
46+
public void receiveMessage(
47+
PubsubMessage message, SettableFuture<MessageReceiver.AckReply> response) {
4848
System.out.println("got message: " + message.getData().toStringUtf8());
49-
return Futures.immediateFuture(MessageReceiver.AckReply.ACK);
49+
response.set(MessageReceiver.AckReply.ACK);
5050
}
5151
};
5252
Subscriber subscriber = null;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.primitives.Ints;
2626
import com.google.common.util.concurrent.FutureCallback;
2727
import com.google.common.util.concurrent.Futures;
28+
import com.google.common.util.concurrent.SettableFuture;
2829
import com.google.pubsub.v1.PubsubMessage;
2930
import com.google.pubsub.v1.ReceivedMessage;
3031
import java.util.ArrayList;
@@ -278,11 +279,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
278279
for (ReceivedMessage userMessage : responseMessages) {
279280
final PubsubMessage message = userMessage.getMessage();
280281
final AckHandler ackHandler = acksIterator.next();
282+
final SettableFuture<AckReply> response = SettableFuture.create();
283+
Futures.addCallback(response, ackHandler);
281284
executor.submit(
282285
new Runnable() {
283286
@Override
284287
public void run() {
285-
Futures.addCallback(receiver.receiveMessage(message), ackHandler);
288+
receiver.receiveMessage(message, response);
286289
}
287290
});
288291
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.common.util.concurrent.ListenableFuture;
19+
import com.google.common.util.concurrent.SettableFuture;
2020
import com.google.pubsub.v1.PubsubMessage;
2121

2222
/** This interface can be implemented by users of {@link Subscriber} to receive messages. */
@@ -34,11 +34,10 @@ enum AckReply {
3434
*/
3535
NACK
3636
}
37-
37+
3838
/**
39-
* Called when a message is received by the subscriber.
40-
*
41-
* @return A future that signals when a message has been processed.
39+
* Called when a message is received by the subscriber. The implementation must arrange for {@code
40+
* reponse} to be set after processing the {@code message}.
4241
*/
43-
ListenableFuture<AckReply> receiveMessage(PubsubMessage message);
42+
void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response);
4443
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@
8181
* <pre><code>
8282
* MessageReceiver receiver = new MessageReceiver() {
8383
* &#64;Override
84-
* public ListenableFuture&lt;AckReply&gt; receiveMessage(PubsubMessage message) {
84+
* public void receiveMessage(PubsubMessage message, SettableFuture&lt;AckReply&gt; response) {
8585
* // ... process message ...
86-
* return Futures.immediateFuture(AckReply.ACK);
86+
* return response.set(AckReply.ACK);
8787
* }
8888
* }
8989
*

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.common.base.Optional;
2929
import com.google.common.base.Preconditions;
3030
import com.google.common.collect.ImmutableList;
31-
import com.google.common.util.concurrent.ListenableFuture;
3231
import com.google.common.util.concurrent.SettableFuture;
3332
import com.google.pubsub.v1.PubsubMessage;
3433
import com.google.pubsub.v1.PullResponse;
@@ -115,23 +114,20 @@ void waitForExpectedMessages() throws InterruptedException {
115114
}
116115

117116
@Override
118-
public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
117+
public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response) {
119118
if (messageCountLatch.isPresent()) {
120119
messageCountLatch.get().countDown();
121120
}
122-
SettableFuture<AckReply> reply = SettableFuture.create();
123121

124122
if (explicitAckReplies) {
125123
try {
126-
outstandingMessageReplies.put(reply);
124+
outstandingMessageReplies.put(response);
127125
} catch (InterruptedException e) {
128126
throw new IllegalStateException(e);
129127
}
130128
} else {
131-
replyTo(reply);
129+
replyTo(response);
132130
}
133-
134-
return reply;
135131
}
136132

137133
public void replyNextOutstandingMessage() {

0 commit comments

Comments
 (0)