Skip to content

Commit c3ded9d

Browse files
authored
clean up doc (#1524)
* fix doc annotations * clean up Publisher and Subscriber surface - PUBSUB_API_ADDRESS and PUBSUB_API_SCOPE are removed. We now use {Publisher,Subscriber}Settings instead. - Default settings are moved into the Builder class, so that they have package-private visibility. We can make them public later if we want to. - Subscriber doc is fixed so that it is not chopped off mid-sentence. - Subscriber doc mistakenly referred to the Publisher. This is now fixed.
1 parent 5f1417c commit c3ded9d

File tree

5 files changed

+89
-73
lines changed

5 files changed

+89
-73
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

+21-25
Original file line numberDiff line numberDiff line change
@@ -50,57 +50,39 @@
5050
* <p>For example, a {@link Publisher} can be constructed and used to publish a list of messages as
5151
* follows:
5252
*
53-
* <pre>
53+
* <pre><code>
5454
* Publisher publisher =
5555
* Publisher.Builder.newBuilder(MY_TOPIC)
5656
* .setMaxBundleDuration(new Duration(10 * 1000))
5757
* .build();
58-
* List<ListenableFuture<String>> results = new ArrayList<>();
58+
* List&lt;ListenableFuture&lt;String&gt;&gt; results = new ArrayList&lt;&gt;();
5959
*
6060
* for (PubsubMessage messages : messagesToPublish) {
6161
* results.add(publisher.publish(message));
6262
* }
6363
*
6464
* Futures.addCallback(
6565
* Futures.allAsList(results),
66-
* new FutureCallback<List<String>>() {
67-
* @Override
68-
* public void onSuccess(List<String> messageIds) {
66+
* new FutureCallback&lt;List&lt;String&gt;&gt;() {
67+
* &#64;Override
68+
* public void onSuccess(List&lt;String&gt; messageIds) {
6969
* // ... process the acknowledgement of publish ...
7070
* }
71-
* @Override
71+
* &#64;Override
7272
* public void onFailure(Throwable t) {
7373
* // .. handle the failure ...
7474
* }
7575
* });
7676
*
7777
* // Ensure all the outstanding messages have been published before shutting down your process.
7878
* publisher.shutdown();
79-
* </pre>
79+
* </code></pre>
8080
*/
8181
public interface Publisher {
82-
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
83-
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
84-
8582
// API limits.
8683
int MAX_BUNDLE_MESSAGES = 1000;
8784
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
8885

89-
// Meaningful defaults.
90-
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
91-
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
92-
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
93-
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
94-
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
95-
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
96-
97-
BundlingSettings DEFAULT_BUNDLING_SETTINGS =
98-
BundlingSettings.newBuilder()
99-
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
100-
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
101-
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
102-
.build();
103-
10486
/** Topic to which the publisher publishes to. */
10587
String getTopic();
10688

@@ -163,6 +145,20 @@ public interface Publisher {
163145

164146
/** A builder of {@link Publisher}s. */
165147
public final class Builder {
148+
// Meaningful defaults.
149+
static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
150+
static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
151+
static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
152+
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
153+
static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
154+
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
155+
static final BundlingSettings DEFAULT_BUNDLING_SETTINGS =
156+
BundlingSettings.newBuilder()
157+
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
158+
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
159+
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
160+
.build();
161+
166162
String topic;
167163

168164
// Bundling options

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
2122
import com.google.common.base.Optional;
2223
import com.google.common.collect.ImmutableList;
2324
import com.google.common.primitives.Ints;
@@ -38,7 +39,6 @@
3839
import io.grpc.netty.NegotiationType;
3940
import io.grpc.netty.NettyChannelBuilder;
4041
import java.io.IOException;
41-
import java.util.Collections;
4242
import java.util.Iterator;
4343
import java.util.LinkedList;
4444
import java.util.List;
@@ -125,7 +125,9 @@ final class PublisherImpl implements Publisher {
125125
channels[i] =
126126
builder.channelBuilder.isPresent()
127127
? builder.channelBuilder.get().build()
128-
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
128+
: NettyChannelBuilder.forAddress(
129+
PublisherSettings.getDefaultServiceAddress(),
130+
PublisherSettings.getDefaultServicePort())
129131
.negotiationType(NegotiationType.TLS)
130132
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
131133
.executor(executor)
@@ -136,7 +138,7 @@ final class PublisherImpl implements Publisher {
136138
builder.userCredentials.isPresent()
137139
? builder.userCredentials.get()
138140
: GoogleCredentials.getApplicationDefault()
139-
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE)));
141+
.createScoped(PublisherSettings.getDefaultServiceScopes()));
140142
shutdown = new AtomicBoolean(false);
141143
messagesWaiter = new MessagesWaiter();
142144
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

+13-16
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@
5454
* in memory before the receiver either ack or nack them.
5555
* </ul>
5656
*
57-
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
58-
* through {@link GoogleCredentials#getApplicationDefault}.
57+
* <p>If no credentials are provided, the {@link Subscriber} will use application default
58+
* credentials through {@link GoogleCredentials#getApplicationDefault}.
5959
*
6060
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
6161
*
62-
* <pre>{@code
62+
* <pre><code>
6363
* MessageReceiver receiver = new MessageReceiver() {
64-
* @Override
65-
* public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
64+
* &#64;Override
65+
* public ListenableFuture&lt;AckReply&gt; receiveMessage(PubsubMessage message) {
6666
* // ... process message ...
6767
* return Futures.immediateFuture(AckReply.ACK);
6868
* }
@@ -77,20 +77,17 @@
7777
*
7878
* // ... recommended, listen for fatal errors that break the subscriber streaming ...
7979
* subscriber.addListener(new Listener() {
80-
* @Override
80+
* &#64;Override
8181
* public void failed(State from, Throwable failure) {
82-
* System.out.println("Subscriber faile with error: " + failure);
82+
* System.out.println("Subscriber failed with error: " + failure);
8383
* }
8484
* }, Executors.newSingleThreadExecutor());
8585
*
8686
* // ... and when done with the subscriber ...
8787
* subscriber.stopAsync();
88-
* }</pre>
88+
* </code></pre>
8989
*/
9090
public interface Subscriber extends Service {
91-
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
92-
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";
93-
9491
/** Retrieves a snapshot of the current subscriber statistics. */
9592
SubscriberStats getStats();
9693

@@ -125,15 +122,15 @@ public static enum AckReply {
125122
Duration getAckExpirationPadding();
126123

127124
/**
128-
* Maximum number of outstanding (i.e. pending to process) messages before limits are enforced.
125+
* Maximum number of outstanding messages before limits are enforced.
129126
*
130127
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
131128
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
132129
* management, still some extra bytes could be kept at lower layers.</b>
133130
*/
134131
Optional<Integer> getMaxOutstandingElementCount();
135132

136-
/** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */
133+
/** Maximum number of outstanding bytes before limits are enforced. */
137134
Optional<Integer> getMaxOutstandingRequestBytes();
138135

139136
/** Builder of {@link Subscriber Subscribers}. */
@@ -158,7 +155,7 @@ public final class Builder {
158155
* Constructs a new {@link Builder}.
159156
*
160157
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
161-
* Publisher}.
158+
* Subscriber}.
162159
*
163160
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
164161
* @param receiver an implementation of {@link MessageReceiver} used to process the received
@@ -226,8 +223,8 @@ public Builder setExecutor(ScheduledExecutorService executor) {
226223
return this;
227224
}
228225

229-
/** Gives the ability to set a custom executor. */
230-
public Builder setClock(Clock clock) {
226+
/** Gives the ability to set a custom clock. */
227+
Builder setClock(Clock clock) {
231228
this.clock = Optional.of(clock);
232229
return this;
233230
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.auth.Credentials;
2222
import com.google.auth.oauth2.GoogleCredentials;
2323
import com.google.cloud.Clock;
24+
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
2425
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Optional;
2627
import com.google.common.primitives.Ints;
@@ -35,7 +36,6 @@
3536
import io.grpc.netty.NettyChannelBuilder;
3637
import java.io.IOException;
3738
import java.util.ArrayList;
38-
import java.util.Collections;
3939
import java.util.List;
4040
import java.util.concurrent.CountDownLatch;
4141
import java.util.concurrent.Executors;
@@ -104,7 +104,9 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
104104
channelBuilder =
105105
builder.channelBuilder.isPresent()
106106
? builder.channelBuilder.get()
107-
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
107+
: NettyChannelBuilder.forAddress(
108+
SubscriberSettings.getDefaultServiceAddress(),
109+
SubscriberSettings.getDefaultServicePort())
108110
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
109111
.flowControlWindow(5000000) // 2.5 MB
110112
.negotiationType(NegotiationType.TLS)
@@ -115,7 +117,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
115117
builder.credentials.isPresent()
116118
? builder.credentials.get()
117119
: GoogleCredentials.getApplicationDefault()
118-
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE));
120+
.createScoped(SubscriberSettings.getDefaultServiceScopes());
119121

120122
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
121123
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);

0 commit comments

Comments
 (0)