Skip to content

Commit 92189fa

Browse files
committed
Merge internal SchedulerClientEventListener into SchedulerListener
1 parent 7780ac6 commit 92189fa

File tree

8 files changed

+32
-94
lines changed

8 files changed

+32
-94
lines changed

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class ExecutePicked implements Runnable {
3737
private static final Logger LOG = LoggerFactory.getLogger(ExecutePicked.class);
3838
private final Executor executor;
3939
private final TaskRepository taskRepository;
40-
private SchedulerClientEventListener earlyExecutionListener;
4140
private final SchedulerClient schedulerClient;
4241
private final SchedulerListeners schedulerListeners;
4342
private final TaskResolver taskResolver;
@@ -50,7 +49,6 @@ class ExecutePicked implements Runnable {
5049
public ExecutePicked(
5150
Executor executor,
5251
TaskRepository taskRepository,
53-
SchedulerClientEventListener earlyExecutionListener,
5452
SchedulerClient schedulerClient,
5553
SchedulerListeners schedulerListeners,
5654
TaskResolver taskResolver,
@@ -61,7 +59,6 @@ public ExecutePicked(
6159
Execution pickedExecution) {
6260
this.executor = executor;
6361
this.taskRepository = taskRepository;
64-
this.earlyExecutionListener = earlyExecutionListener;
6562
this.schedulerClient = schedulerClient;
6663
this.schedulerListeners = schedulerListeners;
6764
this.taskResolver = taskResolver;
@@ -126,7 +123,7 @@ private void complete(
126123
try {
127124
completion.complete(
128125
completeEvent,
129-
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
126+
new ExecutionOperations(taskRepository, schedulerListeners, execution));
130127
} catch (Throwable e) {
131128
schedulerListeners.onSchedulerEvent(SchedulerEventType.COMPLETIONHANDLER_ERROR);
132129
schedulerListeners.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
@@ -156,7 +153,7 @@ private void failure(
156153
task.getFailureHandler()
157154
.onFailure(
158155
completeEvent,
159-
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
156+
new ExecutionOperations(taskRepository, schedulerListeners, execution));
160157
} catch (Throwable e) {
161158
schedulerListeners.onSchedulerEvent(SchedulerEventType.FAILUREHANDLER_ERROR);
162159
schedulerListeners.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public class FetchCandidates implements PollStrategy {
3232
private final Executor executor;
3333
private final TaskRepository taskRepository;
3434
private final SchedulerClient schedulerClient;
35-
private SchedulerClientEventListener earlyExecutionListener;
3635
private final SchedulerListeners schedulerListeners;
3736
private final SchedulerState schedulerState;
3837
private final ConfigurableLogger failureLogger;
@@ -49,7 +48,6 @@ public FetchCandidates(
4948
Executor executor,
5049
TaskRepository taskRepository,
5150
SchedulerClient schedulerClient,
52-
SchedulerClientEventListener earlyExecutionListener,
5351
int threadpoolSize,
5452
SchedulerListeners schedulerListeners,
5553
SchedulerState schedulerState,
@@ -62,7 +60,6 @@ public FetchCandidates(
6260
this.executor = executor;
6361
this.taskRepository = taskRepository;
6462
this.schedulerClient = schedulerClient;
65-
this.earlyExecutionListener = earlyExecutionListener;
6663
this.schedulerListeners = schedulerListeners;
6764
this.schedulerState = schedulerState;
6865
this.failureLogger = failureLogger;
@@ -105,7 +102,6 @@ public void run() {
105102
new ExecutePicked(
106103
executor,
107104
taskRepository,
108-
earlyExecutionListener,
109105
schedulerClient,
110106
schedulerListeners,
111107
taskResolver,

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public class LockAndFetchCandidates implements PollStrategy {
2929
private final Executor executor;
3030
private final TaskRepository taskRepository;
3131
private final SchedulerClient schedulerClient;
32-
private SchedulerClientEventListener earlyExecutionListener;
3332
private final SchedulerListeners schedulerListeners;
3433
private final TaskResolver taskResolver;
3534
private final SchedulerState schedulerState;
@@ -46,7 +45,6 @@ public LockAndFetchCandidates(
4645
Executor executor,
4746
TaskRepository taskRepository,
4847
SchedulerClient schedulerClient,
49-
SchedulerClientEventListener earlyExecutionListener,
5048
int threadpoolSize,
5149
SchedulerListeners schedulerListeners,
5250
SchedulerState schedulerState,
@@ -59,7 +57,6 @@ public LockAndFetchCandidates(
5957
this.executor = executor;
6058
this.taskRepository = taskRepository;
6159
this.schedulerClient = schedulerClient;
62-
this.earlyExecutionListener = earlyExecutionListener;
6360
this.schedulerListeners = schedulerListeners;
6461
this.taskResolver = taskResolver;
6562
this.schedulerState = schedulerState;
@@ -104,7 +101,6 @@ public void run() {
104101
new ExecutePicked(
105102
executor,
106103
taskRepository,
107-
earlyExecutionListener,
108104
schedulerClient,
109105
schedulerListeners,
110106
taskResolver,

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public class Scheduler implements SchedulerClient {
6868
private final Waiter heartbeatWaiter;
6969
final SettableSchedulerState schedulerState = new SettableSchedulerState();
7070
final ConfigurableLogger failureLogger;
71-
private SchedulerClientEventListener earlyExecutionListener;
7271

7372
protected Scheduler(
7473
Clock clock,
@@ -110,11 +109,10 @@ protected Scheduler(
110109
this.schedulerListeners = new SchedulerListeners(schedulerListeners);
111110
this.dueExecutor = dueExecutor;
112111
this.housekeeperExecutor = housekeeperExecutor;
113-
earlyExecutionListener =
114-
(enableImmediateExecution
115-
? new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter)
116-
: SchedulerClientEventListener.NOOP);
117-
delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener, clock);
112+
if (enableImmediateExecution) {
113+
this.schedulerListeners.add(new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter));
114+
}
115+
delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
118116
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
119117

120118
if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
@@ -124,7 +122,6 @@ protected Scheduler(
124122
executor,
125123
schedulerTaskRepository,
126124
this,
127-
earlyExecutionListener,
128125
threadpoolSize,
129126
this.schedulerListeners,
130127
schedulerState,
@@ -140,7 +137,6 @@ protected Scheduler(
140137
executor,
141138
schedulerTaskRepository,
142139
this,
143-
earlyExecutionListener,
144140
threadpoolSize,
145141
this.schedulerListeners,
146142
schedulerState,
@@ -368,7 +364,7 @@ protected void detectDeadExecutions() {
368364
.deadExecution(
369365
ExecutionComplete.failure(execution, now, now, null),
370366
new ExecutionOperations(
371-
schedulerTaskRepository, earlyExecutionListener, execution));
367+
schedulerTaskRepository, schedulerListeners, execution));
372368
} else {
373369
LOG.error(
374370
"Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.",

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static java.util.Optional.ofNullable;
1717

18+
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
1819
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceCurrentlyExecutingException;
1920
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceNotFoundException;
2021
import com.github.kagkarlsson.scheduler.jdbc.AutodetectJdbcCustomization;
@@ -239,18 +240,18 @@ class StandardSchedulerClient implements SchedulerClient {
239240
private static final Logger LOG = LoggerFactory.getLogger(StandardSchedulerClient.class);
240241
protected final TaskRepository taskRepository;
241242
private final Clock clock;
242-
private SchedulerClientEventListener schedulerClientEventListener;
243+
private final SchedulerListeners schedulerListeners;
243244

244245
StandardSchedulerClient(TaskRepository taskRepository, Clock clock) {
245-
this(taskRepository, SchedulerClientEventListener.NOOP, clock);
246+
this(taskRepository, SchedulerListeners.NOOP, clock);
246247
}
247248

248249
StandardSchedulerClient(
249250
TaskRepository taskRepository,
250-
SchedulerClientEventListener schedulerClientEventListener,
251+
SchedulerListeners schedulerListeners,
251252
Clock clock) {
252253
this.taskRepository = taskRepository;
253-
this.schedulerClientEventListener = schedulerClientEventListener;
254+
this.schedulerListeners = schedulerListeners;
254255
this.clock = clock;
255256
}
256257

@@ -259,7 +260,7 @@ public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
259260
boolean success =
260261
taskRepository.createIfNotExists(SchedulableInstance.of(taskInstance, executionTime));
261262
if (success) {
262-
notifyListeners(ClientEvent.EventType.SCHEDULE, taskInstance, executionTime);
263+
schedulerListeners.onExecutionScheduled(taskInstance, executionTime);
263264
}
264265
}
265266

@@ -302,7 +303,7 @@ public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTi
302303
}
303304

304305
if (success) {
305-
notifyListeners(ClientEvent.EventType.RESCHEDULE, taskInstanceId, newExecutionTime);
306+
schedulerListeners.onExecutionScheduled(taskInstanceId, newExecutionTime);
306307
}
307308
} else {
308309
throw new TaskInstanceNotFoundException(taskName, instanceId);
@@ -320,8 +321,6 @@ public void cancel(TaskInstanceId taskInstanceId) {
320321
}
321322

322323
taskRepository.remove(execution.get());
323-
notifyListeners(
324-
ClientEvent.EventType.CANCEL, taskInstanceId, execution.get().executionTime);
325324
} else {
326325
throw new TaskInstanceNotFoundException(taskName, instanceId);
327326
}
@@ -366,16 +365,6 @@ public Optional<ScheduledExecution<Object>> getScheduledExecution(
366365
return e.map(oe -> new ScheduledExecution<>(Object.class, oe));
367366
}
368367

369-
private void notifyListeners(
370-
ClientEvent.EventType eventType, TaskInstanceId taskInstanceId, Instant executionTime) {
371-
try {
372-
schedulerClientEventListener.newEvent(
373-
new ClientEvent(
374-
new ClientEvent.ClientEventContext(eventType, taskInstanceId, executionTime)));
375-
} catch (Exception e) {
376-
LOG.error("Error when notifying SchedulerClientEventListener.", e);
377-
}
378-
}
379368
}
380369

381370
class SchedulerClientName implements SchedulerName {

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClientEventListener.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TriggerCheckForDueExecutions.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler;
1515

16+
import com.github.kagkarlsson.scheduler.event.AbstractSchedulerListener;
17+
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
1618
import java.time.Instant;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
1921

20-
class TriggerCheckForDueExecutions implements SchedulerClientEventListener {
22+
class TriggerCheckForDueExecutions extends AbstractSchedulerListener {
2123
private static final Logger LOG = LoggerFactory.getLogger(TriggerCheckForDueExecutions.class);
2224
private SchedulerState schedulerState;
2325
private Clock clock;
@@ -31,29 +33,22 @@ public TriggerCheckForDueExecutions(
3133
}
3234

3335
@Override
34-
public void newEvent(ClientEvent event) {
35-
ClientEvent.ClientEventContext ctx = event.getContext();
36-
ClientEvent.EventType eventType = ctx.getEventType();
37-
36+
public void onExecutionScheduled(
37+
TaskInstanceId taskInstanceId, Instant scheduledToExecutionTime) {
3838
if (!schedulerState.isStarted() || schedulerState.isShuttingDown()) {
3939
LOG.debug(
4040
"Will not act on scheduling event for execution (task: '{}', id: '{}') as scheduler is starting or shutting down.",
41-
ctx.getTaskInstanceId().getTaskName(),
42-
ctx.getTaskInstanceId().getId());
41+
taskInstanceId.getTaskName(),
42+
taskInstanceId.getId());
4343
return;
4444
}
4545

46-
if (eventType == ClientEvent.EventType.SCHEDULE
47-
|| eventType == ClientEvent.EventType.RESCHEDULE) {
48-
49-
Instant scheduledToExecutionTime = ctx.getExecutionTime();
50-
if (scheduledToExecutionTime.toEpochMilli() <= clock.now().toEpochMilli()) {
51-
LOG.debug(
52-
"Task-instance scheduled to run directly, triggering check for due executions (unless it is already running). Task: {}, instance: {}",
53-
ctx.getTaskInstanceId().getTaskName(),
54-
ctx.getTaskInstanceId().getId());
55-
executeDueWaiter.wakeOrSkipNextWait();
56-
}
46+
if (scheduledToExecutionTime.toEpochMilli() <= clock.now().toEpochMilli()) {
47+
LOG.debug(
48+
"Task-instance scheduled to run directly, triggering check for due executions (unless it is already running). Task: {}, instance: {}",
49+
taskInstanceId.getTaskName(),
50+
taskInstanceId.getId());
51+
executeDueWaiter.wakeOrSkipNextWait();
5752
}
5853
}
5954
}

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionOperations.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,22 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler.task;
1515

16-
import com.github.kagkarlsson.scheduler.ClientEvent;
17-
import com.github.kagkarlsson.scheduler.ClientEvent.ClientEventContext;
18-
import com.github.kagkarlsson.scheduler.ClientEvent.EventType;
19-
import com.github.kagkarlsson.scheduler.SchedulerClientEventListener;
2016
import com.github.kagkarlsson.scheduler.TaskRepository;
17+
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
2118
import java.time.Instant;
2219

2320
public class ExecutionOperations<T> {
2421

2522
private final TaskRepository taskRepository;
26-
private final SchedulerClientEventListener earlyExecutionListener;
23+
private final SchedulerListeners schedulerListeners;
2724
private final Execution execution;
2825

2926
public ExecutionOperations(
3027
TaskRepository taskRepository,
31-
SchedulerClientEventListener earlyExecutionListener,
28+
SchedulerListeners schedulerListeners,
3229
Execution execution) {
3330
this.taskRepository = taskRepository;
34-
this.earlyExecutionListener = earlyExecutionListener;
31+
this.schedulerListeners = schedulerListeners;
3532
this.execution = execution;
3633
}
3734

@@ -81,8 +78,6 @@ public void reschedule(ExecutionComplete completed, Instant nextExecutionTime, T
8178

8279
private void hintExecutionScheduled(TaskInstanceId taskInstanceId, Instant nextExecutionTime) {
8380
// Hint that a new execution was scheduled in-case we want to go check for it immediately
84-
earlyExecutionListener.newEvent(
85-
new ClientEvent(
86-
new ClientEventContext(EventType.RESCHEDULE, taskInstanceId, nextExecutionTime)));
81+
schedulerListeners.onExecutionScheduled(taskInstanceId, nextExecutionTime);
8782
}
8883
}

0 commit comments

Comments
 (0)