Skip to content

Commit 93a78c7

Browse files
committed
Adding ExecutionInterceptor in addition to SchedulerListener
1 parent af6fcbf commit 93a78c7

File tree

10 files changed

+236
-7
lines changed

10 files changed

+236
-7
lines changed

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import static com.github.kagkarlsson.scheduler.ExceptionUtils.describe;
1717

18+
import com.github.kagkarlsson.scheduler.event.ExecutionChain;
19+
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
1820
import com.github.kagkarlsson.scheduler.event.SchedulerListener.CandidateEventType;
1921
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
2022
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
@@ -23,21 +25,25 @@
2325
import com.github.kagkarlsson.scheduler.task.Execution;
2426
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
2527
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
28+
import com.github.kagkarlsson.scheduler.task.ExecutionHandler;
2629
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
2730
import com.github.kagkarlsson.scheduler.task.Task;
2831
import java.time.Instant;
32+
import java.util.ArrayList;
33+
import java.util.List;
2934
import java.util.Optional;
3035
import java.util.UUID;
3136
import org.slf4j.Logger;
3237
import org.slf4j.LoggerFactory;
3338

34-
@SuppressWarnings("rawtypes")
39+
@SuppressWarnings({"rawtypes", "unchecked"})
3540
class ExecutePicked implements Runnable {
3641
private static final Logger LOG = LoggerFactory.getLogger(ExecutePicked.class);
3742
private final Executor executor;
3843
private final TaskRepository taskRepository;
3944
private final SchedulerClient schedulerClient;
4045
private final SchedulerListeners schedulerListeners;
46+
private final List<ExecutionInterceptor> executionInterceptors;
4147
private final TaskResolver taskResolver;
4248
private final SchedulerState schedulerState;
4349
private final ConfigurableLogger failureLogger;
@@ -50,6 +56,7 @@ public ExecutePicked(
5056
TaskRepository taskRepository,
5157
SchedulerClient schedulerClient,
5258
SchedulerListeners schedulerListeners,
59+
List<ExecutionInterceptor> executionInterceptors,
5360
TaskResolver taskResolver,
5461
SchedulerState schedulerState,
5562
ConfigurableLogger failureLogger,
@@ -60,6 +67,7 @@ public ExecutePicked(
6067
this.taskRepository = taskRepository;
6168
this.schedulerClient = schedulerClient;
6269
this.schedulerListeners = schedulerListeners;
70+
this.executionInterceptors = executionInterceptors;
6371
this.taskResolver = taskResolver;
6472
this.schedulerState = schedulerState;
6573
this.failureLogger = failureLogger;
@@ -97,12 +105,12 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
97105
Instant executionStarted = clock.now();
98106
try {
99107
LOG.debug("Executing: " + execution);
100-
CompletionHandler completion =
101-
task.get()
102-
.execute(
103-
execution.taskInstance,
104-
new ExecutionContext(
105-
schedulerState, execution, schedulerClient, currentlyExecuting));
108+
ExecutionHandler handler = task.get();
109+
ExecutionContext executionContext =
110+
new ExecutionContext(schedulerState, execution, schedulerClient, currentlyExecuting);
111+
ExecutionChain chain = new ExecutionChain(new ArrayList<>(executionInterceptors), handler);
112+
113+
CompletionHandler completion = chain.proceed(execution.taskInstance, executionContext);
106114
LOG.debug("Execution done: " + execution);
107115

108116
complete(completion, execution, executionStarted);

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler;
1515

16+
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
1617
import com.github.kagkarlsson.scheduler.event.SchedulerListener.CandidateEventType;
1718
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
1819
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
@@ -32,6 +33,7 @@ public class FetchCandidates implements PollStrategy {
3233
private final TaskRepository taskRepository;
3334
private final SchedulerClient schedulerClient;
3435
private final SchedulerListeners schedulerListeners;
36+
private final List<ExecutionInterceptor> executionInterceptors;
3537
private final SchedulerState schedulerState;
3638
private final ConfigurableLogger failureLogger;
3739
private final TaskResolver taskResolver;
@@ -49,6 +51,7 @@ public FetchCandidates(
4951
SchedulerClient schedulerClient,
5052
int threadpoolSize,
5153
SchedulerListeners schedulerListeners,
54+
List<ExecutionInterceptor> executionInterceptors,
5255
SchedulerState schedulerState,
5356
ConfigurableLogger failureLogger,
5457
TaskResolver taskResolver,
@@ -60,6 +63,7 @@ public FetchCandidates(
6063
this.taskRepository = taskRepository;
6164
this.schedulerClient = schedulerClient;
6265
this.schedulerListeners = schedulerListeners;
66+
this.executionInterceptors = executionInterceptors;
6367
this.schedulerState = schedulerState;
6468
this.failureLogger = failureLogger;
6569
this.taskResolver = taskResolver;
@@ -103,6 +107,7 @@ public void run() {
103107
taskRepository,
104108
schedulerClient,
105109
schedulerListeners,
110+
executionInterceptors,
106111
taskResolver,
107112
schedulerState,
108113
failureLogger,

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.github.kagkarlsson.scheduler;
1515

16+
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
1617
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
1718
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
1819
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
@@ -29,6 +30,7 @@ public class LockAndFetchCandidates implements PollStrategy {
2930
private final TaskRepository taskRepository;
3031
private final SchedulerClient schedulerClient;
3132
private final SchedulerListeners schedulerListeners;
33+
private final List<ExecutionInterceptor> executionInterceptors;
3234
private final TaskResolver taskResolver;
3335
private final SchedulerState schedulerState;
3436
private final ConfigurableLogger failureLogger;
@@ -46,6 +48,7 @@ public LockAndFetchCandidates(
4648
SchedulerClient schedulerClient,
4749
int threadpoolSize,
4850
SchedulerListeners schedulerListeners,
51+
List<ExecutionInterceptor> executionInterceptors,
4952
SchedulerState schedulerState,
5053
ConfigurableLogger failureLogger,
5154
TaskResolver taskResolver,
@@ -57,6 +60,7 @@ public LockAndFetchCandidates(
5760
this.taskRepository = taskRepository;
5861
this.schedulerClient = schedulerClient;
5962
this.schedulerListeners = schedulerListeners;
63+
this.executionInterceptors = executionInterceptors;
6064
this.taskResolver = taskResolver;
6165
this.schedulerState = schedulerState;
6266
this.failureLogger = failureLogger;
@@ -102,6 +106,7 @@ public void run() {
102106
taskRepository,
103107
schedulerClient,
104108
schedulerListeners,
109+
executionInterceptors,
105110
taskResolver,
106111
schedulerState,
107112
failureLogger,

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static java.util.stream.Collectors.toList;
1818

1919
import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState;
20+
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
2021
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
2122
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
2223
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
@@ -81,6 +82,7 @@ protected Scheduler(
8182
Duration heartbeatInterval,
8283
int numberOfMissedHeartbeatsBeforeDead,
8384
List<SchedulerListener> schedulerListeners,
85+
List<ExecutionInterceptor> executionInterceptors,
8486
PollingStrategyConfig pollingStrategyConfig,
8587
Duration deleteUnresolvedAfter,
8688
Duration shutdownMaxWait,
@@ -120,6 +122,7 @@ protected Scheduler(
120122
this,
121123
threadpoolSize,
122124
this.schedulerListeners,
125+
executionInterceptors,
123126
schedulerState,
124127
failureLogger,
125128
taskResolver,
@@ -135,6 +138,7 @@ protected Scheduler(
135138
this,
136139
threadpoolSize,
137140
this.schedulerListeners,
141+
executionInterceptors,
138142
schedulerState,
139143
failureLogger,
140144
taskResolver,

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.github.kagkarlsson.scheduler.Scheduler.THREAD_PREFIX;
1818
import static java.util.Optional.ofNullable;
1919

20+
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
2021
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
2122
import com.github.kagkarlsson.scheduler.jdbc.AutodetectJdbcCustomization;
2223
import com.github.kagkarlsson.scheduler.jdbc.JdbcCustomization;
@@ -77,6 +78,7 @@ public class SchedulerBuilder {
7778
private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT;
7879
private boolean alwaysPersistTimestampInUTC = false;
7980
private List<SchedulerListener> schedulerListeners = new ArrayList<>();
81+
private List<ExecutionInterceptor> executionInterceptors = new ArrayList<>();
8082

8183
public SchedulerBuilder(DataSource dataSource, List<Task<?>> knownTasks) {
8284
this.dataSource = dataSource;
@@ -144,6 +146,11 @@ public SchedulerBuilder addSchedulerListener(SchedulerListener schedulerListener
144146
return this;
145147
}
146148

149+
public SchedulerBuilder addExecutionInterceptor(ExecutionInterceptor interceptor) {
150+
this.executionInterceptors.add(interceptor);
151+
return this;
152+
}
153+
147154
public SchedulerBuilder schedulerName(SchedulerName schedulerName) {
148155
this.schedulerName = schedulerName;
149156
return this;
@@ -301,6 +308,7 @@ public Scheduler build() {
301308
heartbeatInterval,
302309
numberOfMissedHeartbeatsBeforeDead,
303310
schedulerListeners,
311+
executionInterceptors,
304312
pollingStrategyConfig,
305313
deleteUnresolvedAfter,
306314
shutdownMaxWait,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) Gustav Karlsson
3+
*
4+
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
5+
* except in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* <p>http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
10+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
* express or implied. See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.github.kagkarlsson.scheduler.event;
15+
16+
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
17+
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
18+
import com.github.kagkarlsson.scheduler.task.ExecutionHandler;
19+
import com.github.kagkarlsson.scheduler.task.TaskInstance;
20+
import java.util.List;
21+
22+
@SuppressWarnings({"rawtypes", "unchecked"})
23+
public class ExecutionChain {
24+
25+
private final List<ExecutionInterceptor> interceptors;
26+
private final ExecutionHandler<?> executionHandler;
27+
28+
public ExecutionChain(
29+
List<ExecutionInterceptor> interceptors, ExecutionHandler<?> executionHandler) {
30+
this.interceptors = interceptors;
31+
this.executionHandler = executionHandler;
32+
}
33+
34+
public CompletionHandler<?> proceed(
35+
TaskInstance taskInstance, ExecutionContext executionContext) {
36+
if (interceptors.isEmpty()) {
37+
return executionHandler.execute(taskInstance, executionContext);
38+
} else {
39+
ExecutionInterceptor nextInterceptor = interceptors.get(0);
40+
List<ExecutionInterceptor> rest = interceptors.subList(1, interceptors.size());
41+
return nextInterceptor.execute(
42+
taskInstance, executionContext, new ExecutionChain(rest, executionHandler));
43+
}
44+
}
45+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (C) Gustav Karlsson
3+
*
4+
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
5+
* except in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* <p>http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
10+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
* express or implied. See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.github.kagkarlsson.scheduler.event;
15+
16+
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
17+
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
18+
import com.github.kagkarlsson.scheduler.task.TaskInstance;
19+
20+
public interface ExecutionInterceptor {
21+
CompletionHandler<?> execute(
22+
TaskInstance<?> taskInstance, ExecutionContext executionContext, ExecutionChain chain);
23+
}

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.kagkarlsson.scheduler.task.OnStartup;
2020
import java.time.Duration;
2121
import java.time.Instant;
22+
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.concurrent.ExecutorService;
2425
import java.util.concurrent.ScheduledExecutorService;
@@ -60,6 +61,7 @@ public class ManualScheduler extends Scheduler {
6061
heartbeatInterval,
6162
SchedulerBuilder.DEFAULT_MISSED_HEARTBEATS_LIMIT,
6263
schedulerListeners,
64+
new ArrayList<>(),
6365
pollingStrategyConfig,
6466
deleteUnresolvedAfter,
6567
Duration.ZERO,
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.github.kagkarlsson.scheduler.event;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
6+
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
7+
import com.github.kagkarlsson.scheduler.task.ExecutionHandler;
8+
import com.github.kagkarlsson.scheduler.task.TaskInstance;
9+
import java.util.List;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import org.junit.jupiter.api.Test;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
@SuppressWarnings("rawtypes")
16+
class ExecutionChainTest {
17+
18+
@Test
19+
public void happy() {
20+
AtomicInteger counter = new AtomicInteger(0);
21+
CountingExecutionHandler executionHandler = new CountingExecutionHandler(counter);
22+
ExecutionChain chain =
23+
new ExecutionChain(
24+
List.of(new CountingInterceptor(counter), new CountingInterceptor(counter)),
25+
executionHandler);
26+
27+
chain.proceed(new TaskInstance("task1", "id1"), dummyContext());
28+
29+
assertEquals(3, counter.get());
30+
}
31+
32+
private static ExecutionContext dummyContext() {
33+
return new ExecutionContext(null, null, null, null);
34+
}
35+
36+
private static class CountingInterceptor implements ExecutionInterceptor {
37+
private static final Logger LOG = LoggerFactory.getLogger(CountingInterceptor.class);
38+
private final AtomicInteger counter;
39+
40+
public CountingInterceptor(AtomicInteger counter) {
41+
this.counter = counter;
42+
}
43+
44+
@Override
45+
public CompletionHandler<?> execute(
46+
TaskInstance<?> taskInstance, ExecutionContext executionContext, ExecutionChain chain) {
47+
48+
counter.incrementAndGet();
49+
return chain.proceed(taskInstance, executionContext);
50+
}
51+
}
52+
53+
private static class CountingExecutionHandler implements ExecutionHandler<Void> {
54+
55+
private final AtomicInteger counter;
56+
57+
public CountingExecutionHandler(AtomicInteger counter) {
58+
this.counter = counter;
59+
}
60+
61+
@Override
62+
public CompletionHandler<Void> execute(
63+
TaskInstance<Void> taskInstance, ExecutionContext executionContext) {
64+
counter.incrementAndGet();
65+
return null;
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)