From b86e2ac7e5fe269b5c839809a270a257936597ff Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 11 Apr 2018 16:38:25 -0700 Subject: [PATCH 1/2] pubsub: change Subscriber defaults --- CHANGES.md | 17 ++++++++ .../pubsub/snippets/SubscriberSnippets.java | 7 ++-- .../google/cloud/pubsub/v1/Subscriber.java | 40 +++++++++++++------ 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b705712e5877..4d267857f653 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - `TableResult.getTotalRows()` can be called to obtain the total number of rows across pages. - Various `Job` statistics are no longer available at `QueryResponse`. - Use `BigQuery.getJob` then `Job.getStatistics` instead. + # v0.36.0 ## Pub/Sub - `TopicName` is renamed to `ProjectTopicName`, and now inherits from a new base class `TopicName` @@ -34,3 +35,19 @@ - `subscription.getTopicAsTopicNameOneof()`: use `TopicNames.parse(subscription.getTopic())` - `subscription.getNameAsSubscriptionName()`: use `ProjectSubscriptionName.parse(subscription.getName())` - `snapshot.getNameAsSnapshotName()`: use `ProjectSnapshotName.parse(snapshot.getName())` + +# v0.44.0 +## Pub/Sub +The default flow control settings for `Subscriber` is changed. + +- Previously it keeps combined size of outstanding messages below 20% of available memory. + Now it keeps the number of outstanding messages less than or equal to 1000. +- Previously it opens one stream per available CPU. + Now it opens one regardless of number of CPUs. + +Slow message consumers will likely see better load-balancing across machines. +Because each machine pulls messages less eagerly, messages not yet pulled can be pulled by another machine. + +Fast message consumers might see reduced performance. +If desired, these settings can be adjusted back by `Subscriber.Builder#setFlowControlSettings` and +`Subscriber.Builder#setParallelPullCount`. diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index 5f26d3ed6a3a..44cde5558ada 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -161,10 +161,11 @@ private Subscriber createSingleThreadedSubscriber() throws Exception { private Subscriber createSubscriberWithCustomFlowSettings() throws Exception { // [START pubsub_subscriber_flow_settings] - long maxMessageCount = 10L; - // Configure max number of messages to be pulled FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxMessageCount).build(); + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(10_000L) + .setMaxOutstandingRequestBytes(1_000_000_000L) + .build(); Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver) .setFlowControlSettings(flowControlSettings) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 74fdb6211abe..8e5d9d7582f8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -39,12 +39,9 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.SubscriberGrpc; -import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub; import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; -import com.google.pubsub.v1.Subscription; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.auth.MoreCallCredentials; @@ -56,7 +53,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -96,7 +92,6 @@ */ public class Subscriber extends AbstractApiService { private static final int THREADS_PER_CHANNEL = 5; - @InternalApi static final int CHANNELS_PER_CORE = 1; private static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size. @InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600; @@ -414,13 +409,11 @@ public static final class Builder { private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.ofMillis(100); private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.ofSeconds(5); private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); - private static final long DEFAULT_MEMORY_PERCENTAGE = 20; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() .setExecutorThreadCount( THREADS_PER_CHANNEL - * CHANNELS_PER_CORE * Runtime.getRuntime().availableProcessors()) .build(); @@ -431,10 +424,7 @@ public static final class Builder { Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder() - .setMaxOutstandingRequestBytes( - Runtime.getRuntime().maxMemory() * DEFAULT_MEMORY_PERCENTAGE / 100L) - .build(); + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; ExecutorProvider systemExecutorProvider = FixedExecutorProvider.create(SHARED_SYSTEM_EXECUTOR); @@ -449,7 +439,7 @@ public static final class Builder { CredentialsProvider credentialsProvider = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build(); Optional clock = Optional.absent(); - int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE; + int parallelPullCount = 1; Builder(String subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; @@ -500,7 +490,31 @@ Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) { return this; } - /** Sets the flow control settings. */ + /** + * Sets the flow control settings. + * + *

In the example below, the {@Subscriber} will make sure that + * + *

+ * + * "Outstanding messages" here means the messages that have already been given to {@link + * MessageReceiver} but not yet {@code acked()} or {@code nacked()}. + * + *
{@code
+     *    FlowControlSettings flowControlSettings =
+     *     FlowControlSettings.newBuilder()
+     *         .setMaxOutstandingElementCount(10_000L)
+     *         .setMaxOutstandingRequestBytes(1_000_000_000L)
+     *         .build();
+     * Subscriber subscriber =
+     *     Subscriber.newBuilder(subscriptionName, receiver)
+     *         .setFlowControlSettings(flowControlSettings)
+     *         .build();
+     * }
+ */ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; From d2b0276ac9ca1f39d657580713abe393e93f5c28 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 12 Apr 2018 12:34:41 -0700 Subject: [PATCH 2/2] pr comment --- .../java/com/google/cloud/pubsub/v1/Subscriber.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 8e5d9d7582f8..2b976aafb89e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -504,11 +504,11 @@ Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) { * MessageReceiver} but not yet {@code acked()} or {@code nacked()}. * *
{@code
-     *    FlowControlSettings flowControlSettings =
-     *     FlowControlSettings.newBuilder()
-     *         .setMaxOutstandingElementCount(10_000L)
-     *         .setMaxOutstandingRequestBytes(1_000_000_000L)
-     *         .build();
+     * FlowControlSettings flowControlSettings =
+     *  FlowControlSettings.newBuilder()
+     *      .setMaxOutstandingElementCount(10_000L)
+     *      .setMaxOutstandingRequestBytes(1_000_000_000L)
+     *      .build();
      * Subscriber subscriber =
      *     Subscriber.newBuilder(subscriptionName, receiver)
      *         .setFlowControlSettings(flowControlSettings)