Skip to content

Commit ecd8fb5

Browse files
authored
Merge pull request #101 from zero88/feature/wrap-Instant-to-interface
feature(#100): TimeClock interface
2 parents 9cd0420 + b57714f commit ecd8fb5

File tree

9 files changed

+87
-30
lines changed

9 files changed

+87
-30
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.zero88.schedulerx;
2+
3+
import java.time.Instant;
4+
5+
import org.jetbrains.annotations.ApiStatus.Internal;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
/**
9+
* Represents for time clock
10+
*
11+
* @since 2.0.0
12+
*/
13+
@Internal
14+
public interface TimeClock {
15+
16+
/**
17+
* Obtains the current instant from the system clock.
18+
*
19+
* @return the current instant using the system clock, not null
20+
*/
21+
@NotNull Instant now();
22+
23+
}

core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.github.zero88.schedulerx.JobExecutor;
2424
import io.github.zero88.schedulerx.Scheduler;
2525
import io.github.zero88.schedulerx.SchedulingMonitor;
26+
import io.github.zero88.schedulerx.TimeClock;
2627
import io.github.zero88.schedulerx.TimeoutBlock;
2728
import io.github.zero88.schedulerx.TimeoutPolicy;
2829
import io.github.zero88.schedulerx.WorkerExecutorFactory;
@@ -59,6 +60,7 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
5960
private final @NotNull T trigger;
6061
private final @NotNull TriggerEvaluator evaluator;
6162
private final @NotNull TimeoutPolicy timeoutPolicy;
63+
private final @NotNull TimeClock clock;
6264
private final Lock lock = new ReentrantLock();
6365
private boolean didStart = false;
6466
private boolean didTriggerValidation = false;
@@ -74,12 +76,15 @@ protected AbstractScheduler(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobD
7476
this.trigger = trigger;
7577
this.monitor = monitor;
7678
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
77-
this.state = new SchedulerStateImpl<>();
79+
this.clock = new TimeClockImpl();
80+
this.state = new SchedulerStateImpl<>(clock);
7881
}
7982

8083
@Override
8184
public final @NotNull Vertx vertx() { return this.vertx; }
8285

86+
public final @NotNull TimeClock clock() { return this.clock; }
87+
8388
@Override
8489
public final @NotNull SchedulingMonitor<OUT> monitor() { return this.monitor; }
8590

@@ -149,7 +154,7 @@ public final void executeJob(@NotNull ExecutionContext<OUT> executionContext) {
149154
@Override
150155
public final void cancel() {
151156
if (!state.completed()) {
152-
log(Instant.now(), "On cancel");
157+
log(clock.now(), "On cancel");
153158
doStop(state.timerId(), TriggerContextFactory.cancel(trigger().type(), state.tick()));
154159
}
155160
}
@@ -203,8 +208,9 @@ protected final void onTrigger(WorkerExecutor workerExecutor, TriggerContext tri
203208
return;
204209
}
205210
final long round = state.increaseRound();
206-
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext, round);
207211
final Duration timeout = timeoutPolicy().executionTimeout();
212+
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, clock, triggerContext,
213+
round);
208214
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), round);
209215
Future.join(onEvaluationAfterTrigger(workerExecutor, triggerContext, round),
210216
executeBlocking(workerExecutor, p -> executeJob(executionContext.setup(wrapTimeout(timeout, p)))))
@@ -228,7 +234,7 @@ protected final void onSchedule(long timerId) {
228234
.setExternalId(jobData.externalId())
229235
.setAvailableAt(state.availableAt())
230236
.setTriggerContext(context)
231-
.setRescheduledAt(Instant.now())
237+
.setRescheduledAt(clock.now())
232238
.setTick(context.tick())
233239
.setRound(state.round())
234240
.build();
@@ -241,15 +247,15 @@ protected final void onUnableSchedule(Throwable cause) {
241247
monitor.onUnableSchedule(ExecutionResultImpl.<OUT>builder()
242248
.setExternalId(jobData.externalId())
243249
.setTriggerContext(ctx)
244-
.setUnscheduledAt(Instant.now())
250+
.setUnscheduledAt(clock.now())
245251
.setTick(ctx.tick())
246252
.setRound(ctx.tick())
247253
.build());
248254
}
249255

250256
protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor worker, TriggerContext ctx) {
251257
return executeBlocking(worker, p -> {
252-
log(Instant.now(), "On before trigger");
258+
log(clock.now(), "On before trigger");
253259
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
254260
evaluator.beforeTrigger(trigger, ctx, jobData.externalId()));
255261
});
@@ -258,11 +264,11 @@ protected final Future<TriggerContext> onEvaluationBeforeTrigger(WorkerExecutor
258264
protected final Future<TriggerContext> onEvaluationAfterTrigger(WorkerExecutor worker, TriggerContext ctx,
259265
long round) {
260266
return executeBlocking(worker, p -> {
261-
log(Instant.now(), "On after trigger");
267+
log(clock.now(), "On after trigger");
262268
wrapTimeout(timeoutPolicy().evaluationTimeout(), p).handle(
263269
evaluator.afterTrigger(trigger(), ctx, jobData.externalId(), round)
264270
.onSuccess(c -> doStop(state.timerId(), c))
265-
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, Instant.now(), "After evaluate"), t)));
271+
.onFailure(t -> LOGGER.error(genMsg(ctx.tick(), round, clock.now(), "After evaluate"), t)));
266272
});
267273
}
268274

core/src/main/java/io/github/zero88/schedulerx/impl/ExecutionContextImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import org.jetbrains.annotations.NotNull;
77

8+
import io.github.zero88.schedulerx.TimeClock;
89
import io.github.zero88.schedulerx.trigger.TriggerContext;
910
import io.vertx.core.Promise;
1011
import io.vertx.core.Vertx;
@@ -14,18 +15,20 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
1415
private final Vertx vertx;
1516
private final long round;
1617
private final TriggerContext triggerContext;
18+
private final TimeClock clock;
1719
private final Instant triggeredAt;
1820
private Instant executedAt;
1921
private Promise<Object> promise;
2022
private OUTPUT data;
2123
private Throwable error;
2224
private boolean forceStop = false;
2325

24-
ExecutionContextImpl(Vertx vertx, TriggerContext triggerContext, long round) {
26+
ExecutionContextImpl(Vertx vertx, TimeClock clock, TriggerContext triggerContext, long round) {
2527
this.vertx = vertx;
2628
this.round = round;
2729
this.triggerContext = triggerContext;
28-
this.triggeredAt = Instant.now();
30+
this.clock = clock;
31+
this.triggeredAt = this.clock.now();
2932
}
3033

3134
@Override
@@ -34,7 +37,7 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
3437
throw new IllegalStateException("ExecutionContext is already setup");
3538
}
3639
this.promise = promise;
37-
this.executedAt = Instant.now();
40+
this.executedAt = this.clock.now();
3841
return this;
3942
}
4043

core/src/main/java/io/github/zero88/schedulerx/impl/SchedulerStateImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import org.jetbrains.annotations.NotNull;
1616

17+
import io.github.zero88.schedulerx.TimeClock;
18+
1719
final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT> {
1820

1921
private final AtomicReference<Instant> availableAt = new AtomicReference<>();
@@ -24,8 +26,11 @@ final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT>
2426
private final AtomicBoolean pending = new AtomicBoolean(true);
2527
private final AtomicReference<Entry<Long, OUTPUT>> data = new AtomicReference<>(new SimpleEntry<>(0L, null));
2628
private final AtomicReference<Entry<Long, Throwable>> error = new AtomicReference<>(new SimpleEntry<>(0L, null));
29+
private final TimeClock clock;
2730
private long timerId;
2831

32+
SchedulerStateImpl(TimeClock clock) { this.clock = clock; }
33+
2934
@Override
3035
public Instant availableAt() { return availableAt.get(); }
3136

@@ -73,20 +78,20 @@ public long increaseTick() {
7378
@Override
7479
public @NotNull Instant markAvailable() {
7580
pending.set(false);
76-
availableAt.set(Instant.now());
81+
availableAt.set(clock.now());
7782
return availableAt();
7883
}
7984

8085
@Override
8186
public @NotNull Instant markFinished(long tick) {
8287
inProgress.remove(tick);
83-
return Instant.now();
88+
return clock.now();
8489
}
8590

8691
@Override
8792
public @NotNull Instant markCompleted() {
8893
completed.set(true);
89-
return Instant.now();
94+
return clock.now();
9095
}
9196

9297
@Override
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.github.zero88.schedulerx.impl;
2+
3+
import java.time.Instant;
4+
5+
import org.jetbrains.annotations.ApiStatus.Internal;
6+
import org.jetbrains.annotations.NotNull;
7+
8+
import io.github.zero88.schedulerx.TimeClock;
9+
10+
@Internal
11+
class TimeClockImpl implements TimeClock {
12+
13+
@Override
14+
public @NotNull Instant now() {
15+
return Instant.now();
16+
}
17+
18+
}

core/src/main/java/io/github/zero88/schedulerx/impl/TriggerContextFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,21 +66,23 @@ public static TriggerContext error(String triggerType, String reason, @Nullable
6666
* Create trigger context in {@link TriggerStatus#KICKOFF} state
6767
*
6868
* @param triggerType the trigger type
69+
* @param firedAt the fired at
6970
* @param tick the tick
7071
*/
71-
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, long tick) {
72-
return kickoff(triggerType, tick, null);
72+
public static @NotNull TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick) {
73+
return kickoff(triggerType, firedAt, tick, null);
7374
}
7475

7576
/**
7677
* Create trigger context in {@link TriggerStatus#KICKOFF} state
7778
*
7879
* @param triggerType the trigger type
80+
* @param firedAt the fired at
7981
* @param tick the tick
8082
* @param info the trigger context info
8183
*/
82-
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, long tick, @Nullable T info) {
83-
final Instant firedAt = Instant.now();
84+
public static @NotNull <T> TriggerContext kickoff(@NotNull String triggerType, @NotNull Instant firedAt, long tick,
85+
@Nullable T info) {
8486
final TriggerCondition condition = createCondition(TriggerStatus.KICKOFF, null, null);
8587
return new TriggerContext() {
8688
@Override

core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
3232
@Override
3333
protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
3434
try {
35-
final Instant now = Instant.now();
35+
final Instant now = clock().now();
3636
final long nextTriggerAfter = trigger().nextTriggerAfter(now);
3737
final Instant nextTriggerTime = now.plus(nextTriggerAfter, ChronoUnit.MILLIS);
3838
nextTimerId = vertx().setTimer(nextTriggerAfter, tId -> {
39-
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), onFire(tId)));
39+
onProcess(workerExecutor, TriggerContextFactory.kickoff(trigger().type(), clock().now(), onFire(tId)));
4040
doStart(workerExecutor);
4141
});
4242
log(now, "Next schedule at" + brackets(nextTriggerTime) + " by timerId" + brackets(nextTimerId));
@@ -49,7 +49,7 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
4949
@Override
5050
protected void unregisterTimer(long timerId) {
5151
boolean result = vertx().cancelTimer(nextTimerId);
52-
log(Instant.now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
52+
log(clock().now(), "Unregistered timerId" + brackets(nextTimerId) + brackets(result));
5353
}
5454

5555
static final class CronSchedulerBuilderImpl<IN, OUT>

core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static io.github.zero88.schedulerx.impl.Utils.brackets;
44

5-
import java.time.Instant;
65
import java.util.Objects;
76

87
import org.jetbrains.annotations.NotNull;
@@ -61,7 +60,7 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
6160
protected void unregisterTimer(long timerId) {
6261
if (Objects.nonNull(consumer)) {
6362
consumer.unregister()
64-
.onComplete(r -> log(Instant.now(),
63+
.onComplete(r -> log(clock().now(),
6564
"Unregistered EventBus subscriber on address" + brackets(consumer.address()) +
6665
brackets(r.succeeded()) + brackets(r.cause())));
6766
}
@@ -70,9 +69,9 @@ protected void unregisterTimer(long timerId) {
7069
private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
7170
try {
7271
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
73-
return TriggerContextFactory.kickoff(trigger().type(), tick, eventMsg);
72+
return TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, eventMsg);
7473
} catch (Exception ex) {
75-
return handleException(TriggerContextFactory.kickoff(trigger().type(), tick, msg), ex);
74+
return handleException(TriggerContextFactory.kickoff(trigger().type(), clock().now(), tick, msg), ex);
7675
}
7776
}
7877

core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static io.github.zero88.schedulerx.trigger.IntervalTrigger.REPEAT_INDEFINITELY;
55

66
import java.time.Duration;
7-
import java.time.Instant;
87

98
import org.jetbrains.annotations.NotNull;
109

@@ -37,7 +36,7 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
3736
}
3837
final Promise<Long> promise = Promise.promise();
3938
final long delay = trigger().initialDelay().toMillis();
40-
log(Instant.now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
39+
log(clock().now(), "Delay " + brackets(delay + "ms") + " then register the trigger in the scheduler");
4140
vertx().setTimer(delay, ignore -> promise.complete(createPeriodicTimer(workerExecutor)));
4241
return promise.future();
4342
} catch (Exception e) {
@@ -48,13 +47,15 @@ final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, In
4847
@Override
4948
protected void unregisterTimer(long timerId) {
5049
boolean result = vertx().cancelTimer(timerId);
51-
log(Instant.now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
50+
log(clock().now(), "Unregistered timerId" + brackets(timerId) + brackets(result));
5251
}
5352

5453
private long createPeriodicTimer(WorkerExecutor executor) {
54+
final long millis = trigger().interval().toMillis();
5555
return this.vertx()
56-
.setPeriodic(trigger().interval().toMillis(),
57-
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
56+
.setPeriodic(millis, id -> onProcess(executor,
57+
TriggerContextFactory.kickoff(trigger().type(), clock().now(),
58+
onFire(id))));
5859
}
5960

6061
// @formatter:off

0 commit comments

Comments
 (0)