4
4
import java .util .concurrent .locks .Lock ;
5
5
import java .util .concurrent .locks .ReentrantLock ;
6
6
7
+ import org .jetbrains .annotations .ApiStatus .Internal ;
7
8
import org .jetbrains .annotations .NotNull ;
8
9
import org .jetbrains .annotations .Nullable ;
9
10
15
16
import io .github .zero88 .schedulerx .TaskResult ;
16
17
import io .github .zero88 .schedulerx .TriggerTaskExecutor ;
17
18
import io .github .zero88 .schedulerx .trigger .Trigger ;
19
+ import io .github .zero88 .schedulerx .trigger .TriggerContext ;
18
20
import io .vertx .core .AsyncResult ;
19
21
import io .vertx .core .Future ;
20
22
import io .vertx .core .Promise ;
30
32
* @param <OUT> Type of output data
31
33
* @param <T> Type of trigger
32
34
*/
35
+ @ Internal
33
36
public abstract class AbstractTaskExecutor <IN , OUT , T extends Trigger > implements TriggerTaskExecutor <IN , OUT , T > {
34
37
35
38
@ SuppressWarnings ("java:S3416" )
@@ -112,7 +115,7 @@ public final void start(WorkerExecutor workerExecutor) {
112
115
@ Override
113
116
public final void executeTask (@ NotNull TaskExecutionContext <OUT > executionContext ) {
114
117
try {
115
- trace (state . tick (), state . round (), executionContext .executedAt (), "Start to execute the task" );
118
+ trace (executionContext .executedAt (), "Start to execute the task" );
116
119
task .execute (jobData (), executionContext );
117
120
if (!task .isAsync ()) {
118
121
((TaskExecutionContextInternal <OUT >) executionContext ).internalComplete ();
@@ -125,59 +128,66 @@ public final void executeTask(@NotNull TaskExecutionContext<OUT> executionContex
125
128
@ Override
126
129
public final void cancel () {
127
130
if (!state .completed ()) {
128
- trace (state . tick (), state . round (), Instant .now (), "Canceling the task" );
131
+ trace (Instant .now (), "Canceling the task" );
129
132
doStop (state .timerId ());
130
- onCompleted ();
131
133
}
132
134
}
133
135
134
- protected abstract @ NotNull Future <Long > addTimer (@ NotNull Promise <Long > promise , WorkerExecutor workerExecutor );
136
+ protected final void doStart (WorkerExecutor workerExecutor ) {
137
+ this .registerTimer (Promise .promise (), workerExecutor )
138
+ .onSuccess (this ::onSchedule )
139
+ .onFailure (this ::onUnableSchedule );
140
+ }
141
+
142
+ protected final void doStop (long timerId ) {
143
+ unregisterTimer (timerId );
144
+ onCompleted ();
145
+ }
146
+
147
+ protected abstract @ NotNull Future <Long > registerTimer (@ NotNull Promise <Long > promise ,
148
+ @ Nullable WorkerExecutor workerExecutor );
135
149
136
- protected final boolean shouldRun (@ NotNull Instant triggerAt ) {
137
- final long tick = state .increaseTick ();
150
+ protected void unregisterTimer (long timerId ) { vertx .cancelTimer (timerId ); }
151
+
152
+ protected InternalTriggerContext shouldRun (@ NotNull Instant triggerAt , @ NotNull TriggerContext triggerContext ) {
153
+ state .increaseTick ();
138
154
if (state .completed ()) {
139
- trace (tick , state . round (), triggerAt , "The task execution is already completed" );
155
+ trace (triggerAt , "The task execution is already completed" );
140
156
}
141
157
if (state .executing ()) {
142
- trace (tick , state .round (), triggerAt , "Skip the execution due to the task is still running" );
143
- monitor .onMisfire (TaskResultImpl .<OUT >builder ()
144
- .setExternalId (jobData .externalId ())
145
- .setAvailableAt (state .availableAt ())
146
- .setTick (state .tick ())
147
- .setTriggeredAt (triggerAt )
148
- .build ());
158
+ trace (triggerAt , "Skip the execution due to the task is still running" );
159
+ onMisfire (triggerAt );
149
160
}
150
- return state .idle () && trigger ().shouldExecute (triggerAt );
161
+ return InternalTriggerContext . create ( state .idle () && trigger ().shouldExecute (triggerAt ), triggerContext );
151
162
}
152
163
153
164
protected final boolean shouldStop (@ Nullable TaskExecutionContext <OUT > executionContext , long round ) {
154
165
return (executionContext != null && executionContext .isForceStop ()) || trigger ().shouldStop (round );
155
166
}
156
167
157
- protected final void doStart (WorkerExecutor workerExecutor ) {
158
- this .addTimer (Promise .promise (), workerExecutor )
159
- .onSuccess (this ::onReceiveTimer )
160
- .onFailure (t -> monitor .onUnableSchedule (TaskResultImpl .<OUT >builder ()
161
- .setExternalId (jobData .externalId ())
162
- .setTick (state .tick ())
163
- .setRound (state .round ())
164
- .setAvailableAt (state .availableAt ())
165
- .setUnscheduledAt (Instant .now ())
166
- .setError (t )
167
- .build ()));
168
- }
169
-
170
- protected void doStop (long timerId ) {
171
- vertx .cancelTimer (timerId );
168
+ protected final void run (WorkerExecutor workerExecutor , TriggerContext triggerContext ) {
169
+ final Instant triggerAt = Instant .now ();
170
+ final InternalTriggerContext internalContext = shouldRun (triggerAt , triggerContext );
171
+ if (internalContext .shouldRun ()) {
172
+ final TriggerContext triggerCtx = TriggerContext .create (internalContext .type (), internalContext .info ());
173
+ final TaskExecutionContextInternal <OUT > ctx = new TaskExecutionContextImpl <>(vertx , state .increaseRound (),
174
+ triggerAt , triggerCtx );
175
+ trace (triggerAt , "Trigger the task execution" );
176
+ if (workerExecutor != null ) {
177
+ workerExecutor .executeBlocking (promise -> executeTask (onExecute (promise , ctx )), this ::onResult );
178
+ } else {
179
+ vertx .executeBlocking (promise -> executeTask (onExecute (promise , ctx )), this ::onResult );
180
+ }
181
+ }
172
182
}
173
183
174
- protected final void trace (long tick , long round , @ NotNull Instant at , @ NotNull String event ) {
184
+ protected final void trace (@ NotNull Instant at , @ NotNull String event ) {
175
185
if (LOGGER .isTraceEnabled ()) {
176
- LOGGER .trace (genMsg (tick , round , at , event ));
186
+ LOGGER .trace (genMsg (state . tick (), state . round () , at , event ));
177
187
}
178
188
}
179
189
180
- protected final void onReceiveTimer (long timerId ) {
190
+ protected final void onSchedule (long timerId ) {
181
191
TaskResult <OUT > result ;
182
192
if (state .pending ()) {
183
193
result = TaskResultImpl .<OUT >builder ()
@@ -196,37 +206,44 @@ protected final void onReceiveTimer(long timerId) {
196
206
monitor .onSchedule (result );
197
207
}
198
208
199
- protected final void run (WorkerExecutor workerExecutor ) {
200
- final Instant triggerAt = Instant .now ();
201
- if (shouldRun (triggerAt )) {
202
- TaskExecutionContextInternal <OUT > ctx = new TaskExecutionContextImpl <>(vertx , state .increaseRound (),
203
- triggerAt );
204
- trace (state .tick (), ctx .round (), triggerAt , "Trigger the task execution" );
205
- if (workerExecutor != null ) {
206
- workerExecutor .executeBlocking (promise -> executeTask (onExecute (promise , ctx )), this ::onResult );
207
- } else {
208
- vertx .executeBlocking (promise -> executeTask (onExecute (promise , ctx )), this ::onResult );
209
- }
210
- }
209
+ protected final void onUnableSchedule (Throwable t ) {
210
+ monitor .onUnableSchedule (TaskResultImpl .<OUT >builder ()
211
+ .setExternalId (jobData .externalId ())
212
+ .setTick (state .tick ())
213
+ .setRound (state .round ())
214
+ .setUnscheduledAt (Instant .now ())
215
+ .setError (t )
216
+ .build ());
211
217
}
212
218
213
- private TaskExecutionContextInternal <OUT > onExecute (@ NotNull Promise <Object > promise ,
214
- @ NotNull TaskExecutionContextInternal <OUT > executionContext ) {
215
- state .markExecuting ();
216
- return executionContext .setup (promise , Instant .now ());
219
+ protected final void onMisfire (@ NotNull Instant triggerAt ) {
220
+ monitor .onMisfire (TaskResultImpl .<OUT >builder ()
221
+ .setExternalId (jobData .externalId ())
222
+ .setTick (state .tick ())
223
+ .setAvailableAt (state .availableAt ())
224
+ .setTriggeredAt (triggerAt )
225
+ .build ());
217
226
}
218
227
219
228
@ SuppressWarnings ("unchecked" )
220
229
protected final void onResult (@ NotNull AsyncResult <Object > asyncResult ) {
221
230
state .markIdle ();
222
231
final Instant finishedAt = Instant .now ();
223
232
if (asyncResult .failed ()) {
224
- LOGGER .warn (genMsg (state .tick (), state .round (), finishedAt , "Internal execution error" ),
225
- asyncResult .cause ());
233
+ final Throwable cause = asyncResult .cause ();
234
+ LOGGER .warn (genMsg (state .tick (), state .round (), finishedAt , "Internal execution error" ), cause );
235
+ monitor .onEach (TaskResultImpl .<OUT >builder ()
236
+ .setExternalId (jobData .externalId ())
237
+ .setAvailableAt (state .availableAt ())
238
+ .setTick (state .tick ())
239
+ .setRound (state .round ())
240
+ .setFinishedAt (finishedAt )
241
+ .setError (state .addError (state .round (), cause ))
242
+ .build ());
226
243
}
227
244
TaskExecutionContextInternal <OUT > executionContext = (TaskExecutionContextInternal <OUT >) asyncResult .result ();
228
245
if (asyncResult .succeeded ()) {
229
- trace (state . tick (), executionContext . round (), finishedAt , "Received the task result" );
246
+ trace (finishedAt , "Received the task result" );
230
247
monitor .onEach (TaskResultImpl .<OUT >builder ()
231
248
.setExternalId (jobData .externalId ())
232
249
.setAvailableAt (state .availableAt ())
@@ -242,14 +259,13 @@ protected final void onResult(@NotNull AsyncResult<Object> asyncResult) {
242
259
}
243
260
if (shouldStop (executionContext , state .round ())) {
244
261
doStop (state .timerId ());
245
- onCompleted ();
246
262
}
247
263
}
248
264
249
265
protected final void onCompleted () {
250
266
state .markCompleted ();
251
267
final Instant completedAt = Instant .now ();
252
- trace (state . tick (), state . round (), completedAt , "The task execution is completed" );
268
+ trace (completedAt , "The task execution is completed" );
253
269
monitor .onCompleted (TaskResultImpl .<OUT >builder ()
254
270
.setExternalId (jobData .externalId ())
255
271
.setAvailableAt (state .availableAt ())
@@ -262,6 +278,12 @@ protected final void onCompleted() {
262
278
.build ());
263
279
}
264
280
281
+ private TaskExecutionContextInternal <OUT > onExecute (@ NotNull Promise <Object > promise ,
282
+ @ NotNull TaskExecutionContextInternal <OUT > executionContext ) {
283
+ state .markExecuting ();
284
+ return executionContext .setup (promise , Instant .now ());
285
+ }
286
+
265
287
private String genMsg (long tick , long round , @ NotNull Instant at , @ NotNull String event ) {
266
288
return "TaskExecutor[" + tick + "][" + round + "][" + at + "]::[" + jobData .externalId () + "] - " + event ;
267
289
}
0 commit comments