Skip to content

Commit 586bb2a

Browse files
authored
Publisher uses gax BundlingSettings (#1489)
1 parent 61687f0 commit 586bb2a

File tree

3 files changed

+126
-72
lines changed

3 files changed

+126
-72
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

+30-38
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.api.gax.grpc.BundlingSettings;
1920
import com.google.auth.Credentials;
2021
import com.google.auth.oauth2.GoogleCredentials;
2122
import com.google.common.base.Optional;
@@ -92,6 +93,13 @@ public interface Publisher {
9293
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
9394
Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
9495

96+
BundlingSettings DEFAULT_BUNDLING_SETTINGS =
97+
BundlingSettings.newBuilder()
98+
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
99+
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
100+
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
101+
.build();
102+
95103
/** Topic to which the publisher publishes to. */
96104
String getTopic();
97105

@@ -156,9 +164,7 @@ final class Builder {
156164
String topic;
157165

158166
// Bundling options
159-
int maxBundleMessages;
160-
int maxBundleBytes;
161-
Duration maxBundleDuration;
167+
BundlingSettings bundlingSettings;
162168

163169
// Client-side flow control options
164170
Optional<Integer> maxOutstandingMessages;
@@ -192,9 +198,7 @@ private void setDefaults() {
192198
channelBuilder = Optional.absent();
193199
maxOutstandingMessages = Optional.absent();
194200
maxOutstandingBytes = Optional.absent();
195-
maxBundleMessages = DEFAULT_MAX_BUNDLE_MESSAGES;
196-
maxBundleBytes = DEFAULT_MAX_BUNDLE_BYTES;
197-
maxBundleDuration = DEFAULT_MAX_BUNDLE_DURATION;
201+
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
198202
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
199203
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
200204
failOnFlowControlLimits = false;
@@ -225,38 +229,26 @@ public Builder setChannelBuilder(
225229
}
226230

227231
// Bundling options
228-
229-
/**
230-
* Maximum number of messages to send per publish call.
231-
*
232-
* <p>It also sets a target to when to trigger a publish.
233-
*/
234-
public Builder setMaxBundleMessages(int messages) {
235-
Preconditions.checkArgument(messages > 0);
236-
maxBundleMessages = messages;
237-
return this;
238-
}
239-
240-
/**
241-
* Maximum number of bytes to send per publish call.
242-
*
243-
* <p>It also sets a target to when to trigger a publish.
244-
*
245-
* <p>This will not be honored if a single message is published that exceeds this maximum.
246-
*/
247-
public Builder setMaxBundleBytes(int bytes) {
248-
Preconditions.checkArgument(bytes > 0);
249-
maxBundleBytes = bytes;
250-
return this;
251-
}
252-
253-
/**
254-
* Time to wait, since the first message is kept in memory for bundling, before triggering a
255-
* publish call.
256-
*/
257-
public Builder setMaxBundleDuration(Duration duration) {
258-
Preconditions.checkArgument(duration.getMillis() >= 0);
259-
maxBundleDuration = duration;
232+
public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
233+
Preconditions.checkNotNull(bundlingSettings);
234+
Preconditions.checkNotNull(bundlingSettings.getElementCountThreshold());
235+
Preconditions.checkArgument(bundlingSettings.getElementCountThreshold() > 0);
236+
Preconditions.checkNotNull(bundlingSettings.getRequestByteThreshold());
237+
Preconditions.checkArgument(bundlingSettings.getRequestByteThreshold() > 0);
238+
Preconditions.checkNotNull(bundlingSettings.getDelayThreshold());
239+
Preconditions.checkArgument(bundlingSettings.getDelayThreshold().getMillis() > 0);
240+
241+
Preconditions.checkArgument(
242+
bundlingSettings.getElementCountLimit() == null,
243+
"elementCountLimit option not honored by current implementation");
244+
Preconditions.checkArgument(
245+
bundlingSettings.getRequestByteLimit() == null,
246+
"requestByteLimit option not honored by current implementation");
247+
Preconditions.checkArgument(
248+
bundlingSettings.getBlockingCallCountThreshold() == null,
249+
"blockingCallCountThreshold option not honored by current implementation");
250+
251+
this.bundlingSettings = bundlingSettings;
260252
return this;
261253
}
262254

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ final class PublisherImpl implements Publisher {
6363

6464
private final String topic;
6565

66-
private final int maxBundleMessages;
67-
private final int maxBundleBytes;
66+
private final long maxBundleMessages;
67+
private final long maxBundleBytes;
6868
private final Duration maxBundleDuration;
6969
private final boolean hasBundlingBytes;
7070

@@ -93,9 +93,9 @@ final class PublisherImpl implements Publisher {
9393
PublisherImpl(Builder builder) throws IOException {
9494
topic = builder.topic;
9595

96-
maxBundleMessages = builder.maxBundleMessages;
97-
maxBundleBytes = builder.maxBundleBytes;
98-
maxBundleDuration = builder.maxBundleDuration;
96+
maxBundleMessages = builder.bundlingSettings.getElementCountThreshold();
97+
maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold();
98+
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
9999
hasBundlingBytes = maxBundleBytes > 0;
100100

101101
maxOutstandingMessages = builder.maxOutstandingMessages;

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java

+91-29
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.atLeast;
2424
import static org.mockito.Mockito.times;
2525

26+
import com.google.api.gax.grpc.BundlingSettings;
2627
import com.google.cloud.pubsub.Publisher.Builder;
2728
import com.google.common.base.Optional;
2829
import com.google.common.util.concurrent.ListenableFuture;
@@ -99,9 +100,13 @@ public static void tearDownClass() throws Exception {
99100
public void testPublishByDuration() throws Exception {
100101
Publisher publisher =
101102
getTestPublisherBuilder()
102-
.setMaxBundleDuration(Duration.standardSeconds(5))
103103
// To demonstrate that reaching duration will trigger publish
104-
.setMaxBundleMessages(10)
104+
.setBundlingSettings(
105+
Publisher.DEFAULT_BUNDLING_SETTINGS
106+
.toBuilder()
107+
.setDelayThreshold(Duration.standardSeconds(5))
108+
.setElementCountThreshold(10)
109+
.build())
105110
.build();
106111

107112
testPublisherServiceImpl.addPublishResponse(
@@ -127,8 +132,12 @@ public void testPublishByDuration() throws Exception {
127132
public void testPublishByNumBundledMessages() throws Exception {
128133
Publisher publisher =
129134
getTestPublisherBuilder()
130-
.setMaxBundleDuration(Duration.standardSeconds(100))
131-
.setMaxBundleMessages(2)
135+
.setBundlingSettings(
136+
Publisher.DEFAULT_BUNDLING_SETTINGS
137+
.toBuilder()
138+
.setElementCountThreshold(2)
139+
.setDelayThreshold(Duration.standardSeconds(100))
140+
.build())
132141
.build();
133142

134143
testPublisherServiceImpl
@@ -162,8 +171,12 @@ public void testPublishByNumBundledMessages() throws Exception {
162171
public void testSinglePublishByNumBytes() throws Exception {
163172
Publisher publisher =
164173
getTestPublisherBuilder()
165-
.setMaxBundleDuration(Duration.standardSeconds(100))
166-
.setMaxBundleMessages(2)
174+
.setBundlingSettings(
175+
Publisher.DEFAULT_BUNDLING_SETTINGS
176+
.toBuilder()
177+
.setElementCountThreshold(2)
178+
.setDelayThreshold(Duration.standardSeconds(100))
179+
.build())
167180
.build();
168181

169182
testPublisherServiceImpl
@@ -192,9 +205,13 @@ public void testSinglePublishByNumBytes() throws Exception {
192205
public void testPublishMixedSizeAndDuration() throws Exception {
193206
Publisher publisher =
194207
getTestPublisherBuilder()
195-
.setMaxBundleDuration(Duration.standardSeconds(5))
196208
// To demonstrate that reaching duration will trigger publish
197-
.setMaxBundleMessages(2)
209+
.setBundlingSettings(
210+
Publisher.DEFAULT_BUNDLING_SETTINGS
211+
.toBuilder()
212+
.setElementCountThreshold(2)
213+
.setDelayThreshold(Duration.standardSeconds(5))
214+
.build())
198215
.build();
199216

200217
testPublisherServiceImpl.addPublishResponse(
@@ -237,8 +254,12 @@ public void testPublishFailureRetries() throws Exception {
237254
Publisher publisher =
238255
getTestPublisherBuilder()
239256
.setExecutor(Executors.newSingleThreadScheduledExecutor())
240-
.setMaxBundleDuration(Duration.standardSeconds(5))
241-
.setMaxBundleMessages(1)
257+
.setBundlingSettings(
258+
Publisher.DEFAULT_BUNDLING_SETTINGS
259+
.toBuilder()
260+
.setElementCountThreshold(1)
261+
.setDelayThreshold(Duration.standardSeconds(5))
262+
.build())
242263
.build(); // To demonstrate that reaching duration will trigger publish
243264

244265
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
@@ -258,8 +279,12 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
258279
getTestPublisherBuilder()
259280
.setExecutor(Executors.newSingleThreadScheduledExecutor())
260281
.setSendBundleDeadline(Duration.standardSeconds(10))
261-
.setMaxBundleDuration(Duration.standardSeconds(5))
262-
.setMaxBundleMessages(1)
282+
.setBundlingSettings(
283+
Publisher.DEFAULT_BUNDLING_SETTINGS
284+
.toBuilder()
285+
.setElementCountThreshold(1)
286+
.setDelayThreshold(Duration.standardSeconds(5))
287+
.build())
263288
.build(); // To demonstrate that reaching duration will trigger publish
264289

265290
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
@@ -283,8 +308,12 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
283308
getTestPublisherBuilder()
284309
.setExecutor(Executors.newSingleThreadScheduledExecutor())
285310
.setSendBundleDeadline(Duration.standardSeconds(10))
286-
.setMaxBundleDuration(Duration.standardSeconds(5))
287-
.setMaxBundleMessages(1)
311+
.setBundlingSettings(
312+
Publisher.DEFAULT_BUNDLING_SETTINGS
313+
.toBuilder()
314+
.setElementCountThreshold(1)
315+
.setDelayThreshold(Duration.standardSeconds(5))
316+
.build())
288317
.build(); // To demonstrate that reaching duration will trigger publish
289318

290319
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
@@ -309,9 +338,12 @@ public void testPublisherGetters() throws Exception {
309338
builder.setCredentials(credentials);
310339
builder.setExecutor(executor);
311340
builder.setFailOnFlowControlLimits(true);
312-
builder.setMaxBundleBytes(10);
313-
builder.setMaxBundleDuration(new Duration(11));
314-
builder.setMaxBundleMessages(12);
341+
builder.setBundlingSettings(
342+
BundlingSettings.newBuilder()
343+
.setRequestByteThreshold(10)
344+
.setDelayThreshold(new Duration(11))
345+
.setElementCountThreshold(12)
346+
.build());
315347
builder.setMaxOutstandingBytes(13);
316348
builder.setMaxOutstandingMessages(14);
317349
builder.setRequestTimeout(new Duration(15));
@@ -334,9 +366,14 @@ public void testBuilderParametersAndDefaults() {
334366
assertEquals(Optional.absent(), builder.channelBuilder);
335367
assertEquals(Optional.absent(), builder.executor);
336368
assertFalse(builder.failOnFlowControlLimits);
337-
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_BYTES, builder.maxBundleBytes);
338-
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.maxBundleDuration);
339-
assertEquals(Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.maxBundleMessages);
369+
assertEquals(
370+
Publisher.DEFAULT_MAX_BUNDLE_BYTES,
371+
builder.bundlingSettings.getRequestByteThreshold().longValue());
372+
assertEquals(
373+
Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.bundlingSettings.getDelayThreshold());
374+
assertEquals(
375+
Publisher.DEFAULT_MAX_BUNDLE_MESSAGES,
376+
builder.bundlingSettings.getElementCountThreshold().longValue());
340377
assertEquals(Optional.absent(), builder.maxOutstandingBytes);
341378
assertEquals(Optional.absent(), builder.maxOutstandingMessages);
342379
assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout);
@@ -369,41 +406,66 @@ public void testBuilderInvalidArguments() {
369406
// Expected
370407
}
371408
try {
372-
builder.setMaxBundleBytes(0);
409+
builder.setBundlingSettings(
410+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(null).build());
411+
fail("Should have thrown an NullPointerException");
412+
} catch (NullPointerException expected) {
413+
// Expected
414+
}
415+
try {
416+
builder.setBundlingSettings(
417+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(0).build());
373418
fail("Should have thrown an IllegalArgumentException");
374419
} catch (IllegalArgumentException expected) {
375420
// Expected
376421
}
377422
try {
378-
builder.setMaxBundleBytes(-1);
423+
builder.setBundlingSettings(
424+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(-1).build());
379425
fail("Should have thrown an IllegalArgumentException");
380426
} catch (IllegalArgumentException expected) {
381427
// Expected
382428
}
383429

384-
builder.setMaxBundleDuration(new Duration(1));
430+
builder.setBundlingSettings(
431+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(new Duration(1)).build());
385432
try {
386-
builder.setMaxBundleDuration(null);
387-
fail("Should have thrown an IllegalArgumentException");
433+
builder.setBundlingSettings(
434+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build());
435+
fail("Should have thrown an NullPointerException");
388436
} catch (NullPointerException expected) {
389437
// Expected
390438
}
391439
try {
392-
builder.setMaxBundleDuration(new Duration(-1));
440+
builder.setBundlingSettings(
441+
Publisher.DEFAULT_BUNDLING_SETTINGS
442+
.toBuilder()
443+
.setDelayThreshold(new Duration(-1))
444+
.build());
393445
fail("Should have thrown an IllegalArgumentException");
394446
} catch (IllegalArgumentException expected) {
395447
// Expected
396448
}
397449

398-
builder.setMaxBundleMessages(1);
450+
builder.setBundlingSettings(
451+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build());
452+
try {
453+
builder.setBundlingSettings(
454+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(null).build());
455+
fail("Should have thrown an NullPointerException");
456+
} catch (NullPointerException expected) {
457+
// Expected
458+
}
399459
try {
400-
builder.setMaxBundleMessages(0);
460+
builder.setBundlingSettings(
461+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(0).build());
401462
fail("Should have thrown an IllegalArgumentException");
402463
} catch (IllegalArgumentException expected) {
403464
// Expected
404465
}
405466
try {
406-
builder.setMaxBundleMessages(-1);
467+
builder.setBundlingSettings(
468+
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(-1).build());
407469
fail("Should have thrown an IllegalArgumentException");
408470
} catch (IllegalArgumentException expected) {
409471
// Expected

0 commit comments

Comments
 (0)