Skip to content

Commit 0da6ac4

Browse files
authored
Add javadoc and tests for Subscription (#1074)
1 parent 7de0d45 commit 0da6ac4

File tree

4 files changed

+494
-29
lines changed

4 files changed

+494
-29
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ interface MessageConsumer extends AutoCloseable {
173173

174174
/**
175175
* Sends a request for creating a topic. This method returns a {@code Future} object to consume
176-
* the result. {@link Future#get()} returns the created topic or {@code null} if not found.
176+
* the result. {@link Future#get()} returns the created topic.
177177
*/
178178
Future<Topic> createAsync(TopicInfo topic);
179179

@@ -311,8 +311,7 @@ interface MessageConsumer extends AutoCloseable {
311311

312312
/**
313313
* Sends a request for creating a subscription. This method returns a {@code Future} object to
314-
* consume the result. {@link Future#get()} returns the created subscription or {@code null} if
315-
* not found.
314+
* consume the result. {@link Future#get()} returns the created subscription.
316315
*/
317316
Future<Subscription> createAsync(SubscriptionInfo subscription);
318317

@@ -463,7 +462,7 @@ interface MessageConsumer extends AutoCloseable {
463462
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);
464463

465464
/**
466-
* Creates a message consumer that pulls messages for the provided subscription. You can stop
465+
* Creates a message consumer that pulls messages from the provided subscription. You can stop
467466
* pulling messages by calling {@link MessageConsumer#close()}. The returned message consumer
468467
* executes {@link MessageProcessor#process(Message)} on each pulled message. If
469468
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java

Lines changed: 147 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

21+
import com.google.cloud.GrpcServiceOptions;
2122
import com.google.cloud.pubsub.PubSub.MessageConsumer;
2223
import com.google.cloud.pubsub.PubSub.MessageProcessor;
2324
import com.google.cloud.pubsub.PubSub.PullOption;
@@ -30,7 +31,31 @@
3031
import java.util.concurrent.Future;
3132

3233
/**
33-
* PubSub subscription.
34+
* A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
35+
* single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions
36+
* support both push and pull message delivery.
37+
*
38+
* <p>In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a
39+
* preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an
40+
* implicit acknowledgement: a success response indicates that the message has been succesfully
41+
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
42+
* indicates that the Pub/Sub server should resend it (implicit "nack").
43+
*
44+
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
45+
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
46+
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
47+
* When messages are pulled with {@link PubSub#pull(String, int)} or
48+
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
49+
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
50+
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
51+
* {@link PubSub#ackAsync(String, String, String...)}.
52+
*
53+
* <p>{@code Subscription} adds a layer of service-related functionality over
54+
* {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
55+
* object with the most recent information use {@link #reload} or {@link #reloadAsync}.
56+
*
57+
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
58+
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
3459
*/
3560
public class Subscription extends SubscriptionInfo {
3661

@@ -39,6 +64,9 @@ public class Subscription extends SubscriptionInfo {
3964
private final PubSubOptions options;
4065
private transient PubSub pubsub;
4166

67+
/**
68+
* A builder for {@code Subscription} objects.
69+
*/
4270
public static final class Builder extends SubscriptionInfo.Builder {
4371

4472
private final PubSub pubsub;
@@ -103,62 +131,172 @@ public Builder toBuilder() {
103131
}
104132

105133
@Override
106-
public int hashCode() {
134+
public final int hashCode() {
107135
return Objects.hash(options, super.hashCode());
108136
}
109137

110138
@Override
111-
public boolean equals(Object obj) {
139+
public final boolean equals(Object obj) {
112140
if (this == obj) {
113141
return true;
114142
}
115-
if (obj == null || getClass() != obj.getClass()) {
143+
if (obj == null || !obj.getClass().equals(Subscription.class)) {
116144
return false;
117145
}
118146
Subscription other = (Subscription) obj;
119-
return Objects.equals(topic(), other.topic())
120-
&& Objects.equals(name(), other.name())
121-
&& Objects.equals(pushConfig(), other.pushConfig())
122-
&& ackDeadlineSeconds() == other.ackDeadlineSeconds()
123-
&& Objects.equals(options, other.options);
147+
return baseEquals(other) && Objects.equals(options, other.options);
124148
}
125149

150+
/**
151+
* Returns the subscription's {@code PubSub} object used to issue requests.
152+
*/
126153
public PubSub pubSub() {
127154
return pubsub;
128155
}
129156

157+
/**
158+
* Deletes this subscription.
159+
*
160+
* @return {@code true} if the subscription was deleted, {@code false} if it was not found
161+
* @throws PubSubException upon failure
162+
*/
130163
public boolean delete() {
131164
return pubsub.deleteSubscription(name());
132165
}
133166

167+
/**
168+
* Sends a request for deleting this subscription. This method returns a {@code Future} object to
169+
* consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
170+
* {@code false} if it was not found.
171+
*/
134172
public Future<Boolean> deleteAsync() {
135173
return pubsub.deleteSubscriptionAsync(name());
136174
}
137175

176+
/**
177+
* Fetches current subscription's latest information. Returns {@code null} if the subscription
178+
* does not exist.
179+
*
180+
* @return a {@code Subscription} object with latest information or {@code null} if not found
181+
* @throws PubSubException upon failure
182+
*/
138183
public Subscription reload() {
139184
return pubsub.getSubscription(name());
140185
}
141186

187+
/**
188+
* Sends a request for fetching current subscription's latest information. This method returns a
189+
* {@code Future} object to consume the result. {@link Future#get()} returns the requested
190+
* subscription or {@code null} if not found.
191+
*
192+
* @return a {@code Subscription} object with latest information or {@code null} if not found
193+
* @throws PubSubException upon failure
194+
*/
142195
public Future<Subscription> reloadAsync() {
143196
return pubsub.getSubscriptionAsync(name());
144197
}
145198

199+
/**
200+
* Sets the push configuration for this subscription. This may be used to change a push
201+
* subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
202+
* This methods can also be used to change the endpoint URL and other attributes of a push
203+
* subscription. Messages will accumulate for delivery regardless of changes to the push
204+
* configuration.
205+
*
206+
* @param pushConfig the new push configuration. Use {@code null} to unset it
207+
* @throws PubSubException upon failure, or if the subscription does not exist
208+
*/
146209
public void replacePushConfig(PushConfig pushConfig) {
147210
pubsub.replacePushConfig(name(), pushConfig);
148211
}
149212

213+
/**
214+
* Sends a request for updating the push configuration for a specified subscription. This may be
215+
* used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
216+
* parameter) or vice versa. This methods can also be used to change the endpoint URL and other
217+
* attributes of a push subscription. Messages will accumulate for delivery regardless of changes
218+
* to the push configuration. The method returns a {@code Future} object that can be used to wait
219+
* for the replace operation to be completed.
220+
*
221+
* @param pushConfig the new push configuration. Use {@code null} to unset it
222+
* @return a {@code Future} to wait for the replace operation to be completed.
223+
*/
150224
public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
151225
return pubsub.replacePushConfigAsync(name(), pushConfig);
152226
}
153227

228+
/**
229+
* Pulls messages from this subscription. This method possibly returns no messages if no message
230+
* was available at the time the request was processed by the Pub/Sub service (i.e. the system is
231+
* not allowed to wait until at least one message is available). Pulled messages have their
232+
* acknowledge deadline automatically renewed until they are explicitly consumed using
233+
* {@link Iterator#next()}.
234+
*
235+
* <p>Example usage of synchronous message pulling:
236+
* <pre> {@code
237+
* Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
238+
* while (messageIterator.hasNext()) {
239+
* ReceivedMessage message = messageIterator.next();
240+
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
241+
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
242+
* doSomething(message);
243+
* message.ack(); // or message.nack()
244+
* }}</pre>
245+
*
246+
* @param maxMessages the maximum number of messages pulled by this method. This method can
247+
* possibly return fewer messages.
248+
* @throws PubSubException upon failure
249+
*/
154250
public Iterator<ReceivedMessage> pull(int maxMessages) {
155251
return pubsub.pull(name(), maxMessages);
156252
}
157253

254+
/**
255+
* Sends a request for pulling messages from this subscription. This method returns a
256+
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
257+
* This method possibly returns no messages if no message was available at the time the request
258+
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
259+
* message is available).
260+
*
261+
* <p>Example usage of asynchronous message pulling:
262+
* <pre> {@code
263+
* Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
264+
* // do something while the request gets processed
265+
* Iterator<ReceivedMessage> messageIterator = future.get();
266+
* while (messageIterator.hasNext()) {
267+
* ReceivedMessage message = messageIterator.next();
268+
* // message's acknowledge deadline is no longer automatically renewed. If processing takes
269+
* // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
270+
* doSomething(message);
271+
* message.ack(); // or message.nack()
272+
* }}</pre>
273+
*
274+
* @param maxMessages the maximum number of messages pulled by this method. This method can
275+
* possibly return fewer messages.
276+
* @throws PubSubException upon failure
277+
*/
158278
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
159279
return pubsub.pullAsync(name(), maxMessages);
160280
}
161281

282+
/**
283+
* Creates a message consumer that pulls messages from this subscription. You can stop pulling
284+
* messages by calling {@link MessageConsumer#close()}. The returned message consumer executes
285+
* {@link MessageProcessor#process(Message)} on each pulled message. If
286+
* {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
287+
* {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For
288+
* all pulled messages, the ack deadline is automatically renewed until the message is either
289+
* acknowledged or "nacked".
290+
*
291+
* <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
292+
* number of queued messages (messages either being processed or waiting to be processed). The
293+
* {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
294+
* an executor to run message processor callbacks.
295+
*
296+
* @param callback the callback to be executed on each message
297+
* @param options pulling options
298+
* @return a message consumer for the provided subscription and options
299+
*/
162300
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
163301
return pubsub.pullAsync(name(), callback, options);
164302
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@
3737
* indicates that the Pub/Sub server should resend it (implicit "nack").
3838
*
3939
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
40-
* {@link PubSub#pull(String, PubSub.PullOption...)},
41-
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor)} or
42-
* {@link PubSub#pullAsync(String, PubSub.PullOption...)}. The subscribing application must then
43-
* explicitly acknowledge the messages using one of {@link PubSub#ack(String, Iterable)},
40+
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
41+
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
42+
* When messages are pulled with {@link PubSub#pull(String, int)} or
43+
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
44+
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
4445
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
4546
* {@link PubSub#ackAsync(String, String, String...)}.
4647
*
@@ -190,7 +191,7 @@ public TopicId topic() {
190191
}
191192

192193
/**
193-
* Sets the name of the subscription. The name must start with a letter, and contain only
194+
* Returns the name of the subscription. The name must start with a letter, and contain only
194195
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
195196
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
196197
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
@@ -223,19 +224,19 @@ public long ackDeadlineSeconds() {
223224
return ackDeadlineSeconds;
224225
}
225226

227+
final boolean baseEquals(SubscriptionInfo subscriptionInfo) {
228+
return Objects.equals(topic, subscriptionInfo.topic)
229+
&& Objects.equals(name, subscriptionInfo.name)
230+
&& Objects.equals(pushConfig, subscriptionInfo.pushConfig)
231+
&& ackDeadlineSeconds == subscriptionInfo.ackDeadlineSeconds;
232+
}
233+
226234
@Override
227235
public boolean equals(Object obj) {
228-
if (this == obj) {
229-
return true;
230-
}
231-
if (obj == null || !obj.getClass().equals(this.getClass())) {
232-
return false;
233-
}
234-
SubscriptionInfo other = (SubscriptionInfo) obj;
235-
return Objects.equals(topic, other.topic)
236-
&& Objects.equals(name, other.name)
237-
&& Objects.equals(pushConfig, other.pushConfig)
238-
&& ackDeadlineSeconds == other.ackDeadlineSeconds;
236+
return obj == this
237+
|| obj != null
238+
&& obj.getClass().equals(SubscriptionInfo.class)
239+
&& baseEquals((SubscriptionInfo) obj);
239240
}
240241

241242
@Override

0 commit comments

Comments
 (0)