@@ -160,8 +160,10 @@ protected final void doStart(WorkerExecutor workerExecutor) {
160
160
}
161
161
162
162
protected final void doStop (long timerId , TriggerContext context ) {
163
- unregisterTimer (timerId );
164
- onComplete (context );
163
+ if (context .isStopped ()) {
164
+ unregisterTimer (timerId );
165
+ onComplete (context );
166
+ }
165
167
}
166
168
167
169
/**
@@ -174,18 +176,6 @@ protected final void doStop(long timerId, TriggerContext context) {
174
176
*/
175
177
protected abstract void unregisterTimer (long timerId );
176
178
177
- /**
178
- * Check a trigger context whether to be able to stop by configuration or force stop
179
- */
180
- protected final TriggerContext shouldStop (@ NotNull TriggerContext triggerContext , boolean isForceStop , long round ) {
181
- if (isForceStop ) {
182
- return TriggerContextFactory .stop (triggerContext , ReasonCode .STOP_BY_JOB );
183
- }
184
- return trigger ().shouldStop (round )
185
- ? TriggerContextFactory .stop (triggerContext , ReasonCode .STOP_BY_CONFIG )
186
- : triggerContext ;
187
- }
188
-
189
179
/**
190
180
* Register a timer id in internal state and increase tick time when the system timer fires
191
181
*
@@ -199,29 +189,27 @@ protected final long onFire(long timerId) {
199
189
/**
200
190
* Processing the trigger right away after the system timer fires
201
191
*/
202
- protected final void onProcess (WorkerExecutor workerExecutor , TriggerContext ctx ) {
203
- log (Objects .requireNonNull (ctx .firedAt ()), "On fire" );
204
- final Duration timeout = timeoutPolicy ().evaluationTimeout ();
205
- this .<TriggerContext >executeBlocking (workerExecutor , p -> this .wrapTimeout (timeout , p )
206
- .handle (evaluator .beforeRun (trigger , ctx ,
207
- jobData .externalId ())))
208
- .onSuccess (context -> onTrigger (workerExecutor , context ))
209
- .onFailure (t -> onMisfire (TriggerContextFactory .skip (ctx , t instanceof TimeoutException
210
- ? ReasonCode .EVALUATION_TIMEOUT
211
- : ReasonCode .UNEXPECTED_ERROR , t )));
192
+ protected final void onProcess (WorkerExecutor workerExecutor , TriggerContext triggerContext ) {
193
+ log (Objects .requireNonNull (triggerContext .firedAt ()), "On fire" );
194
+ this .onEvaluationBeforeTrigger (workerExecutor , triggerContext )
195
+ .onSuccess (ctx -> onTrigger (workerExecutor , ctx ))
196
+ .onFailure (t -> onMisfire (TriggerContextFactory .skip (triggerContext , t instanceof TimeoutException
197
+ ? ReasonCode .EVALUATION_TIMEOUT
198
+ : ReasonCode .UNEXPECTED_ERROR , t )));
212
199
}
213
200
214
201
protected final void onTrigger (WorkerExecutor workerExecutor , TriggerContext triggerContext ) {
215
202
if (!triggerContext .isReady ()) {
216
203
onMisfire (triggerContext );
217
204
return ;
218
205
}
219
- final ExecutionContextInternal < OUT > executionContext = new ExecutionContextImpl <>( vertx , triggerContext ,
220
- state . increaseRound () );
206
+ final long round = state . increaseRound ();
207
+ final ExecutionContextInternal < OUT > executionContext = new ExecutionContextImpl <>( vertx , triggerContext , round );
221
208
final Duration timeout = timeoutPolicy ().executionTimeout ();
222
- log (executionContext .triggeredAt (), "On trigger" , triggerContext .tick (), executionContext .round ());
223
- this .executeBlocking (workerExecutor , p -> executeJob (executionContext .setup (wrapTimeout (timeout , p ))))
224
- .onComplete (ar -> onResult (executionContext , ar .cause ()));
209
+ log (executionContext .triggeredAt (), "On trigger" , triggerContext .tick (), round );
210
+ Future .join (onEvaluationAfterTrigger (workerExecutor , triggerContext , round ),
211
+ executeBlocking (workerExecutor , p -> executeJob (executionContext .setup (wrapTimeout (timeout , p )))))
212
+ .onComplete (ar -> onResult (executionContext , ar .result ().cause (1 )));
225
213
}
226
214
227
215
protected final void onSchedule (long timerId ) {
@@ -260,6 +248,25 @@ protected final void onUnableSchedule(Throwable cause) {
260
248
.build ());
261
249
}
262
250
251
+ protected final Future <TriggerContext > onEvaluationBeforeTrigger (WorkerExecutor worker , TriggerContext ctx ) {
252
+ return executeBlocking (worker , p -> {
253
+ log (Instant .now (), "On before trigger" );
254
+ this .wrapTimeout (timeoutPolicy ().evaluationTimeout (), p )
255
+ .handle (evaluator .beforeTrigger (trigger , ctx , jobData .externalId ()));
256
+ });
257
+ }
258
+
259
+ protected final Future <TriggerContext > onEvaluationAfterTrigger (WorkerExecutor worker , TriggerContext ctx ,
260
+ long round ) {
261
+ return executeBlocking (worker , p -> {
262
+ log (Instant .now (), "On after trigger" );
263
+ wrapTimeout (timeoutPolicy ().evaluationTimeout (), p ).handle (
264
+ evaluator .afterTrigger (trigger (), ctx , jobData .externalId (), round )
265
+ .onSuccess (_ctx -> doStop (state .timerId (), _ctx ))
266
+ .onFailure (t -> LOGGER .error (genMsg (ctx .tick (), round , Instant .now (), "After evaluate" ), t )));
267
+ });
268
+ }
269
+
263
270
protected final void onMisfire (@ NotNull TriggerContext triggerCtx ) {
264
271
final Instant finishedAt = state .markFinished (triggerCtx .tick ());
265
272
final String reasonCode = triggerCtx .condition ().reasonCode ();
@@ -288,7 +295,7 @@ protected final void onResult(@NotNull ExecutionContext<OUT> executionContext, @
288
295
if (asyncCause instanceof TimeoutException ) {
289
296
LOGGER .warn (genMsg (triggerContext .tick (), ctx .round (), finishedAt , asyncCause .getMessage ()));
290
297
} else if (asyncCause != null ) {
291
- LOGGER .error (genMsg (triggerContext .tick (), ctx .round (), finishedAt , "System error" ), asyncCause );
298
+ LOGGER .error (genMsg (triggerContext .tick (), ctx .round (), finishedAt , "On result:: System error" ), asyncCause );
292
299
}
293
300
monitor .onEach (ExecutionResultImpl .<OUT >builder ()
294
301
.setExternalId (jobData .externalId ())
@@ -304,9 +311,8 @@ protected final void onResult(@NotNull ExecutionContext<OUT> executionContext, @
304
311
.setError (state .addError (ctx .round (),
305
312
Optional .ofNullable (ctx .error ()).orElse (asyncCause )))
306
313
.build ());
307
- final TriggerContext transitionCtx = shouldStop (triggerContext , ctx .isForceStop (), ctx .round ());
308
- if (transitionCtx .isStopped ()) {
309
- doStop (state .timerId (), transitionCtx );
314
+ if (ctx .isForceStop ()) {
315
+ doStop (state .timerId (), TriggerContextFactory .stop (triggerContext , ReasonCode .STOP_BY_JOB ));
310
316
}
311
317
}
312
318
@@ -358,15 +364,15 @@ private <R> Promise<R> wrapTimeout(Duration timeout, Promise<R> promise) {
358
364
}
359
365
360
366
@ SuppressWarnings ("rawtypes" )
361
- private static class InternalTriggerEvaluator extends AbstractTriggerEvaluator {
367
+ private static class InternalTriggerEvaluator extends DefaultTriggerEvaluator {
362
368
363
369
private final AbstractScheduler scheduler ;
364
370
365
371
private InternalTriggerEvaluator (AbstractScheduler scheduler ) { this .scheduler = scheduler ; }
366
372
367
373
@ Override
368
- protected Future <TriggerContext > internalCheck (@ NotNull Trigger trigger , @ NotNull TriggerContext ctx ,
369
- @ Nullable Object externalId ) {
374
+ protected Future <TriggerContext > internalBeforeTrigger (@ NotNull Trigger trigger , @ NotNull TriggerContext ctx ,
375
+ @ Nullable Object externalId ) {
370
376
if (!ctx .isKickoff ()) {
371
377
throw new IllegalStateException ("Trigger condition status must be " + TriggerStatus .KICKOFF );
372
378
}
@@ -375,7 +381,6 @@ protected Future<TriggerContext> internalCheck(@NotNull Trigger trigger, @NotNul
375
381
376
382
@ NotNull
377
383
private TriggerContext doCheck (TriggerContext ctx ) {
378
- scheduler .log (Instant .now (), "On evaluate" );
379
384
if (scheduler .state .pending ()) {
380
385
return TriggerContextFactory .skip (ctx , ReasonCode .NOT_YET_SCHEDULED );
381
386
}
0 commit comments