Skip to content

Commit

Permalink
feat: Adding the ability to prioritize tasks (#519)
Browse files Browse the repository at this point in the history
This change introduces the possibility of using a task prioritization
mechanism. This mechanism is disabled by default, and can be enabled
using the `enablePrioritization()` method.

## Reminders
- [x] Added/ran automated tests
- [x] Update README and/or examples
- [x] Ran `mvn spotless:apply`

---
Co-authored-by: Gustav Karlsson <[email protected]>
  • Loading branch information
simeq authored Nov 16, 2024
1 parent 292380a commit c3eace6
Show file tree
Hide file tree
Showing 74 changed files with 930 additions and 242 deletions.
56 changes: 49 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,32 @@ An instance of a _one-time_ task has a single execution-time some time in the fu
Define a _one-time_ task and start the scheduler:

```java
OneTimeTask<MyTaskData> myAdhocTask = Tasks.oneTime("my-typed-adhoc-task", MyTaskData.class)
TaskDescriptor<MyTaskData> MY_TASK =
TaskDescriptor.of("my-onetime-task", MyTaskData.class);

OneTimeTask<MyTaskData> myTaskImplementation =
Tasks.oneTime(MY_TASK)
.execute((inst, ctx) -> {
System.out.println("Executed! Custom data, Id: " + inst.getData().id);
System.out.println("Executed! Custom data, Id: " + inst.getData().id);
});

final Scheduler scheduler = Scheduler
.create(dataSource, myAdhocTask)
.registerShutdownHook()
.build();
.create(dataSource, myTaskImplementation)
.registerShutdownHook()
.build();

scheduler.start();

```

... and then at some point (at runtime), an execution is scheduled using the `SchedulerClient`:

```java
// Schedule the task for execution a certain time in the future and optionally provide custom data for the execution
scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.now().plusSeconds(5));
scheduler.schedule(
MY_TASK
.instanceWithId("1045")
.data(new MyTaskData(1001L))
.scheduledTo(Instant.now().plusSeconds(5)));
```

### More examples
Expand Down Expand Up @@ -225,6 +232,32 @@ How long the scheduler will wait before interrupting executor-service threads. I
consider if it is possible to instead regularly check `executionContext.getSchedulerState().isShuttingDown()`
in the ExecutionHandler and abort long-running task. Default `30min`.

:gear: `.enablePriority()`<br/>
It is possible to define a priority for executions which determines the order in which due executions
are fetched from the database. An execution with a higher value for priority will run before an
execution with a lower value (technically, the ordering will be `order by priority desc, execution_time asc`).
Consider using priorities in the range 0-32000 as the field is defined as a `SMALLINT`. If you need a larger value,
modify the schema. For now, this feature is **opt-in**, and column `priority` is only needed by users who choose to
enable priority via this config setting.

Set the priority per instance using the `TaskInstance.Builder`:

```java
scheduler.schedule(
MY_TASK
.instance("1")
.priority(100)
.scheduledTo(Instant.now()));
```

**Note:**
* When enabling this feature, make sure you have the new necessary indexes defined. If you
regularly have a state with large amounts of executions both due and future, it might be beneficial
to add an index on `(execution_time asc, priority desc)` (replacing the old `execution_time asc`).
* This feature is not recommended for users of **MySQL** and **MariaDB** below version 8.x,
as they do not support descending indexes.
* Value `null` for priority may be interpreted differently depending on database (low or high).

#### Polling strategy

If you are running >1000 executions/s you might want to use the `lock-and-fetch` polling-strategy for lower overhead
Expand Down Expand Up @@ -408,6 +441,7 @@ db-scheduler.table-name=scheduled_tasks
db-scheduler.immediate-execution-enabled=false
db-scheduler.scheduler-name=
db-scheduler.threads=10
db-scheduler.priority-enabled=false

# Ignored if a custom DbSchedulerStarter bean is defined
db-scheduler.delay-startup-until-context-ready=false
Expand Down Expand Up @@ -579,6 +613,14 @@ There are a number of users that are using db-scheduler for high throughput use-

See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release-notes.

**Upgrading to 15.x**
* Priority is a new opt-in feature. To be able to use it, column `priority` and index `priority_execution_time_idx`
must be added to the database schema. See table definitions for
[postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql),
[oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or
[mysql](./db-scheduler/src/test/resources/mysql_tables.sql).
At some point, this column will be made mandatory. This will be made clear in future release/upgrade-notes.

**Upgrading to 8.x**
* Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis
builder.enableImmediateExecution();
}

if (config.isPriorityEnabled()) {
builder.enablePriority();
}

// Use custom executor service if provided
customizer.executorService().ifPresent(builder::executorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public class DbSchedulerProperties {
/** Whether or not to log the {@link Throwable} that caused a task to fail. */
private boolean failureLoggerLogStackTrace = SchedulerBuilder.LOG_STACK_TRACE_ON_FAILURE;

/** Whether or executions are ordered by priority */
private boolean priorityEnabled = false;

public boolean isEnabled() {
return enabled;
}
Expand Down Expand Up @@ -243,4 +246,12 @@ public boolean isAlwaysPersistTimestampInUtc() {
public void setAlwaysPersistTimestampInUtc(boolean alwaysPersistTimestampInUTC) {
this.alwaysPersistTimestampInUtc = alwaysPersistTimestampInUTC;
}

public boolean isPriorityEnabled() {
return priorityEnabled;
}

public void setPriorityEnabled(boolean priorityEnabled) {
this.priorityEnabled = priorityEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ public void it_should_start_when_the_context_is_ready() {
});
}

@Test
public void it_should_enable_priority() {
ctxRunner
.withPropertyValues("db-scheduler.priority-enabled=true")
.run(
(AssertableApplicationContext ctx) -> {
assertThat(ctx).hasSingleBean(DataSource.class);
assertThat(ctx).hasSingleBean(Scheduler.class);

DbSchedulerProperties props = ctx.getBean(DbSchedulerProperties.class);
assertThat(props.isPriorityEnabled()).isTrue();
});
}

@Test
public void it_should_support_custom_starting_strategies() {
ctxRunner
Expand Down
1 change: 1 addition & 0 deletions db-scheduler-boot-starter/src/test/resources/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table if not exists scheduled_tasks (
consecutive_failures INT,
last_heartbeat TIMESTAMP WITH TIME ZONE,
version BIGINT,
priority INT,
PRIMARY KEY (task_name, task_instance)
);
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SchedulerBuilder {
protected PollingStrategyConfig pollingStrategyConfig = DEFAULT_POLLING_STRATEGY;
protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL;
protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE;
protected boolean enablePriority = false;
private boolean registerShutdownHook = false;
private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT;
private boolean alwaysPersistTimestampInUTC = false;
Expand Down Expand Up @@ -230,6 +231,11 @@ public SchedulerBuilder registerShutdownHook() {
return this;
}

public SchedulerBuilder enablePriority() {
this.enablePriority = true;
return this;
}

public Scheduler build() {
if (schedulerName == null) {
schedulerName = new SchedulerName.Hostname();
Expand All @@ -249,6 +255,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
clock);
final JdbcTaskRepository clientTaskRepository =
new JdbcTaskRepository(
Expand All @@ -259,6 +266,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
clock);

ExecutorService candidateExecutorService = executorService;
Expand Down Expand Up @@ -287,11 +295,12 @@ public Scheduler build() {
}

LOG.info(
"Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s enable-immediate-execution={}, table-name={}, name={}",
"Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s, enable-immediate-execution={}, enable-priority={}, table-name={}, name={}",
executorThreads,
waiter.getWaitDuration().getSeconds(),
heartbeatInterval.getSeconds(),
enableImmediateExecution,
enablePriority,
tableName,
schedulerName.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class Builder {
private Serializer serializer = Serializer.DEFAULT_JAVA_SERIALIZER;
private String tableName = JdbcTaskRepository.DEFAULT_TABLE_NAME;
private JdbcCustomization jdbcCustomization;
private boolean priority = false;

private Builder(DataSource dataSource, List<Task<?>> knownTasks) {
this.dataSource = dataSource;
Expand All @@ -237,6 +238,12 @@ public Builder tableName(String tableName) {
return this;
}

/** Will cause getScheduledExecutions(..) to return executions in priority order. */
public Builder enablePriority() {
this.priority = true;
return this;
}

public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) {
this.jdbcCustomization = jdbcCustomization;
return this;
Expand All @@ -259,6 +266,7 @@ public SchedulerClient build() {
taskResolver,
new SchedulerClientName(),
serializer,
priority,
clock);

return new StandardSchedulerClient(taskRepository, clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public boolean supportsSingleStatementLockAndFetch() {

@Override
public List<Execution> lockAndFetchSingleStatement(
JdbcTaskRepositoryContext ctx, Instant now, int limit) {
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit);
JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) {
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit, orderByPriority);
}

@Override
Expand All @@ -143,14 +143,15 @@ public boolean supportsGenericLockAndFetch() {

@Override
public String createGenericSelectForUpdateQuery(
String tableName, int limit, String requiredAndCondition) {
String tableName, int limit, String requiredAndCondition, boolean orderByPriority) {
return jdbcCustomization.createGenericSelectForUpdateQuery(
tableName, limit, requiredAndCondition);
tableName, limit, requiredAndCondition, orderByPriority);
}

@Override
public String createSelectDueQuery(String tableName, int limit, String andCondition) {
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition);
public String createSelectDueQuery(
String tableName, int limit, String andCondition, boolean orderByPriority) {
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition, orderByPriority);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.TimeZone;

public class DefaultJdbcCustomization implements JdbcCustomization {

public static final Calendar UTC = GregorianCalendar.getInstance(TimeZone.getTimeZone("UTC"));
private final boolean persistTimestampInUTC;

Expand Down Expand Up @@ -88,7 +89,7 @@ public boolean supportsSingleStatementLockAndFetch() {

@Override
public List<Execution> lockAndFetchSingleStatement(
JdbcTaskRepositoryContext ctx, Instant now, int limit) {
JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) {
throw new UnsupportedOperationException(
"lockAndFetch not supported for " + this.getClass().getName());
}
Expand All @@ -100,19 +101,20 @@ public boolean supportsGenericLockAndFetch() {

@Override
public String createGenericSelectForUpdateQuery(
String tableName, int limit, String requiredAndCondition) {
String tableName, int limit, String requiredAndCondition, boolean orderByPriority) {
throw new UnsupportedOperationException(
"method must be implemented when supporting generic lock-and-fetch");
}

@Override
public String createSelectDueQuery(String tableName, int limit, String andCondition) {
public String createSelectDueQuery(
String tableName, int limit, String andCondition, boolean orderByPriority) {
final String explicitLimit = supportsExplicitQueryLimitPart() ? getQueryLimitPart(limit) : "";
return "select * from "
+ tableName
+ " where picked = ? and execution_time <= ? "
+ andCondition
+ " order by execution_time asc "
+ Queries.ansiSqlOrderPart(orderByPriority)
+ explicitLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public interface JdbcCustomization {
boolean supportsSingleStatementLockAndFetch();

List<Execution> lockAndFetchSingleStatement(
JdbcTaskRepositoryContext ctx, Instant now, int limit);
JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority);

boolean supportsGenericLockAndFetch();

String createGenericSelectForUpdateQuery(
String tableName, int limit, String requiredAndCondition);
String tableName, int limit, String requiredAndCondition, boolean orderByPriority);

String createSelectDueQuery(String tableName, int limit, String andCondition);
String createSelectDueQuery(
String tableName, int limit, String andCondition, boolean orderByPriority);
}
Loading

0 comments on commit c3eace6

Please sign in to comment.