Skip to content

Commit 0559a75

Browse files
committed
make pubsub work with new gax
update documentation links too
1 parent e575ba7 commit 0559a75

File tree

9 files changed

+73
-93
lines changed

9 files changed

+73
-93
lines changed

google-cloud-core/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
<dependency>
112112
<groupId>com.google.api</groupId>
113113
<artifactId>gax</artifactId>
114-
<version>0.0.27-SNAPSHOT</version>
114+
<version>0.0.27</version>
115115
<exclusions>
116116
<exclusion>
117117
<groupId>io.grpc</groupId>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java

-3
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,16 @@
1717
package com.google.cloud.pubsub;
1818

1919
import com.google.api.gax.bundling.FlowController;
20-
import com.google.auth.Credentials;
2120
import com.google.cloud.Clock;
2221
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2322
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2423
import com.google.common.annotations.VisibleForTesting;
2524
import com.google.common.collect.Lists;
2625
import com.google.common.primitives.Ints;
27-
import com.google.common.util.concurrent.AbstractService;
2826
import com.google.common.util.concurrent.FutureCallback;
2927
import com.google.common.util.concurrent.Futures;
3028
import com.google.pubsub.v1.PubsubMessage;
3129
import com.google.pubsub.v1.ReceivedMessage;
32-
import io.grpc.Status;
3330
import java.util.ArrayList;
3431
import java.util.Collection;
3532
import java.util.HashMap;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java

-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import io.grpc.Channel;
4141
import io.grpc.StatusRuntimeException;
4242
import io.grpc.auth.MoreCallCredentials;
43-
import java.util.Iterator;
4443
import java.util.List;
4544
import java.util.concurrent.ScheduledExecutorService;
4645
import java.util.concurrent.TimeUnit;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

-3
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616

1717
package com.google.cloud.pubsub;
1818

19-
import com.google.auto.value.AutoValue;
2019
import com.google.cloud.AsyncPage;
2120
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2221
import com.google.cloud.Page;
2322
import com.google.cloud.Policy;
2423
import com.google.cloud.Service;
25-
import com.google.common.base.Optional;
26-
import com.google.common.base.Preconditions;
2724
import java.io.IOException;
2825
import java.util.List;
2926
import java.util.Map;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,9 @@ public interface Publisher {
143143
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
144144
*
145145
* <p>If set to false, a publish call will fail with either {@link
146-
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException}, as
147-
* appropriate, when flow control limits are reached.
146+
* RequestByteMaxOutstandingReachedException} or {@link
147+
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
148+
* reached.
148149
*/
149150
boolean failOnFlowControlLimits();
150151

@@ -250,8 +251,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting
250251

251252
/**
252253
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
253-
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException} as
254-
* appropriate.
254+
* RequestByteMaxOutstandingReachedException} or {@link
255+
* ElementCountMaxOutstandingReachedException} as appropriate.
255256
*
256257
* <p>If set to false, then publish operations will block the current thread until the
257258
* outstanding requests go under the limits.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

+30-32
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
*
3838
* <p>A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver
3939
* receiver} to which messages are going to be delivered as soon as they are received by the
40-
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link
41-
* AckReply#NACK nacked} at will as they get processed by the receiver. Nacking a
42-
* messages implies a later redelivery of such message.
40+
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK
41+
* nacked} at will as they get processed by the receiver. Nacking a messages implies a later
42+
* redelivery of such message.
4343
*
4444
* <p>The subscriber handles the ack management, by automatically extending the ack deadline while
4545
* the message is being processed, to then issue the ack or nack of such message when the processing
@@ -54,38 +54,38 @@
5454
* in memory before the receiver either ack or nack them.
5555
* </ul>
5656
*
57-
* <p>If no credentials are provided, the {@link Publisher} will use application default
58-
* credentials through {@link GoogleCredentials#getApplicationDefault}.
57+
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
58+
* through {@link GoogleCredentials#getApplicationDefault}.
5959
*
6060
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
6161
*
62-
* <pre>
63-
* MessageReceiver receiver =
64-
* message -> {
65-
* // ... process message ...
66-
* return Futures.immediateFuture(AckReply.ACK);
67-
* });
62+
* <pre>{@code
63+
* MessageReceiver receiver = new MessageReceiver() {
64+
* @Override
65+
* public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
66+
* // ... process message ...
67+
* return Futures.immediateFuture(AckReply.ACK);
68+
* }
69+
* }
6870
*
69-
* Subscriber subscriber =
70-
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
71-
* .setMaxBundleAcks(100)
72-
* .build();
71+
* Subscriber subscriber =
72+
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
73+
* .setMaxBundleAcks(100)
74+
* .build();
7375
*
74-
* subscriber.startAsync();
76+
* subscriber.startAsync();
7577
*
76-
* ... recommended, listen for fatal errors that break the subscriber streaming ...
77-
* subscriber.addListener(
78-
new Listener() {
79-
@Override
80-
public void failed(State from, Throwable failure) {
81-
System.out.println("Subscriber faile with error: " + failure);
82-
}
83-
},
84-
Executors.newSingleThreadExecutor());
78+
* // ... recommended, listen for fatal errors that break the subscriber streaming ...
79+
* subscriber.addListener(new Listener() {
80+
* @Override
81+
* public void failed(State from, Throwable failure) {
82+
* System.out.println("Subscriber faile with error: " + failure);
83+
* }
84+
* }, Executors.newSingleThreadExecutor());
8585
*
86-
* ... and when done with the subscriber ...
87-
* subscriber.stopAsync();
88-
* </pre>
86+
* // ... and when done with the subscriber ...
87+
* subscriber.stopAsync();
88+
* }</pre>
8989
*/
9090
public interface Subscriber extends Service {
9191
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
@@ -129,13 +129,11 @@ public static enum AckReply {
129129
*
130130
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
131131
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
132-
* management, still some extra bytes could be kept at lower layers.
132+
* management, still some extra bytes could be kept at lower layers.</b>
133133
*/
134134
Optional<Integer> getMaxOutstandingElementCount();
135135

136-
/**
137-
* Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced.
138-
*/
136+
/** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */
139137
Optional<Integer> getMaxOutstandingRequestBytes();
140138

141139
/** Builder of {@link Subscriber Subscribers}. */

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java

+5-15
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

21-
import com.google.cloud.GrpcServiceOptions;
2221
import com.google.cloud.Policy;
23-
import com.google.cloud.pubsub.PubSub.PullOption;
2422
import com.google.common.base.Function;
25-
2623
import java.io.IOException;
2724
import java.io.ObjectInputStream;
28-
import java.util.Iterator;
2925
import java.util.List;
3026
import java.util.Objects;
3127
import java.util.concurrent.Future;
@@ -41,18 +37,12 @@
4137
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
4238
* indicates that the Pub/Sub server should resend it (implicit "nack").
4339
*
44-
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
45-
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
46-
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
47-
* When messages are pulled with {@link PubSub#pull(String, int)} or
48-
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
49-
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
50-
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
51-
* {@link PubSub#ackAsync(String, String, String...)}.
40+
* <p>In a pull subscription, the subscribing application must pull messages using {@link
41+
* PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}.
5242
*
53-
* <p>{@code Subscription} adds a layer of service-related functionality over
54-
* {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
55-
* object with the most recent information use {@link #reload} or {@link #reloadAsync}.
43+
* <p>{@code Subscription} adds a layer of service-related functionality over {@link
44+
* SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} object with
45+
* the most recent information use {@link #reload} or {@link #reloadAsync}.
5646
*
5747
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
5848
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java

+24-32
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020

2121
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
2222
import com.google.common.base.MoreObjects;
23-
2423
import java.io.Serializable;
2524
import java.util.Objects;
26-
import java.util.concurrent.TimeUnit;
2725

2826
/**
2927
* A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
@@ -36,14 +34,8 @@
3634
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
3735
* indicates that the Pub/Sub server should resend it (implicit "nack").
3836
*
39-
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
40-
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
41-
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
42-
* When messages are pulled with {@link PubSub#pull(String, int)} or
43-
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
44-
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
45-
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
46-
* {@link PubSub#ackAsync(String, String, String...)}.
37+
* <p>In a pull subscription, the subscribing application must pull messages using {@link
38+
* PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}.
4739
*
4840
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
4941
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
@@ -140,10 +132,10 @@ public abstract static class Builder {
140132
* acknowledge the message. After message delivery but before the ack deadline expires and
141133
* before the message is acknowledged, it is an outstanding message and will not be delivered
142134
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
143-
* as the initial value for the ack deadline. To override the ack deadline value for a given
144-
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
145-
* delivery, this value is used to set the request timeout for the call to the push endpoint.
146-
* This value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
135+
* as the initial value for the ack deadline, and {@link Subscriber} automatically renews
136+
* unprocessed messages. For push delivery, this value is used to set the request timeout for
137+
* the call to the push endpoint. This value must be between 10 and 600 seconds, if not
138+
* specified, 10 seconds is used.
147139
*/
148140
@Deprecated
149141
public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds);
@@ -153,10 +145,10 @@ public abstract static class Builder {
153145
* acknowledge the message. After message delivery but before the ack deadline expires and
154146
* before the message is acknowledged, it is an outstanding message and will not be delivered
155147
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
156-
* as the initial value for the ack deadline. To override the ack deadline value for a given
157-
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
158-
* delivery, this value is used to set the request timeout for the call to the push endpoint.
159-
* This value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
148+
* as the initial value for the ack deadline. , and {@link Subscriber} automatically renews
149+
* unprocessed messages. For push delivery, this value is used to set the request timeout for
150+
* the call to the push endpoint. This value must be between 10 and 600 seconds, if not
151+
* specified, 10 seconds is used.
160152
*/
161153
public abstract Builder setAckDeadLineSeconds(int ackDeadLineSeconds);
162154

@@ -333,13 +325,13 @@ public PushConfig getPushConfig() {
333325

334326
/**
335327
* Returns the maximum time after a subscriber receives a message before the subscriber should
336-
* acknowledge the message. After message delivery but before the ack deadline expires and
337-
* before the message is acknowledged, it is an outstanding message and will not be delivered
338-
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
339-
* as the initial value for the ack deadline. To override the ack deadline value for a given
340-
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
341-
* delivery, this value is used to set the request timeout for the call to the push endpoint. This
342-
* value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
328+
* acknowledge the message. After message delivery but before the ack deadline expires and before
329+
* the message is acknowledged, it is an outstanding message and will not be delivered again
330+
* during that time (on a best-effort basis). For pull subscriptions, this value is used as the
331+
* initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed
332+
* messages. For push delivery, this value is used to set the request timeout for the call to the
333+
* push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is
334+
* used.
343335
*/
344336
@Deprecated
345337
public long ackDeadlineSeconds() {
@@ -348,13 +340,13 @@ public long ackDeadlineSeconds() {
348340

349341
/**
350342
* Returns the maximum time after a subscriber receives a message before the subscriber should
351-
* acknowledge the message. After message delivery but before the ack deadline expires and
352-
* before the message is acknowledged, it is an outstanding message and will not be delivered
353-
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
354-
* as the initial value for the ack deadline. To override the ack deadline value for a given
355-
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
356-
* delivery, this value is used to set the request timeout for the call to the push endpoint. This
357-
* value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
343+
* acknowledge the message. After message delivery but before the ack deadline expires and before
344+
* the message is acknowledged, it is an outstanding message and will not be delivered again
345+
* during that time (on a best-effort basis). For pull subscriptions, this value is used as the
346+
* initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed
347+
* messages. For push delivery, this value is used to set the request timeout for the call to the
348+
* push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is
349+
* used.
358350
*/
359351
public long getAckDeadlineSeconds() {
360352
return ackDeadlineSeconds;

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,10 @@ public void testBuilderInvalidArguments() {
410410
}
411411
try {
412412
builder.setBundlingSettings(
413-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold((Long)null).build());
413+
Publisher.DEFAULT_BUNDLING_SETTINGS
414+
.toBuilder()
415+
.setRequestByteThreshold((Long) null)
416+
.build());
414417
fail("Should have thrown an NullPointerException");
415418
} catch (NullPointerException expected) {
416419
// Expected
@@ -454,7 +457,10 @@ public void testBuilderInvalidArguments() {
454457
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build());
455458
try {
456459
builder.setBundlingSettings(
457-
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold((Long)null).build());
460+
Publisher.DEFAULT_BUNDLING_SETTINGS
461+
.toBuilder()
462+
.setElementCountThreshold((Long) null)
463+
.build());
458464
fail("Should have thrown an NullPointerException");
459465
} catch (NullPointerException expected) {
460466
// Expected

0 commit comments

Comments
 (0)