Skip to content

Add javadoc and tests for Subscription #1074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ interface MessageConsumer extends AutoCloseable {

/**
* Sends a request for creating a topic. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns the created topic or {@code null} if not found.
* the result. {@link Future#get()} returns the created topic.
*/
Future<Topic> createAsync(TopicInfo topic);

Expand Down Expand Up @@ -311,8 +311,7 @@ interface MessageConsumer extends AutoCloseable {

/**
* Sends a request for creating a subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns the created subscription or {@code null} if
* not found.
* consume the result. {@link Future#get()} returns the created subscription.
*/
Future<Subscription> createAsync(SubscriptionInfo subscription);

Expand Down Expand Up @@ -463,7 +462,7 @@ interface MessageConsumer extends AutoCloseable {
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

/**
* Creates a message consumer that pulls messages for the provided subscription. You can stop
* Creates a message consumer that pulls messages from the provided subscription. You can stop
* pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer
* executes {@link MessageProcessor#process(Message)} on each pulled message. If
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.GrpcServiceOptions;

This comment was marked as spam.

import com.google.cloud.pubsub.PubSub.MessageConsumer;
import com.google.cloud.pubsub.PubSub.MessageProcessor;
import com.google.cloud.pubsub.PubSub.PullOption;
Expand All @@ -30,7 +31,31 @@
import java.util.concurrent.Future;

/**
* PubSub subscription.
* A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
* single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions
* support both push and pull message delivery.
*
* <p>In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a
* preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an
* implicit acknowledgement: a success response indicates that the message has been succesfully
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
*
* <p>{@code Subscription} adds a layer of service-related functionality over
* {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
* object with the most recent information use {@link #reload} or {@link #reloadAsync}.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
*/
public class Subscription extends SubscriptionInfo {

Expand All @@ -39,6 +64,9 @@ public class Subscription extends SubscriptionInfo {
private final PubSubOptions options;
private transient PubSub pubsub;

/**
* A builder for {@code Subscription} objects.
*/
public static final class Builder extends SubscriptionInfo.Builder {

private final PubSub pubsub;
Expand Down Expand Up @@ -103,62 +131,172 @@ public Builder toBuilder() {
}

@Override
public int hashCode() {
public final int hashCode() {
return Objects.hash(options, super.hashCode());
}

@Override
public boolean equals(Object obj) {
public final boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
if (obj == null || !obj.getClass().equals(Subscription.class)) {
return false;
}
Subscription other = (Subscription) obj;
return Objects.equals(topic(), other.topic())
&& Objects.equals(name(), other.name())
&& Objects.equals(pushConfig(), other.pushConfig())
&& ackDeadlineSeconds() == other.ackDeadlineSeconds()
&& Objects.equals(options, other.options);
return baseEquals(other) && Objects.equals(options, other.options);
}

/**
* Returns the subscription's {@code PubSub} object used to issue requests.
*/
public PubSub pubSub() {
return pubsub;
}

/**
* Deletes this subscription.
*
* @return {@code true} if the subscription was deleted, {@code false} if it was not found
* @throws PubSubException upon failure
*/
public boolean delete() {
return pubsub.deleteSubscription(name());
}

/**
* Sends a request for deleting this subscription. This method returns a {@code Future} object to
* consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
* {@code false} if it was not found.
*/
public Future<Boolean> deleteAsync() {
return pubsub.deleteSubscriptionAsync(name());
}

/**
* Fetches current subscription's latest information. Returns {@code null} if the subscription
* does not exist.
*
* @return a {@code Subscription} object with latest information or {@code null} if not found
* @throws PubSubException upon failure
*/
public Subscription reload() {
return pubsub.getSubscription(name());
}

/**
* Sends a request for fetching current subscription's latest information. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns the requested
* subscription or {@code null} if not found.
*
* @return a {@code Subscription} object with latest information or {@code null} if not found
* @throws PubSubException upon failure
*/
public Future<Subscription> reloadAsync() {
return pubsub.getSubscriptionAsync(name());
}

/**
* Sets the push configuration for this subscription. This may be used to change a push
* subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
* This methods can also be used to change the endpoint URL and other attributes of a push
* subscription. Messages will accumulate for delivery regardless of changes to the push
* configuration.
*
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @throws PubSubException upon failure, or if the subscription does not exist
*/
public void replacePushConfig(PushConfig pushConfig) {
pubsub.replacePushConfig(name(), pushConfig);
}

/**
* Sends a request for updating the push configuration for a specified subscription. This may be
* used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
* parameter) or vice versa. This methods can also be used to change the endpoint URL and other
* attributes of a push subscription. Messages will accumulate for delivery regardless of changes
* to the push configuration. The method returns a {@code Future} object that can be used to wait
* for the replace operation to be completed.
*
* @param pushConfig the new push configuration. Use {@code null} to unset it
* @return a {@code Future} to wait for the replace operation to be completed.
*/
public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {

This comment was marked as spam.

This comment was marked as spam.

return pubsub.replacePushConfigAsync(name(), pushConfig);
}

/**
* Pulls messages from this subscription. This method possibly returns no messages if no message
* was available at the time the request was processed by the Pub/Sub service (i.e. the system is
* not allowed to wait until at least one message is available). Pulled messages have their
* acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
*
* <p>Example usage of synchronous message pulling:
* <pre> {@code
* Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
public Iterator<ReceivedMessage> pull(int maxMessages) {
return pubsub.pull(name(), maxMessages);
}

/**
* Sends a request for pulling messages from this subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
*
* <p>Example usage of asynchronous message pulling:
* <pre> {@code
* Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
* // do something while the request gets processed
* Iterator<ReceivedMessage> messageIterator = future.get();
* while (messageIterator.hasNext()) {
* ReceivedMessage message = messageIterator.next();
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
* doSomething(message);
* message.ack(); // or message.nack()
* }}</pre>
*
* @param maxMessages the maximum number of messages pulled by this method. This method can
* possibly return fewer messages.
* @throws PubSubException upon failure
*/
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
}

/**
* Creates a message consumer that pulls messages from this subscription. You can stop pulling
* messages by calling {@link MessageConsumer#close()}. The returned message consumer executes
* {@link MessageProcessor#process(Message)} on each pulled message. If
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
* {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For
* all pulled messages, the ack deadline is automatically renewed until the message is either
* acknowledged or "nacked".
*
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
* number of queued messages (messages either being processed or waiting to be processed). The
* {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
* an executor to run message processor callbacks.
*
* @param callback the callback to be executed on each message
* @param options pulling options
* @return a message consumer for the provided subscription and options
*/
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
return pubsub.pullAsync(name(), callback, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, PubSub.PullOption...)},
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor)} or
* {@link PubSub#pullAsync(String, PubSub.PullOption...)}. The subscribing application must then
* explicitly acknowledge the messages using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
*
Expand Down Expand Up @@ -190,7 +191,7 @@ public TopicId topic() {
}

/**
* Sets the name of the subscription. The name must start with a letter, and contain only
* Returns the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
Expand Down Expand Up @@ -223,19 +224,19 @@ public long ackDeadlineSeconds() {
return ackDeadlineSeconds;
}

final boolean baseEquals(SubscriptionInfo subscriptionInfo) {
return Objects.equals(topic, subscriptionInfo.topic)
&& Objects.equals(name, subscriptionInfo.name)
&& Objects.equals(pushConfig, subscriptionInfo.pushConfig)
&& ackDeadlineSeconds == subscriptionInfo.ackDeadlineSeconds;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || !obj.getClass().equals(this.getClass())) {
return false;
}
SubscriptionInfo other = (SubscriptionInfo) obj;
return Objects.equals(topic, other.topic)
&& Objects.equals(name, other.name)
&& Objects.equals(pushConfig, other.pushConfig)
&& ackDeadlineSeconds == other.ackDeadlineSeconds;
return obj == this
|| obj != null
&& obj.getClass().equals(SubscriptionInfo.class)
&& baseEquals((SubscriptionInfo) obj);
}

@Override
Expand Down
Loading