From 1c502eb7672a2899c3df1c6c2d7e96e62b86a53a Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 23 Jun 2016 14:21:58 +0200 Subject: [PATCH] Add javadoc and tests for Subscription --- .../java/com/google/cloud/pubsub/PubSub.java | 7 +- .../com/google/cloud/pubsub/Subscription.java | 156 ++++++++- .../google/cloud/pubsub/SubscriptionInfo.java | 33 +- .../google/cloud/pubsub/SubscriptionTest.java | 327 ++++++++++++++++++ 4 files changed, 494 insertions(+), 29 deletions(-) create mode 100644 gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 042f64c317bf..a548b3850d15 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -173,7 +173,7 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request for creating a topic. This method returns a {@code Future} object to consume - * the result. {@link Future#get()} returns the created topic or {@code null} if not found. + * the result. {@link Future#get()} returns the created topic. */ Future createAsync(TopicInfo topic); @@ -311,8 +311,7 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request for creating a subscription. This method returns a {@code Future} object to - * consume the result. {@link Future#get()} returns the created subscription or {@code null} if - * not found. + * consume the result. {@link Future#get()} returns the created subscription. */ Future createAsync(SubscriptionInfo subscription); @@ -463,7 +462,7 @@ interface MessageConsumer extends AutoCloseable { Future> pullAsync(String subscription, int maxMessages); /** - * Creates a message consumer that pulls messages for the provided subscription. You can stop + * Creates a message consumer that pulls messages from the provided subscription. You can stop * pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer * executes {@link MessageProcessor#process(Message)} on each pulled message. If * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index 9e1ed2f53579..d96d51015fd3 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.cloud.GrpcServiceOptions; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.cloud.pubsub.PubSub.PullOption; @@ -30,7 +31,31 @@ import java.util.concurrent.Future; /** - * PubSub subscription. + * A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a + * single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions + * support both push and pull message delivery. + * + *

In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a + * preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an + * implicit acknowledgement: a success response indicates that the message has been succesfully + * processed and the Pub/Sub system can delete it from the subscription; a non-success response + * indicates that the Pub/Sub server should resend it (implicit "nack"). + * + *

In a pull subscription, the subscribing application must explicitly pull messages using one of + * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or + * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. + * When messages are pulled with {@link PubSub#pull(String, int)} or + * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly + * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, + * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or + * {@link PubSub#ackAsync(String, String, String...)}. + * + *

{@code Subscription} adds a layer of service-related functionality over + * {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} + * object with the most recent information use {@link #reload} or {@link #reloadAsync}. + * + * @see Pub/Sub Data Model + * @see Subscriber Guide */ public class Subscription extends SubscriptionInfo { @@ -39,6 +64,9 @@ public class Subscription extends SubscriptionInfo { private final PubSubOptions options; private transient PubSub pubsub; + /** + * A builder for {@code Subscription} objects. + */ public static final class Builder extends SubscriptionInfo.Builder { private final PubSub pubsub; @@ -103,62 +131,172 @@ public Builder toBuilder() { } @Override - public int hashCode() { + public final int hashCode() { return Objects.hash(options, super.hashCode()); } @Override - public boolean equals(Object obj) { + public final boolean equals(Object obj) { if (this == obj) { return true; } - if (obj == null || getClass() != obj.getClass()) { + if (obj == null || !obj.getClass().equals(Subscription.class)) { return false; } Subscription other = (Subscription) obj; - return Objects.equals(topic(), other.topic()) - && Objects.equals(name(), other.name()) - && Objects.equals(pushConfig(), other.pushConfig()) - && ackDeadlineSeconds() == other.ackDeadlineSeconds() - && Objects.equals(options, other.options); + return baseEquals(other) && Objects.equals(options, other.options); } + /** + * Returns the subscription's {@code PubSub} object used to issue requests. + */ public PubSub pubSub() { return pubsub; } + /** + * Deletes this subscription. + * + * @return {@code true} if the subscription was deleted, {@code false} if it was not found + * @throws PubSubException upon failure + */ public boolean delete() { return pubsub.deleteSubscription(name()); } + /** + * Sends a request for deleting this subscription. This method returns a {@code Future} object to + * consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted, + * {@code false} if it was not found. + */ public Future deleteAsync() { return pubsub.deleteSubscriptionAsync(name()); } + /** + * Fetches current subscription's latest information. Returns {@code null} if the subscription + * does not exist. + * + * @return a {@code Subscription} object with latest information or {@code null} if not found + * @throws PubSubException upon failure + */ public Subscription reload() { return pubsub.getSubscription(name()); } + /** + * Sends a request for fetching current subscription's latest information. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns the requested + * subscription or {@code null} if not found. + * + * @return a {@code Subscription} object with latest information or {@code null} if not found + * @throws PubSubException upon failure + */ public Future reloadAsync() { return pubsub.getSubscriptionAsync(name()); } + /** + * Sets the push configuration for this subscription. This may be used to change a push + * subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa. + * This methods can also be used to change the endpoint URL and other attributes of a push + * subscription. Messages will accumulate for delivery regardless of changes to the push + * configuration. + * + * @param pushConfig the new push configuration. Use {@code null} to unset it + * @throws PubSubException upon failure, or if the subscription does not exist + */ public void replacePushConfig(PushConfig pushConfig) { pubsub.replacePushConfig(name(), pushConfig); } + /** + * Sends a request for updating the push configuration for a specified subscription. This may be + * used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig} + * parameter) or vice versa. This methods can also be used to change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for delivery regardless of changes + * to the push configuration. The method returns a {@code Future} object that can be used to wait + * for the replace operation to be completed. + * + * @param pushConfig the new push configuration. Use {@code null} to unset it + * @return a {@code Future} to wait for the replace operation to be completed. + */ public Future replacePushConfigAsync(PushConfig pushConfig) { return pubsub.replacePushConfigAsync(name(), pushConfig); } + /** + * Pulls messages from this subscription. This method possibly returns no messages if no message + * was available at the time the request was processed by the Pub/Sub service (i.e. the system is + * not allowed to wait until at least one message is available). Pulled messages have their + * acknowledge deadline automatically renewed until they are explicitly consumed using + * {@link Iterator#next()}. + * + *

Example usage of synchronous message pulling: + *

 {@code
+   * Iterator messageIterator = pubsub.pull("subscription", 100);
+   * while (messageIterator.hasNext()) {
+   *   ReceivedMessage message = messageIterator.next();
+   *   // message's acknowledge deadline is no longer automatically renewed. If processing takes
+   *   // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
+   *   doSomething(message);
+   *   message.ack(); // or message.nack()
+   * }}
+ * + * @param maxMessages the maximum number of messages pulled by this method. This method can + * possibly return fewer messages. + * @throws PubSubException upon failure + */ public Iterator pull(int maxMessages) { return pubsub.pull(name(), maxMessages); } + /** + * Sends a request for pulling messages from this subscription. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator. + * This method possibly returns no messages if no message was available at the time the request + * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one + * message is available). + * + *

Example usage of asynchronous message pulling: + *

 {@code
+   * Future> future = pubsub.pull("subscription", 100);
+   * // do something while the request gets processed
+   * Iterator messageIterator = future.get();
+   * while (messageIterator.hasNext()) {
+   *   ReceivedMessage message = messageIterator.next();
+   *   // message's acknowledge deadline is no longer automatically renewed. If processing takes
+   *   // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
+   *   doSomething(message);
+   *   message.ack(); // or message.nack()
+   * }}
+ * + * @param maxMessages the maximum number of messages pulled by this method. This method can + * possibly return fewer messages. + * @throws PubSubException upon failure + */ public Future> pullAsync(int maxMessages) { return pubsub.pullAsync(name(), maxMessages); } + /** + * Creates a message consumer that pulls messages from this subscription. You can stop pulling + * messages by calling {@link MessageConsumer#close()}. The returned message consumer executes + * {@link MessageProcessor#process(Message)} on each pulled message. If + * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If + * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For + * all pulled messages, the ack deadline is automatically renewed until the message is either + * acknowledged or "nacked". + * + *

The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum + * number of queued messages (messages either being processed or waiting to be processed). The + * {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide + * an executor to run message processor callbacks. + * + * @param callback the callback to be executed on each message + * @param options pulling options + * @return a message consumer for the provided subscription and options + */ public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) { return pubsub.pullAsync(name(), callback, options); } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java index 8e619b5bf633..e85a20b53334 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -37,10 +37,11 @@ * indicates that the Pub/Sub server should resend it (implicit "nack"). * *

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, PubSub.PullOption...)}, - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor)} or - * {@link PubSub#pullAsync(String, PubSub.PullOption...)}. The subscribing application must then - * explicitly acknowledge the messages using one of {@link PubSub#ack(String, Iterable)}, + * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or + * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. + * When messages are pulled with {@link PubSub#pull(String, int)} or + * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly + * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or * {@link PubSub#ackAsync(String, String, String...)}. * @@ -190,7 +191,7 @@ public TopicId topic() { } /** - * Sets the name of the subscription. The name must start with a letter, and contain only + * Returns the name of the subscription. The name must start with a letter, and contain only * letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores * ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs * ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the @@ -223,19 +224,19 @@ public long ackDeadlineSeconds() { return ackDeadlineSeconds; } + final boolean baseEquals(SubscriptionInfo subscriptionInfo) { + return Objects.equals(topic, subscriptionInfo.topic) + && Objects.equals(name, subscriptionInfo.name) + && Objects.equals(pushConfig, subscriptionInfo.pushConfig) + && ackDeadlineSeconds == subscriptionInfo.ackDeadlineSeconds; + } + @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || !obj.getClass().equals(this.getClass())) { - return false; - } - SubscriptionInfo other = (SubscriptionInfo) obj; - return Objects.equals(topic, other.topic) - && Objects.equals(name, other.name) - && Objects.equals(pushConfig, other.pushConfig) - && ackDeadlineSeconds == other.ackDeadlineSeconds; + return obj == this + || obj != null + && obj.getClass().equals(SubscriptionInfo.class) + && baseEquals((SubscriptionInfo) obj); } @Override diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java new file mode 100644 index 000000000000..b04fd800759f --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java @@ -0,0 +1,327 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.easymock.EasyMock.createStrictMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.pubsub.PubSub.MessageConsumer; +import com.google.cloud.pubsub.PubSub.MessageProcessor; +import com.google.cloud.pubsub.PubSub.PullOption; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; + +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class SubscriptionTest { + + private static final TopicId TOPIC_ID = TopicId.of("project", "topic"); + private static final String NAME = "subscription"; + private static final String ENDPOINT = "https://example.com/push"; + private static final PushConfig PUSH_CONFIG = PushConfig.of(ENDPOINT); + private static final int ACK_DEADLINE = 42; + private static final SubscriptionInfo SUBSCRIPTION_INFO =SubscriptionInfo.builder(TOPIC_ID, NAME) + .pushConfig(PUSH_CONFIG) + .ackDeadLineSeconds(ACK_DEADLINE) + .build(); + private static final Message MESSAGE1 = Message.of("payload1"); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE_PB1 = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setMessage(MESSAGE1.toPb()) + .setAckId("ackId1") + .build(); + private static final Message MESSAGE2 = Message.of("payload2"); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE_PB2 = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setMessage(MESSAGE2.toPb()) + .setAckId("ackId2") + .build(); + + private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class); + private final PubSubOptions mockOptions = createStrictMock(PubSubOptions.class); + private PubSub pubsub; + private Subscription expectedSubscription; + private Subscription subscription; + + private void initializeExpectedSubscription(int optionsCalls) { + expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls); + replay(serviceMockReturnsOptions); + pubsub = createStrictMock(PubSub.class); + expectedSubscription = new Subscription(serviceMockReturnsOptions, + new Subscription.BuilderImpl(SUBSCRIPTION_INFO)); + } + + private void initializeSubscription() { + subscription = new Subscription(pubsub, new Subscription.BuilderImpl(SUBSCRIPTION_INFO)); + } + + @After + public void tearDown() throws Exception { + verify(pubsub, serviceMockReturnsOptions); + } + + @Test + public void testBuilder() { + initializeExpectedSubscription(2); + replay(pubsub); + assertEquals(TOPIC_ID, expectedSubscription.topic()); + assertEquals(NAME, expectedSubscription.name()); + assertEquals(PUSH_CONFIG, expectedSubscription.pushConfig()); + assertEquals(ACK_DEADLINE, expectedSubscription.ackDeadlineSeconds()); + Subscription builtSubscription = expectedSubscription.toBuilder() + .name("newSubscription") + .topic("newProject", "newTopic") + .pushConfig(null) + .ackDeadLineSeconds(10) + .build(); + assertEquals(TopicId.of("newProject", "newTopic"), builtSubscription.topic()); + assertEquals("newSubscription", builtSubscription.name()); + assertEquals(null, builtSubscription.pushConfig()); + assertEquals(10, builtSubscription.ackDeadlineSeconds()); + } + + @Test + public void testToBuilder() { + initializeExpectedSubscription(2); + replay(pubsub); + compareSubscription(expectedSubscription, expectedSubscription.toBuilder().build()); + } + + @Test + public void testReload() { + initializeExpectedSubscription(2); + SubscriptionInfo updatedInfo = SUBSCRIPTION_INFO.toBuilder().name("newSubscription").build(); + Subscription expectedSubscription = + new Subscription(serviceMockReturnsOptions, new SubscriptionInfo.BuilderImpl(updatedInfo)); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscription(NAME)).andReturn(expectedSubscription); + replay(pubsub); + initializeSubscription(); + Subscription updatedSubscription = subscription.reload(); + compareSubscription(expectedSubscription, updatedSubscription); + } + + @Test + public void testReloadNull() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscription(NAME)).andReturn(null); + replay(pubsub); + initializeSubscription(); + assertNull(subscription.reload()); + } + + @Test + public void testReloadAsync() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(2); + SubscriptionInfo updatedInfo = SUBSCRIPTION_INFO.toBuilder().name("newSubscription").build(); + Subscription expectedSubscription = + new Subscription(serviceMockReturnsOptions, new SubscriptionInfo.BuilderImpl(updatedInfo)); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscriptionAsync(NAME)) + .andReturn(Futures.immediateFuture(expectedSubscription)); + replay(pubsub); + initializeSubscription(); + Subscription updatedSubscription = subscription.reloadAsync().get(); + compareSubscription(expectedSubscription, updatedSubscription); + } + + @Test + public void testReloadAsyncNull() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscriptionAsync(NAME)) + .andReturn(Futures.immediateFuture(null)); + replay(pubsub); + initializeSubscription(); + assertNull(subscription.reloadAsync().get()); + } + + @Test + public void testDeleteTrue() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteSubscription(NAME)).andReturn(true); + replay(pubsub); + initializeSubscription(); + assertTrue(subscription.delete()); + } + + @Test + public void testDeleteFalse() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteSubscription(NAME)).andReturn(false); + replay(pubsub); + initializeSubscription(); + assertFalse(subscription.delete()); + } + + @Test + public void testDeleteAsyncTrue() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteSubscriptionAsync(NAME)) + .andReturn(Futures.immediateFuture(true)); + replay(pubsub); + initializeSubscription(); + assertTrue(subscription.deleteAsync().get()); + } + + @Test + public void testDeleteAsyncFalse() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.deleteSubscriptionAsync(NAME)) + .andReturn(Futures.immediateFuture(false)); + replay(pubsub); + initializeSubscription(); + assertFalse(subscription.deleteAsync().get()); + } + + @Test + public void testReplacePushConfig() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + PushConfig pushConfig = PushConfig.of("https://example.com/newPush"); + pubsub.replacePushConfig(NAME, pushConfig); + EasyMock.expectLastCall(); + replay(pubsub); + initializeSubscription(); + subscription.replacePushConfig(pushConfig); + } + + @Test + public void testReplacePushConfig_Null() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + pubsub.replacePushConfig(NAME, null); + EasyMock.expectLastCall(); + replay(pubsub); + initializeSubscription(); + subscription.replacePushConfig(null); + } + + @Test + public void testReplacePushConfig_Async() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + PushConfig pushConfig = PushConfig.of("https://example.com/newPush"); + expect(pubsub.replacePushConfigAsync(NAME, pushConfig)) + .andReturn(Futures.immediateFuture(null)); + EasyMock.expectLastCall(); + replay(pubsub); + initializeSubscription(); + assertNull(subscription.replacePushConfigAsync(pushConfig).get()); + } + + @Test + public void testReplacePushConfigAsync_Null() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.replacePushConfigAsync(NAME, null)) + .andReturn(Futures.immediateFuture(null)); + replay(pubsub); + initializeSubscription(); + assertNull(subscription.replacePushConfigAsync(null).get()); + } + + @Test + public void testPull() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions).times(2); + replay(pubsub); + ReceivedMessage message1 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB1); + ReceivedMessage message2 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB2); + reset(pubsub); + expect(pubsub.options()).andReturn(mockOptions); + List messages = ImmutableList.of(message1, message2); + expect(pubsub.pull(NAME, 42)).andReturn(messages.iterator()); + replay(pubsub); + initializeSubscription(); + assertEquals(messages, Lists.newArrayList(subscription.pull(42))); + } + + @Test + public void testPullAsync() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions).times(2); + replay(pubsub); + ReceivedMessage message1 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB1); + ReceivedMessage message2 = ReceivedMessage.fromPb(pubsub, NAME, MESSAGE_PB2); + reset(pubsub); + expect(pubsub.options()).andReturn(mockOptions); + List messages = ImmutableList.of(message1, message2); + expect(pubsub.pullAsync(NAME, 42)).andReturn(Futures.immediateFuture(messages.iterator())); + replay(pubsub); + initializeSubscription(); + assertEquals(messages, Lists.newArrayList(subscription.pullAsync(42).get())); + } + + @Test + public void testMessageConsumer() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + MessageConsumer messageConsumer = createStrictMock(MessageConsumer.class); + MessageProcessor messageProcessor = createStrictMock(MessageProcessor.class); + replay(messageConsumer, messageProcessor); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.pullAsync(NAME, messageProcessor)).andReturn(messageConsumer); + replay(pubsub); + initializeSubscription(); + assertSame(messageConsumer, subscription.pullAsync(messageProcessor)); + verify(messageConsumer, messageProcessor); + } + + @Test + public void testMessageConsumerWithOptions() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + MessageConsumer messageConsumer = createStrictMock(MessageConsumer.class); + MessageProcessor messageProcessor = createStrictMock(MessageProcessor.class); + replay(messageConsumer, messageProcessor); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.pullAsync(NAME, messageProcessor, PullOption.maxQueuedCallbacks(2))) + .andReturn(messageConsumer); + replay(pubsub); + initializeSubscription(); + assertSame(messageConsumer, + subscription.pullAsync(messageProcessor, PullOption.maxQueuedCallbacks(2))); + verify(messageConsumer, messageProcessor); + } + + private void compareSubscription(Subscription expected, Subscription value) { + assertEquals(expected, value); + assertEquals(expected.topic(), value.topic()); + assertEquals(expected.name(), value.name()); + assertEquals(expected.pushConfig(), value.pushConfig()); + assertEquals(expected.ackDeadlineSeconds(), value.ackDeadlineSeconds()); + assertEquals(expected.hashCode(), value.hashCode()); + } +}