Skip to content

clean up doc #1524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,57 +50,39 @@
* <p>For example, a {@link Publisher} can be constructed and used to publish a list of messages as
* follows:
*
* <pre>
* <pre><code>
* Publisher publisher =
* Publisher.Builder.newBuilder(MY_TOPIC)
* .setMaxBundleDuration(new Duration(10 * 1000))
* .build();
* List<ListenableFuture<String>> results = new ArrayList<>();
* List&lt;ListenableFuture&lt;String&gt;&gt; results = new ArrayList&lt;&gt;();
*
* for (PubsubMessage messages : messagesToPublish) {
* results.add(publisher.publish(message));
* }
*
* Futures.addCallback(
* Futures.allAsList(results),
* new FutureCallback<List<String>>() {
* @Override
* public void onSuccess(List<String> messageIds) {
* new FutureCallback&lt;List&lt;String&gt;&gt;() {
* &#64;Override
* public void onSuccess(List&lt;String&gt; messageIds) {
* // ... process the acknowledgement of publish ...
* }
* @Override
* &#64;Override
* public void onFailure(Throwable t) {
* // .. handle the failure ...
* }
* });
*
* // Ensure all the outstanding messages have been published before shutting down your process.
* publisher.shutdown();
* </pre>
* </code></pre>
*/
public interface Publisher {
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";

// API limits.
int MAX_BUNDLE_MESSAGES = 1000;
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds

BundlingSettings DEFAULT_BUNDLING_SETTINGS =
BundlingSettings.newBuilder()
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
.build();

/** Topic to which the publisher publishes to. */
String getTopic();

Expand Down Expand Up @@ -163,6 +145,20 @@ public interface Publisher {

/** A builder of {@link Publisher}s. */
public final class Builder {
// Meaningful defaults.
static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
static final BundlingSettings DEFAULT_BUNDLING_SETTINGS =
BundlingSettings.newBuilder()
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
.build();

String topic;

// Bundling options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.gax.bundling.FlowController;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
Expand All @@ -38,7 +39,6 @@
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -125,7 +125,9 @@ final class PublisherImpl implements Publisher {
channels[i] =
builder.channelBuilder.isPresent()
? builder.channelBuilder.get().build()
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
: NettyChannelBuilder.forAddress(
PublisherSettings.getDefaultServiceAddress(),
PublisherSettings.getDefaultServicePort())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.executor(executor)
Expand All @@ -136,7 +138,7 @@ final class PublisherImpl implements Publisher {
builder.userCredentials.isPresent()
? builder.userCredentials.get()
: GoogleCredentials.getApplicationDefault()
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE)));
.createScoped(PublisherSettings.getDefaultServiceScopes()));
shutdown = new AtomicBoolean(false);
messagesWaiter = new MessagesWaiter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@
* in memory before the receiver either ack or nack them.
* </ul>
*
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
* through {@link GoogleCredentials#getApplicationDefault}.
* <p>If no credentials are provided, the {@link Subscriber} will use application default
* credentials through {@link GoogleCredentials#getApplicationDefault}.
*
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
*
* <pre>{@code
* <pre><code>
* MessageReceiver receiver = new MessageReceiver() {
* @Override
* public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
* &#64;Override
* public ListenableFuture&lt;AckReply&gt; receiveMessage(PubsubMessage message) {
* // ... process message ...
* return Futures.immediateFuture(AckReply.ACK);
* }
Expand All @@ -77,20 +77,17 @@
*
* // ... recommended, listen for fatal errors that break the subscriber streaming ...
* subscriber.addListener(new Listener() {
* @Override
* &#64;Override
* public void failed(State from, Throwable failure) {
* System.out.println("Subscriber faile with error: " + failure);
* System.out.println("Subscriber failed with error: " + failure);
* }
* }, Executors.newSingleThreadExecutor());
*
* // ... and when done with the subscriber ...
* subscriber.stopAsync();
* }</pre>
* </code></pre>
*/
public interface Subscriber extends Service {
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub";

/** Retrieves a snapshot of the current subscriber statistics. */
SubscriberStats getStats();

Expand Down Expand Up @@ -125,15 +122,15 @@ public static enum AckReply {
Duration getAckExpirationPadding();

/**
* Maximum number of outstanding (i.e. pending to process) messages before limits are enforced.
* Maximum number of outstanding messages before limits are enforced.
*
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
* management, still some extra bytes could be kept at lower layers.</b>
*/
Optional<Integer> getMaxOutstandingElementCount();

/** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */
/** Maximum number of outstanding bytes before limits are enforced. */
Optional<Integer> getMaxOutstandingRequestBytes();

/** Builder of {@link Subscriber Subscribers}. */
Expand All @@ -158,7 +155,7 @@ public final class Builder {
* Constructs a new {@link Builder}.
*
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
* Publisher}.
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
Expand Down Expand Up @@ -226,8 +223,8 @@ public Builder setExecutor(ScheduledExecutorService executor) {
return this;
}

/** Gives the ability to set a custom executor. */
public Builder setClock(Clock clock) {
/** Gives the ability to set a custom clock. */
Builder setClock(Clock clock) {
this.clock = Optional.of(clock);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
Expand All @@ -35,7 +36,6 @@
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -104,7 +104,9 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
channelBuilder =
builder.channelBuilder.isPresent()
? builder.channelBuilder.get()
: NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443)
: NettyChannelBuilder.forAddress(
SubscriberSettings.getDefaultServiceAddress(),
SubscriberSettings.getDefaultServicePort())
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.flowControlWindow(5000000) // 2.5 MB
.negotiationType(NegotiationType.TLS)
Expand All @@ -115,7 +117,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
builder.credentials.isPresent()
? builder.credentials.get()
: GoogleCredentials.getApplicationDefault()
.createScoped(Collections.singletonList(PUBSUB_API_SCOPE));
.createScoped(SubscriberSettings.getDefaultServiceScopes());

streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
Expand Down
Loading