Skip to content

Publisher uses gax BundlingSettings #1489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -92,6 +93,13 @@ public interface Publisher {
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds

BundlingSettings DEFAULT_BUNDLING_SETTINGS =
BundlingSettings.newBuilder()
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
.build();

/** Topic to which the publisher publishes to. */
String getTopic();

Expand Down Expand Up @@ -156,9 +164,7 @@ final class Builder {
String topic;

// Bundling options
int maxBundleMessages;
int maxBundleBytes;
Duration maxBundleDuration;
BundlingSettings bundlingSettings;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Expand Down Expand Up @@ -192,9 +198,7 @@ private void setDefaults() {
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
maxBundleMessages = DEFAULT_MAX_BUNDLE_MESSAGES;
maxBundleBytes = DEFAULT_MAX_BUNDLE_BYTES;
maxBundleDuration = DEFAULT_MAX_BUNDLE_DURATION;
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
Expand Down Expand Up @@ -225,38 +229,26 @@ public Builder setChannelBuilder(
}

// Bundling options

/**
* Maximum number of messages to send per publish call.
*
* <p>It also sets a target to when to trigger a publish.
*/
public Builder setMaxBundleMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxBundleMessages = messages;
return this;
}

/**
* Maximum number of bytes to send per publish call.
*
* <p>It also sets a target to when to trigger a publish.
*
* <p>This will not be honored if a single message is published that exceeds this maximum.
*/
public Builder setMaxBundleBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxBundleBytes = bytes;
return this;
}

/**
* Time to wait, since the first message is kept in memory for bundling, before triggering a
* publish call.
*/
public Builder setMaxBundleDuration(Duration duration) {
Preconditions.checkArgument(duration.getMillis() >= 0);
maxBundleDuration = duration;
public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
Preconditions.checkNotNull(bundlingSettings);
Preconditions.checkNotNull(bundlingSettings.getElementCountThreshold());
Preconditions.checkArgument(bundlingSettings.getElementCountThreshold() > 0);
Preconditions.checkNotNull(bundlingSettings.getRequestByteThreshold());
Preconditions.checkArgument(bundlingSettings.getRequestByteThreshold() > 0);
Preconditions.checkNotNull(bundlingSettings.getDelayThreshold());
Preconditions.checkArgument(bundlingSettings.getDelayThreshold().getMillis() > 0);

Preconditions.checkArgument(
bundlingSettings.getElementCountLimit() == null,
"elementCountLimit option not honored by current implementation");
Preconditions.checkArgument(
bundlingSettings.getRequestByteLimit() == null,
"requestByteLimit option not honored by current implementation");
Preconditions.checkArgument(
bundlingSettings.getBlockingCallCountThreshold() == null,
"blockingCallCountThreshold option not honored by current implementation");

this.bundlingSettings = bundlingSettings;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ final class PublisherImpl implements Publisher {

private final String topic;

private final int maxBundleMessages;
private final int maxBundleBytes;
private final long maxBundleMessages;
private final long maxBundleBytes;
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;

Expand Down Expand Up @@ -93,9 +93,9 @@ final class PublisherImpl implements Publisher {
PublisherImpl(Builder builder) throws IOException {
topic = builder.topic;

maxBundleMessages = builder.maxBundleMessages;
maxBundleBytes = builder.maxBundleBytes;
maxBundleDuration = builder.maxBundleDuration;
maxBundleMessages = builder.bundlingSettings.getElementCountThreshold();
maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold();
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
hasBundlingBytes = maxBundleBytes > 0;

maxOutstandingMessages = builder.maxOutstandingMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;

import com.google.api.gax.grpc.BundlingSettings;
import com.google.cloud.pubsub.Publisher.Builder;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -99,9 +100,13 @@ public static void tearDownClass() throws Exception {
public void testPublishByDuration() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setMaxBundleDuration(Duration.standardSeconds(5))
// To demonstrate that reaching duration will trigger publish
.setMaxBundleMessages(10)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.standardSeconds(5))
.setElementCountThreshold(10)
.build())
.build();

testPublisherServiceImpl.addPublishResponse(
Expand All @@ -127,8 +132,12 @@ public void testPublishByDuration() throws Exception {
public void testPublishByNumBundledMessages() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setMaxBundleDuration(Duration.standardSeconds(100))
.setMaxBundleMessages(2)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(2)
.setDelayThreshold(Duration.standardSeconds(100))
.build())
.build();

testPublisherServiceImpl
Expand Down Expand Up @@ -162,8 +171,12 @@ public void testPublishByNumBundledMessages() throws Exception {
public void testSinglePublishByNumBytes() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setMaxBundleDuration(Duration.standardSeconds(100))
.setMaxBundleMessages(2)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(2)
.setDelayThreshold(Duration.standardSeconds(100))
.build())
.build();

testPublisherServiceImpl
Expand Down Expand Up @@ -192,9 +205,13 @@ public void testSinglePublishByNumBytes() throws Exception {
public void testPublishMixedSizeAndDuration() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setMaxBundleDuration(Duration.standardSeconds(5))
// To demonstrate that reaching duration will trigger publish
.setMaxBundleMessages(2)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(2)
.setDelayThreshold(Duration.standardSeconds(5))
.build())
.build();

testPublisherServiceImpl.addPublishResponse(
Expand Down Expand Up @@ -237,8 +254,12 @@ public void testPublishFailureRetries() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setExecutor(Executors.newSingleThreadScheduledExecutor())
.setMaxBundleDuration(Duration.standardSeconds(5))
.setMaxBundleMessages(1)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(1)
.setDelayThreshold(Duration.standardSeconds(5))
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
Expand All @@ -258,8 +279,12 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
getTestPublisherBuilder()
.setExecutor(Executors.newSingleThreadScheduledExecutor())
.setSendBundleDeadline(Duration.standardSeconds(10))
.setMaxBundleDuration(Duration.standardSeconds(5))
.setMaxBundleMessages(1)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(1)
.setDelayThreshold(Duration.standardSeconds(5))
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
Expand All @@ -283,8 +308,12 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
getTestPublisherBuilder()
.setExecutor(Executors.newSingleThreadScheduledExecutor())
.setSendBundleDeadline(Duration.standardSeconds(10))
.setMaxBundleDuration(Duration.standardSeconds(5))
.setMaxBundleMessages(1)
.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold(1)
.setDelayThreshold(Duration.standardSeconds(5))
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
Expand All @@ -309,9 +338,12 @@ public void testPublisherGetters() throws Exception {
builder.setCredentials(credentials);
builder.setExecutor(executor);
builder.setFailOnFlowControlLimits(true);
builder.setMaxBundleBytes(10);
builder.setMaxBundleDuration(new Duration(11));
builder.setMaxBundleMessages(12);
builder.setBundlingSettings(
BundlingSettings.newBuilder()
.setRequestByteThreshold(10)
.setDelayThreshold(new Duration(11))
.setElementCountThreshold(12)
.build());
builder.setMaxOutstandingBytes(13);
builder.setMaxOutstandingMessages(14);
builder.setRequestTimeout(new Duration(15));
Expand All @@ -334,9 +366,14 @@ public void testBuilderParametersAndDefaults() {
assertEquals(Optional.absent(), builder.channelBuilder);
assertEquals(Optional.absent(), builder.executor);
assertFalse(builder.failOnFlowControlLimits);
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_BYTES, builder.maxBundleBytes);
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.maxBundleDuration);
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.maxBundleMessages);
assertEquals(
Publisher.DEFAULT_MAX_BUNDLE_BYTES,
builder.bundlingSettings.getRequestByteThreshold().longValue());
assertEquals(
Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.bundlingSettings.getDelayThreshold());
assertEquals(
Publisher.DEFAULT_MAX_BUNDLE_MESSAGES,
builder.bundlingSettings.getElementCountThreshold().longValue());
assertEquals(Optional.absent(), builder.maxOutstandingBytes);
assertEquals(Optional.absent(), builder.maxOutstandingMessages);
assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout);
Expand Down Expand Up @@ -369,41 +406,66 @@ public void testBuilderInvalidArguments() {
// Expected
}
try {
builder.setMaxBundleBytes(0);
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(null).build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
// Expected
}
try {
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(0).build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
}
try {
builder.setMaxBundleBytes(-1);
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(-1).build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
}

builder.setMaxBundleDuration(new Duration(1));
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(new Duration(1)).build());
try {
builder.setMaxBundleDuration(null);
fail("Should have thrown an IllegalArgumentException");
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
// Expected
}
try {
builder.setMaxBundleDuration(new Duration(-1));
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setDelayThreshold(new Duration(-1))
.build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
}

builder.setMaxBundleMessages(1);
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build());
try {
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(null).build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
// Expected
}
try {
builder.setMaxBundleMessages(0);
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(0).build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
}
try {
builder.setMaxBundleMessages(-1);
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(-1).build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
Expand Down