Skip to content

Commit 79a8ccf

Browse files
committed
feat(#93): First step for TriggerEvaluator
1 parent 5253452 commit 79a8ccf

File tree

9 files changed

+186
-74
lines changed

9 files changed

+186
-74
lines changed

core/src/main/java/io/github/zero88/schedulerx/SchedulerBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.jetbrains.annotations.NotNull;
44

55
import io.github.zero88.schedulerx.trigger.Trigger;
6+
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
67
import io.vertx.core.Vertx;
78

89
/**
@@ -21,10 +22,14 @@ public interface SchedulerBuilder<IN, OUT, TRIGGER extends Trigger, SCHEDULER ex
2122
SELF extends SchedulerBuilder<IN, OUT, TRIGGER, SCHEDULER, SELF>>
2223
extends JobExecutorContext<IN, OUT>, SchedulerContext<TRIGGER, OUT> {
2324

25+
@NotNull TriggerEvaluator triggerEvaluator();
26+
2427
@NotNull SELF setVertx(@NotNull Vertx vertx);
2528

2629
@NotNull SELF setTrigger(@NotNull TRIGGER trigger);
2730

31+
@NotNull SELF setTriggerEvaluator(@NotNull TriggerEvaluator evaluator);
32+
2833
@NotNull SELF setMonitor(@NotNull SchedulingMonitor<OUT> monitor);
2934

3035
@NotNull SELF setJob(@NotNull Job<IN, OUT> job);

core/src/main/java/io/github/zero88/schedulerx/impl/AbstractScheduler.java

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
3131
import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus;
3232
import io.github.zero88.schedulerx.trigger.TriggerContext;
33+
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
3334
import io.vertx.core.Future;
3435
import io.vertx.core.Promise;
3536
import io.vertx.core.Vertx;
@@ -56,21 +57,23 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
5657
private final @NotNull JobData<IN> jobData;
5758
private final @NotNull Job<IN, OUT> job;
5859
private final @NotNull T trigger;
60+
private final @NotNull TriggerEvaluator evaluator;
5961
private final @NotNull TimeoutPolicy timeoutPolicy;
6062
private final Lock lock = new ReentrantLock();
6163
private boolean didStart = false;
6264
private boolean didTriggerValidation = false;
6365
private IllegalArgumentException invalidTrigger;
6466

65-
protected AbstractScheduler(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor,
66-
@NotNull JobData<IN> jobData, @NotNull Job<IN, OUT> job, @NotNull T trigger,
67-
@NotNull TimeoutPolicy timeoutPolicy) {
67+
protected AbstractScheduler(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData,
68+
@NotNull TimeoutPolicy timeoutPolicy, @NotNull SchedulingMonitor<OUT> monitor,
69+
@NotNull T trigger, @NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
6870
this.job = job;
6971
this.jobData = jobData;
7072
this.timeoutPolicy = timeoutPolicy;
7173
this.vertx = vertx;
7274
this.trigger = trigger;
7375
this.monitor = monitor;
76+
this.evaluator = new InternalTriggerEvaluator(this).andThen(evaluator);
7477
this.state = new SchedulerStateImpl<>();
7578
}
7679

@@ -170,23 +173,6 @@ protected final void doStop(long timerId, TriggerContext context) {
170173
*/
171174
protected abstract void unregisterTimer(long timerId);
172175

173-
/**
174-
* Check a trigger kickoff context whether to be able to run new execution or not
175-
*/
176-
protected final TriggerContext shouldRun(@NotNull TriggerContext kickOffContext) {
177-
log(Instant.now(), "On evaluate");
178-
if (state.pending()) {
179-
return TriggerContextFactory.skip(kickOffContext, ReasonCode.NOT_YET_SCHEDULED);
180-
}
181-
if (state.completed()) {
182-
return TriggerContextFactory.skip(kickOffContext, ReasonCode.ALREADY_STOPPED);
183-
}
184-
if (state.executing()) {
185-
return TriggerContextFactory.skip(kickOffContext, ReasonCode.JOB_IS_RUNNING);
186-
}
187-
return evaluateTriggerRule(kickOffContext);
188-
}
189-
190176
/**
191177
* Check a trigger context whether to be able to stop by configuration or force stop
192178
*/
@@ -199,23 +185,6 @@ protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext
199185
: triggerContext;
200186
}
201187

202-
/**
203-
* Evaluate a trigger kickoff context on trigger rule
204-
*/
205-
protected TriggerContext evaluateTriggerRule(@NotNull TriggerContext triggerContext) {
206-
if (!triggerContext.isKickoff()) {
207-
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
208-
}
209-
final Instant firedAt = Objects.requireNonNull(triggerContext.firedAt(),
210-
"Kickoff context is missing a fired at time");
211-
if (trigger().rule().isExceeded(firedAt)) {
212-
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG);
213-
}
214-
return trigger().shouldExecute(firedAt)
215-
? TriggerContextFactory.ready(triggerContext)
216-
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
217-
}
218-
219188
/**
220189
* Register a timer id in internal state and increase tick time when the system timer fires
221190
*
@@ -232,9 +201,7 @@ protected final long onFire(long timerId) {
232201
protected final void onProcess(WorkerExecutor workerExecutor, TriggerContext ctx) {
233202
log(Objects.requireNonNull(ctx.firedAt()), "On fire");
234203
final Duration timeout = timeoutPolicy().evaluationTimeout();
235-
this.<TriggerContext>executeBlocking(workerExecutor,
236-
p -> wrapTimeout(timeoutPolicy().evaluationTimeout(), p).complete(
237-
shouldRun(ctx)))
204+
this.<TriggerContext>executeBlocking(workerExecutor, p -> wrapTimeout(timeout, p).handle(evaluator.check(ctx)))
238205
.onSuccess(context -> onTrigger(workerExecutor, context))
239206
.onFailure(t -> onMisfire(TriggerContextFactory.skip(ctx, t instanceof TimeoutException
240207
? ReasonCode.EVALUATION_TIMEOUT
@@ -387,4 +354,42 @@ private <R> Promise<R> wrapTimeout(Duration timeout, Promise<R> promise) {
387354
return new TimeoutBlock(vertx, timeout).wrap(promise);
388355
}
389356

357+
@SuppressWarnings("rawtypes")
358+
private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator {
359+
360+
private final AbstractScheduler scheduler;
361+
362+
private InternalTriggerEvaluator(AbstractScheduler scheduler) { this.scheduler = scheduler; }
363+
364+
@Override
365+
protected Future<TriggerContext> internalCheck(@NotNull TriggerContext ctx) {
366+
if (!ctx.isKickoff()) {
367+
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
368+
}
369+
return Future.succeededFuture(doCheck(ctx));
370+
}
371+
372+
@NotNull
373+
private TriggerContext doCheck(TriggerContext ctx) {
374+
scheduler.log(Instant.now(), "On evaluate");
375+
if (scheduler.state.pending()) {
376+
return TriggerContextFactory.skip(ctx, ReasonCode.NOT_YET_SCHEDULED);
377+
}
378+
if (scheduler.state.completed()) {
379+
return TriggerContextFactory.skip(ctx, ReasonCode.ALREADY_STOPPED);
380+
}
381+
if (scheduler.state.executing()) {
382+
return TriggerContextFactory.skip(ctx, ReasonCode.JOB_IS_RUNNING);
383+
}
384+
final Instant firedAt = Objects.requireNonNull(ctx.firedAt());
385+
if (scheduler.trigger().rule().isExceeded(firedAt)) {
386+
return TriggerContextFactory.stop(ctx, ReasonCode.STOP_BY_CONFIG);
387+
}
388+
return scheduler.trigger().shouldExecute(firedAt)
389+
? TriggerContextFactory.ready(ctx)
390+
: TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED);
391+
}
392+
393+
}
394+
390395
}

core/src/main/java/io/github/zero88/schedulerx/impl/AbstractSchedulerBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.github.zero88.schedulerx.SchedulingMonitor;
1515
import io.github.zero88.schedulerx.TimeoutPolicy;
1616
import io.github.zero88.schedulerx.trigger.Trigger;
17+
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
1718
import io.vertx.core.Vertx;
1819

1920
/**
@@ -30,6 +31,7 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
3031
private JobData<IN> jobData;
3132
private Job<IN, OUT> job;
3233
private T trigger;
34+
private TriggerEvaluator evaluator;
3335
private TimeoutPolicy timeoutPolicy;
3436

3537
@Override
@@ -45,6 +47,11 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
4547
@Override
4648
public @NotNull T trigger() { return Objects.requireNonNull(trigger, "Trigger is required"); }
4749

50+
@Override
51+
public @NotNull TriggerEvaluator triggerEvaluator() {
52+
return Optional.ofNullable(evaluator).orElseGet(AbstractTriggerEvaluator::noop);
53+
}
54+
4855
@Override
4956
public @NotNull Job<IN, OUT> job() { return Objects.requireNonNull(job, "Job is required"); }
5057

@@ -71,6 +78,12 @@ public abstract class AbstractSchedulerBuilder<IN, OUT, T extends Trigger, S ext
7178
return (B) this;
7279
}
7380

81+
@Override
82+
public @NotNull B setTriggerEvaluator(@NotNull TriggerEvaluator evaluator) {
83+
this.evaluator = evaluator;
84+
return (B) this;
85+
}
86+
7487
public @NotNull B setMonitor(@NotNull SchedulingMonitor<OUT> monitor) {
7588
this.monitor = monitor;
7689
return (B) this;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.github.zero88.schedulerx.impl;
2+
3+
import org.jetbrains.annotations.ApiStatus.Internal;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.jetbrains.annotations.Nullable;
6+
7+
import io.github.zero88.schedulerx.trigger.TriggerContext;
8+
import io.github.zero88.schedulerx.trigger.TriggerEvaluator;
9+
import io.vertx.core.Future;
10+
11+
@Internal
12+
public abstract class AbstractTriggerEvaluator implements TriggerEvaluator {
13+
14+
static TriggerEvaluator noop() {
15+
return new AbstractTriggerEvaluator() {
16+
@Override
17+
protected Future<TriggerContext> internalCheck(@NotNull TriggerContext triggerContext) {
18+
return Future.succeededFuture(triggerContext);
19+
}
20+
};
21+
}
22+
23+
private TriggerEvaluator next;
24+
25+
@Override
26+
public @NotNull Future<TriggerContext> check(@NotNull TriggerContext triggerContext) {
27+
return this.internalCheck(triggerContext)
28+
.flatMap(ctx -> next == null ? Future.succeededFuture(ctx) : next.check(ctx));
29+
}
30+
31+
@Override
32+
public @NotNull TriggerEvaluator andThen(@Nullable TriggerEvaluator another) {
33+
this.next = another;
34+
return this;
35+
}
36+
37+
protected abstract Future<TriggerContext> internalCheck(@NotNull TriggerContext triggerContext);
38+
39+
}

core/src/main/java/io/github/zero88/schedulerx/trigger/CronSchedulerImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ final class CronSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, CronTr
2323

2424
private long nextTimerId;
2525

26-
CronSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
27-
@NotNull Job<IN, OUT> job, @NotNull CronTrigger trigger, @NotNull TimeoutPolicy timeoutPolicy) {
28-
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
26+
CronSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
27+
@NotNull SchedulingMonitor<OUT> monitor, @NotNull CronTrigger trigger,
28+
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
29+
super(job, jobData, timeoutPolicy, monitor, trigger, evaluator, vertx);
2930
}
3031

3132
@Override
@@ -56,7 +57,8 @@ static final class CronSchedulerBuilderImpl<IN, OUT>
5657
implements CronSchedulerBuilder<IN, OUT> {
5758

5859
public @NotNull CronScheduler<IN, OUT> build() {
59-
return new CronSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
60+
return new CronSchedulerImpl<>(job(), jobData(), timeoutPolicy(), monitor(), trigger(), triggerEvaluator(),
61+
vertx());
6062
}
6163

6264
}

core/src/main/java/io/github/zero88/schedulerx/trigger/EventSchedulerImpl.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ final class EventSchedulerImpl<IN, OUT, T> extends AbstractScheduler<IN, OUT, Ev
2828

2929
private MessageConsumer<Object> consumer;
3030

31-
EventSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
32-
@NotNull Job<IN, OUT> job, @NotNull EventTrigger<T> trigger,
33-
@NotNull TimeoutPolicy timeoutPolicy) {
34-
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
31+
EventSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
32+
@NotNull SchedulingMonitor<OUT> monitor, @NotNull EventTrigger<T> trigger,
33+
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
34+
super(job, jobData, timeoutPolicy, monitor, trigger, new EventTriggerEvaluator<>(trigger).andThen(evaluator),
35+
vertx);
3536
}
3637

3738
@Override
@@ -65,21 +66,6 @@ protected void unregisterTimer(long timerId) {
6566
}
6667
}
6768

68-
@Override
69-
@SuppressWarnings("unchecked")
70-
protected TriggerContext evaluateTriggerRule(@NotNull TriggerContext triggerContext) {
71-
final TriggerContext ctx = super.evaluateTriggerRule(triggerContext);
72-
try {
73-
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
74-
!trigger().getPredicate().test((T) triggerContext.info())) {
75-
return TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED);
76-
}
77-
} catch (Exception ex) {
78-
return handleException(ctx, ex);
79-
}
80-
return ctx;
81-
}
82-
8369
private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
8470
try {
8571
T eventMsg = trigger().getPredicate().convert(msg.headers(), msg.body());
@@ -89,7 +75,7 @@ private TriggerContext createKickoffContext(Message<Object> msg, long tick) {
8975
}
9076
}
9177

92-
private TriggerContext handleException(TriggerContext context, Exception cause) {
78+
static TriggerContext handleException(TriggerContext context, Exception cause) {
9379
String reason = cause instanceof ClassCastException || cause instanceof EventTriggerPredicateException
9480
? ReasonCode.CONDITION_IS_NOT_MATCHED
9581
: ReasonCode.UNEXPECTED_ERROR;
@@ -103,7 +89,8 @@ static final class EventSchedulerBuilderImpl<IN, OUT, T>
10389
// @formatter:on
10490

10591
public @NotNull EventScheduler<IN, OUT, T> build() {
106-
return new EventSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
92+
return new EventSchedulerImpl<>(job(), jobData(), timeoutPolicy(), monitor(), trigger(), triggerEvaluator(),
93+
vertx());
10794
}
10895

10996
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.github.zero88.schedulerx.trigger;
2+
3+
import org.jetbrains.annotations.NotNull;
4+
5+
import io.github.zero88.schedulerx.impl.AbstractTriggerEvaluator;
6+
import io.github.zero88.schedulerx.impl.TriggerContextFactory;
7+
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
8+
import io.vertx.core.Future;
9+
10+
final class EventTriggerEvaluator<T> extends AbstractTriggerEvaluator {
11+
12+
private final EventTrigger<T> trigger;
13+
14+
EventTriggerEvaluator(EventTrigger<T> trigger) { this.trigger = trigger; }
15+
16+
@Override
17+
@SuppressWarnings("unchecked")
18+
protected Future<TriggerContext> internalCheck(@NotNull TriggerContext ctx) {
19+
try {
20+
if (ctx.condition().status() == TriggerCondition.TriggerStatus.READY &&
21+
!trigger.getPredicate().test((T) ctx.info())) {
22+
return Future.succeededFuture(TriggerContextFactory.skip(ctx, ReasonCode.CONDITION_IS_NOT_MATCHED));
23+
}
24+
} catch (Exception ex) {
25+
return Future.succeededFuture(EventSchedulerImpl.handleException(ctx, ex));
26+
}
27+
return Future.succeededFuture(ctx);
28+
}
29+
30+
}

core/src/main/java/io/github/zero88/schedulerx/trigger/IntervalSchedulerImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
final class IntervalSchedulerImpl<IN, OUT> extends AbstractScheduler<IN, OUT, IntervalTrigger>
2222
implements IntervalScheduler<IN, OUT> {
2323

24-
IntervalSchedulerImpl(@NotNull Vertx vertx, @NotNull SchedulingMonitor<OUT> monitor, @NotNull JobData<IN> jobData,
25-
@NotNull Job<IN, OUT> job, @NotNull IntervalTrigger trigger,
26-
@NotNull TimeoutPolicy timeoutPolicy) {
27-
super(vertx, monitor, jobData, job, trigger, timeoutPolicy);
24+
IntervalSchedulerImpl(@NotNull Job<IN, OUT> job, @NotNull JobData<IN> jobData, @NotNull TimeoutPolicy timeoutPolicy,
25+
@NotNull SchedulingMonitor<OUT> monitor, @NotNull IntervalTrigger trigger,
26+
@NotNull TriggerEvaluator evaluator, @NotNull Vertx vertx) {
27+
super(job, jobData, timeoutPolicy, monitor, trigger, evaluator, vertx);
2828
}
2929

3030
protected @NotNull Future<Long> registerTimer(WorkerExecutor workerExecutor) {
@@ -49,10 +49,9 @@ protected void unregisterTimer(long timerId) {
4949
}
5050

5151
private long createPeriodicTimer(WorkerExecutor executor) {
52-
return vertx().setPeriodic(trigger().intervalInMilliseconds(), id -> onProcess(executor,
53-
TriggerContextFactory.kickoff(
54-
trigger().type(),
55-
onFire(id))));
52+
return this.vertx()
53+
.setPeriodic(trigger().intervalInMilliseconds(),
54+
id -> onProcess(executor, TriggerContextFactory.kickoff(trigger().type(), onFire(id))));
5655
}
5756

5857
// @formatter:off
@@ -62,7 +61,8 @@ static final class IntervalSchedulerBuilderImpl<IN, OUT>
6261
// @formatter:on
6362

6463
public @NotNull IntervalScheduler<IN, OUT> build() {
65-
return new IntervalSchedulerImpl<>(vertx(), monitor(), jobData(), job(), trigger(), timeoutPolicy());
64+
return new IntervalSchedulerImpl<>(job(), jobData(), timeoutPolicy(), monitor(), trigger(),
65+
triggerEvaluator(), vertx());
6666
}
6767

6868
}

0 commit comments

Comments
 (0)