Skip to content

Commit daaf061

Browse files
authored
change from static time methods to clock (#1477)
1 parent 4dd7d79 commit daaf061

File tree

7 files changed

+59
-44
lines changed

7 files changed

+59
-44
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.auth.Credentials;
2020
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.cloud.Clock;
2122
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2223
import com.google.common.base.Optional;
2324
import com.google.common.base.Preconditions;
@@ -152,6 +153,7 @@ final class Builder {
152153

153154
Optional<ScheduledExecutorService> executor;
154155
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
156+
Optional<Clock> clock;
155157

156158
/**
157159
* Constructs a new {@link Builder}.
@@ -180,6 +182,7 @@ private void setDefaults() {
180182
maxOutstandingBytes = Optional.absent();
181183
maxOutstandingMessages = Optional.absent();
182184
executor = Optional.absent();
185+
clock = Optional.absent();
183186
}
184187

185188
/**
@@ -238,7 +241,7 @@ public Builder setMaxOutstandingBytes(int maxOutstandingBytes) {
238241
/**
239242
* Set acknowledgement expiration padding.
240243
*
241-
* <p>This is the time accounted before a message expiration is to happen, so the
244+
* <p>This is the time accounted before a message expiration is to happen, so the
242245
* {@link Subscriber} is able to send an ack extension beforehand.
243246
*
244247
* <p>This padding duration is configurable so you can account for network latency. A reasonable
@@ -259,6 +262,12 @@ public Builder setExecutor(ScheduledExecutorService executor) {
259262
return this;
260263
}
261264

265+
/** Gives the ability to set a custom executor. */
266+
public Builder setClock(Clock clock) {
267+
this.clock = Optional.of(clock);
268+
return this;
269+
}
270+
262271
public Subscriber build() throws IOException {
263272
return new SubscriberImpl(this);
264273
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberConnection.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.pubsub;
1818

1919
import com.google.auth.Credentials;
20+
import com.google.cloud.Clock;
2021
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
2122
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
2223
import com.google.common.annotations.VisibleForTesting;
@@ -70,6 +71,7 @@ final class SubscriberConnection extends AbstractService {
7071

7172
private final Duration ackExpirationPadding;
7273
private final ScheduledExecutorService executor;
74+
private final Clock clock;
7375
private final MessageReceiver receiver;
7476
private final String subscription;
7577
private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
@@ -96,7 +98,7 @@ final class SubscriberConnection extends AbstractService {
9698
// To keep track of number of seconds the receiver takes to process messages.
9799
private final Distribution ackLatencyDistribution;
98100

99-
public SubscriberConnection(
101+
SubscriberConnection(
100102
String subscription,
101103
Credentials credentials,
102104
MessageReceiver receiver,
@@ -105,8 +107,10 @@ public SubscriberConnection(
105107
Distribution ackLatencyDistribution,
106108
Channel channel,
107109
FlowController flowController,
108-
ScheduledExecutorService executor) {
110+
ScheduledExecutorService executor,
111+
Clock clock) {
109112
this.executor = executor;
113+
this.clock = clock;
110114
this.credentials = credentials;
111115
this.ackExpirationPadding = ackExpirationPadding;
112116
this.streamAckDeadlineSeconds = streamAckDeadlineSeconds;
@@ -125,16 +129,18 @@ public SubscriberConnection(
125129
}
126130

127131
private static class ExpirationInfo implements Comparable<ExpirationInfo> {
132+
private final Clock clock;
128133
Instant expiration;
129134
int nextExtensionSeconds;
130135

131-
ExpirationInfo(Instant expiration, int initialAckDeadlineExtension) {
136+
ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) {
137+
this.clock = clock;
132138
this.expiration = expiration;
133139
nextExtensionSeconds = initialAckDeadlineExtension;
134140
}
135141

136142
void extendExpiration() {
137-
expiration = Instant.now().plus(Duration.standardSeconds(nextExtensionSeconds));
143+
expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds));
138144
nextExtensionSeconds = 2 * nextExtensionSeconds;
139145
}
140146

@@ -181,7 +187,7 @@ private class AckHandler implements FutureCallback<AckReply>, Comparable<AckHand
181187
this.ackId = ackId;
182188
this.outstandingBytes = outstandingBytes;
183189
acked = new AtomicBoolean(false);
184-
receivedTime = Instant.now();
190+
receivedTime = new Instant(clock.millis());
185191
}
186192

187193
@Override
@@ -216,7 +222,7 @@ public void onSuccess(AckReply reply) {
216222
// Record the latency rounded to the next closest integer.
217223
ackLatencyDistribution.record(
218224
Ints.saturatedCast(
219-
(long) Math.ceil(new Duration(receivedTime, Instant.now()).getMillis() / 1000D)));
225+
(long) Math.ceil((clock.millis() - receivedTime.getMillis()) / 1000D)));
220226
messagesWaiter.incrementPendingMessages(-1);
221227
return;
222228
case NACK:
@@ -361,7 +367,7 @@ private void processReceivedMessages(StreamingPullResponse response) {
361367
final List<com.google.pubsub.v1.ReceivedMessage> responseMessages =
362368
response.getReceivedMessagesList();
363369
try {
364-
Instant now = Instant.now();
370+
Instant now = new Instant(clock.millis());
365371
int receivedMessagesCount = response.getReceivedMessagesCount();
366372
int totalByteCount = 0;
367373
final List<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
@@ -372,7 +378,9 @@ private void processReceivedMessages(StreamingPullResponse response) {
372378
}
373379
ExpirationInfo expiration =
374380
new ExpirationInfo(
375-
now.plus(streamAckDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
381+
clock,
382+
now.plus(streamAckDeadlineSeconds * 1000),
383+
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
376384
synchronized (outstandingAckHandlers) {
377385
addOutstadingAckHandlers(expiration, ackHandlers);
378386
}
@@ -451,7 +459,7 @@ public void run() {
451459
alarmsLock.unlock();
452460
}
453461

454-
Instant now = Instant.now();
462+
Instant now = new Instant(clock.millis());
455463
// Rounded to the next second, so we only schedule future alarms at the second
456464
// resolution.
457465
Instant cutOverTime =
@@ -531,7 +539,7 @@ private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration
531539
ackDeadlineExtensionAlarm =
532540
executor.schedule(
533541
new AckDeadlineAlarm(),
534-
nextAckDeadlineExtensionAlarmTime.getMillis() - Instant.now().getMillis(),
542+
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millis(),
535543
TimeUnit.MILLISECONDS);
536544
}
537545

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.auth.Credentials;
2020
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.cloud.Clock;
2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Optional;
2324
import com.google.common.primitives.Ints;
@@ -60,6 +61,7 @@ public class SubscriberImpl extends AbstractService implements Subscriber {
6061
private final ScheduledExecutorService executor;
6162
private final Distribution ackLatencyDistribution =
6263
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
64+
private final Clock clock;
6365
private ScheduledFuture<?> ackDeadlineUpdater;
6466
private int streamAckDeadlineSeconds;
6567

@@ -72,6 +74,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
7274
Math.max(
7375
INITIAL_ACK_DEADLINE_SECONDS,
7476
Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
77+
clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock();
7578

7679
int numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
7780
executor =
@@ -113,7 +116,8 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
113116
ackLatencyDistribution,
114117
channelBuilder.build(),
115118
flowController,
116-
executor);
119+
executor,
120+
clock);
117121
}
118122
}
119123

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static org.junit.Assert.assertTrue;
2020

21-
import com.google.cloud.Clock;
2221
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
2322
import com.google.common.collect.ImmutableList;
2423
import java.util.concurrent.CountDownLatch;
@@ -30,7 +29,6 @@
3029
import java.util.concurrent.atomic.AtomicLong;
3130
import org.easymock.EasyMock;
3231
import org.easymock.IAnswer;
33-
import org.joda.time.DateTimeUtils;
3432
import org.junit.After;
3533
import org.junit.Before;
3634
import org.junit.Rule;
@@ -51,12 +49,6 @@ public class AckDeadlineRenewerTest {
5149
private PubSub pubsub;
5250
private FakeScheduledExecutorService executorService;
5351
private AckDeadlineRenewer ackDeadlineRenewer;
54-
private final Clock clock = new Clock() {
55-
@Override
56-
public long millis() {
57-
return DateTimeUtils.currentTimeMillis();
58-
}
59-
};
6052

6153
@Rule
6254
public Timeout globalTimeout = Timeout.seconds(60);
@@ -78,7 +70,7 @@ public void release(ExecutorService executor) {
7870
PubSubOptions options = PubSubOptions.newBuilder()
7971
.setProjectId("projectId")
8072
.setExecutorFactory(executorFactory)
81-
.setClock(clock)
73+
.setClock(executorService.getClock())
8274
.build();
8375
EasyMock.expect(pubsub.getOptions()).andReturn(options);
8476
EasyMock.replay(pubsub);
@@ -97,7 +89,7 @@ private IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
9789
@Override
9890
public Future<Void> answer() throws Throwable {
9991
latch.countDown();
100-
renewal.set(clock.millis());
92+
renewal.set(executorService.getClock().millis());
10193
return null;
10294
}
10395
};
@@ -117,7 +109,7 @@ public void testAddOneMessage() throws InterruptedException {
117109
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
118110
.andAnswer(createAnswer(secondLatch, secondRenewal));
119111
EasyMock.replay(pubsub);
120-
long addTime = clock.millis();
112+
long addTime = executorService.getClock().millis();
121113
ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
122114
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
123115
firstLatch.await();
@@ -149,7 +141,7 @@ public void testAddMessages() throws InterruptedException {
149141
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3)))
150142
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
151143
EasyMock.replay(pubsub);
152-
long addTime1 = clock.millis();
144+
long addTime1 = executorService.getClock().millis();
153145
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
154146
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
155147
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
@@ -185,7 +177,7 @@ public void testAddExistingMessage() throws InterruptedException {
185177
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
186178
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
187179
EasyMock.replay(pubsub);
188-
long addTime1 = clock.millis();
180+
long addTime1 = executorService.getClock().millis();
189181
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
190182
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
191183
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
@@ -221,7 +213,7 @@ public void testRemoveNonExistingMessage() throws InterruptedException {
221213
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
222214
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
223215
EasyMock.replay(pubsub);
224-
long addTime1 = clock.millis();
216+
long addTime1 = executorService.getClock().millis();
225217
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
226218
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
227219
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
@@ -257,7 +249,7 @@ public void testRemoveMessage() throws InterruptedException {
257249
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
258250
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
259251
EasyMock.replay(pubsub);
260-
long addTime1 = clock.millis();
252+
long addTime1 = executorService.getClock().millis();
261253
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
262254
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
263255
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FakeScheduledExecutorService.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.cloud.Clock;
20+
import com.google.cloud.pubsub.FakeClock;
1921
import com.google.common.primitives.Ints;
2022
import com.google.common.util.concurrent.SettableFuture;
2123
import java.util.ArrayList;
@@ -33,7 +35,6 @@
3335
import java.util.concurrent.TimeoutException;
3436
import java.util.concurrent.atomic.AtomicBoolean;
3537
import org.joda.time.DateTime;
36-
import org.joda.time.DateTimeUtils;
3738
import org.joda.time.Duration;
3839
import org.joda.time.MutableDateTime;
3940

@@ -46,11 +47,11 @@ public class FakeScheduledExecutorService extends AbstractExecutorService
4647

4748
private final AtomicBoolean shutdown = new AtomicBoolean(false);
4849
private final PriorityQueue<PendingCallable<?>> pendingCallables = new PriorityQueue<>();
49-
private final MutableDateTime currentTime = MutableDateTime.now();
5050
private final ExecutorService delegate = Executors.newSingleThreadExecutor();
51+
private final FakeClock clock = new FakeClock();
5152

52-
public FakeScheduledExecutorService() {
53-
DateTimeUtils.setCurrentMillisFixed(currentTime.getMillis());
53+
public Clock getClock() {
54+
return clock;
5455
}
5556

5657
@Override
@@ -92,11 +93,12 @@ public void tick(long time, TimeUnit unit) {
9293
* outstanding callable which execution time has passed.
9394
*/
9495
public void advanceTime(Duration toAdvance) {
95-
currentTime.add(toAdvance);
96-
DateTimeUtils.setCurrentMillisFixed(currentTime.getMillis());
96+
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
97+
DateTime cmpTime = new DateTime(clock.millis());
98+
9799
synchronized (pendingCallables) {
98100
while (!pendingCallables.isEmpty()
99-
&& pendingCallables.peek().getScheduledTime().compareTo(currentTime) <= 0) {
101+
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
100102
try {
101103
pendingCallables.poll().call();
102104
if (shutdown.get() && pendingCallables.isEmpty()) {
@@ -186,7 +188,7 @@ static enum PendingCallableType {
186188

187189
/** Class that saves the state of an scheduled pending callable. */
188190
class PendingCallable<T> implements Comparable<PendingCallable<T>> {
189-
DateTime creationTime = currentTime.toDateTime();
191+
DateTime creationTime = new DateTime(clock.millis());
190192
Duration delay;
191193
Callable<T> pendingCallable;
192194
SettableFuture<T> future = SettableFuture.create();
@@ -221,8 +223,7 @@ ScheduledFuture<T> getScheduledFuture() {
221223
return new ScheduledFuture<T>() {
222224
@Override
223225
public long getDelay(TimeUnit unit) {
224-
return unit.convert(
225-
new Duration(currentTime, getScheduledTime()).getMillis(), TimeUnit.MILLISECONDS);
226+
return unit.convert(getScheduledTime().getMillis() - clock.millis(), TimeUnit.MILLISECONDS);
226227
}
227228

228229
@Override
@@ -279,7 +280,7 @@ T call() {
279280
done.set(true);
280281
break;
281282
case FIXED_DELAY:
282-
this.creationTime = currentTime.toDateTime();
283+
this.creationTime = new DateTime(clock.millis());
283284
schedulePendingCallable(this);
284285
break;
285286
case FIXED_RATE:

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FakeSubscriberServiceImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,21 @@ public static enum CloseSide {
5555
public static final class ModifyAckDeadline {
5656
private final String ackId;
5757
private final long seconds;
58-
58+
5959
public ModifyAckDeadline(String ackId, long seconds) {
6060
Preconditions.checkNotNull(ackId);
6161
this.ackId = ackId;
6262
this.seconds = seconds;
6363
}
64-
64+
6565
public String getAckId() {
6666
return ackId;
6767
}
68-
68+
6969
public long getSeconds() {
7070
return seconds;
7171
}
72-
72+
7373
@Override
7474
public boolean equals(Object obj) {
7575
if (!(obj instanceof ModifyAckDeadline)) {
@@ -78,7 +78,7 @@ public boolean equals(Object obj) {
7878
ModifyAckDeadline other = (ModifyAckDeadline) obj;
7979
return other.ackId.equals(this.ackId) && other.seconds == this.seconds;
8080
}
81-
81+
8282
@Override
8383
public int hashCode() {
8484
return ackId.hashCode();

0 commit comments

Comments
 (0)