Skip to content

Commit d487307

Browse files
authored
Fix PubSub Iterator pullAsync: add callback to PubSubRpc.pull (#1048)
1 parent b137cad commit d487307

File tree

5 files changed

+160
-19
lines changed

5 files changed

+160
-19
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package com.google.cloud.pubsub;
1818

19-
import static com.google.api.client.util.Preconditions.checkArgument;
2019
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
2120
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
21+
import static com.google.common.base.Preconditions.checkArgument;
2222
import static com.google.common.util.concurrent.Futures.lazyTransform;
2323

2424
import com.google.cloud.AsyncPage;
@@ -27,6 +27,7 @@
2727
import com.google.cloud.Page;
2828
import com.google.cloud.PageImpl;
2929
import com.google.cloud.pubsub.spi.PubSubRpc;
30+
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
3031
import com.google.cloud.pubsub.spi.v1.PublisherApi;
3132
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
3233
import com.google.common.annotations.VisibleForTesting;
@@ -476,15 +477,24 @@ public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, in
476477
.setMaxMessages(maxMessages)
477478
.setReturnImmediately(true)
478479
.build();
479-
Future<PullResponse> response = rpc.pull(request);
480-
return lazyTransform(response, new Function<PullResponse, Iterator<ReceivedMessage>>() {
480+
PullFuture future = rpc.pull(request);
481+
future.addCallback(new PubSubRpc.PullCallback() {
481482
@Override
482-
public Iterator<ReceivedMessage> apply(PullResponse pullResponse) {
483-
// Add all received messages to the automatic ack deadline renewer
484-
List<String> ackIds = Lists.transform(pullResponse.getReceivedMessagesList(),
483+
public void success(PullResponse response) {
484+
List<String> ackIds = Lists.transform(response.getReceivedMessagesList(),
485485
MESSAGE_TO_ACK_ID_FUNCTION);
486486
ackDeadlineRenewer.add(subscription, ackIds);
487-
return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(),
487+
}
488+
489+
@Override
490+
public void failure(Throwable error) {
491+
// ignore
492+
}
493+
});
494+
return lazyTransform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
495+
@Override
496+
public Iterator<ReceivedMessage> apply(PullResponse response) {
497+
return Iterators.transform(response.getReceivedMessagesList().iterator(),
488498
new Function<com.google.pubsub.v1.ReceivedMessage, ReceivedMessage>() {
489499
@Override
490500
public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) {

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
3232
import com.google.common.base.Function;
3333
import com.google.common.collect.Sets;
34+
import com.google.common.util.concurrent.ForwardingListenableFuture;
35+
import com.google.common.util.concurrent.FutureCallback;
3436
import com.google.common.util.concurrent.Futures;
3537
import com.google.common.util.concurrent.ListenableFuture;
3638
import com.google.protobuf.Empty;
@@ -89,6 +91,30 @@ protected ExecutorFactory executorFactory() {
8991
}
9092
}
9193

94+
private static final class PullFutureImpl
95+
extends ForwardingListenableFuture.SimpleForwardingListenableFuture<PullResponse>
96+
implements PullFuture {
97+
98+
PullFutureImpl(ListenableFuture<PullResponse> delegate) {
99+
super(delegate);
100+
}
101+
102+
@Override
103+
public void addCallback(final PullCallback callback) {
104+
Futures.addCallback(delegate(), new FutureCallback<PullResponse>() {
105+
@Override
106+
public void onSuccess(PullResponse result) {
107+
callback.success(result);
108+
}
109+
110+
@Override
111+
public void onFailure(Throwable error) {
112+
callback.failure(error);
113+
}
114+
});
115+
}
116+
}
117+
92118
public DefaultPubSubRpc(PubSubOptions options) throws IOException {
93119
executorFactory = new InternalPubSubOptions(options).executorFactory();
94120
executor = executorFactory.get();
@@ -136,13 +162,13 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
136162
return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder);
137163
}
138164

139-
private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
140-
int... returnNullOn) {
165+
private static <V> ListenableFuture<V> translate(ListenableFuture<V> from,
166+
final boolean idempotent, int... returnNullOn) {
141167
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
142168
for (int value : returnNullOn) {
143169
returnNullOnSet.add(value);
144170
}
145-
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
171+
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
146172
@Override
147173
public V apply(ApiException exception) {
148174
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
@@ -224,8 +250,8 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
224250
}
225251

226252
@Override
227-
public Future<PullResponse> pull(PullRequest request) {
228-
return translate(subscriberApi.pullCallable().futureCall(request), false);
253+
public PullFuture pull(PullRequest request) {
254+
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
229255
}
230256

231257
@Override

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,44 @@
4141

4242
public interface PubSubRpc extends AutoCloseable {
4343

44+
/**
45+
* A callback that can be registered to {@link PullFuture} objects. Objects of this class allow
46+
* to asynchronously react to the success or failure of a pull RPC.
47+
*/
48+
interface PullCallback {
49+
50+
/**
51+
* This method is invoked with the result of a {@link PullFuture} when it was successful.
52+
*
53+
* @param response the pull response
54+
*/
55+
void success(PullResponse response);
56+
57+
/**
58+
* This method is invoked when the {@link PullFuture} failed or was cancelled.
59+
*
60+
* @param error the execption that caused the {@link PullFuture} to fail
61+
*/
62+
void failure(Throwable error);
63+
}
64+
65+
/**
66+
* A {@link Future} implementation for pull RPCs. This class also allows users to register
67+
* callbacks via {@link #addCallback(PullCallback)}.
68+
*/
69+
interface PullFuture extends Future<PullResponse> {
70+
71+
/**
72+
* Registers a callback to be run on the given executor. The listener will run when the pull
73+
* future completed its computation or, if the computation is already complete, immediately.
74+
* There is no guaranteed ordering of execution of callbacks.
75+
*
76+
* <p>Registered callbacks are run using the same thread that run the RPC call. Only lightweight
77+
* callbacks should be registered via this method.
78+
*/
79+
void addCallback(final PullCallback callback);
80+
}
81+
4482
// in all cases root cause of ExecutionException is PubSubException
4583
Future<Topic> create(Topic topic);
4684

@@ -66,7 +104,7 @@ public interface PubSubRpc extends AutoCloseable {
66104

67105
Future<Empty> acknowledge(AcknowledgeRequest request);
68106

69-
Future<PullResponse> pull(PullRequest request);
107+
PullFuture pull(PullRequest request);
70108

71109
Future<Empty> modify(ModifyPushConfigRequest request);
72110
}

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import static org.junit.Assert.assertNull;
2323
import static org.junit.Assert.assertTrue;
2424

25-
import com.google.api.client.util.Lists;
2625
import com.google.cloud.AsyncPage;
2726
import com.google.cloud.Page;
2827
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.Lists;
2929
import com.google.common.collect.Sets;
3030

3131
import org.junit.Ignore;

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import static org.junit.Assert.assertNull;
2424
import static org.junit.Assert.assertSame;
2525
import static org.junit.Assert.assertTrue;
26+
import static org.junit.Assert.fail;
2627

2728
import com.google.cloud.AsyncPage;
2829
import com.google.cloud.Page;
2930
import com.google.cloud.RetryParams;
3031
import com.google.cloud.pubsub.PubSub.ListOption;
3132
import com.google.cloud.pubsub.spi.PubSubRpc;
33+
import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback;
34+
import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture;
3235
import com.google.cloud.pubsub.spi.PubSubRpcFactory;
3336
import com.google.common.base.Function;
3437
import com.google.common.collect.ImmutableList;
@@ -55,13 +58,15 @@
5558
import com.google.pubsub.v1.PullRequest;
5659
import com.google.pubsub.v1.PullResponse;
5760

61+
import org.easymock.Capture;
5862
import org.easymock.EasyMock;
5963
import org.junit.After;
6064
import org.junit.Before;
6165
import org.junit.Rule;
6266
import org.junit.Test;
6367
import org.junit.rules.ExpectedException;
6468

69+
import java.io.IOException;
6570
import java.util.Iterator;
6671
import java.util.List;
6772
import java.util.concurrent.ExecutionException;
@@ -1229,7 +1234,7 @@ public void testListTopicSubscriptionsAsyncWithOptions()
12291234
}
12301235

12311236
@Test
1232-
public void testPullMessages() {
1237+
public void testPullMessages() throws ExecutionException, InterruptedException {
12331238
pubsub = new PubSubImpl(options, renewerMock);
12341239
PullRequest request = PullRequest.newBuilder()
12351240
.setSubscription(SUBSCRIPTION_NAME_PB)
@@ -1243,10 +1248,16 @@ public void testPullMessages() {
12431248
.addReceivedMessages(MESSAGE_PB1)
12441249
.addReceivedMessages(MESSAGE_PB2)
12451250
.build();
1246-
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
1251+
Capture<PullCallback> callback = Capture.newInstance();
1252+
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
1253+
futureMock.addCallback(EasyMock.capture(callback));
1254+
EasyMock.expectLastCall();
1255+
EasyMock.expect(futureMock.get()).andReturn(response);
1256+
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
12471257
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
1248-
EasyMock.replay(pubsubRpcMock, renewerMock);
1258+
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
12491259
Iterator<ReceivedMessage> messageIterator = pubsub.pull(SUBSCRIPTION, 42);
1260+
callback.getValue().success(response);
12501261
EasyMock.reset(renewerMock);
12511262
for (ReceivedMessage message : messageList) {
12521263
renewerMock.remove(SUBSCRIPTION, message.ackId());
@@ -1256,6 +1267,7 @@ public void testPullMessages() {
12561267
while (messageIterator.hasNext()) {
12571268
messageIterator.next();
12581269
}
1270+
EasyMock.verify(futureMock);
12591271
}
12601272

12611273
@Test
@@ -1273,10 +1285,16 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
12731285
.addReceivedMessages(MESSAGE_PB1)
12741286
.addReceivedMessages(MESSAGE_PB2)
12751287
.build();
1276-
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response));
1288+
Capture<PullCallback> callback = Capture.newInstance();
1289+
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
1290+
futureMock.addCallback(EasyMock.capture(callback));
1291+
EasyMock.expectLastCall();
1292+
EasyMock.expect(futureMock.get()).andReturn(response);
1293+
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
12771294
renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2"));
1278-
EasyMock.replay(pubsubRpcMock, renewerMock);
1295+
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
12791296
Iterator<ReceivedMessage> messageIterator = pubsub.pullAsync(SUBSCRIPTION, 42).get();
1297+
callback.getValue().success(response);
12801298
EasyMock.reset(renewerMock);
12811299
for (ReceivedMessage message : messageList) {
12821300
renewerMock.remove(SUBSCRIPTION, message.ackId());
@@ -1286,6 +1304,55 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
12861304
while (messageIterator.hasNext()) {
12871305
messageIterator.next();
12881306
}
1307+
EasyMock.verify(futureMock);
1308+
}
1309+
1310+
@Test
1311+
public void testPullMessagesError() throws ExecutionException, InterruptedException {
1312+
pubsub = new PubSubImpl(options, renewerMock);
1313+
PullRequest request = PullRequest.newBuilder()
1314+
.setSubscription(SUBSCRIPTION_NAME_PB)
1315+
.setMaxMessages(42)
1316+
.setReturnImmediately(true)
1317+
.build();
1318+
PubSubException exception = new PubSubException(new IOException(), false);
1319+
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
1320+
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
1321+
EasyMock.expectLastCall();
1322+
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
1323+
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
1324+
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
1325+
try {
1326+
pubsub.pull(SUBSCRIPTION, 42);
1327+
fail("Expected PubSubException");
1328+
} catch (PubSubException ex) {
1329+
assertSame(exception, ex);
1330+
}
1331+
EasyMock.verify(futureMock);
1332+
}
1333+
1334+
@Test
1335+
public void testPullMessagesAsyncError() throws ExecutionException, InterruptedException {
1336+
pubsub = new PubSubImpl(options, renewerMock);
1337+
PullRequest request = PullRequest.newBuilder()
1338+
.setSubscription(SUBSCRIPTION_NAME_PB)
1339+
.setMaxMessages(42)
1340+
.setReturnImmediately(true)
1341+
.build();
1342+
PubSubException exception = new PubSubException(new IOException(), false);
1343+
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
1344+
futureMock.addCallback(EasyMock.anyObject(PullCallback.class));
1345+
EasyMock.expectLastCall();
1346+
EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception));
1347+
EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock);
1348+
EasyMock.replay(pubsubRpcMock, renewerMock, futureMock);
1349+
try {
1350+
pubsub.pullAsync(SUBSCRIPTION, 42).get();
1351+
fail("Expected ExecutionException");
1352+
} catch (ExecutionException ex) {
1353+
assertSame(exception, ex.getCause());
1354+
}
1355+
EasyMock.verify(futureMock);
12891356
}
12901357

12911358
@Test

0 commit comments

Comments
 (0)