Skip to content

Commit

Permalink
Merge pull request #257 from kagkarlsson/dynamic_recurring_task
Browse files Browse the repository at this point in the history
Better api for recurring tasks with persistent schedule (having multiple instances)
  • Loading branch information
kagkarlsson authored Feb 23, 2022
2 parents fc9d473 + 865c5f3 commit e7aa9b9
Show file tree
Hide file tree
Showing 55 changed files with 1,244 additions and 264 deletions.
48 changes: 36 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Feel free to open a PR to add your organization to the list.

See also [runnable examples](https://github.com/kagkarlsson/db-scheduler/tree/master/examples/features/src/main/java/com/github/kagkarlsson/examples).

### Recurring task
### Recurring task (_static_)

Define a _recurring_ task and schedule the task's first execution on start-up using the `startTasks` builder-method. Upon completion, the task will be re-scheduled according to the defined schedule (see [pre-defined schedule-types](#schedules)).

Expand All @@ -111,8 +111,9 @@ final Scheduler scheduler = Scheduler
scheduler.start();
```

For recurring tasks with multiple instances and schedules, see example [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java).

### One-time tasks
### One-time task

An instance of a _one-time_ task has a single execution-time some time in the future (i.e. non-recurring). The instance-id must be unique within this task, and may be used to encode some metadata (e.g. an id). For more complex state, custom serializable java objects are supported (as used in the example).

Expand Down Expand Up @@ -149,7 +150,8 @@ scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.
* [TrackingProgressRecurringTaskMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java)
* [SpawningOtherTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java)
* [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java)
* [PersistentDynamicScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/PersistentDynamicScheduleMain.java)
* [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java)
* [StatefulRecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java)


## Configuration
Expand Down Expand Up @@ -339,7 +341,7 @@ db-scheduler.shutdown-max-wait=30m
## Interacting with scheduled executions using the SchedulerClient
It is possible to use the `Scheduler` to interact with the persisted future executions. For situations where a full
`Scheduler`-instance is not needed, a simpler [SchedulerClient](./src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java)
`Scheduler`-instance is not needed, a simpler [SchedulerClient](./db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java)
can be created using its builder:
```java
Expand All @@ -358,36 +360,58 @@ It will allow for operations such as:

A single database table is used to track future task-executions. When a task-execution is due, db-scheduler picks it and executes it. When the execution is done, the `Task` is consulted to see what should be done. For example, a `RecurringTask` is typically rescheduled in the future based on its `Schedule`.

Optimistic locking is used to guarantee that a one and only one scheduler-instance gets to pick a task-execution.
The scheduler uses optimistic locking or select-for-update (depending on polling strategy) to guarantee that one and only one scheduler-instance gets to pick and run a task-execution.


### Recurring tasks

The term _recurring task_ is used for tasks that should be run regularly, according to some schedule (see ``Tasks.recurring(..)``).
The term _recurring task_ is used for tasks that should be run regularly, according to some schedule.

When the execution of a recurring task has finished, a `Schedule` is consulted to determine what the next time for
execution should be, and a future task-execution is created for that time (i.e. it is _rescheduled_).
The time chosen will be the nearest time according to the `Schedule`, but still in the future.

To create the initial execution for a `RecurringTask`, the scheduler has a method `startTasks(...)` that takes a list of tasks
There are two types of recurring tasks, the regular _static_ recurring task, where the `Schedule` is defined statically in the code, and
the _dynamic_ recurring tasks, where the `Schedule` is defined at runtime and persisted in the database (still requiring only a single table).

#### Static recurring task

The _static_ recurring task is the most common one and suitable for regular background jobs since the scheduler automatically schedules
an instance of the task if it is not present and also updates the next execution-time if the `Schedule` is updated.

To create the initial execution for a static recurring task, the scheduler has a method `startTasks(...)` that takes a list of tasks
that should be "started" if they do not already have an existing execution. The initial execution-time is determined by the `Schedule`.
If the task already has a future execution (i.e. has been started at least once before), but an updated `Schedule` now indicates another execution-time,
the existing execution will be rescheduled to the new execution-time (with the exception of _non-deterministic_ schedules
such as `FixedDelay` where new execution-time is further into the future).

Create using `Tasks.recurring(..)`.

#### Dynamic recurring task

The _dynamic_ recurring task is a later addition to db-scheduler and was added to support use-cases where there is need for multiple instances
of the same type of task (i.e. same implementation) with different schedules. The `Schedule` is persisted in the `task_data` alongside any regular data.
Unlike the _static_ recurring task, the dynamic one will not automatically schedule instances of the task. It is up to the user to create instances and
update the schedule for existing ones if necessary (using the `SchedulerClient` interface).
See the example [RecurringTaskWithPersistentScheduleMain.java]((./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java)) for more details.

Create using `Tasks.recurringWithPersistentSchedule(..)`.

### One-time tasks

The term _one-time task_ is used for tasks that have a single execution-time (see `Tasks.oneTime(..)`).
The term _one-time task_ is used for tasks that have a single execution-time.
In addition to encode data into the `instanceId`of a task-execution, it is possible to store arbitrary binary data in a separate field for use at execution-time. By default, Java serialization is used to marshal/unmarshal the data.

Create using `Tasks.oneTime(..)`.

### Custom tasks

For tasks not fitting the above categories, it is possible to fully customize the behavior of the tasks using `Tasks.custom(..)`.

Use-cases might be:

* Recurring tasks that needs to update its data
* Tasks that should be either rescheduled or removed based on output from the actual execution
* ..


### Dead executions
Expand Down Expand Up @@ -455,7 +479,7 @@ Currently, polling strategy `lock-and-fetch` is implemented only for Postgres. C

* There are no guarantees that all instants in a schedule for a `RecurringTask` will be executed. The `Schedule` is consulted after the previous task-execution finishes, and the closest time in the future will be selected for next execution-time. A new type of task may be added in the future to provide such functionality.

* The methods on `SchedulerClient` (`schedule`, `cancel`, `reschedule`) will run using a new `Connection`from the `DataSource`provided. To have the action be a part of a transaction, it must be taken care of by the `DataSource`provided, for example using something like Spring's `TransactionAwareDataSourceProxy`.
* The methods on `SchedulerClient` (`schedule`, `cancel`, `reschedule`) will run using a new `Connection`from the `DataSource` provided. To have the action be a part of a transaction, it must be taken care of by the `DataSource` provided, for example using something like Spring's `TransactionAwareDataSourceProxy`.

* Currently, the precision of db-scheduler is depending on the `pollingInterval` (default 10s) which specifies how often to look in the table for due executions. If you know what you are doing, the scheduler may be instructed at runtime to "look early" via `scheduler.triggerCheckForDueExecutions()`. (See also `enableImmediateExecution()` on the `Builder`)

Expand All @@ -468,14 +492,14 @@ See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release
* Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not.

**Upgrading to 4.x**
* Add column `consecutive_failures` to the database schema. See table definitions for [postgresql](db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](https://github.com/kagkarlsson/db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](https://github.com/kagkarlsson/db-scheduler/src/test/resources/mysql_tables.sql). `null` is handled as 0, so no need to update existing records.
* Add column `consecutive_failures` to the database schema. See table definitions for [postgresql](./db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql). `null` is handled as 0, so no need to update existing records.

**Upgrading to 3.x**
* No schema changes
* Task creation are preferrably done through builders in `Tasks` class

**Upgrading to 2.x**
* Add column `task_data` to the database schema. See table definitions for [postgresql](db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](db-scheduler/src/test/resources/mysql_tables.sql).
* Add column `task_data` to the database schema. See table definitions for [postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql).

## FAQ

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.boot.autoconfigure;

import com.github.kagkarlsson.scheduler.stats.MicrometerStatsRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRep
this.dueExecutor = Executors.newSingleThreadExecutor(defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-execute-due-"));
this.housekeeperExecutor = Executors.newScheduledThreadPool(3, defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-housekeeper-"));
SchedulerClientEventListener earlyExecutionListener = (enableImmediateExecution ? new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP);
delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener);
delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener, clock);
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);

if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
Expand Down Expand Up @@ -125,7 +126,7 @@ protected void executeDue() {

protected void executeOnStartup() {
// Client used for OnStartup always commits
final StandardSchedulerClient onStartupClient = new StandardSchedulerClient(schedulerTaskRepository);
final StandardSchedulerClient onStartupClient = new StandardSchedulerClient(schedulerTaskRepository, clock);
onStartup.forEach(os -> {
try {
os.onStartup(onStartupClient, this.clock);
Expand Down Expand Up @@ -173,6 +174,11 @@ public SchedulerState getSchedulerState() {
return schedulerState;
}

@Override
public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
this.delegate.schedule(schedulableInstance);
}

@Override
public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
this.delegate.schedule(taskInstance, executionTime);
Expand All @@ -183,6 +189,11 @@ public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime)
this.delegate.reschedule(taskInstanceId, newExecutionTime);
}

@Override
public <T> void reschedule(SchedulableInstance<T> schedulableInstance) {
this.delegate.reschedule(schedulableInstance);
}

@Override
public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
this.delegate.reschedule(taskInstanceId, newExecutionTime, newData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public Scheduler build() {

final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks);
final JdbcCustomization jdbcCustomization = ofNullable(this.jdbcCustomization).orElseGet(() -> new AutodetectJdbcCustomization(dataSource));
final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, jdbcCustomization, tableName, taskResolver, schedulerName, serializer);
final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerName, serializer);
final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, jdbcCustomization, tableName, taskResolver, schedulerName, serializer, clock);
final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerName, serializer, clock);

ExecutorService candidateExecutorService = executorService;
if (candidateExecutorService == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
Expand Down Expand Up @@ -51,13 +52,14 @@ public interface SchedulerClient {
*/
<T> void schedule(TaskInstance<T> taskInstance, Instant executionTime);

<T> void schedule(SchedulableInstance<T> schedulableInstance);

/**
* Update an existing execution to a new execution-time. If the execution does not exist or if it is currently
* running, an exception is thrown.
*
* @param taskInstanceId
* @param newExecutionTime the new execution-time
* @return void
* @see java.time.Instant
* @see com.github.kagkarlsson.scheduler.task.TaskInstanceId
*/
Expand All @@ -70,12 +72,19 @@ public interface SchedulerClient {
* @param taskInstanceId
* @param newExecutionTime the new execution-time
* @param newData the new task-data
* @return void
* @see java.time.Instant
* @see com.github.kagkarlsson.scheduler.task.TaskInstanceId
*/
<T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData);

/**
* Update an existing execution with a new execution-time and new task-data. If the execution does not exist or if
* it is currently running, an exception is thrown.
*
* @param schedulableInstance the updated instance
*/
<T> void reschedule(SchedulableInstance<T> schedulableInstance);

/**
* Removes/Cancels an execution.
*
Expand Down Expand Up @@ -191,6 +200,7 @@ public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) {

public SchedulerClient build() {
TaskResolver taskResolver = new TaskResolver(StatsRegistry.NOOP, knownTasks);
final SystemClock clock = new SystemClock();

TaskRepository taskRepository = new JdbcTaskRepository(
dataSource,
Expand All @@ -199,40 +209,52 @@ public SchedulerClient build() {
tableName,
taskResolver,
new SchedulerClientName(),
serializer);
serializer,
clock);

return new StandardSchedulerClient(taskRepository);
return new StandardSchedulerClient(taskRepository, clock);
}
}

class StandardSchedulerClient implements SchedulerClient {

private static final Logger LOG = LoggerFactory.getLogger(StandardSchedulerClient.class);
protected final TaskRepository taskRepository;
private final Clock clock;
private SchedulerClientEventListener schedulerClientEventListener;

StandardSchedulerClient(TaskRepository taskRepository) {
this(taskRepository, SchedulerClientEventListener.NOOP);
StandardSchedulerClient(TaskRepository taskRepository, Clock clock) {
this(taskRepository, SchedulerClientEventListener.NOOP, clock);
}

StandardSchedulerClient(TaskRepository taskRepository, SchedulerClientEventListener schedulerClientEventListener) {
StandardSchedulerClient(TaskRepository taskRepository, SchedulerClientEventListener schedulerClientEventListener, Clock clock) {
this.taskRepository = taskRepository;
this.schedulerClientEventListener = schedulerClientEventListener;
this.clock = clock;
}

@Override
public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
boolean success = taskRepository.createIfNotExists(new Execution(executionTime, taskInstance));
boolean success = taskRepository.createIfNotExists(SchedulableInstance.of(taskInstance, executionTime));
if (success) {
notifyListeners(ClientEvent.EventType.SCHEDULE, taskInstance, executionTime);
}
}
@Override
public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
schedule(schedulableInstance.getTaskInstance(), schedulableInstance.getNextExecutionTime(clock.now()));
}

@Override
public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) {
reschedule(taskInstanceId, newExecutionTime, null);
}

@Override
public <T> void reschedule(SchedulableInstance<T> schedulableInstance) {
reschedule(schedulableInstance, schedulableInstance.getNextExecutionTime(clock.now()), schedulableInstance.getTaskInstance().getData());
}

@Override
public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
String taskName = taskInstanceId.getTaskName();
Expand Down
Loading

0 comments on commit e7aa9b9

Please sign in to comment.