Skip to content

Commit d389e53

Browse files
committed
Adding the ability to prioritize tasks
1 parent d09ecbd commit d389e53

File tree

44 files changed

+403
-102
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+403
-102
lines changed

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,25 @@ in queue (applicable if `upperLimitFractionOfThreads > 1.0`). If they stay there
252252
unlocked again (determined by `DeadExecutionHandler`). Currently supported by **postgres**. **sql-server** also supports
253253
this, but testing has shown this is prone to deadlocks and thus not recommended until understood/resolved.
254254

255+
### Prioritization
256+
257+
If you have a mix of tasks with different priorities, you might want to enable prioritization.
258+
259+
:gear: `.enablePrioritization()`<br/>
260+
An executor will always run the tasks with the highest priority first even if the `execution_time` is greater than other tasks. <br />
261+
You can set the priority of a task using the `setPriority(int)` method on the task instance, by default the priority of task is set to `0`.
262+
263+
```java
264+
scheduler.schedule(
265+
onetimeTask.instanceBuilder("1").setPriority(100),
266+
Instant.now()
267+
);
268+
269+
scheduler.schedule(
270+
onetimeTask.instanceBuilder("2").setPriority(200),
271+
Instant.now()
272+
);
273+
```
255274

256275
#### Less commonly tuned
257276

@@ -406,6 +425,7 @@ db-scheduler.table-name=scheduled_tasks
406425
db-scheduler.immediate-execution-enabled=false
407426
db-scheduler.scheduler-name=
408427
db-scheduler.threads=10
428+
db-scheduler.prioritization-enabled=false
409429

410430
# Ignored if a custom DbSchedulerStarter bean is defined
411431
db-scheduler.delay-startup-until-context-ready=false
@@ -577,6 +597,9 @@ There are a number of users that are using db-scheduler for high throughput use-
577597

578598
See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release-notes.
579599

600+
**Upgrading to 15.x**
601+
* Add column `priority` and `priority_execution_time_idx` index to the database schema. See table definitions for [postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql). Note that when `enablePrioritization()` is used, the `null` value in order of prioritization is handled differently depending on the database used.
602+
580603
**Upgrading to 8.x**
581604
* Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not.
582605

db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis
164164
builder.enableImmediateExecution();
165165
}
166166

167+
if (config.isPrioritizationEnabled()) {
168+
builder.enablePrioritization();
169+
}
170+
167171
// Use custom executor service if provided
168172
customizer.executorService().ifPresent(builder::executorService);
169173

db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/config/DbSchedulerProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public class DbSchedulerProperties {
114114
/** Whether or not to log the {@link Throwable} that caused a task to fail. */
115115
private boolean failureLoggerLogStackTrace = SchedulerBuilder.LOG_STACK_TRACE_ON_FAILURE;
116116

117+
/** Whether or not to prioritization of tasks is enabled. */
118+
private boolean prioritizationEnabled = false;
119+
117120
public boolean isEnabled() {
118121
return enabled;
119122
}
@@ -243,4 +246,12 @@ public boolean isAlwaysPersistTimestampInUtc() {
243246
public void setAlwaysPersistTimestampInUtc(boolean alwaysPersistTimestampInUTC) {
244247
this.alwaysPersistTimestampInUtc = alwaysPersistTimestampInUTC;
245248
}
249+
250+
public boolean isPrioritizationEnabled() {
251+
return prioritizationEnabled;
252+
}
253+
254+
public void setPrioritizationEnabled(boolean prioritizationEnabled) {
255+
this.prioritizationEnabled = prioritizationEnabled;
256+
}
246257
}

db-scheduler-boot-starter/src/test/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfigurationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,20 @@ public void it_should_start_when_the_context_is_ready() {
182182
});
183183
}
184184

185+
@Test
186+
public void it_should_enable_prioritization() {
187+
ctxRunner
188+
.withPropertyValues("db-scheduler.prioritization-enabled=true")
189+
.run(
190+
(AssertableApplicationContext ctx) -> {
191+
assertThat(ctx).hasSingleBean(DataSource.class);
192+
assertThat(ctx).hasSingleBean(Scheduler.class);
193+
194+
DbSchedulerProperties props = ctx.getBean(DbSchedulerProperties.class);
195+
assertThat(props.isPrioritizationEnabled()).isTrue();
196+
});
197+
}
198+
185199
@Test
186200
public void it_should_support_custom_starting_strategies() {
187201
ctxRunner

db-scheduler-boot-starter/src/test/resources/schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ create table if not exists scheduled_tasks (
1010
consecutive_failures INT,
1111
last_heartbeat TIMESTAMP WITH TIME ZONE,
1212
version BIGINT,
13+
priority INT,
1314
PRIMARY KEY (task_name, task_instance)
1415
);

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class FetchCandidates implements PollStrategy {
4444
AtomicInteger currentGenerationNumber = new AtomicInteger(0);
4545
private final int lowerLimit;
4646
private final int upperLimit;
47+
private final boolean prioritization;
4748

4849
public FetchCandidates(
4950
Executor executor,
@@ -58,7 +59,8 @@ public FetchCandidates(
5859
Clock clock,
5960
PollingStrategyConfig pollingStrategyConfig,
6061
Runnable triggerCheckForNewExecutions,
61-
HeartbeatConfig heartbeatConfig) {
62+
HeartbeatConfig heartbeatConfig,
63+
boolean prioritization) {
6264
this.executor = executor;
6365
this.taskRepository = taskRepository;
6466
this.schedulerClient = schedulerClient;
@@ -71,6 +73,7 @@ public FetchCandidates(
7173
this.pollingStrategyConfig = pollingStrategyConfig;
7274
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
7375
this.heartbeatConfig = heartbeatConfig;
76+
this.prioritization = prioritization;
7477
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
7578
// FIXLATER: this is not "upper limit", but rather nr of executions to get. those already in
7679
// queue will become stale
@@ -84,7 +87,8 @@ public void run() {
8487
// Fetch new candidates for execution. Old ones still in ExecutorService will become stale and
8588
// be discarded
8689
final int executionsToFetch = upperLimit;
87-
List<Execution> fetchedDueExecutions = taskRepository.getDue(now, executionsToFetch);
90+
List<Execution> fetchedDueExecutions =
91+
taskRepository.getDue(now, executionsToFetch, prioritization);
8892
LOG.trace(
8993
"Fetched {} task instances due for execution at {}", fetchedDueExecutions.size(), now);
9094

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class LockAndFetchCandidates implements PollStrategy {
4141
private final int lowerLimit;
4242
private final int upperLimit;
4343
private AtomicBoolean moreExecutionsInDatabase = new AtomicBoolean(false);
44+
private final boolean prioritization;
4445

4546
public LockAndFetchCandidates(
4647
Executor executor,
@@ -55,7 +56,8 @@ public LockAndFetchCandidates(
5556
Clock clock,
5657
PollingStrategyConfig pollingStrategyConfig,
5758
Runnable triggerCheckForNewExecutions,
58-
HeartbeatConfig maxAgeBeforeConsideredDead) {
59+
HeartbeatConfig maxAgeBeforeConsideredDead,
60+
boolean prioritization) {
5961
this.executor = executor;
6062
this.taskRepository = taskRepository;
6163
this.schedulerClient = schedulerClient;
@@ -68,6 +70,7 @@ public LockAndFetchCandidates(
6870
this.pollingStrategyConfig = pollingStrategyConfig;
6971
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
7072
this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead;
73+
this.prioritization = prioritization;
7174
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
7275
upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
7376
}
@@ -85,7 +88,8 @@ public void run() {
8588
}
8689

8790
// FIXLATER: should it fetch here if not under lowerLimit? probably
88-
List<Execution> pickedExecutions = taskRepository.lockAndGetDue(now, executionsToFetch);
91+
List<Execution> pickedExecutions =
92+
taskRepository.lockAndGetDue(now, executionsToFetch, prioritization);
8993
LOG.trace("Picked {} taskinstances due for execution", pickedExecutions.size());
9094

9195
// Shared indicator for if there are more due executions in the database.

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class Scheduler implements SchedulerClient {
6969
private final Waiter heartbeatWaiter;
7070
final SettableSchedulerState schedulerState = new SettableSchedulerState();
7171
final ConfigurableLogger failureLogger;
72+
final boolean prioritization;
7273

7374
protected Scheduler(
7475
Clock clock,
@@ -90,7 +91,8 @@ protected Scheduler(
9091
boolean logStackTrace,
9192
List<OnStartup> onStartup,
9293
ExecutorService dueExecutor,
93-
ScheduledExecutorService housekeeperExecutor) {
94+
ScheduledExecutorService housekeeperExecutor,
95+
boolean prioritization) {
9496
this.clock = clock;
9597
this.schedulerTaskRepository = schedulerTaskRepository;
9698
this.taskResolver = taskResolver;
@@ -112,6 +114,7 @@ protected Scheduler(
112114
this.housekeeperExecutor = housekeeperExecutor;
113115
delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
114116
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
117+
this.prioritization = prioritization;
115118

116119
if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
117120
schedulerTaskRepository.verifySupportsLockAndFetch();
@@ -129,7 +132,8 @@ protected Scheduler(
129132
clock,
130133
pollingStrategyConfig,
131134
this::triggerCheckForDueExecutions,
132-
heartbeatConfig);
135+
heartbeatConfig,
136+
prioritization);
133137
} else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) {
134138
executeDueStrategy =
135139
new FetchCandidates(
@@ -145,7 +149,8 @@ protected Scheduler(
145149
clock,
146150
pollingStrategyConfig,
147151
this::triggerCheckForDueExecutions,
148-
heartbeatConfig);
152+
heartbeatConfig,
153+
prioritization);
149154
} else {
150155
throw new IllegalArgumentException(
151156
"Unknown polling-strategy type: " + pollingStrategyConfig.type);

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class SchedulerBuilder {
7474
protected PollingStrategyConfig pollingStrategyConfig = DEFAULT_POLLING_STRATEGY;
7575
protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL;
7676
protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE;
77+
protected boolean prioritization = false;
7778
private boolean registerShutdownHook = false;
7879
private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT;
7980
private boolean alwaysPersistTimestampInUTC = false;
@@ -230,6 +231,11 @@ public SchedulerBuilder registerShutdownHook() {
230231
return this;
231232
}
232233

234+
public SchedulerBuilder enablePrioritization() {
235+
this.prioritization = true;
236+
return this;
237+
}
238+
233239
public Scheduler build() {
234240
if (schedulerName == null) {
235241
schedulerName = new SchedulerName.Hostname();
@@ -249,6 +255,7 @@ public Scheduler build() {
249255
taskResolver,
250256
schedulerName,
251257
serializer,
258+
prioritization,
252259
clock);
253260
final JdbcTaskRepository clientTaskRepository =
254261
new JdbcTaskRepository(
@@ -259,6 +266,7 @@ public Scheduler build() {
259266
taskResolver,
260267
schedulerName,
261268
serializer,
269+
prioritization,
262270
clock);
263271

264272
ExecutorService candidateExecutorService = executorService;
@@ -316,7 +324,8 @@ public Scheduler build() {
316324
logStackTrace,
317325
startTasks,
318326
candidateDueExecutor,
319-
candidateHousekeeperExecutor);
327+
candidateHousekeeperExecutor,
328+
prioritization);
320329

321330
if (enableImmediateExecution) {
322331
scheduler.registerSchedulerListener(new ImmediateCheckForDueExecutions(scheduler, clock));

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class Builder {
213213
private Serializer serializer = Serializer.DEFAULT_JAVA_SERIALIZER;
214214
private String tableName = JdbcTaskRepository.DEFAULT_TABLE_NAME;
215215
private JdbcCustomization jdbcCustomization;
216+
private boolean prioritization = false;
216217

217218
private Builder(DataSource dataSource, List<Task<?>> knownTasks) {
218219
this.dataSource = dataSource;
@@ -242,6 +243,11 @@ public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) {
242243
return this;
243244
}
244245

246+
public Builder enablePrioritization() {
247+
this.prioritization = true;
248+
return this;
249+
}
250+
245251
public SchedulerClient build() {
246252
TaskResolver taskResolver = new TaskResolver(StatsRegistry.NOOP, knownTasks);
247253
final SystemClock clock = new SystemClock();
@@ -259,6 +265,7 @@ public SchedulerClient build() {
259265
taskResolver,
260266
new SchedulerClientName(),
261267
serializer,
268+
prioritization,
262269
clock);
263270

264271
return new StandardSchedulerClient(taskRepository, clock);

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface TaskRepository {
2626

2727
boolean createIfNotExists(SchedulableInstance execution);
2828

29-
List<Execution> getDue(Instant now, int limit);
29+
List<Execution> getDue(Instant now, int limit, boolean prioritization);
3030

3131
Instant replace(Execution toBeReplaced, SchedulableInstance newInstance);
3232

@@ -37,7 +37,7 @@ void getScheduledExecutions(
3737

3838
List<Execution> lockAndFetchGeneric(Instant now, int limit);
3939

40-
List<Execution> lockAndGetDue(Instant now, int limit);
40+
List<Execution> lockAndGetDue(Instant now, int limit, boolean prioritization);
4141

4242
void remove(Execution execution);
4343

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,20 @@ public String getQueryLimitPart(int limit) {
125125
return jdbcCustomization.getQueryLimitPart(limit);
126126
}
127127

128+
@Override
129+
public String getQueryOrderPart(boolean prioritization) {
130+
return jdbcCustomization.getQueryOrderPart(prioritization);
131+
}
132+
128133
@Override
129134
public boolean supportsSingleStatementLockAndFetch() {
130135
return jdbcCustomization.supportsSingleStatementLockAndFetch();
131136
}
132137

133138
@Override
134139
public List<Execution> lockAndFetchSingleStatement(
135-
JdbcTaskRepositoryContext ctx, Instant now, int limit) {
136-
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit);
140+
JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean prioritization) {
141+
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit, prioritization);
137142
}
138143

139144
@Override
@@ -143,14 +148,15 @@ public boolean supportsGenericLockAndFetch() {
143148

144149
@Override
145150
public String createGenericSelectForUpdateQuery(
146-
String tableName, int limit, String requiredAndCondition) {
151+
String tableName, int limit, String requiredAndCondition, boolean prioritization) {
147152
return jdbcCustomization.createGenericSelectForUpdateQuery(
148-
tableName, limit, requiredAndCondition);
153+
tableName, limit, requiredAndCondition, prioritization);
149154
}
150155

151156
@Override
152-
public String createSelectDueQuery(String tableName, int limit, String andCondition) {
153-
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition);
157+
public String createSelectDueQuery(
158+
String tableName, int limit, String andCondition, boolean prioritization) {
159+
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition, prioritization);
154160
}
155161

156162
@Override

0 commit comments

Comments
 (0)