Skip to content

Commit fe4c90c

Browse files
committed
Implement Pub/Sub management methods, add javadoc and tests (#1015)
1 parent 2656eb9 commit fe4c90c

File tree

10 files changed

+2185
-41
lines changed

10 files changed

+2185
-41
lines changed

gcloud-java-pubsub/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
<version>4.12</version>
4949
<scope>test</scope>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.easymock</groupId>
53+
<artifactId>easymock</artifactId>
54+
<version>3.4</version>
55+
<scope>test</scope>
56+
</dependency>
5157
</dependencies>
5258
<profiles>
5359
<profile>

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 209 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private PullOption(Option.OptionType option, Object value) {
101101
}
102102

103103
/**
104-
* Returns an option to specify the maximum number of messages that can be executed
104+
* Returns an option to specify the maximum number of messages that can be processed
105105
* concurrently at any time.
106106
*/
107107
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
@@ -110,74 +110,275 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
110110
}
111111

112112
/**
113-
* A callback to process pulled messages.
114-
* The message will be ack'ed upon successful return or nack'ed if exception is thrown.
113+
* A callback to process pulled messages. The received message will be ack'ed upon successful
114+
* return or nack'ed if exception is thrown.
115115
*/
116116
interface MessageProcessor {
117+
/**
118+
* Processes the received {@code message}. If this method returns correctly the message is
119+
* ack'ed. If this method throws an exception the message is nack'ed.
120+
*/
117121
void process(Message message) throws Exception;
118122
}
119123

120124
/**
121-
* An interface to control message consumer settings.
125+
* An interface to control a message consumer.
122126
*/
123127
interface MessageConsumer extends AutoCloseable {
124128

129+
/**
130+
* Stops pulling messages from the subscription associated with this {@code MessageConsumer} and
131+
* frees all resources. Messages that have already been pulled are processed before closing.
132+
*/
133+
@Override
134+
void close() throws Exception;
125135
}
126136

137+
/**
138+
* Creates a new topic.
139+
*
140+
* @return the created topic
141+
* @throws PubSubException upon failure
142+
*/
127143
Topic create(TopicInfo topic);
128144

145+
/**
146+
* Sends a request for creating a topic. This method returns a {@code Future} object to consume
147+
* the result. {@link Future#get()} returns the created topic or {@code null} if not found.
148+
*/
129149
Future<Topic> createAsync(TopicInfo topic);
130150

131-
// null if not found
151+
/**
152+
* Returns the requested topic or {@code null} if not found.
153+
*
154+
* @throws PubSubException upon failure
155+
*/
132156
Topic getTopic(String topic);
133157

158+
/**
159+
* Sends a request for getting a topic. This method returns a {@code Future} object to consume the
160+
* result. {@link Future#get()} returns the requested topic or {@code null} if not found.
161+
*
162+
* @throws PubSubException upon failure
163+
*/
134164
Future<Topic> getTopicAsync(String topic);
135165

136-
// false if not found
166+
/**
167+
* Deletes the requested topic.
168+
*
169+
* @return {@code true} if the topic was deleted, {@code false} if it was not found
170+
*/
137171
boolean deleteTopic(String topic);
138172

173+
/**
174+
* Sends a request for deleting a topic. This method returns a {@code Future} object to consume
175+
* the result. {@link Future#get()} returns {@code true} if the topic was deleted, {@code false}
176+
* if it was not found.
177+
*/
139178
Future<Boolean> deleteTopicAsync(String topic);
140179

180+
/**
181+
* Lists the topics. This method returns a {@link Page} object that can be used to consume
182+
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
183+
* to start listing topics.
184+
*
185+
* @throws PubSubException upon failure
186+
*/
141187
Page<Topic> listTopics(ListOption... options);
142188

189+
/**
190+
* Sends a request for listing topics. This method returns a {@code Future} object to consume
191+
* the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used to
192+
* asynchronously handle paginated results. Use {@link ListOption} to specify the page size or the
193+
* page token from which to start listing topics.
194+
*/
143195
Future<AsyncPage<Topic>> listTopicsAsync(ListOption... options);
144196

197+
/**
198+
* Publishes a message to the provided topic. This method returns a service-generated id for the
199+
* published message. Service-generated ids are guaranteed to be unique within the topic.
200+
*
201+
* @param topic the topic where the message is published
202+
* @param message the message to publish
203+
* @return a unique service-generated id for the message
204+
* @throws PubSubException upon failure, if the topic does not exist or if the message has empty
205+
* payload and no attributes
206+
*/
145207
String publish(String topic, Message message);
146208

209+
/**
210+
* Sends a request for publishing a message to the provided topic. This method returns a
211+
* {@code Future} object to consume the result. {@link Future#get()} returns a service-generated
212+
* id for the published message. Service-generated ids are guaranteed to be unique within the
213+
* topic.
214+
*
215+
* @param topic the topic where the message is published
216+
* @param message the message to publish
217+
* @return a {@code Future} for the unique service-generated id for the message
218+
*/
147219
Future<String> publishAsync(String topic, Message message);
148220

221+
/**
222+
* Publishes a number of messages to the provided topic. This method returns a list of
223+
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
224+
* unique within the topic.
225+
*
226+
* @param topic the topic where the message is published
227+
* @param message the first message to publish
228+
* @param messages other messages to publish
229+
* @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
230+
* @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
231+
* empty payload and no attributes
232+
*/
149233
List<String> publish(String topic, Message message, Message... messages);
150234

235+
/**
236+
* Sends a request to publish a number of messages to the provided topic. This method returns a
237+
* {@code Future} object to consume the result. {@link Future#get()} returns a list of
238+
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
239+
* unique within the topic.
240+
*
241+
* @param topic the topic where the message is published
242+
* @param message the first message to publish
243+
* @param messages other messages to publish
244+
* @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
245+
* the messages.
246+
*/
151247
Future<List<String>> publishAsync(String topic, Message message, Message... messages);
152248

249+
/**
250+
* Publishes a number of messages to the provided topic. This method returns a list of
251+
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
252+
* unique within the topic.
253+
*
254+
* @param topic the topic where the message is published
255+
* @param messages the messages to publish
256+
* @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
257+
* @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
258+
* empty payload and no attributes
259+
*/
153260
List<String> publish(String topic, Iterable<Message> messages);
154261

262+
/**
263+
* Sends a request to publish a number of messages to the provided topic. This method returns a
264+
* {@code Future} object to consume the result. {@link Future#get()} returns a list of
265+
* service-generated ids for the published messages. Service-generated ids are guaranteed to be
266+
* unique within the topic.
267+
*
268+
* @param topic the topic where the message is published
269+
* @param messages the messages to publish
270+
* @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
271+
* the messages
272+
*/
155273
Future<List<String>> publishAsync(String topic, Iterable<Message> messages);
156274

275+
/**
276+
* Creates a new subscription.
277+
*
278+
* @return the created subscription
279+
* @throws PubSubException upon failure
280+
*/
157281
Subscription create(SubscriptionInfo subscription);
158282

283+
/**
284+
* Sends a request for creating a subscription. This method returns a {@code Future} object to
285+
* consume the result. {@link Future#get()} returns the created subscription or {@code null} if
286+
* not found.
287+
*/
159288
Future<Subscription> createAsync(SubscriptionInfo subscription);
160289

161-
// null if not found
290+
/**
291+
* Returns the requested subscription or {@code null} if not found.
292+
*/
162293
Subscription getSubscription(String subscription);
163294

295+
/**
296+
* Sends a request for getting a subscription. This method returns a {@code Future} object to
297+
* consume the result. {@link Future#get()} returns the requested subscription or {@code null} if
298+
* not found.
299+
*/
164300
Future<Subscription> getSubscriptionAsync(String subscription);
165301

302+
/**
303+
* Sets the push configuration for a specified subscription. This may be used to change a push
304+
* subscription to a pull one (passing a {@code null} {@code pushConfig} parameter) or vice versa.
305+
* This methods can also be used to change the endpoint URL and other attributes of a push
306+
* subscription. Messages will accumulate for delivery regardless of changes to the push
307+
* configuration.
308+
*
309+
* @param subscription the subscription for which to replace push configuration
310+
* @param pushConfig the new push configuration. Use {@code null} to unset it
311+
* @throws PubSubException upon failure, or if the subscription does not exist
312+
*/
166313
void replacePushConfig(String subscription, PushConfig pushConfig);
167314

315+
/**
316+
* Sends a request for updating the push configuration for a specified subscription. This may be
317+
* used to change a push subscription to a pull one (passing a {@code null} {@code pushConfig}
318+
* parameter) or vice versa. This methods can also be used to change the endpoint URL and other
319+
* attributes of a push subscription. Messages will accumulate for delivery regardless of changes
320+
* to the push configuration. The method returns a {@code Future} object that can be used to wait
321+
* for the replace operation to be completed.
322+
*
323+
* @param subscription the subscription for which to replace push configuration
324+
* @param pushConfig the new push configuration. Use {@code null} to unset it
325+
* @return a {@code Future} to wait for the replace operation to be completed.
326+
*/
168327
Future<Void> replacePushConfigAsync(String subscription, PushConfig pushConfig);
169328

170-
// false if not found
329+
/**
330+
* Deletes the requested subscription.
331+
*
332+
* @return {@code true} if the subscription was deleted, {@code false} if it was not found
333+
* @throws PubSubException upon failure
334+
*/
171335
boolean deleteSubscription(String subscription);
172336

337+
/**
338+
* Sends a request for deleting a subscription. This method returns a {@code Future} object to
339+
* consume the result. {@link Future#get()} returns {@code true} if the subscription was deleted,
340+
* {@code false} if it was not found.
341+
*/
173342
Future<Boolean> deleteSubscriptionAsync(String subscription);
174343

344+
/**
345+
* Lists the subscriptions. This method returns a {@link Page} object that can be used to consume
346+
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
347+
* to start listing subscriptions.
348+
*
349+
* @throws PubSubException upon failure
350+
*/
175351
Page<Subscription> listSubscriptions(ListOption... options);
176352

353+
/**
354+
* Sends a request for listing subscriptions. This method returns a {@code Future} object to
355+
* consume the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used
356+
* to asynchronously handle paginated results. Use {@link ListOption} to specify the page size or
357+
* the page token from which to start listing subscriptions.
358+
*
359+
* @throws PubSubException upon failure
360+
*/
177361
Future<AsyncPage<Subscription>> listSubscriptionsAsync(ListOption... options);
178362

363+
/**
364+
* Lists the identities of the subscriptions for the provided topic. This method returns a
365+
* {@link Page} object that can be used to consume paginated results. Use {@link ListOption} to
366+
* specify the page size or the page token from which to start listing subscriptions.
367+
*
368+
* @param topic the topic for which to list subscriptions
369+
* @throws PubSubException upon failure
370+
*/
179371
Page<SubscriptionId> listSubscriptions(String topic, ListOption... options);
180372

373+
/**
374+
* Sends a request for listing the identities of subscriptions for the provided topic. This method
375+
* returns a {@code Future} object to consume the result. {@link Future#get()} returns an
376+
* {@link AsyncPage} object that can be used to asynchronously handle paginated results. Use
377+
* {@link ListOption} to specify the page size or the page token from which to start listing
378+
* subscriptions.
379+
*
380+
* @param topic the topic for which to list subscriptions
381+
*/
181382
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);
182383

183384
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

0 commit comments

Comments
 (0)