Skip to content

Commit 1b03709

Browse files
committed
New interface SchedulerListener. Adapt old StatsRegistry. Add execution-events.
1 parent 1b52ac3 commit 1b03709

File tree

14 files changed

+375
-73
lines changed

14 files changed

+375
-73
lines changed

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import static com.github.kagkarlsson.scheduler.ExceptionUtils.describe;
1717

1818
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
19-
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
19+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
20+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType;
21+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
2022
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
2123
import com.github.kagkarlsson.scheduler.task.Execution;
2224
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
@@ -36,7 +38,7 @@ class ExecutePicked implements Runnable {
3638
private final TaskRepository taskRepository;
3739
private SchedulerClientEventListener earlyExecutionListener;
3840
private final SchedulerClient schedulerClient;
39-
private final StatsRegistry statsRegistry;
41+
private final SchedulerListener schedulerListener;
4042
private final TaskResolver taskResolver;
4143
private final SchedulerState schedulerState;
4244
private final ConfigurableLogger failureLogger;
@@ -49,7 +51,7 @@ public ExecutePicked(
4951
TaskRepository taskRepository,
5052
SchedulerClientEventListener earlyExecutionListener,
5153
SchedulerClient schedulerClient,
52-
StatsRegistry statsRegistry,
54+
SchedulerListener schedulerListener,
5355
TaskResolver taskResolver,
5456
SchedulerState schedulerState,
5557
ConfigurableLogger failureLogger,
@@ -60,7 +62,7 @@ public ExecutePicked(
6062
this.taskRepository = taskRepository;
6163
this.earlyExecutionListener = earlyExecutionListener;
6264
this.schedulerClient = schedulerClient;
63-
this.statsRegistry = statsRegistry;
65+
this.schedulerListener = schedulerListener;
6466
this.taskResolver = taskResolver;
6567
this.schedulerState = schedulerState;
6668
this.failureLogger = failureLogger;
@@ -77,7 +79,8 @@ public void run() {
7779
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);
7880

7981
try {
80-
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
82+
schedulerListener.onCandidateEvent(CandidateEventType.EXECUTED);
83+
schedulerListener.onExecutionStart(currentlyExecuting);
8184
executePickedExecution(pickedExecution, currentlyExecuting);
8285
} finally {
8386
executor.removeCurrentlyProcessing(executionId);
@@ -90,7 +93,7 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
9093
LOG.error(
9194
"Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.",
9295
execution.taskInstance.getTaskName());
93-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
96+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
9497
return;
9598
}
9699

@@ -106,15 +109,12 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
106109
LOG.debug("Execution done: " + execution);
107110

108111
complete(completion, execution, executionStarted);
109-
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);
110112

111113
} catch (RuntimeException unhandledException) {
112114
failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
113-
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
114115

115116
} catch (Throwable unhandledError) {
116117
failure(task.get(), execution, unhandledError, executionStarted, "Error");
117-
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
118118
}
119119
}
120120

@@ -126,16 +126,17 @@ private void complete(
126126
completion.complete(
127127
completeEvent,
128128
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
129-
statsRegistry.registerSingleCompletedExecution(completeEvent);
130129
} catch (Throwable e) {
131-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
132-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
130+
schedulerListener.onSchedulerEvent(SchedulerEventType.COMPLETIONHANDLER_ERROR);
131+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
133132
LOG.error(
134133
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
135134
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
136135
execution,
137136
describe(e),
138137
e);
138+
} finally {
139+
schedulerListener.onExecutionComplete(completeEvent);
139140
}
140141
}
141142

@@ -155,16 +156,17 @@ private void failure(
155156
.onFailure(
156157
completeEvent,
157158
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
158-
statsRegistry.registerSingleCompletedExecution(completeEvent);
159159
} catch (Throwable e) {
160-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
161-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
160+
schedulerListener.onSchedulerEvent(SchedulerEventType.FAILUREHANDLER_ERROR);
161+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
162162
LOG.error(
163163
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
164164
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
165165
execution,
166166
describe(cause),
167167
e);
168+
} finally {
169+
schedulerListener.onExecutionComplete(completeEvent);
168170
}
169171
}
170172
}

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
package com.github.kagkarlsson.scheduler;
1515

1616
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
17-
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
17+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
18+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType;
19+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
1820
import com.github.kagkarlsson.scheduler.task.Execution;
1921
import java.time.Instant;
2022
import java.util.List;
@@ -30,7 +32,7 @@ public class FetchCandidates implements PollStrategy {
3032
private final TaskRepository taskRepository;
3133
private final SchedulerClient schedulerClient;
3234
private SchedulerClientEventListener earlyExecutionListener;
33-
private final StatsRegistry statsRegistry;
35+
private final SchedulerListener schedulerListener;
3436
private final SchedulerState schedulerState;
3537
private final ConfigurableLogger failureLogger;
3638
private final TaskResolver taskResolver;
@@ -48,7 +50,7 @@ public FetchCandidates(
4850
SchedulerClient schedulerClient,
4951
SchedulerClientEventListener earlyExecutionListener,
5052
int threadpoolSize,
51-
StatsRegistry statsRegistry,
53+
SchedulerListener schedulerListener,
5254
SchedulerState schedulerState,
5355
ConfigurableLogger failureLogger,
5456
TaskResolver taskResolver,
@@ -60,7 +62,7 @@ public FetchCandidates(
6062
this.taskRepository = taskRepository;
6163
this.schedulerClient = schedulerClient;
6264
this.earlyExecutionListener = earlyExecutionListener;
63-
this.statsRegistry = statsRegistry;
65+
this.schedulerListener = schedulerListener;
6466
this.schedulerState = schedulerState;
6567
this.failureLogger = failureLogger;
6668
this.taskResolver = taskResolver;
@@ -104,7 +106,7 @@ public void run() {
104106
taskRepository,
105107
earlyExecutionListener,
106108
schedulerClient,
107-
statsRegistry,
109+
schedulerListener,
108110
taskResolver,
109111
schedulerState,
110112
failureLogger,
@@ -117,7 +119,7 @@ public void run() {
117119
newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run);
118120
});
119121
}
120-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
122+
schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE);
121123
}
122124

123125
private class PickDue implements Callable<Optional<Execution>> {
@@ -141,7 +143,7 @@ public Optional<Execution> call() {
141143
if (addedDueExecutionsBatch.isOlderGenerationThan(currentGenerationNumber.get())) {
142144
// skipping execution due to it being stale
143145
addedDueExecutionsBatch.markBatchAsStale();
144-
statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE);
146+
schedulerListener.onCandidateEvent(CandidateEventType.STALE);
145147
LOG.trace(
146148
"Skipping queued execution (current generationNumber: {}, execution generationNumber: {})",
147149
currentGenerationNumber,
@@ -154,7 +156,7 @@ public Optional<Execution> call() {
154156
if (!pickedExecution.isPresent()) {
155157
// someone else picked id
156158
LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
157-
statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED);
159+
schedulerListener.onCandidateEvent(CandidateEventType.ALREADY_PICKED);
158160
return Optional.empty();
159161
}
160162

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
package com.github.kagkarlsson.scheduler;
1515

1616
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
17-
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
17+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
18+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
1819
import com.github.kagkarlsson.scheduler.task.Execution;
1920
import java.time.Instant;
2021
import java.util.List;
@@ -28,7 +29,7 @@ public class LockAndFetchCandidates implements PollStrategy {
2829
private final TaskRepository taskRepository;
2930
private final SchedulerClient schedulerClient;
3031
private SchedulerClientEventListener earlyExecutionListener;
31-
private final StatsRegistry statsRegistry;
32+
private final SchedulerListener schedulerListener;
3233
private final TaskResolver taskResolver;
3334
private final SchedulerState schedulerState;
3435
private final ConfigurableLogger failureLogger;
@@ -46,7 +47,7 @@ public LockAndFetchCandidates(
4647
SchedulerClient schedulerClient,
4748
SchedulerClientEventListener earlyExecutionListener,
4849
int threadpoolSize,
49-
StatsRegistry statsRegistry,
50+
SchedulerListener schedulerListener,
5051
SchedulerState schedulerState,
5152
ConfigurableLogger failureLogger,
5253
TaskResolver taskResolver,
@@ -58,7 +59,7 @@ public LockAndFetchCandidates(
5859
this.taskRepository = taskRepository;
5960
this.schedulerClient = schedulerClient;
6061
this.earlyExecutionListener = earlyExecutionListener;
61-
this.statsRegistry = statsRegistry;
62+
this.schedulerListener = schedulerListener;
6263
this.taskResolver = taskResolver;
6364
this.schedulerState = schedulerState;
6465
this.failureLogger = failureLogger;
@@ -104,7 +105,7 @@ public void run() {
104105
taskRepository,
105106
earlyExecutionListener,
106107
schedulerClient,
107-
statsRegistry,
108+
schedulerListener,
108109
taskResolver,
109110
schedulerState,
110111
failureLogger,
@@ -118,6 +119,6 @@ public void run() {
118119
}
119120
});
120121
}
121-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
122+
schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE);
122123
}
123124
}

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunAndLogErrors.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,19 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler;
1515

16-
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
16+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
17+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
1920

2021
class RunAndLogErrors implements Runnable {
2122
private static final Logger LOG = LoggerFactory.getLogger(RunAndLogErrors.class);
2223
private final Runnable toRun;
23-
private final StatsRegistry statsRegistry;
24+
private final SchedulerListener schedulerListener;
2425

25-
public RunAndLogErrors(Runnable toRun, StatsRegistry statsRegistry) {
26+
public RunAndLogErrors(Runnable toRun, SchedulerListener schedulerListener) {
2627
this.toRun = toRun;
27-
this.statsRegistry = statsRegistry;
28+
this.schedulerListener = schedulerListener;
2829
}
2930

3031
@Override
@@ -33,7 +34,7 @@ public void run() {
3334
toRun.run();
3435
} catch (Throwable e) {
3536
LOG.error("Unhandled exception. Will keep running.", e);
36-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
37+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
3738
}
3839
}
3940
}

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunUntilShutdown.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler;
1515

16-
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
16+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
17+
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
1920

@@ -22,17 +23,17 @@ class RunUntilShutdown implements Runnable {
2223
private final Runnable toRun;
2324
private final Waiter waitBetweenRuns;
2425
private final SchedulerState schedulerState;
25-
private final StatsRegistry statsRegistry;
26+
private final SchedulerListener schedulerListener;
2627

2728
public RunUntilShutdown(
2829
Runnable toRun,
2930
Waiter waitBetweenRuns,
3031
SchedulerState schedulerState,
31-
StatsRegistry statsRegistry) {
32+
SchedulerListener schedulerListener) {
3233
this.toRun = toRun;
3334
this.waitBetweenRuns = waitBetweenRuns;
3435
this.schedulerState = schedulerState;
35-
this.statsRegistry = statsRegistry;
36+
this.schedulerListener = schedulerListener;
3637
}
3738

3839
@Override
@@ -43,7 +44,7 @@ public void run() {
4344
toRun.run();
4445
} catch (Throwable e) {
4546
LOG.error("Unhandled exception. Will keep running.", e);
46-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
47+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
4748
}
4849
}
4950

@@ -54,7 +55,7 @@ public void run() {
5455
LOG.debug("Thread '{}' interrupted due to shutdown.", Thread.currentThread().getName());
5556
} else {
5657
LOG.error("Unexpected interruption of thread. Will keep running.", interruptedException);
57-
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
58+
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
5859
}
5960
}
6061
}

0 commit comments

Comments
 (0)