diff --git a/README.md b/README.md index 1a7693f1..1f196fb8 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Feel free to open a PR to add your organization to the list. See also [runnable examples](https://github.com/kagkarlsson/db-scheduler/tree/master/examples/features/src/main/java/com/github/kagkarlsson/examples). -### Recurring task +### Recurring task (_static_) Define a _recurring_ task and schedule the task's first execution on start-up using the `startTasks` builder-method. Upon completion, the task will be re-scheduled according to the defined schedule (see [pre-defined schedule-types](#schedules)). @@ -111,8 +111,9 @@ final Scheduler scheduler = Scheduler scheduler.start(); ``` +For recurring tasks with multiple instances and schedules, see example [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java). -### One-time tasks +### One-time task An instance of a _one-time_ task has a single execution-time some time in the future (i.e. non-recurring). The instance-id must be unique within this task, and may be used to encode some metadata (e.g. an id). For more complex state, custom serializable java objects are supported (as used in the example). @@ -149,7 +150,8 @@ scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant. * [TrackingProgressRecurringTaskMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java) * [SpawningOtherTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java) * [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java) -* [PersistentDynamicScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/PersistentDynamicScheduleMain.java) +* [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java) +* [StatefulRecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java) ## Configuration @@ -339,7 +341,7 @@ db-scheduler.shutdown-max-wait=30m ## Interacting with scheduled executions using the SchedulerClient It is possible to use the `Scheduler` to interact with the persisted future executions. For situations where a full -`Scheduler`-instance is not needed, a simpler [SchedulerClient](./src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java) +`Scheduler`-instance is not needed, a simpler [SchedulerClient](./db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java) can be created using its builder: ```java @@ -358,36 +360,58 @@ It will allow for operations such as: A single database table is used to track future task-executions. When a task-execution is due, db-scheduler picks it and executes it. When the execution is done, the `Task` is consulted to see what should be done. For example, a `RecurringTask` is typically rescheduled in the future based on its `Schedule`. -Optimistic locking is used to guarantee that a one and only one scheduler-instance gets to pick a task-execution. +The scheduler uses optimistic locking or select-for-update (depending on polling strategy) to guarantee that one and only one scheduler-instance gets to pick and run a task-execution. ### Recurring tasks -The term _recurring task_ is used for tasks that should be run regularly, according to some schedule (see ``Tasks.recurring(..)``). +The term _recurring task_ is used for tasks that should be run regularly, according to some schedule. When the execution of a recurring task has finished, a `Schedule` is consulted to determine what the next time for execution should be, and a future task-execution is created for that time (i.e. it is _rescheduled_). The time chosen will be the nearest time according to the `Schedule`, but still in the future. -To create the initial execution for a `RecurringTask`, the scheduler has a method `startTasks(...)` that takes a list of tasks +There are two types of recurring tasks, the regular _static_ recurring task, where the `Schedule` is defined statically in the code, and +the _dynamic_ recurring tasks, where the `Schedule` is defined at runtime and persisted in the database (still requiring only a single table). + +#### Static recurring task + +The _static_ recurring task is the most common one and suitable for regular background jobs since the scheduler automatically schedules +an instance of the task if it is not present and also updates the next execution-time if the `Schedule` is updated. + +To create the initial execution for a static recurring task, the scheduler has a method `startTasks(...)` that takes a list of tasks that should be "started" if they do not already have an existing execution. The initial execution-time is determined by the `Schedule`. If the task already has a future execution (i.e. has been started at least once before), but an updated `Schedule` now indicates another execution-time, the existing execution will be rescheduled to the new execution-time (with the exception of _non-deterministic_ schedules such as `FixedDelay` where new execution-time is further into the future). +Create using `Tasks.recurring(..)`. + +#### Dynamic recurring task + +The _dynamic_ recurring task is a later addition to db-scheduler and was added to support use-cases where there is need for multiple instances +of the same type of task (i.e. same implementation) with different schedules. The `Schedule` is persisted in the `task_data` alongside any regular data. +Unlike the _static_ recurring task, the dynamic one will not automatically schedule instances of the task. It is up to the user to create instances and +update the schedule for existing ones if necessary (using the `SchedulerClient` interface). +See the example [RecurringTaskWithPersistentScheduleMain.java]((./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java)) for more details. + +Create using `Tasks.recurringWithPersistentSchedule(..)`. + ### One-time tasks -The term _one-time task_ is used for tasks that have a single execution-time (see `Tasks.oneTime(..)`). +The term _one-time task_ is used for tasks that have a single execution-time. In addition to encode data into the `instanceId`of a task-execution, it is possible to store arbitrary binary data in a separate field for use at execution-time. By default, Java serialization is used to marshal/unmarshal the data. +Create using `Tasks.oneTime(..)`. + ### Custom tasks For tasks not fitting the above categories, it is possible to fully customize the behavior of the tasks using `Tasks.custom(..)`. Use-cases might be: -* Recurring tasks that needs to update its data * Tasks that should be either rescheduled or removed based on output from the actual execution +* .. ### Dead executions @@ -455,7 +479,7 @@ Currently, polling strategy `lock-and-fetch` is implemented only for Postgres. C * There are no guarantees that all instants in a schedule for a `RecurringTask` will be executed. The `Schedule` is consulted after the previous task-execution finishes, and the closest time in the future will be selected for next execution-time. A new type of task may be added in the future to provide such functionality. -* The methods on `SchedulerClient` (`schedule`, `cancel`, `reschedule`) will run using a new `Connection`from the `DataSource`provided. To have the action be a part of a transaction, it must be taken care of by the `DataSource`provided, for example using something like Spring's `TransactionAwareDataSourceProxy`. +* The methods on `SchedulerClient` (`schedule`, `cancel`, `reschedule`) will run using a new `Connection`from the `DataSource` provided. To have the action be a part of a transaction, it must be taken care of by the `DataSource` provided, for example using something like Spring's `TransactionAwareDataSourceProxy`. * Currently, the precision of db-scheduler is depending on the `pollingInterval` (default 10s) which specifies how often to look in the table for due executions. If you know what you are doing, the scheduler may be instructed at runtime to "look early" via `scheduler.triggerCheckForDueExecutions()`. (See also `enableImmediateExecution()` on the `Builder`) @@ -468,14 +492,14 @@ See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release * Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not. **Upgrading to 4.x** -* Add column `consecutive_failures` to the database schema. See table definitions for [postgresql](db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](https://github.com/kagkarlsson/db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](https://github.com/kagkarlsson/db-scheduler/src/test/resources/mysql_tables.sql). `null` is handled as 0, so no need to update existing records. +* Add column `consecutive_failures` to the database schema. See table definitions for [postgresql](./db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql). `null` is handled as 0, so no need to update existing records. **Upgrading to 3.x** * No schema changes * Task creation are preferrably done through builders in `Tasks` class **Upgrading to 2.x** -* Add column `task_data` to the database schema. See table definitions for [postgresql](db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](db-scheduler/src/test/resources/mysql_tables.sql). +* Add column `task_data` to the database schema. See table definitions for [postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql). ## FAQ diff --git a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerMetricsAutoConfiguration.java b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerMetricsAutoConfiguration.java index 88028dd8..a2699e76 100644 --- a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerMetricsAutoConfiguration.java +++ b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerMetricsAutoConfiguration.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.scheduler.boot.autoconfigure; import com.github.kagkarlsson.scheduler.stats.MicrometerStatsRegistry; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 5b6af0a3..bd56c8d8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -23,6 +23,7 @@ import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.ExecutionOperations; import com.github.kagkarlsson.scheduler.task.OnStartup; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; @@ -88,7 +89,7 @@ protected Scheduler(Clock clock, TaskRepository schedulerTaskRepository, TaskRep this.dueExecutor = Executors.newSingleThreadExecutor(defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-execute-due-")); this.housekeeperExecutor = Executors.newScheduledThreadPool(3, defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-housekeeper-")); SchedulerClientEventListener earlyExecutionListener = (enableImmediateExecution ? new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter) : SchedulerClientEventListener.NOOP); - delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener); + delegate = new StandardSchedulerClient(clientTaskRepository, earlyExecutionListener, clock); this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace); if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) { @@ -125,7 +126,7 @@ protected void executeDue() { protected void executeOnStartup() { // Client used for OnStartup always commits - final StandardSchedulerClient onStartupClient = new StandardSchedulerClient(schedulerTaskRepository); + final StandardSchedulerClient onStartupClient = new StandardSchedulerClient(schedulerTaskRepository, clock); onStartup.forEach(os -> { try { os.onStartup(onStartupClient, this.clock); @@ -173,6 +174,11 @@ public SchedulerState getSchedulerState() { return schedulerState; } + @Override + public void schedule(SchedulableInstance schedulableInstance) { + this.delegate.schedule(schedulableInstance); + } + @Override public void schedule(TaskInstance taskInstance, Instant executionTime) { this.delegate.schedule(taskInstance, executionTime); @@ -183,6 +189,11 @@ public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) this.delegate.reschedule(taskInstanceId, newExecutionTime); } + @Override + public void reschedule(SchedulableInstance schedulableInstance) { + this.delegate.reschedule(schedulableInstance); + } + @Override public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) { this.delegate.reschedule(taskInstanceId, newExecutionTime, newData); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index 22656ded..5e999f94 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -190,8 +190,8 @@ public Scheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); final JdbcCustomization jdbcCustomization = ofNullable(this.jdbcCustomization).orElseGet(() -> new AutodetectJdbcCustomization(dataSource)); - final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, jdbcCustomization, tableName, taskResolver, schedulerName, serializer); - final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerName, serializer); + final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, jdbcCustomization, tableName, taskResolver, schedulerName, serializer, clock); + final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerName, serializer, clock); ExecutorService candidateExecutorService = executorService; if (candidateExecutorService == null) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java index 7068aa2a..6c6d9ae2 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java @@ -22,6 +22,7 @@ import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstanceId; @@ -51,13 +52,14 @@ public interface SchedulerClient { */ void schedule(TaskInstance taskInstance, Instant executionTime); + void schedule(SchedulableInstance schedulableInstance); + /** * Update an existing execution to a new execution-time. If the execution does not exist or if it is currently * running, an exception is thrown. * * @param taskInstanceId * @param newExecutionTime the new execution-time - * @return void * @see java.time.Instant * @see com.github.kagkarlsson.scheduler.task.TaskInstanceId */ @@ -70,12 +72,19 @@ public interface SchedulerClient { * @param taskInstanceId * @param newExecutionTime the new execution-time * @param newData the new task-data - * @return void * @see java.time.Instant * @see com.github.kagkarlsson.scheduler.task.TaskInstanceId */ void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData); + /** + * Update an existing execution with a new execution-time and new task-data. If the execution does not exist or if + * it is currently running, an exception is thrown. + * + * @param schedulableInstance the updated instance + */ + void reschedule(SchedulableInstance schedulableInstance); + /** * Removes/Cancels an execution. * @@ -191,6 +200,7 @@ public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) { public SchedulerClient build() { TaskResolver taskResolver = new TaskResolver(StatsRegistry.NOOP, knownTasks); + final SystemClock clock = new SystemClock(); TaskRepository taskRepository = new JdbcTaskRepository( dataSource, @@ -199,9 +209,10 @@ public SchedulerClient build() { tableName, taskResolver, new SchedulerClientName(), - serializer); + serializer, + clock); - return new StandardSchedulerClient(taskRepository); + return new StandardSchedulerClient(taskRepository, clock); } } @@ -209,30 +220,41 @@ class StandardSchedulerClient implements SchedulerClient { private static final Logger LOG = LoggerFactory.getLogger(StandardSchedulerClient.class); protected final TaskRepository taskRepository; + private final Clock clock; private SchedulerClientEventListener schedulerClientEventListener; - StandardSchedulerClient(TaskRepository taskRepository) { - this(taskRepository, SchedulerClientEventListener.NOOP); + StandardSchedulerClient(TaskRepository taskRepository, Clock clock) { + this(taskRepository, SchedulerClientEventListener.NOOP, clock); } - StandardSchedulerClient(TaskRepository taskRepository, SchedulerClientEventListener schedulerClientEventListener) { + StandardSchedulerClient(TaskRepository taskRepository, SchedulerClientEventListener schedulerClientEventListener, Clock clock) { this.taskRepository = taskRepository; this.schedulerClientEventListener = schedulerClientEventListener; + this.clock = clock; } @Override public void schedule(TaskInstance taskInstance, Instant executionTime) { - boolean success = taskRepository.createIfNotExists(new Execution(executionTime, taskInstance)); + boolean success = taskRepository.createIfNotExists(SchedulableInstance.of(taskInstance, executionTime)); if (success) { notifyListeners(ClientEvent.EventType.SCHEDULE, taskInstance, executionTime); } } + @Override + public void schedule(SchedulableInstance schedulableInstance) { + schedule(schedulableInstance.getTaskInstance(), schedulableInstance.getNextExecutionTime(clock.now())); + } @Override public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) { reschedule(taskInstanceId, newExecutionTime, null); } + @Override + public void reschedule(SchedulableInstance schedulableInstance) { + reschedule(schedulableInstance, schedulableInstance.getNextExecutionTime(clock.now()), schedulableInstance.getTaskInstance().getData()); + } + @Override public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) { String taskName = taskInstanceId.getTaskName(); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java index cff87174..a5560c32 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java @@ -16,6 +16,7 @@ package com.github.kagkarlsson.scheduler; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import java.time.Duration; import java.time.Instant; @@ -25,7 +26,7 @@ public interface TaskRepository { - boolean createIfNotExists(Execution execution); + boolean createIfNotExists(SchedulableInstance execution); List getDue(Instant now, int limit); void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer consumer); void getScheduledExecutions(ScheduledExecutionsFilter filter, String taskName, Consumer consumer); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index b3291458..b3d55a3b 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -18,6 +18,7 @@ import com.github.kagkarlsson.jdbc.JdbcRunner; import com.github.kagkarlsson.jdbc.ResultSetMapper; import com.github.kagkarlsson.jdbc.SQLRuntimeException; +import com.github.kagkarlsson.scheduler.Clock; import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter; import com.github.kagkarlsson.scheduler.SchedulerName; import com.github.kagkarlsson.scheduler.Serializer; @@ -27,6 +28,7 @@ import com.github.kagkarlsson.scheduler.exceptions.ExecutionException; import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import org.slf4j.Logger; @@ -61,33 +63,36 @@ public class JdbcTaskRepository implements TaskRepository { private final Serializer serializer; private final String tableName; private final JdbcCustomization jdbcCustomization; + private final Clock clock; - public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName) { - this(dataSource, commitWhenAutocommitDisabled, new AutodetectJdbcCustomization(dataSource), tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER); + public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Clock clock) { + this(dataSource, commitWhenAutocommitDisabled, new AutodetectJdbcCustomization(dataSource), tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, clock); } - public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName) { - this(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER); + public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Clock clock) { + this(dataSource, commitWhenAutocommitDisabled, jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, clock); } - public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer) { - this(jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, serializer, new JdbcRunner(dataSource, commitWhenAutocommitDisabled)); + public JdbcTaskRepository(DataSource dataSource, boolean commitWhenAutocommitDisabled, JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, Clock clock) { + this(jdbcCustomization, tableName, taskResolver, schedulerSchedulerName, serializer, new JdbcRunner(dataSource, commitWhenAutocommitDisabled), clock); } - protected JdbcTaskRepository(JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, JdbcRunner jdbcRunner) { + protected JdbcTaskRepository(JdbcCustomization jdbcCustomization, String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, JdbcRunner jdbcRunner, Clock clock) { this.tableName = tableName; this.taskResolver = taskResolver; this.schedulerSchedulerName = schedulerSchedulerName; this.jdbcRunner = jdbcRunner; this.serializer = serializer; this.jdbcCustomization = jdbcCustomization; + this.clock = clock; } @Override @SuppressWarnings({"unchecked"}) - public boolean createIfNotExists(Execution execution) { + public boolean createIfNotExists(SchedulableInstance instance) { + final TaskInstance taskInstance = instance.getTaskInstance(); try { - Optional existingExecution = getExecution(execution.taskInstance); + Optional existingExecution = getExecution(taskInstance); if (existingExecution.isPresent()) { LOG.debug("Execution not created, it already exists. Due: {}", existingExecution.get().executionTime); return false; @@ -96,10 +101,10 @@ public boolean createIfNotExists(Execution execution) { jdbcRunner.execute( "insert into " + tableName + "(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)", (PreparedStatement p) -> { - p.setString(1, execution.taskInstance.getTaskName()); - p.setString(2, execution.taskInstance.getId()); - p.setObject(3, serializer.serialize(execution.taskInstance.getData())); - jdbcCustomization.setInstant(p, 4, execution.executionTime); + p.setString(1, taskInstance.getTaskName()); + p.setString(2, taskInstance.getId()); + p.setObject(3, serializer.serialize(taskInstance.getData())); + jdbcCustomization.setInstant(p, 4, instance.getNextExecutionTime(clock.now())); p.setBoolean(5, false); p.setLong(6, 1L); }); @@ -107,9 +112,9 @@ public boolean createIfNotExists(Execution execution) { } catch (SQLRuntimeException e) { LOG.debug("Exception when inserting execution. Assuming it to be a constraint violation.", e); - Optional existingExecution = getExecution(execution.taskInstance); + Optional existingExecution = getExecution(taskInstance); if (!existingExecution.isPresent()) { - throw new ExecutionException("Failed to add new execution.", execution, e); + throw new TaskInstanceException("Failed to add new execution.", instance.getTaskName(), instance.getId(), e); } LOG.debug("Execution not created, another thread created it."); return false; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/CompletionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/CompletionHandler.java index 511616bb..a92bba59 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/CompletionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/CompletionHandler.java @@ -15,6 +15,7 @@ */ package com.github.kagkarlsson.scheduler.task; +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; import com.github.kagkarlsson.scheduler.task.schedule.Schedule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +36,6 @@ public void complete(ExecutionComplete executionComplete, ExecutionOperations } class OnCompleteReschedule implements CompletionHandler { - private static final Logger LOG = LoggerFactory.getLogger(OnCompleteReschedule.class); private final Schedule schedule; private final boolean setNewData; @@ -70,6 +70,4 @@ public String toString() { } - - } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/FailureHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/FailureHandler.java index fc544fca..0d49ca60 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/FailureHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/FailureHandler.java @@ -15,6 +15,7 @@ */ package com.github.kagkarlsson.scheduler.task; +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; import com.github.kagkarlsson.scheduler.task.schedule.Schedule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +79,6 @@ public void onFailure(final ExecutionComplete executionComplete, final Execution } class OnFailureRetryLater implements FailureHandler { - private static final Logger LOG = LoggerFactory.getLogger(CompletionHandler.OnCompleteReschedule.class); private final Duration sleepDuration; @@ -95,7 +95,6 @@ public void onFailure(ExecutionComplete executionComplete, ExecutionOperations implements FailureHandler { - private static final Logger LOG = LoggerFactory.getLogger(CompletionHandler.OnCompleteReschedule.class); private final Schedule schedule; @@ -110,4 +109,17 @@ public void onFailure(ExecutionComplete executionComplete, ExecutionOperations implements FailureHandler { + private static final Logger LOG = LoggerFactory.getLogger(CompletionHandler.OnCompleteReschedule.class); + + @Override + public void onFailure(ExecutionComplete executionComplete, ExecutionOperations executionOperations) { + final T data = (T)executionComplete.getExecution().taskInstance.getData(); + final Instant nextExecutionTime = data.getSchedule().getNextExecutionTime(executionComplete); + LOG.debug("Execution failed. Rescheduling task {} to {}", executionComplete.getExecution().taskInstance, nextExecutionTime); + executionOperations.reschedule(executionComplete, nextExecutionTime); + } + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/NextExecutionTime.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/NextExecutionTime.java new file mode 100644 index 00000000..831c9cba --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/NextExecutionTime.java @@ -0,0 +1,28 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import java.time.Instant; +import java.util.function.Function; + +@FunctionalInterface +public interface NextExecutionTime { + Instant getNextExecutionTime(Instant currentTime); + + static NextExecutionTime from(Function defaultExecutionTime) { + return defaultExecutionTime::apply; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java new file mode 100644 index 00000000..bd27d26d --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import java.time.Instant; + +public interface SchedulableInstance extends TaskInstanceId { + + TaskInstance getTaskInstance(); + Instant getNextExecutionTime(Instant currentTime); + + default String getTaskName() { + return getTaskInstance().getTaskName(); + } + + default String getId() { + return getTaskInstance().getId(); + } + + static SchedulableInstance of(TaskInstance taskInstance, Instant executionTime) { + return new SchedulableTaskInstance(taskInstance, executionTime); + } + + static SchedulableInstance of(TaskInstance taskInstance, NextExecutionTime executionTime) { + return new SchedulableTaskInstance(taskInstance, executionTime); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableTaskInstance.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableTaskInstance.java new file mode 100644 index 00000000..d402340e --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableTaskInstance.java @@ -0,0 +1,44 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +import java.time.Instant; + +public class SchedulableTaskInstance implements SchedulableInstance { + private final TaskInstance taskInstance; + NextExecutionTime executionTime; + + public SchedulableTaskInstance(TaskInstance taskInstance, NextExecutionTime executionTime) { + this.taskInstance = taskInstance; + this.executionTime = executionTime; + } + + public SchedulableTaskInstance(TaskInstance taskInstance, Instant executionTime) { + this.taskInstance = taskInstance; + this.executionTime = (_ignored) -> executionTime; + } + + @Override + public TaskInstance getTaskInstance() { + return taskInstance; + } + + @Override + public Instant getNextExecutionTime(Instant currentTime) { + return executionTime.getNextExecutionTime(currentTime); + } + +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java index f05b5192..acbe9182 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java @@ -15,15 +15,21 @@ */ package com.github.kagkarlsson.scheduler.task; +import java.time.Instant; + public interface Task extends ExecutionHandler { String getName(); Class getDataClass(); TaskInstance instance(String id); - TaskInstance instance(String id, T data); + default TaskInstanceId instanceId(String id) { return TaskInstanceId.of(getName(), id); }; + + SchedulableInstance schedulableInstance(String id); + SchedulableInstance schedulableInstance(String id, T data); + FailureHandler getFailureHandler(); DeadExecutionHandler getDeadExecutionHandler(); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java index 3c5455ab..36a82870 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java @@ -20,14 +20,34 @@ import com.github.kagkarlsson.scheduler.task.AbstractTask; import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler; import com.github.kagkarlsson.scheduler.task.FailureHandler; +import com.github.kagkarlsson.scheduler.task.NextExecutionTime; import com.github.kagkarlsson.scheduler.task.OnStartup; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; +import com.github.kagkarlsson.scheduler.task.TaskInstance; + +import java.time.Instant; +import java.util.function.Function; public abstract class CustomTask extends AbstractTask implements OnStartup { private ScheduleOnStartup scheduleOnStartup; + private final NextExecutionTime defaultExecutionTime; - public CustomTask(String name, Class dataClass, ScheduleOnStartup scheduleOnStartup, FailureHandler failureHandler, DeadExecutionHandler deadExecutionHandler) { + public CustomTask(String name, Class dataClass, ScheduleOnStartup scheduleOnStartup, Function defaultExecutionTime, + FailureHandler failureHandler, DeadExecutionHandler deadExecutionHandler) { super(name, dataClass, failureHandler, deadExecutionHandler); this.scheduleOnStartup = scheduleOnStartup; + this.defaultExecutionTime = NextExecutionTime.from(defaultExecutionTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), defaultExecutionTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), defaultExecutionTime); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java index 72327249..4b58c040 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java @@ -21,6 +21,7 @@ import com.github.kagkarlsson.scheduler.task.FailureHandler.OnFailureRetryLater; import java.time.Duration; +import java.time.Instant; public abstract class OneTimeTask extends AbstractTask { @@ -32,6 +33,16 @@ public OneTimeTask(String name, Class dataClass, FailureHandler failureHan super(name, dataClass, failureHandler, deadExecutionHandler); } + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + } + @Override public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { executeOnce(taskInstance, executionContext); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/PlainScheduleAndData.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/PlainScheduleAndData.java new file mode 100644 index 00000000..f82b9aa8 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/PlainScheduleAndData.java @@ -0,0 +1,67 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task.helper; + +import com.github.kagkarlsson.scheduler.task.schedule.Schedule; + +import java.util.Objects; + +public class PlainScheduleAndData implements ScheduleAndData { + private final Schedule schedule; + private final Object data; + + public PlainScheduleAndData(Schedule schedule) { + this.schedule = schedule; + this.data = null; + } + + public PlainScheduleAndData(Schedule schedule, Object data) { + this.schedule = schedule; + this.data = data; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public Object getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + com.github.kagkarlsson.scheduler.task.helper.PlainScheduleAndData that = (com.github.kagkarlsson.scheduler.task.helper.PlainScheduleAndData) o; + return Objects.equals(schedule, that.schedule) && + Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(schedule, data); + } + + @Override + public String toString() { + return this.getClass().getName() + "{" + + "schedule=" + schedule + + ", data=" + data + + '}'; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java index 42faf483..2570a165 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java @@ -22,10 +22,14 @@ import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler.ReviveDeadExecution; import com.github.kagkarlsson.scheduler.task.schedule.Schedule; +import java.time.Instant; +import java.util.Optional; + public abstract class RecurringTask extends AbstractTask implements OnStartup { public static final String INSTANCE = "recurring"; private final OnCompleteReschedule onComplete; + private final Schedule schedule; private ScheduleOnStartup scheduleOnStartup; public RecurringTask(String name, Schedule schedule, Class dataClass) { @@ -39,9 +43,20 @@ public RecurringTask(String name, Schedule schedule, Class dataClass, T initi public RecurringTask(String name, Schedule schedule, Class dataClass, ScheduleRecurringOnStartup scheduleOnStartup, FailureHandler failureHandler, DeadExecutionHandler deadExecutionHandler) { super(name, dataClass, failureHandler, deadExecutionHandler); onComplete = new OnCompleteReschedule<>(schedule); + this.schedule = schedule; this.scheduleOnStartup = scheduleOnStartup; } + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), schedule::getInitialExecutionTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), schedule::getInitialExecutionTime); + } + @Override public void onStartup(SchedulerClient scheduler, Clock clock) { if (scheduleOnStartup != null) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java new file mode 100644 index 00000000..9e2305c2 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java @@ -0,0 +1,56 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task.helper; + +import com.github.kagkarlsson.scheduler.task.AbstractTask; +import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler; +import com.github.kagkarlsson.scheduler.task.FailureHandler; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; +import com.github.kagkarlsson.scheduler.task.TaskInstance; + +import java.time.Instant; + +public abstract class RecurringTaskWithPersistentSchedule extends AbstractTask { + + public RecurringTaskWithPersistentSchedule(String name, Class dataClass) { + super( + name, + dataClass, + new FailureHandler.OnFailureRescheduleUsingTaskDataSchedule<>(), + new DeadExecutionHandler.ReviveDeadExecution<>()); + } + + @Override + public TaskInstance instance(String id) { + throw new UnsupportedOperationException("Cannot instatiate a RecurringTaskWithPersistentSchedule without 'data' since that holds the schedule."); + } + + @Override + public SchedulableInstance schedulableInstance(String id) { + throw new UnsupportedOperationException("Cannot instatiate a RecurringTaskWithPersistentSchedule without 'data' since that holds the schedule."); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), data.getSchedule()::getInitialExecutionTime); + } + + @Override + public String toString() { + return RecurringTaskWithPersistentSchedule.class.getName() + " name=" + getName(); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ScheduleAndData.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ScheduleAndData.java new file mode 100644 index 00000000..d32e56bf --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ScheduleAndData.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task.helper; + +import com.github.kagkarlsson.scheduler.task.schedule.Schedule; + +import java.io.Serializable; +import java.util.Objects; + +public interface ScheduleAndData extends Serializable { + Schedule getSchedule(); + Object getData(); + + default PlainScheduleAndData of(Schedule schedule, Object data) { + return new PlainScheduleAndData(schedule, data); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java index a58cf05b..e8a0ba4d 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java @@ -33,6 +33,10 @@ public static RecurringTaskBuilder recurring(String name, Schedule schedu return new RecurringTaskBuilder<>(name, schedule, dataClass); } + public static RecurringTaskWithPersistentScheduleBuilder recurringWithPersistentSchedule(String name, Class dataClass) { + return new RecurringTaskWithPersistentScheduleBuilder(name, dataClass); + } + public static OneTimeTaskBuilder oneTime(String name) { return new OneTimeTaskBuilder<>(name, Void.class); } @@ -115,6 +119,52 @@ public void executeRecurringly(TaskInstance taskInstance, ExecutionContext ex } } + public static class RecurringTaskWithPersistentScheduleBuilder { + private final String name; + private final Class dataClass; + + public RecurringTaskWithPersistentScheduleBuilder(String name, Class dataClass) { + this.name = name; + this.dataClass = dataClass; + } + + public RecurringTaskWithPersistentSchedule execute(VoidExecutionHandler executionHandler) { + return new RecurringTaskWithPersistentSchedule(name, dataClass) { + @Override + public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + executionHandler.execute(taskInstance, executionContext); + + return (executionComplete, executionOperations) -> { + executionOperations.reschedule( + executionComplete, + taskInstance.getData().getSchedule().getNextExecutionTime(executionComplete) + ); + }; + + } + }; + } + + public RecurringTaskWithPersistentSchedule executeStateful(StateReturningExecutionHandler executionHandler) { + return new RecurringTaskWithPersistentSchedule(name, dataClass) { + + @Override + public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + final T nextData = executionHandler.execute(taskInstance, executionContext); + + return (executionComplete, executionOperations) -> { + executionOperations.reschedule( + executionComplete, + nextData.getSchedule().getNextExecutionTime(executionComplete), + nextData + ); + }; + } + }; + } + } + + public static class OneTimeTaskBuilder { private final String name; private Class dataClass; @@ -164,6 +214,7 @@ public static class TaskBuilder { private FailureHandler onFailure; private DeadExecutionHandler onDeadExecution; private ScheduleOnStartup onStartup; + private Function defaultExecutionTime = Function.identity(); public TaskBuilder(String name, Class dataClass) { this.name = name; @@ -205,8 +256,13 @@ public TaskBuilder scheduleOnStartup(String instance, T initialData, Schedule return this; } + public TaskBuilder defaultExecutionTime(Function defaultExecutionTime) { + this.defaultExecutionTime = defaultExecutionTime; + return this; + } + public CustomTask execute(ExecutionHandler executionHandler) { - return new CustomTask(name, dataClass, onStartup, onFailure, onDeadExecution) { + return new CustomTask(name, dataClass, onStartup, defaultExecutionTime, onFailure, onDeadExecution) { @Override public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { return executionHandler.execute(taskInstance, executionContext); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Daily.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Daily.java index 495c34a1..eb3f1da8 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Daily.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Daily.java @@ -17,6 +17,7 @@ import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import java.io.Serializable; import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; @@ -27,7 +28,7 @@ import java.util.Objects; import java.util.stream.Collectors; -public class Daily implements Schedule { +public class Daily implements Schedule, Serializable { private final List times; private final ZoneId zone; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/FixedDelay.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/FixedDelay.java index 54401ff5..e4c97f1e 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/FixedDelay.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/FixedDelay.java @@ -17,11 +17,12 @@ import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import java.io.Serializable; import java.time.Duration; import java.time.Instant; import java.util.Objects; -public class FixedDelay implements Schedule { +public class FixedDelay implements Schedule, Serializable { private final Duration duration; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/PersistentCronSchedule.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/PersistentCronSchedule.java new file mode 100644 index 00000000..5415c27e --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/PersistentCronSchedule.java @@ -0,0 +1,59 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task.schedule; + +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; + +import java.time.ZoneId; + +public class PersistentCronSchedule implements ScheduleAndData { + private final String cronPattern; + private final String zoneId; + private final Object data; + + public PersistentCronSchedule(String cronPattern) { + this(cronPattern, ZoneId.systemDefault(), null); + } + + public PersistentCronSchedule(String cronPattern, Object data) { + this(cronPattern, ZoneId.systemDefault(), data); + } + + public PersistentCronSchedule(String cronPattern, ZoneId zoneId) { + this(cronPattern, zoneId, null); + } + + public PersistentCronSchedule(String cronPattern, ZoneId zoneId, Object data) { + this.cronPattern = cronPattern; + this.zoneId = zoneId.getId(); + this.data = data; + } + + @Override + public String toString() { + return "PersistentCronSchedule pattern=" + cronPattern; + } + + @Override + public Schedule getSchedule() { + return new CronSchedule(cronPattern, ZoneId.of(zoneId)); + } + + @Override + public Object getData() { + return data; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Schedules.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Schedules.java index 92ecca6e..81d12d75 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Schedules.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/schedule/Schedules.java @@ -23,23 +23,23 @@ public class Schedules { private static final Parser SCHEDULE_PARSER = CompositeParser.of(new FixedDelayParser(), new DailyParser()); - public static Schedule daily(LocalTime... times) { + public static Daily daily(LocalTime... times) { return new Daily(times); } - public static Schedule daily(ZoneId zone, LocalTime... times) { + public static Daily daily(ZoneId zone, LocalTime... times) { return new Daily(zone, times); } - public static Schedule fixedDelay(Duration delay) { + public static FixedDelay fixedDelay(Duration delay) { return FixedDelay.of(delay); } - public static Schedule cron(String cronPattern) { + public static CronSchedule cron(String cronPattern) { return new CronSchedule(cronPattern); } - public static Schedule cron(String cronPattern, ZoneId zoneId) { + public static CronSchedule cron(String cronPattern, ZoneId zoneId) { return new CronSchedule(cronPattern, zoneId); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 6bfb9404..1abad063 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -73,8 +73,8 @@ public ManualSchedulerBuilder pollingStrategy(PollingStrategyConfig pollingStrat public ManualScheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); - final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer); - final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer); + final JdbcTaskRepository schedulerTaskRepository = new JdbcTaskRepository(dataSource, true, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); + final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled, new DefaultJdbcCustomization(), tableName, taskResolver, new SchedulerName.Fixed("manual"), serializer, clock); return new ManualScheduler(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java index 02cb24a8..ebb1e51a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java @@ -55,7 +55,6 @@ public class ClusterTest { // ); - @Test @RepeatedTest(10) public void test_concurrency_optimistic_locking() throws InterruptedException { DEBUG_LOG.info("Starting test_concurrency_optimistic_locking"); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java index e5baf5fa..789a67c5 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java @@ -4,7 +4,7 @@ import com.github.kagkarlsson.jdbc.RowMapper; import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; -import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; @@ -36,7 +36,7 @@ public void setUp() { oneTimeTask = TestTasks.oneTime("OneTime", Void.class, TestTasks.DO_NOTHING); List> knownTasks = new ArrayList<>(); knownTasks.add(oneTimeTask); - taskRepository = new JdbcTaskRepository(DB.getDataSource(), false, CUSTOM_TABLENAME, new TaskResolver(StatsRegistry.NOOP, knownTasks), new SchedulerName.Fixed(SCHEDULER_NAME)); + taskRepository = new JdbcTaskRepository(DB.getDataSource(), false, CUSTOM_TABLENAME, new TaskResolver(StatsRegistry.NOOP, knownTasks), new SchedulerName.Fixed(SCHEDULER_NAME), new SystemClock()); DbUtils.runSqlResource("postgresql_custom_tablename.sql").accept(DB.getDataSource()); } @@ -46,7 +46,7 @@ public void can_customize_table_name() { Instant now = Instant.now(); TaskInstance instance1 = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance1)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance1, now)); JdbcRunner jdbcRunner = new JdbcRunner(DB.getDataSource()); jdbcRunner.query("SELECT count(1) AS number_of_tasks FROM " + CUSTOM_TABLENAME, NOOP, (RowMapper) rs -> rs.getInt("number_of_tasks")); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index 448e69ca..4d43695f 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -1,35 +1,35 @@ package com.github.kagkarlsson.scheduler; -import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.jupiter.api.Assertions.assertTrue; - import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; -import com.github.kagkarlsson.scheduler.task.AbstractTask; -import com.github.kagkarlsson.scheduler.task.CompletionHandler; import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.ExecutionContext; import com.github.kagkarlsson.scheduler.task.ExecutionOperations; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.VoidExecutionHandler; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.google.common.util.concurrent.MoreExecutors; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import org.hamcrest.Matchers; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; + +import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; public class DeadExecutionsTest { @@ -57,7 +57,7 @@ public void setUp() { TaskResolver taskResolver = new TaskResolver(StatsRegistry.NOOP, oneTimeTask, nonCompleting); - jdbcTaskRepository = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); + jdbcTaskRepository = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), settableClock); scheduler = new Scheduler(settableClock, jdbcTaskRepository, @@ -84,7 +84,7 @@ public void scheduler_should_handle_dead_executions() { final Instant now = settableClock.now(); final TaskInstance taskInstance = oneTimeTask.instance("id1"); - final Execution execution1 = new Execution(now.minus(Duration.ofDays(1)), taskInstance); + final SchedulableTaskInstance execution1 = new SchedulableTaskInstance<>(taskInstance, now.minus(Duration.ofDays(1))); jdbcTaskRepository.createIfNotExists(execution1); final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT); @@ -110,7 +110,7 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat( final Instant oneHourAgo = settableClock.now(); final TaskInstance taskInstance = nonCompleting.instance("id1"); - final Execution execution1 = new Execution(oneHourAgo, taskInstance); + final SchedulableTaskInstance execution1 = new SchedulableTaskInstance<>(taskInstance, oneHourAgo); jdbcTaskRepository.createIfNotExists(execution1); scheduler.executeDue(); @@ -130,16 +130,17 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat( assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2)); } - public static class NonCompletingTask extends AbstractTask { + public static class NonCompletingTask extends OneTimeTask { private final VoidExecutionHandler handler; public NonCompletingTask(String name, Class dataClass, VoidExecutionHandler handler, DeadExecutionHandler deadExecutionHandler) { - super(name, dataClass, (executionComplete, executionOperations) -> {}, deadExecutionHandler); + super(name, dataClass, (executionComplete, executionOperations) -> { + }, deadExecutionHandler); this.handler = handler; } @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + public void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { handler.execute(taskInstance, executionContext); throw new RuntimeException("simulated unexpected exception"); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java index b793f08e..1547d15c 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java @@ -6,6 +6,7 @@ import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; @@ -63,7 +64,7 @@ public void setUp() { knownTasks.add(alternativeOneTimeTask); testableRegistry = new TestableRegistry(true, Collections.emptyList()); taskResolver = new TaskResolver(testableRegistry, knownTasks); - taskRepository = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed(SCHEDULER_NAME)); + taskRepository = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed(SCHEDULER_NAME), new SystemClock()); } @Test @@ -73,17 +74,17 @@ public void test_createIfNotExists() { TaskInstance instance1 = oneTimeTask.instance("id1"); TaskInstance instance2 = oneTimeTask.instance("id2"); - assertTrue(taskRepository.createIfNotExists(new Execution(now, instance1))); - assertFalse(taskRepository.createIfNotExists(new Execution(now, instance1))); + assertTrue(taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance1, now))); + assertFalse(taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance1, now))); - assertTrue(taskRepository.createIfNotExists(new Execution(now, instance2))); + assertTrue(taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance2, now))); } @Test public void get_due_should_only_include_due_executions() { Instant now = TimeHelper.truncatedInstantNow(); - taskRepository.createIfNotExists(new Execution(now, oneTimeTask.instance("id1"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(1)); assertThat(taskRepository.getDue(now.minusSeconds(1), POLLING_LIMIT), hasSize(0)); } @@ -92,8 +93,8 @@ public void get_due_should_only_include_due_executions() { public void get_due_should_honor_max_results_limit() { Instant now = TimeHelper.truncatedInstantNow(); - taskRepository.createIfNotExists(new Execution(now, oneTimeTask.instance("id1"))); - taskRepository.createIfNotExists(new Execution(now, oneTimeTask.instance("id2"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id2"), now)); assertThat(taskRepository.getDue(now, 1), hasSize(1)); assertThat(taskRepository.getDue(now, 2), hasSize(2)); } @@ -102,7 +103,7 @@ public void get_due_should_honor_max_results_limit() { public void get_due_should_be_sorted() { Instant now = TimeHelper.truncatedInstantNow(); IntStream.range(0, 100).forEach(i -> - taskRepository.createIfNotExists(new Execution(now.minusSeconds(new Random().nextInt(10000)), oneTimeTask.instance("id" + i))) + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id" + i), now.minusSeconds(new Random().nextInt(10000)))) ); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(100)); @@ -122,7 +123,7 @@ public void get_due_should_not_include_previously_unresolved() { assertThat(taskResolver.getUnresolved(), hasSize(0)); // 1 - taskRepository.createIfNotExists(new Execution(now, unresolved1.instance("id"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); assertEquals(1, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); @@ -133,13 +134,13 @@ public void get_due_should_not_include_previously_unresolved() { "Execution should not have have been in the ResultSet"); // 1, 2 - taskRepository.createIfNotExists(new Execution(now, unresolved2.instance("id"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(unresolved2.instance("id"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(2)); assertEquals(2, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); // 1, 2, 3 - taskRepository.createIfNotExists(new Execution(now, unresolved3.instance("id"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(unresolved3.instance("id"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(3)); assertEquals(3, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); @@ -148,7 +149,7 @@ public void get_due_should_not_include_previously_unresolved() { @Test public void picked_executions_should_not_be_returned_as_due() { Instant now = TimeHelper.truncatedInstantNow(); - taskRepository.createIfNotExists(new Execution(now, oneTimeTask.instance("id1"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); @@ -160,7 +161,7 @@ public void picked_executions_should_not_be_returned_as_due() { public void picked_execution_should_have_information_about_which_scheduler_processes_it() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); taskRepository.pick(due.get(0), now); @@ -177,7 +178,7 @@ public void picked_execution_should_have_information_about_which_scheduler_proce public void should_not_be_able_to_pick_execution_that_has_been_rescheduled() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); @@ -193,7 +194,7 @@ public void should_not_be_able_to_pick_execution_that_has_been_rescheduled() { public void reschedule_should_move_execution_in_time() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); @@ -216,7 +217,7 @@ public void reschedule_should_move_execution_in_time() { public void reschedule_should_persist_consecutive_failures() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); @@ -234,7 +235,7 @@ public void reschedule_should_persist_consecutive_failures() { public void reschedule_should_update_data_if_specified() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTaskWithData.instance("id1", 1); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); Execution created = taskRepository.getExecution(instance).get(); assertEquals(created.taskInstance.getData(), 1); @@ -250,7 +251,7 @@ public void reschedule_should_update_data_if_specified() { public void test_get_failing_executions() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); - taskRepository.createIfNotExists(new Execution(now, instance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); @@ -275,7 +276,7 @@ public void test_get_failing_executions() { public void get_scheduled_executions() { Instant now = TimeHelper.truncatedInstantNow(); IntStream.range(0, 100).forEach(i -> - taskRepository.createIfNotExists(new Execution(now.plus(new Random().nextInt(10), ChronoUnit.HOURS), oneTimeTask.instance("id" + i))) + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id" + i), now.plus(new Random().nextInt(10), ChronoUnit.HOURS))) ); final List beforePick = getScheduledExecutions(all().withPicked(false)); assertThat(beforePick, hasSize(100)); @@ -295,12 +296,12 @@ private List getScheduledExecutions(ScheduledExecutionsFilter filter) @Test public void get_scheduled_by_task_name() { Instant now = TimeHelper.truncatedInstantNow(); - final Execution execution1 = new Execution(now.plus(new Random().nextInt(10), ChronoUnit.HOURS), oneTimeTask.instance("id" + 1)); + final SchedulableTaskInstance execution1 = new SchedulableTaskInstance<>(oneTimeTask.instance("id" + 1), now.plus(new Random().nextInt(10), ChronoUnit.HOURS)); taskRepository.createIfNotExists(execution1); - taskRepository.createIfNotExists(new Execution(now.plus(new Random().nextInt(10), ChronoUnit.HOURS), oneTimeTask.instance("id" + 2))); - taskRepository.createIfNotExists(new Execution(now.plus(new Random().nextInt(10), ChronoUnit.HOURS), alternativeOneTimeTask.instance("id" + 3))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id" + 2), now.plus(new Random().nextInt(10), ChronoUnit.HOURS))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(alternativeOneTimeTask.instance("id" + 3), now.plus(new Random().nextInt(10), ChronoUnit.HOURS))); - taskRepository.pick(execution1, Instant.now()); + taskRepository.pick(taskRepository.getExecution(execution1.getTaskInstance()).get(), Instant.now()); assertThat(getScheduledExecutions(all().withPicked(true), oneTimeTask.getName()), hasSize(1)); assertThat(getScheduledExecutions(all().withPicked(false), oneTimeTask.getName()), hasSize(1)); assertThat(getScheduledExecutions(all(), oneTimeTask.getName()), hasSize(2)); @@ -324,7 +325,7 @@ public void get_dead_executions_should_not_include_previously_unresolved() { createDeadExecution(oneTimeTask.instance("id1"), timeDied); TaskResolver taskResolverMissingTask = new TaskResolver(testableRegistry); - JdbcTaskRepository repoMissingTask = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolverMissingTask, new SchedulerName.Fixed(SCHEDULER_NAME)); + JdbcTaskRepository repoMissingTask = new JdbcTaskRepository(DB.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolverMissingTask, new SchedulerName.Fixed(SCHEDULER_NAME), new SystemClock()); assertThat(taskResolverMissingTask.getUnresolved(), hasSize(0)); assertEquals(0, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); @@ -342,7 +343,7 @@ public void get_dead_executions_should_not_include_previously_unresolved() { public void get_scheduled_executions_should_work_with_unresolved() { Instant now = TimeHelper.truncatedInstantNow(); final OneTimeTask unresolved1 = TestTasks.oneTime("unresolved1", Void.class, TestTasks.DO_NOTHING); - taskRepository.createIfNotExists(new Execution(now, unresolved1.instance("id"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); @@ -353,8 +354,8 @@ public void get_scheduled_executions_should_work_with_unresolved() { @Test public void lockAndGetDue_should_pick_due() { Instant now = Instant.now(); - taskRepository.createIfNotExists(new Execution(now.plusSeconds(10), oneTimeTask.instance("future1"))); - taskRepository.createIfNotExists(new Execution(now, oneTimeTask.instance("id1"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("future1"), now.plusSeconds(10))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); List picked = taskRepository.lockAndGetDue(now, POLLING_LIMIT); assertThat(picked, hasSize(1)); @@ -370,7 +371,7 @@ public void lockAndGetDue_should_not_include_previously_unresolved() { assertThat(taskResolver.getUnresolved(), hasSize(0)); - taskRepository.createIfNotExists(new Execution(now, unresolved1.instance("id"))); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); @@ -379,7 +380,7 @@ public void lockAndGetDue_should_not_include_previously_unresolved() { private void createDeadExecution(TaskInstance taskInstance, Instant timeDied) { - taskRepository.createIfNotExists(new Execution(timeDied, taskInstance)); + taskRepository.createIfNotExists(new SchedulableTaskInstance<>(taskInstance, timeDied)); final Execution due = getSingleExecution(); final Optional picked = taskRepository.pick(due, timeDied); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java index bb69bd6b..7c17ba24 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java @@ -14,12 +14,8 @@ import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; import com.github.kagkarlsson.scheduler.testhelper.SettableClock; import com.google.common.util.concurrent.MoreExecutors; - import org.apache.commons.lang3.RandomUtils; -import org.apache.commons.lang3.time.DateUtils; -import org.apache.commons.lang3.time.DurationFormatUtils; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -28,12 +24,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; -import static java.time.Duration.*; +import static java.time.Duration.ZERO; +import static java.time.Duration.between; +import static java.time.Duration.ofDays; +import static java.time.Duration.ofHours; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofMinutes; +import static java.time.Duration.ofSeconds; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -60,7 +61,7 @@ private Scheduler schedulerFor(Task... tasks) { private Scheduler schedulerFor(ExecutorService executor, Task ... tasks) { final StatsRegistry statsRegistry = StatsRegistry.NOOP; TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, Arrays.asList(tasks)); - JdbcTaskRepository taskRepository = new JdbcTaskRepository(postgres.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); + JdbcTaskRepository taskRepository = new JdbcTaskRepository(postgres.getDataSource(), false, DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), clock); return new Scheduler(clock, taskRepository, taskRepository, taskResolver, 1, executor, new SchedulerName.Fixed("name"), new Waiter(ZERO), ofSeconds(1), false, statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, ofDays(14), ZERO, LogLevel.DEBUG, true, new ArrayList<>()); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java index d3baff3f..47ce64a2 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java @@ -2,6 +2,7 @@ import co.unruly.matchers.OptionalMatchers; import com.github.kagkarlsson.scheduler.DbUtils; +import com.github.kagkarlsson.scheduler.SystemClock; import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.SchedulerName; @@ -14,6 +15,8 @@ import com.github.kagkarlsson.scheduler.helper.TimeHelper; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; @@ -169,12 +172,12 @@ public void test_jdbc_repository_select_for_update_compatibility() { taskResolver.addTask(oneTime); DataSource dataSource = getDataSource(); - final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); + final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), new SystemClock()); final Instant now = TimeHelper.truncatedInstantNow(); - jdbcTaskRepository.createIfNotExists(new Execution(now.plusSeconds(10), oneTime.instance("future1"))); - jdbcTaskRepository.createIfNotExists(new Execution(now, oneTime.instance("id1"))); + jdbcTaskRepository.createIfNotExists(SchedulableInstance.of(oneTime.instance("future1"), now.plusSeconds(10))); + jdbcTaskRepository.createIfNotExists(new SchedulableTaskInstance<>(oneTime.instance("id1"), now)); List picked = jdbcTaskRepository.lockAndGetDue(now, POLLING_LIMIT); assertThat(picked, IsCollectionWithSize.hasSize(1)); @@ -186,12 +189,12 @@ private void doJDBCRepositoryCompatibilityTestUsingData(String data) { taskResolver.addTask(oneTime); DataSource dataSource = getDataSource(); - final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); + final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), new SystemClock()); final Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance taskInstance = oneTime.instance("id1", data); - final Execution newExecution = new Execution(now, taskInstance); + final SchedulableTaskInstance newExecution = new SchedulableTaskInstance<>(taskInstance, now); jdbcTaskRepository.createIfNotExists(newExecution); Execution storedExecution = (jdbcTaskRepository.getExecution(taskInstance)).get(); assertThat(storedExecution.getExecutionTime(), is(now)); @@ -225,12 +228,12 @@ public void test_jdbc_repository_compatibility_set_data() { taskResolver.addTask(recurringWithData); DataSource dataSource = getDataSource(); - final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1")); + final JdbcTaskRepository jdbcTaskRepository = new JdbcTaskRepository(dataSource, commitWhenAutocommitDisabled(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), new SystemClock()); final Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance taskInstance = recurringWithData.instance("id1", 1); - final Execution newExecution = new Execution(now, taskInstance); + final SchedulableTaskInstance newExecution = new SchedulableTaskInstance<>(taskInstance, now); jdbcTaskRepository.createIfNotExists(newExecution); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java new file mode 100644 index 00000000..89b7818a --- /dev/null +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java @@ -0,0 +1,136 @@ +package com.github.kagkarlsson.scheduler.functional; + +import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension; +import com.github.kagkarlsson.scheduler.ScheduledExecution; +import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskInstanceId; +import com.github.kagkarlsson.scheduler.task.helper.PlainScheduleAndData; +import com.github.kagkarlsson.scheduler.task.helper.RecurringTaskWithPersistentSchedule; +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; +import com.github.kagkarlsson.scheduler.task.schedule.Daily; +import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; +import com.github.kagkarlsson.scheduler.task.schedule.Schedule; +import com.github.kagkarlsson.scheduler.task.schedule.Schedules; +import com.github.kagkarlsson.scheduler.testhelper.ManualScheduler; +import com.github.kagkarlsson.scheduler.testhelper.SettableClock; +import com.github.kagkarlsson.scheduler.testhelper.TestHelper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static co.unruly.matchers.OptionalMatchers.contains; +import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class DynamicRecurringTaskTest { + + public static final ZoneId ZONE = ZoneId.systemDefault(); + private static final LocalDate DATE = LocalDate.of(2018, 3, 1); + private static final LocalTime TIME = LocalTime.of(8, 0); + private SettableClock clock; + + @RegisterExtension + public EmbeddedPostgresqlExtension postgres = new EmbeddedPostgresqlExtension(); + + @BeforeEach + public void setUp() { + clock = new SettableClock(); + clock.set(ZonedDateTime.of(DATE, TIME, ZONE).toInstant()); + } + + @Test + public void should_schedule_multiple_instances_with_different_schedules() { + + final String taskName = "dynamic-recurring"; + final RecurringTaskWithPersistentSchedule task = + Tasks.recurringWithPersistentSchedule(taskName, PlainScheduleAndData.class) + .execute((taskInstance, executionContext) -> { + }); + + ManualScheduler scheduler = manualSchedulerFor(singletonList(task)); + scheduler.start(); + + final PlainScheduleAndData schedule1 = new PlainScheduleAndData(new Daily(LocalTime.of(23, 51))); + final PlainScheduleAndData schedule2 = new PlainScheduleAndData(new Daily(LocalTime.of(23, 50))); + final PlainScheduleAndData schedule3 = new PlainScheduleAndData(new Daily(LocalTime.of(23, 55))); + + scheduler.schedule(task.schedulableInstance("id1", schedule1)); + scheduler.schedule(task.schedulableInstance("id2", schedule2)); + + assertScheduled(scheduler, task.instanceId("id1"), LocalTime.of(23, 51), schedule1); + assertScheduled(scheduler, task.instanceId("id2"), LocalTime.of(23, 50), schedule2); + + scheduler.reschedule(task.schedulableInstance("id1", schedule3)); + assertScheduled(scheduler, task.instanceId("id1"), LocalTime.of(23, 55), schedule3); + } + + @Test + public void should_support_statechanging_tasks() { + final PersistentFixedDelaySchedule scheduleAndData1 = new PersistentFixedDelaySchedule(Schedules.fixedDelay(Duration.ofSeconds(10)), 1); + + final String taskName = "dynamic-recurring"; + final RecurringTaskWithPersistentSchedule task = + Tasks.recurringWithPersistentSchedule(taskName, PersistentFixedDelaySchedule.class) + .executeStateful((taskInstance, executionContext) -> { + final PersistentFixedDelaySchedule persistentFixedDelaySchedule = taskInstance.getData().returnIncremented(); + System.out.println(persistentFixedDelaySchedule); + return persistentFixedDelaySchedule; + }); + + ManualScheduler scheduler = manualSchedulerFor(singletonList(task)); + scheduler.start(); + + + scheduler.schedule(task.schedulableInstance("id1", scheduleAndData1)); + + assertScheduled(scheduler, task.instanceId("id1"), clock.now(), scheduleAndData1); // FixedDelay has initial execution-time now() + scheduler.runAnyDueExecutions(); + + assertScheduled(scheduler, task.instanceId("id1"), clock.now().plus(Duration.ofSeconds(10)), scheduleAndData1.returnIncremented()); + } + + private void assertScheduled(ManualScheduler scheduler, TaskInstanceId instanceId, LocalTime expectedExecutionTime, Object taskData) { + assertScheduled(scheduler, instanceId, ZonedDateTime.of(DATE, expectedExecutionTime, ZONE).toInstant(), taskData); + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + private void assertScheduled(ManualScheduler scheduler, TaskInstanceId instanceId, Instant expectedExecutionTime, Object taskData) { + Optional> firstExecution = scheduler.getScheduledExecution(instanceId); + assertThat(firstExecution.map(ScheduledExecution::getExecutionTime), + contains(expectedExecutionTime)); + if (taskData != null) { + assertEquals(taskData, firstExecution.get().getData()); + } + } + + private ManualScheduler manualSchedulerFor(List> recurringTasks) { + return TestHelper.createManualScheduler(postgres.getDataSource(), recurringTasks) + .clock(clock) + .build(); + } + + + public static class PersistentFixedDelaySchedule extends PlainScheduleAndData { + public PersistentFixedDelaySchedule(Schedule schedule, Integer data) { + super(schedule, data); + } + + public PersistentFixedDelaySchedule returnIncremented() { + return new PersistentFixedDelaySchedule(super.getSchedule(), ((Integer)super.getData()) + 1); + } + } + + +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java index b1ebaeeb..04fe1109 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java @@ -1,17 +1,15 @@ package com.github.kagkarlsson.scheduler.jdbc; -import java.lang.reflect.Field; import java.time.Instant; -import java.util.HashMap; import java.util.List; -import com.github.kagkarlsson.scheduler.jdbc.JdbcCustomization; +import com.github.kagkarlsson.scheduler.SystemClock; +import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.junit.platform.commons.util.ReflectionUtils; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -22,7 +20,6 @@ import com.github.kagkarlsson.jdbc.SQLRuntimeException; import com.github.kagkarlsson.scheduler.exceptions.ExecutionException; import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException; -import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.google.common.collect.Lists; @@ -44,9 +41,9 @@ public class JdbcTaskRepositoryExceptionsTest { private String expectedTableName; @BeforeEach - public void setup() throws NoSuchFieldException, IllegalAccessException { + public void setup() { expectedTableName = randomAlphanumeric(5); - jdbcTaskRepository = new JdbcTaskRepository( null, expectedTableName, null, null, null, mockJdbcRunner); + jdbcTaskRepository = new JdbcTaskRepository(null, expectedTableName, null, null, null, mockJdbcRunner, new SystemClock()); } @Test @@ -57,21 +54,20 @@ public void createIfNotExistsFailsToAddNewTask() { when(mockJdbcRunner.execute(ArgumentMatchers.eq("insert into " + expectedTableName + "(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)"), any(PreparedStatementSetter.class))) .thenThrow(rootCause); - TaskInstance taskInstance = new TaskInstance(randomAlphanumeric(10), randomAlphanumeric(10)); - Execution execution = new Execution(Instant.now(), taskInstance); - ExecutionException actualException = assertThrows(ExecutionException.class, () -> { + TaskInstance taskInstance = new TaskInstance<>(randomAlphanumeric(10), randomAlphanumeric(10)); + SchedulableTaskInstance execution = new SchedulableTaskInstance<>(taskInstance, Instant.now()); + TaskInstanceException actualException = assertThrows(TaskInstanceException.class, () -> { jdbcTaskRepository.createIfNotExists(execution); }); assertEquals("Failed to add new execution. (task name: " + taskInstance.getTaskName() + ", instance id: " + taskInstance.getId() + ")", actualException.getMessage()); assertEquals(rootCause, actualException.getCause()); - assertEquals(execution.version, actualException.getVersion()); - assertEquals(execution.taskInstance.getTaskName(), actualException.getTaskName()); - assertEquals(execution.taskInstance.getId(), actualException.getInstanceId()); + assertEquals(execution.getTaskInstance().getTaskName(), actualException.getTaskName()); + assertEquals(execution.getTaskInstance().getId(), actualException.getInstanceId()); } @Test public void getExecutionIsMoreThanOne() { - TaskInstance expectedTaskInstance = new TaskInstance(randomAlphanumeric(10), randomAlphanumeric(10)); + TaskInstance expectedTaskInstance = new TaskInstance<>(randomAlphanumeric(10), randomAlphanumeric(10)); when(mockJdbcRunner.query(ArgumentMatchers.eq("select * from " + expectedTableName + " where task_name = ? and task_instance = ?"), any(PreparedStatementSetter.class), (ResultSetMapper>) any(ResultSetMapper.class))) .thenReturn(Lists.newArrayList(new Execution(Instant.now(), expectedTaskInstance), new Execution(Instant.now(), expectedTaskInstance))); @@ -89,7 +85,7 @@ public void removesUnexpectedNumberOfRows(int removalCount) { when(mockJdbcRunner.execute(ArgumentMatchers.eq("delete from " + expectedTableName + " where task_name = ? and task_instance = ? and version = ?"), any(PreparedStatementSetter.class))) .thenReturn(removalCount); - TaskInstance taskInstance = new TaskInstance(randomAlphanumeric(10), randomAlphanumeric(10)); + TaskInstance taskInstance = new TaskInstance<>(randomAlphanumeric(10), randomAlphanumeric(10)); Execution execution = new Execution(Instant.now(), taskInstance); ExecutionException actualException = assertThrows(ExecutionException.class, () -> { jdbcTaskRepository.remove(execution); @@ -120,7 +116,7 @@ public void rescheduleUpdatesUnexpectedNumberOfRowsWithoutNewData(int updateCoun any(PreparedStatementSetter.class))) .thenReturn(updateCount); - TaskInstance taskInstance = new TaskInstance(randomAlphanumeric(10), randomAlphanumeric(10)); + TaskInstance taskInstance = new TaskInstance<>(randomAlphanumeric(10), randomAlphanumeric(10)); Execution execution = new Execution(Instant.now(), taskInstance); ExecutionException actualException = assertThrows(ExecutionException.class, () -> { jdbcTaskRepository.reschedule(execution, @@ -157,12 +153,12 @@ public void rescheduleUpdatesUnexpectedNumberOfRowsWithNewData(int updateCount) any(PreparedStatementSetter.class))) .thenReturn(updateCount); - TaskInstance taskInstance = new TaskInstance(randomAlphanumeric(10), randomAlphanumeric(10)); + TaskInstance taskInstance = new TaskInstance<>(randomAlphanumeric(10), randomAlphanumeric(10)); Execution execution = new Execution(Instant.now(), taskInstance); ExecutionException actualException = assertThrows(ExecutionException.class, () -> { jdbcTaskRepository.reschedule(execution, Instant.now(), - new HashMap(), + "", null, null, 0 diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java similarity index 72% rename from db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java rename to db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java index 3103be0b..bef9c146 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java @@ -19,19 +19,11 @@ import com.github.kagkarlsson.scheduler.task.schedule.Schedule; import java.time.Duration; +import java.time.Instant; @Deprecated public class ComposableTask { - public static RecurringTask recurringTask(String name, Schedule schedule, VoidExecutionHandler executionHandler) { - return new RecurringTask(name, schedule, Void.class) { - @Override - public void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); - } - }; - } - public static OneTimeTask onetimeTask(String name, Class dataClass, VoidExecutionHandler executionHandler) { return new OneTimeTask(name, dataClass) { @Override @@ -43,6 +35,16 @@ public void executeOnce(TaskInstance taskInstance, ExecutionContext execution public static Task customTask(String name, Class dataClass, CompletionHandler completionHandler, VoidExecutionHandler executionHandler) { return new AbstractTask(name, dataClass, new FailureHandler.OnFailureRetryLater<>(Duration.ofMinutes(5)), new DeadExecutionHandler.ReviveDeadExecution<>()) { + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + } + @Override public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { executionHandler.execute(taskInstance, executionContext); @@ -53,6 +55,16 @@ public CompletionHandler execute(TaskInstance taskInstance, ExecutionConte public static Task customTask(String name, Class dataClass, CompletionHandler completionHandler, FailureHandler failureHandler, VoidExecutionHandler executionHandler) { return new AbstractTask(name, dataClass, failureHandler, new DeadExecutionHandler.ReviveDeadExecution<>()) { + @Override + public SchedulableInstance schedulableInstance(String id) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + } + + @Override + public SchedulableInstance schedulableInstance(String id, T data) { + return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + } + @Override public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { executionHandler.execute(taskInstance, executionContext); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java index 8b6d499b..429ffd44 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/CronMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/CronMain.java index 56c747e3..813fbd19 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/CronMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/CronMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java index 422e97b8..f87b1859 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java index 9b64e14f..07f6f20d 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; @@ -28,7 +43,7 @@ public void run(DataSource dataSource) { final Scheduler scheduler = Scheduler .create(dataSource, onetimeTask) - .pollingInterval(Duration.ofSeconds(5)) + .pollingInterval(Duration.ofSeconds(20)) .enableImmediateExecution() .build(); @@ -36,7 +51,7 @@ public void run(DataSource dataSource) { scheduler.start(); - sleep(2); + sleep(2000); System.out.println("Scheduling task to executed immediately."); scheduler.schedule(onetimeTask.instance("1"), Instant.now()); // scheduler.triggerCheckForDueExecutions(); // another option for triggering execution directly diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java index 50b5c707..19934914 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import java.time.Duration; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java index 15823964..824e0a12 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import java.time.Instant; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/FailureLoggingMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/FailureLoggingMain.java index 49ed4d44..c3081111 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/FailureLoggingMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/FailureLoggingMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java index 8486ecaf..51eccdc9 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java index c38ae1dd..4347b179 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/PersistentDynamicScheduleMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/PersistentDynamicScheduleMain.java deleted file mode 100644 index d965102e..00000000 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/PersistentDynamicScheduleMain.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.github.kagkarlsson.examples; - -import com.github.kagkarlsson.examples.helpers.Example; -import com.github.kagkarlsson.scheduler.ScheduledExecution; -import com.github.kagkarlsson.scheduler.Scheduler; -import com.github.kagkarlsson.scheduler.task.ExecutionComplete; -import com.github.kagkarlsson.scheduler.task.ExecutionOperations; -import com.github.kagkarlsson.scheduler.task.FailureHandler; -import com.github.kagkarlsson.scheduler.task.TaskInstanceId; -import com.github.kagkarlsson.scheduler.task.helper.CustomTask; -import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; -import com.github.kagkarlsson.scheduler.task.helper.Tasks; -import com.github.kagkarlsson.scheduler.task.schedule.CronSchedule; -import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; -import com.github.kagkarlsson.scheduler.task.schedule.Schedule; -import com.github.kagkarlsson.scheduler.task.schedule.Schedules; - -import javax.sql.DataSource; -import java.io.Serializable; -import java.time.Duration; -import java.time.Instant; -import java.util.Optional; - -import static java.util.function.Function.identity; - -public class PersistentDynamicScheduleMain extends Example { - - public static void main(String[] args) { - new PersistentDynamicScheduleMain().runWithDatasource(); - } - - @Override - public void run(DataSource dataSource) { - - final SerializableCronSchedule initialSchedule = new SerializableCronSchedule("*/10 * * * * ?"); - final CustomTask task = Tasks.custom("dynamic-recurring-task", SerializableCronSchedule.class) - .scheduleOnStartup(RecurringTask.INSTANCE, initialSchedule, initialSchedule) - .onFailure((executionComplete, executionOperations) -> { - final SerializableCronSchedule persistedSchedule = (SerializableCronSchedule) (executionComplete.getExecution().taskInstance.getData()); - executionOperations.reschedule(executionComplete, persistedSchedule.getNextExecutionTime(executionComplete)); - }) - .execute((taskInstance, executionContext) -> { - final SerializableCronSchedule persistentSchedule = taskInstance.getData(); - System.out.println("Ran using persistent schedule: " + persistentSchedule.getCronPattern()); - - return (executionComplete, executionOperations) -> { - executionOperations.reschedule( - executionComplete, - persistentSchedule.getNextExecutionTime(executionComplete) - ); - }; - }); - - final Scheduler scheduler = Scheduler - .create(dataSource) - .pollingInterval(Duration.ofSeconds(5)) - .startTasks(task) - .registerShutdownHook() - .build(); - - scheduler.start(); - - sleep(7_000); - - final SerializableCronSchedule newSchedule = new SerializableCronSchedule("*/15 * * * * ?"); - final TaskInstanceId scheduledExecution = TaskInstanceId.of("dynamic-recurring-task", RecurringTask.INSTANCE); - final Instant newNextExecutionTime = newSchedule.getNextExecutionTime(ExecutionComplete.simulatedSuccess(Instant.now())); - - // reschedule updating both next execution time and the persistent schedule - System.out.println("Simulating dynamic reschedule of recurring task"); - scheduler.reschedule(scheduledExecution, newNextExecutionTime, newSchedule); - } - - private void sleep(int millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private static class SerializableCronSchedule implements Serializable, Schedule { - private final String cronPattern; - SerializableCronSchedule(String cronPattern) { - this.cronPattern = cronPattern; - } - - @Override - public Instant getNextExecutionTime(ExecutionComplete executionComplete) { - return new CronSchedule(cronPattern).getNextExecutionTime(executionComplete); - } - - @Override - public boolean isDeterministic() { - return true; - } - - @Override - public String toString() { - return "SerializableCronSchedule pattern=" + cronPattern; - } - - public String getCronPattern() { - return cronPattern; - } - } -} diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskMain.java index c542de0a..4761c93e 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; @@ -5,10 +20,14 @@ import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.sql.DataSource; +import java.time.Duration; public class RecurringTaskMain extends Example { + private static final Logger LOG = LoggerFactory.getLogger(RecurringTaskMain.class); public static void main(String[] args) { new RecurringTaskMain().runWithDatasource(); @@ -16,19 +35,20 @@ public static void main(String[] args) { @Override public void run(DataSource dataSource) { - RecurringTask hourlyTask = Tasks.recurring("my-hourly-task", FixedDelay.ofHours(1)) + + RecurringTask myTask = Tasks.recurring("my-task", FixedDelay.ofSeconds(5)) .execute((inst, ctx) -> { - System.out.println("Executed!"); + LOG.info("Executed!"); }); final Scheduler scheduler = Scheduler .create(dataSource) - .startTasks(hourlyTask) + .startTasks(myTask) + .pollingInterval(Duration.ofSeconds(1)) .registerShutdownHook() - .threads(5) .build(); - // hourlyTask is automatically scheduled on startup if not already started (i.e. exists in the db) + // myTask is automatically scheduled on startup if not already started (i.e. exists in the db) scheduler.start(); } } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java new file mode 100644 index 00000000..7b51d408 --- /dev/null +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java @@ -0,0 +1,61 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.examples; + +import com.github.kagkarlsson.examples.helpers.Example; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.helper.PlainScheduleAndData; +import com.github.kagkarlsson.scheduler.task.helper.RecurringTaskWithPersistentSchedule; +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; +import com.github.kagkarlsson.scheduler.task.schedule.CronSchedule; +import com.github.kagkarlsson.scheduler.task.schedule.PersistentCronSchedule; +import com.github.kagkarlsson.scheduler.task.schedule.Schedule; +import com.github.kagkarlsson.scheduler.task.schedule.Schedules; + +import javax.sql.DataSource; +import java.time.Duration; +import java.time.ZoneId; + +public class RecurringTaskWithPersistentScheduleMain extends Example { + + public static void main(String[] args) { + new RecurringTaskWithPersistentScheduleMain().runWithDatasource(); + } + + @Override + public void run(DataSource dataSource) { + + final RecurringTaskWithPersistentSchedule task = + Tasks.recurringWithPersistentSchedule("dynamic-recurring-task", PlainScheduleAndData.class) + .execute((taskInstance, executionContext) -> { + System.out.println("Instance: '" + taskInstance.getId() + "' ran using persistent schedule: " + taskInstance.getData().getSchedule()); + }); + + final Scheduler scheduler = Scheduler + .create(dataSource, task) + .pollingInterval(Duration.ofSeconds(1)) + .registerShutdownHook() + .build(); + + scheduler.start(); + sleep(2_000); + + scheduler.schedule(task.schedulableInstance("id1", new PlainScheduleAndData(Schedules.fixedDelay(Duration.ofSeconds(1))))); + scheduler.schedule(task.schedulableInstance("id2", new PlainScheduleAndData(Schedules.fixedDelay(Duration.ofSeconds(6))))); + } + +} diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java index 5bfe6c4f..d2a59eea 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerMain.java index 6a94b385..52f3c4db 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java index 3244756a..df8df39f 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java new file mode 100644 index 00000000..a5db7935 --- /dev/null +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) Gustav Karlsson + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.examples; + +import com.github.kagkarlsson.examples.helpers.Example; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.helper.PlainScheduleAndData; +import com.github.kagkarlsson.scheduler.task.helper.RecurringTaskWithPersistentSchedule; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; +import com.github.kagkarlsson.scheduler.task.schedule.Schedule; +import com.github.kagkarlsson.scheduler.task.schedule.Schedules; + +import javax.sql.DataSource; +import java.time.Duration; + +public class StatefulRecurringTaskWithPersistentScheduleMain extends Example { + + public static void main(String[] args) { + new StatefulRecurringTaskWithPersistentScheduleMain().runWithDatasource(); + } + + @Override + public void run(DataSource dataSource) { + + final RecurringTaskWithPersistentSchedule task = + Tasks.recurringWithPersistentSchedule("dynamic-recurring-task", ScheduleAndInteger.class) + .executeStateful((taskInstance, executionContext) -> { + System.out.printf("Instance: '%s' ran using persistent schedule '%s' and data '%s'\n", taskInstance.getId(), taskInstance.getData().getSchedule(), taskInstance.getData().getData()); + return taskInstance.getData().returnIncremented(); + }); + + final Scheduler scheduler = Scheduler + .create(dataSource, task) + .pollingInterval(Duration.ofSeconds(1)) + .registerShutdownHook() + .build(); + + scheduler.start(); + sleep(2_000); + + scheduler.schedule(task.schedulableInstance("id1", + new ScheduleAndInteger( + Schedules.fixedDelay(Duration.ofSeconds(3)), + 1))); + } + + public static class ScheduleAndInteger extends PlainScheduleAndData { + public ScheduleAndInteger(Schedule schedule, Integer data) { + super(schedule, data); + } + + public ScheduleAndInteger returnIncremented() { + return new ScheduleAndInteger(super.getSchedule(), ((Integer) super.getData()) + 1); + } + } + +} diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java index 3bd5fe77..a08fda2d 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples; import com.github.kagkarlsson.examples.helpers.Example; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/Example.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/Example.java index 705bed41..c8d17e1e 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/Example.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/Example.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples.helpers; import org.slf4j.Logger; @@ -14,4 +29,12 @@ public abstract class Example { protected void runWithDatasource() { run(HsqlDatasource.initDatabase()); } + + protected void sleep(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/ExampleHelpers.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/ExampleHelpers.java index c028c02d..ab3b36e9 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/ExampleHelpers.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/ExampleHelpers.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples.helpers; import com.github.kagkarlsson.scheduler.Scheduler; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/HsqlDatasource.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/HsqlDatasource.java index ef708b85..f4d0d3e9 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/HsqlDatasource.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/helpers/HsqlDatasource.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) Gustav Karlsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.kagkarlsson.examples.helpers; import com.zaxxer.hikari.HikariConfig; diff --git a/examples/features/src/main/resources/logback.xml b/examples/features/src/main/resources/logback.xml index e33f2d37..92320971 100644 --- a/examples/features/src/main/resources/logback.xml +++ b/examples/features/src/main/resources/logback.xml @@ -1,3 +1,20 @@ +