Skip to content

Commit 8661957

Browse files
committed
PR comments
1 parent 0707971 commit 8661957

File tree

3 files changed

+37
-8
lines changed

3 files changed

+37
-8
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
2424
import com.google.auth.Credentials;
2525
import com.google.auth.oauth2.GoogleCredentials;
26+
import com.google.common.annotations.VisibleForTesting;
2627
import com.google.common.base.Optional;
2728
import com.google.common.base.Preconditions;
2829
import com.google.common.collect.ImmutableList;
@@ -125,6 +126,7 @@ public static long getApiMaxRequestBytes() {
125126

126127
private final BundlingSettings bundlingSettings;
127128
private final RetrySettings retrySettings;
129+
private final LongRandom longRandom;
128130

129131
private final FlowController.Settings flowControlSettings;
130132
private final boolean failOnFlowControlLimits;
@@ -151,6 +153,7 @@ private Publisher(Builder builder) throws IOException {
151153

152154
this.bundlingSettings = builder.bundlingSettings;
153155
this.retrySettings = builder.retrySettings;
156+
this.longRandom = builder.longRandom;
154157

155158
flowControlSettings = builder.flowControlSettings;
156159
failOnFlowControlLimits = builder.failOnFlowControlLimits;
@@ -380,7 +383,8 @@ public void onSuccess(PublishResponse result) {
380383

381384
@Override
382385
public void onFailure(Throwable t) {
383-
long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle, retrySettings);
386+
long nextBackoffDelay =
387+
computeNextBackoffDelayMs(outstandingBundle, retrySettings, longRandom);
384388

385389
if (!isRetryable(t)
386390
|| System.currentTimeMillis() + nextBackoffDelay
@@ -494,14 +498,14 @@ private boolean hasBundlingBytes() {
494498
}
495499

496500
private static long computeNextBackoffDelayMs(
497-
OutstandingBundle outstandingBundle, RetrySettings retrySettings) {
501+
OutstandingBundle outstandingBundle, RetrySettings retrySettings, LongRandom longRandom) {
498502
long delayMillis =
499503
Math.round(
500504
retrySettings.getInitialRetryDelay().getMillis()
501505
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1));
502506
delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis);
503507
outstandingBundle.attempt++;
504-
return ThreadLocalRandom.current().nextLong(delayMillis / 2, delayMillis);
508+
return longRandom.nextLong(0, delayMillis);
505509
}
506510

507511
private boolean isRetryable(Throwable t) {
@@ -520,6 +524,10 @@ private boolean isRetryable(Throwable t) {
520524
}
521525
}
522526

527+
interface LongRandom {
528+
long nextLong(long least, long bound);
529+
}
530+
523531
/** A builder of {@link Publisher}s. */
524532
public static final class Builder {
525533
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
@@ -547,6 +555,13 @@ public static final class Builder {
547555
.setRpcTimeoutMultiplier(2)
548556
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
549557
.build();
558+
static final LongRandom DEFAULT_LONG_RANDOM =
559+
new LongRandom() {
560+
@Override
561+
public long nextLong(long least, long bound) {
562+
return ThreadLocalRandom.current().nextLong(least, bound);
563+
}
564+
};
550565

551566
private static final int THREADS_PER_CPU = 5;
552567
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
@@ -564,6 +579,7 @@ public static final class Builder {
564579
boolean failOnFlowControlLimits = false;
565580

566581
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
582+
LongRandom longRandom = DEFAULT_LONG_RANDOM;
567583

568584
// Channels and credentials
569585
Optional<Credentials> userCredentials = Optional.absent();
@@ -659,6 +675,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
659675
return this;
660676
}
661677

678+
@VisibleForTesting
679+
Builder setLongRandom(LongRandom longRandom) {
680+
this.longRandom = Preconditions.checkNotNull(longRandom);
681+
return this;
682+
}
683+
662684
/** Gives the ability to set a custom executor to be used by the library. */
663685
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
664686
this.executorProvider = Preconditions.checkNotNull(executorProvider);

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting
458458
/**
459459
* Set acknowledgement expiration padding.
460460
*
461-
* <p>This is the time accounted before a message expiration is to happen, so the
462-
* {@link Subscriber} is able to send an ack extension beforehand.
461+
* <p>This is the time accounted before a message expiration is to happen, so the {@link
462+
* Subscriber} is able to send an ack extension beforehand.
463463
*
464464
* <p>This padding duration is configurable so you can account for network latency. A reasonable
465465
* number must be provided so messages don't expire because of network latency between when the

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
299299
.build())
300300
.build(); // To demonstrate that reaching duration will trigger publish
301301

302-
// We use exponential backoff with randomness. 30 should be more than enough.
303-
for (int i = 0; i < 30; ++i) {
302+
// With exponential backoff, starting at 5ms we should have no more than 11 retries
303+
for (int i = 0; i < 11; ++i) {
304304
testPublisherServiceImpl.addPublishError(new FakeException());
305305
}
306306
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
@@ -604,6 +604,13 @@ private Builder getTestPublisherBuilder() {
604604
return Publisher.Builder.newBuilder(TEST_TOPIC)
605605
.setCredentials(testCredentials)
606606
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
607-
.setChannelBuilder(testChannelBuilder);
607+
.setChannelBuilder(testChannelBuilder)
608+
.setLongRandom(
609+
new Publisher.LongRandom() {
610+
@Override
611+
public long nextLong(long least, long bound) {
612+
return bound - 1;
613+
}
614+
});
608615
}
609616
}

0 commit comments

Comments
 (0)