@@ -65,12 +65,12 @@ public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements S
65
65
protected AbstractScheduler (@ NotNull Vertx vertx , @ NotNull SchedulingMonitor <OUT > monitor ,
66
66
@ NotNull JobData <IN > jobData , @ NotNull Job <IN , OUT > job , @ NotNull T trigger ,
67
67
@ NotNull TimeoutPolicy timeoutPolicy ) {
68
+ this .job = job ;
69
+ this .jobData = jobData ;
70
+ this .timeoutPolicy = timeoutPolicy ;
68
71
this .vertx = vertx ;
69
72
this .trigger = trigger ;
70
73
this .monitor = monitor ;
71
- this .jobData = jobData ;
72
- this .job = job ;
73
- this .timeoutPolicy = timeoutPolicy ;
74
74
this .state = new SchedulerStateImpl <>();
75
75
}
76
76
@@ -147,7 +147,7 @@ public final void executeJob(@NotNull ExecutionContext<OUT> executionContext) {
147
147
public final void cancel () {
148
148
if (!state .completed ()) {
149
149
log (Instant .now (), "On cancel" );
150
- doStop (state .timerId (), TriggerContextFactory .stop (trigger ().type (), ReasonCode . STOP_BY_MANUAL ));
150
+ doStop (state .timerId (), TriggerContextFactory .cancel (trigger ().type (), state . tick () ));
151
151
}
152
152
}
153
153
@@ -173,7 +173,7 @@ protected final void doStop(long timerId, TriggerContext context) {
173
173
/**
174
174
* Check a trigger kickoff context whether to be able to run new execution or not
175
175
*/
176
- protected final TriggerTransitionContext shouldRun (@ NotNull TriggerTransitionContext kickOffContext ) {
176
+ protected final TriggerContext shouldRun (@ NotNull TriggerContext kickOffContext ) {
177
177
log (Instant .now (), "On evaluate" );
178
178
if (state .pending ()) {
179
179
return TriggerContextFactory .skip (kickOffContext , ReasonCode .NOT_YET_SCHEDULED );
@@ -190,8 +190,7 @@ protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionCon
190
190
/**
191
191
* Check a trigger context whether to be able to stop by configuration or force stop
192
192
*/
193
- protected final TriggerTransitionContext shouldStop (@ NotNull TriggerTransitionContext triggerContext ,
194
- boolean isForceStop , long round ) {
193
+ protected final TriggerContext shouldStop (@ NotNull TriggerContext triggerContext , boolean isForceStop , long round ) {
195
194
if (isForceStop ) {
196
195
return TriggerContextFactory .stop (triggerContext , ReasonCode .STOP_BY_JOB );
197
196
}
@@ -203,7 +202,7 @@ protected final TriggerTransitionContext shouldStop(@NotNull TriggerTransitionCo
203
202
/**
204
203
* Evaluate a trigger kickoff context on trigger rule
205
204
*/
206
- protected TriggerTransitionContext evaluateTriggerRule (@ NotNull TriggerTransitionContext triggerContext ) {
205
+ protected TriggerContext evaluateTriggerRule (@ NotNull TriggerContext triggerContext ) {
207
206
if (!triggerContext .isKickoff ()) {
208
207
throw new IllegalStateException ("Trigger condition status must be " + TriggerStatus .KICKOFF );
209
208
}
@@ -228,48 +227,52 @@ protected final long onFire(long timerId) {
228
227
}
229
228
230
229
/**
231
- * Processing the trigger when the system timer fires
230
+ * Processing the trigger right away after the system timer fires
232
231
*/
233
- protected final void onProcess (WorkerExecutor workerExecutor , TriggerTransitionContext kickoffContext ) {
234
- log (Objects .requireNonNull (kickoffContext .firedAt ()), "On fire" );
235
- this .<TriggerTransitionContext >executeBlocking (workerExecutor ,
236
- p -> wrapTimeout (timeoutPolicy ().evaluationTimeout (),
237
- p ).complete (shouldRun (kickoffContext )))
232
+ protected final void onProcess (WorkerExecutor workerExecutor , TriggerContext ctx ) {
233
+ log (Objects .requireNonNull (ctx .firedAt ()), "On fire" );
234
+ final Duration timeout = timeoutPolicy ().evaluationTimeout ();
235
+ this .<TriggerContext >executeBlocking (workerExecutor ,
236
+ p -> wrapTimeout (timeoutPolicy ().evaluationTimeout (), p ).complete (
237
+ shouldRun (ctx )))
238
238
.onSuccess (context -> onTrigger (workerExecutor , context ))
239
- .onFailure (t -> onMisfire (TriggerContextFactory .skip (kickoffContext , t instanceof TimeoutException
240
- ? ReasonCode .EVALUATION_TIMEOUT
241
- : ReasonCode .UNEXPECTED_ERROR , t )));
239
+ .onFailure (t -> onMisfire (TriggerContextFactory .skip (ctx , t instanceof TimeoutException
240
+ ? ReasonCode .EVALUATION_TIMEOUT
241
+ : ReasonCode .UNEXPECTED_ERROR , t )));
242
242
}
243
243
244
- protected final void onTrigger (WorkerExecutor workerExecutor , TriggerTransitionContext triggerContext ) {
244
+ protected final void onTrigger (WorkerExecutor workerExecutor , TriggerContext triggerContext ) {
245
245
if (!triggerContext .isReady ()) {
246
246
onMisfire (triggerContext );
247
247
return ;
248
248
}
249
249
final ExecutionContextInternal <OUT > executionContext = new ExecutionContextImpl <>(vertx , triggerContext ,
250
250
state .increaseRound ());
251
+ final Duration timeout = timeoutPolicy ().executionTimeout ();
251
252
log (executionContext .triggeredAt (), "On trigger" , triggerContext .tick (), executionContext .round ());
252
- this .executeBlocking (workerExecutor , p -> executeJob (
253
- executionContext .setup (wrapTimeout (timeoutPolicy ().executionTimeout (), p ))))
253
+ this .executeBlocking (workerExecutor , p -> executeJob (executionContext .setup (wrapTimeout (timeout , p ))))
254
254
.onComplete (ar -> onResult (executionContext , ar .cause ()));
255
255
}
256
256
257
257
protected final void onSchedule (long timerId ) {
258
- final TriggerContext context = TriggerContextFactory .scheduled (trigger ().type ());
259
258
ExecutionResult <OUT > result ;
260
259
if (state .pending ()) {
260
+ final TriggerContext context = TriggerContextFactory .scheduled (trigger ().type ());
261
261
result = ExecutionResultImpl .<OUT >builder ()
262
262
.setExternalId (jobData .externalId ())
263
263
.setAvailableAt (state .markAvailable ())
264
264
.setTriggerContext (context )
265
+ .setTick (context .tick ())
266
+ .setRound (context .tick ())
265
267
.build ();
266
268
} else {
269
+ final TriggerContext context = TriggerContextFactory .rescheduled (trigger ().type (), state .tick ());
267
270
result = ExecutionResultImpl .<OUT >builder ()
268
271
.setExternalId (jobData .externalId ())
269
272
.setAvailableAt (state .availableAt ())
270
273
.setTriggerContext (context )
271
274
.setRescheduledAt (Instant .now ())
272
- .setTick (state .tick ())
275
+ .setTick (context .tick ())
273
276
.setRound (state .round ())
274
277
.build ();
275
278
}
@@ -282,32 +285,38 @@ protected final void onUnableSchedule(Throwable cause) {
282
285
.setExternalId (jobData .externalId ())
283
286
.setTriggerContext (ctx )
284
287
.setUnscheduledAt (Instant .now ())
285
- .setTick (- 1 )
286
- .setRound (- 1 )
288
+ .setTick (ctx . tick () )
289
+ .setRound (ctx . tick () )
287
290
.build ());
288
291
}
289
292
290
- protected final void onMisfire (@ NotNull TriggerTransitionContext triggerContext ) {
291
- final Instant finishedAt = state .markFinished (triggerContext .tick ());
292
- log (finishedAt , "On misfire::" + triggerContext .condition ().reasonCode (), triggerContext .tick ());
293
+ protected final void onMisfire (@ NotNull TriggerContext triggerCtx ) {
294
+ final Instant finishedAt = state .markFinished (triggerCtx .tick ());
295
+ final String reasonCode = triggerCtx .condition ().reasonCode ();
296
+ final String event = "On misfire::" + reasonCode ;
297
+ if (ReasonCode .UNEXPECTED_ERROR .equals (reasonCode )) {
298
+ LOGGER .error (genMsg (triggerCtx .tick (), state .round (), finishedAt , event ), triggerCtx .condition ().cause ());
299
+ } else {
300
+ log (finishedAt , event , triggerCtx .tick ());
301
+ }
293
302
monitor .onMisfire (ExecutionResultImpl .<OUT >builder ()
294
303
.setExternalId (jobData .externalId ())
295
304
.setAvailableAt (state .availableAt ())
296
- .setTriggerContext (triggerContext )
297
- .setTick (triggerContext .tick ())
298
- .setFiredAt (triggerContext .firedAt ())
305
+ .setTriggerContext (triggerCtx )
306
+ .setTick (triggerCtx .tick ())
307
+ .setFiredAt (triggerCtx .firedAt ())
299
308
.setRound (state .round ())
300
309
.setFinishedAt (finishedAt )
301
310
.build ());
302
311
}
303
312
304
313
protected final void onResult (@ NotNull ExecutionContext <OUT > executionContext , @ Nullable Throwable asyncCause ) {
305
314
final ExecutionContextInternal <OUT > ctx = (ExecutionContextInternal <OUT >) executionContext ;
306
- final TriggerTransitionContext triggerContext = TriggerContextFactory .executed (ctx .triggerContext ());
315
+ final TriggerContext triggerContext = TriggerContextFactory .executed (ctx .triggerContext ());
307
316
final Instant finishedAt = state .markFinished (triggerContext .tick ());
308
317
log (finishedAt , "On result" , triggerContext .tick (), ctx .round ());
309
318
if (asyncCause instanceof TimeoutException ) {
310
- LOGGER .warn (genMsg (state .tick (), state .round (), finishedAt , asyncCause .getMessage ()));
319
+ LOGGER .warn (genMsg (triggerContext .tick (), ctx .round (), finishedAt , asyncCause .getMessage ()));
311
320
} else if (asyncCause != null ) {
312
321
LOGGER .error (genMsg (triggerContext .tick (), ctx .round (), finishedAt , "System error" ), asyncCause );
313
322
}
@@ -325,7 +334,7 @@ protected final void onResult(@NotNull ExecutionContext<OUT> executionContext, @
325
334
.setError (state .addError (ctx .round (),
326
335
Optional .ofNullable (ctx .error ()).orElse (asyncCause )))
327
336
.build ());
328
- final TriggerTransitionContext transitionCtx = shouldStop (triggerContext , ctx .isForceStop (), ctx .round ());
337
+ final TriggerContext transitionCtx = shouldStop (triggerContext , ctx .isForceStop (), ctx .round ());
329
338
if (transitionCtx .isStopped ()) {
330
339
doStop (state .timerId (), transitionCtx );
331
340
}
0 commit comments