18
18
19
19
import static com .google .common .base .Preconditions .checkNotNull ;
20
20
21
+ import com .google .cloud .GrpcServiceOptions ;
21
22
import com .google .cloud .pubsub .PubSub .MessageConsumer ;
22
23
import com .google .cloud .pubsub .PubSub .MessageProcessor ;
23
24
import com .google .cloud .pubsub .PubSub .PullOption ;
30
31
import java .util .concurrent .Future ;
31
32
32
33
/**
33
- * PubSub subscription.
34
+ * A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
35
+ * single, specific topic, to be delivered to the subscribing application. Pub/Sub subscriptions
36
+ * support both push and pull message delivery.
37
+ *
38
+ * <p>In a push subscription, the Pub/Sub server sends a request to the subscriber application, at a
39
+ * preconfigured endpoint (see {@link PushConfig}). The subscriber's HTTP response serves as an
40
+ * implicit acknowledgement: a success response indicates that the message has been succesfully
41
+ * processed and the Pub/Sub system can delete it from the subscription; a non-success response
42
+ * indicates that the Pub/Sub server should resend it (implicit "nack").
43
+ *
44
+ * <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
45
+ * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
46
+ * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
47
+ * When messages are pulled with {@link PubSub#pull(String, int)} or
48
+ * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
49
+ * acknowledge them using one of {@link PubSub#ack(String, Iterable)},
50
+ * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
51
+ * {@link PubSub#ackAsync(String, String, String...)}.
52
+ *
53
+ * <p>{@code Subscription} adds a layer of service-related functionality over
54
+ * {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
55
+ * object with the most recent information use {@link #reload} or {@link #reloadAsync}.
56
+ *
57
+ * @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
58
+ * @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
34
59
*/
35
60
public class Subscription extends SubscriptionInfo {
36
61
@@ -39,6 +64,9 @@ public class Subscription extends SubscriptionInfo {
39
64
private final PubSubOptions options ;
40
65
private transient PubSub pubsub ;
41
66
67
+ /**
68
+ * A builder for {@code Subscription} objects.
69
+ */
42
70
public static final class Builder extends SubscriptionInfo .Builder {
43
71
44
72
private final PubSub pubsub ;
@@ -103,62 +131,172 @@ public Builder toBuilder() {
103
131
}
104
132
105
133
@ Override
106
- public int hashCode () {
134
+ public final int hashCode () {
107
135
return Objects .hash (options , super .hashCode ());
108
136
}
109
137
110
138
@ Override
111
- public boolean equals (Object obj ) {
139
+ public final boolean equals (Object obj ) {
112
140
if (this == obj ) {
113
141
return true ;
114
142
}
115
- if (obj == null || getClass () != obj .getClass ()) {
143
+ if (obj == null || ! obj .getClass (). equals ( Subscription . class )) {
116
144
return false ;
117
145
}
118
146
Subscription other = (Subscription ) obj ;
119
- return Objects .equals (topic (), other .topic ())
120
- && Objects .equals (name (), other .name ())
121
- && Objects .equals (pushConfig (), other .pushConfig ())
122
- && ackDeadlineSeconds () == other .ackDeadlineSeconds ()
123
- && Objects .equals (options , other .options );
147
+ return baseEquals (other ) && Objects .equals (options , other .options );
124
148
}
125
149
150
+ /**
151
+ * Returns the subscription's {@code PubSub} object used to issue requests.
152
+ */
126
153
public PubSub pubSub () {
127
154
return pubsub ;
128
155
}
129
156
157
+ /**
158
+ * Deletes this subscription.
159
+ *
160
+ * @return {@code true} if the subscription was deleted, {@code false} if it was not found
161
+ * @throws PubSubException upon failure
162
+ */
130
163
public boolean delete () {
131
164
return pubsub .deleteSubscription (name ());
132
165
}
133
166
167
+ /**
168
+ * Sends a request for deleting this subscription. This method returns a {@code Future} object to
169
+ * consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
170
+ * {@code false} if it was not found.
171
+ */
134
172
public Future <Boolean > deleteAsync () {
135
173
return pubsub .deleteSubscriptionAsync (name ());
136
174
}
137
175
176
+ /**
177
+ * Fetches current subscription's latest information. Returns {@code null} if the subscription
178
+ * does not exist.
179
+ *
180
+ * @return a {@code Subscription} object with latest information or {@code null} if not found
181
+ * @throws PubSubException upon failure
182
+ */
138
183
public Subscription reload () {
139
184
return pubsub .getSubscription (name ());
140
185
}
141
186
187
+ /**
188
+ * Sends a request for fetching current subscription's latest information. This method returns a
189
+ * {@code Future} object to consume the result. {@link Future#get()} returns the requested
190
+ * subscription or {@code null} if not found.
191
+ *
192
+ * @return a {@code Subscription} object with latest information or {@code null} if not found
193
+ * @throws PubSubException upon failure
194
+ */
142
195
public Future <Subscription > reloadAsync () {
143
196
return pubsub .getSubscriptionAsync (name ());
144
197
}
145
198
199
+ /**
200
+ * Sets the push configuration for this subscription. This may be used to change a push
201
+ * subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
202
+ * This methods can also be used to change the endpoint URL and other attributes of a push
203
+ * subscription. Messages will accumulate for delivery regardless of changes to the push
204
+ * configuration.
205
+ *
206
+ * @param pushConfig the new push configuration. Use {@code null} to unset it
207
+ * @throws PubSubException upon failure, or if the subscription does not exist
208
+ */
146
209
public void replacePushConfig (PushConfig pushConfig ) {
147
210
pubsub .replacePushConfig (name (), pushConfig );
148
211
}
149
212
213
+ /**
214
+ * Sends a request for updating the push configuration for a specified subscription. This may be
215
+ * used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
216
+ * parameter) or vice versa. This methods can also be used to change the endpoint URL and other
217
+ * attributes of a push subscription. Messages will accumulate for delivery regardless of changes
218
+ * to the push configuration. The method returns a {@code Future} object that can be used to wait
219
+ * for the replace operation to be completed.
220
+ *
221
+ * @param pushConfig the new push configuration. Use {@code null} to unset it
222
+ * @return a {@code Future} to wait for the replace operation to be completed.
223
+ */
150
224
public Future <Void > replacePushConfigAsync (PushConfig pushConfig ) {
151
225
return pubsub .replacePushConfigAsync (name (), pushConfig );
152
226
}
153
227
228
+ /**
229
+ * Pulls messages from this subscription. This method possibly returns no messages if no message
230
+ * was available at the time the request was processed by the Pub/Sub service (i.e. the system is
231
+ * not allowed to wait until at least one message is available). Pulled messages have their
232
+ * acknowledge deadline automatically renewed until they are explicitly consumed using
233
+ * {@link Iterator#next()}.
234
+ *
235
+ * <p>Example usage of synchronous message pulling:
236
+ * <pre> {@code
237
+ * Iterator<ReceivedMessage> messageIterator = pubsub.pull("subscription", 100);
238
+ * while (messageIterator.hasNext()) {
239
+ * ReceivedMessage message = messageIterator.next();
240
+ * // message's acknowledge deadline is no longer automatically renewed. If processing takes
241
+ * // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
242
+ * doSomething(message);
243
+ * message.ack(); // or message.nack()
244
+ * }}</pre>
245
+ *
246
+ * @param maxMessages the maximum number of messages pulled by this method. This method can
247
+ * possibly return fewer messages.
248
+ * @throws PubSubException upon failure
249
+ */
154
250
public Iterator <ReceivedMessage > pull (int maxMessages ) {
155
251
return pubsub .pull (name (), maxMessages );
156
252
}
157
253
254
+ /**
255
+ * Sends a request for pulling messages from this subscription. This method returns a
256
+ * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
257
+ * This method possibly returns no messages if no message was available at the time the request
258
+ * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
259
+ * message is available).
260
+ *
261
+ * <p>Example usage of asynchronous message pulling:
262
+ * <pre> {@code
263
+ * Future<Iterator<ReceivedMessage>> future = pubsub.pull("subscription", 100);
264
+ * // do something while the request gets processed
265
+ * Iterator<ReceivedMessage> messageIterator = future.get();
266
+ * while (messageIterator.hasNext()) {
267
+ * ReceivedMessage message = messageIterator.next();
268
+ * // message's acknowledge deadline is no longer automatically renewed. If processing takes
269
+ * // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
270
+ * doSomething(message);
271
+ * message.ack(); // or message.nack()
272
+ * }}</pre>
273
+ *
274
+ * @param maxMessages the maximum number of messages pulled by this method. This method can
275
+ * possibly return fewer messages.
276
+ * @throws PubSubException upon failure
277
+ */
158
278
public Future <Iterator <ReceivedMessage >> pullAsync (int maxMessages ) {
159
279
return pubsub .pullAsync (name (), maxMessages );
160
280
}
161
281
282
+ /**
283
+ * Creates a message consumer that pulls messages from this subscription. You can stop pulling
284
+ * messages by calling {@link MessageConsumer#close()}. The returned message consumer executes
285
+ * {@link MessageProcessor#process(Message)} on each pulled message. If
286
+ * {@link MessageProcessor#process(Message)} executes correctly, the message is acknowledged. If
287
+ * {@link MessageProcessor#process(Message)} throws an exception, the message is "nacked". For
288
+ * all pulled messages, the ack deadline is automatically renewed until the message is either
289
+ * acknowledged or "nacked".
290
+ *
291
+ * <p>The {@link PullOption#maxQueuedCallbacks(int)} option can be used to control the maximum
292
+ * number of queued messages (messages either being processed or waiting to be processed). The
293
+ * {@link PullOption#executorFactory(GrpcServiceOptions.ExecutorFactory)} can be used to provide
294
+ * an executor to run message processor callbacks.
295
+ *
296
+ * @param callback the callback to be executed on each message
297
+ * @param options pulling options
298
+ * @return a message consumer for the provided subscription and options
299
+ */
162
300
public MessageConsumer pullAsync (MessageProcessor callback , PullOption ... options ) {
163
301
return pubsub .pullAsync (name (), callback , options );
164
302
}
0 commit comments