Skip to content

Commit

Permalink
feat: New interface SchedulerListener (#504)
Browse files Browse the repository at this point in the history
Replaces the old `StatsRegistry` and adds more detailed events:

```java
  void onExecutionScheduled(TaskInstanceId taskInstanceId, Instant executionTime);
  void onExecutionStart(CurrentlyExecuting currentlyExecuting);
  void onExecutionComplete(ExecutionComplete executionComplete);
  void onExecutionDead(Execution execution);
  void onExecutionFailedHeartbeat(CurrentlyExecuting currentlyExecuting);
```

Additionally adds `ExecutionInterceptor` as a way of injecting
wrapping-logic for all executions.

```java
  CompletionHandler<?> execute(
      TaskInstance<?> taskInstance, ExecutionContext executionContext, ExecutionChain chain);
```

## Fixes
* #451
  • Loading branch information
kagkarlsson authored Jul 23, 2024
1 parent 468b87d commit d09ecbd
Show file tree
Hide file tree
Showing 30 changed files with 865 additions and 223 deletions.
36 changes: 24 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,23 @@ scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.

#### Plain Java

* [EnableImmediateExecutionMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java)
* [MaxRetriesMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java)
* [ExponentialBackoffMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java)
* [ExponentialBackoffWithMaxRetriesMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java)
* [TrackingProgressRecurringTaskMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java)
* [SpawningOtherTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java)
* [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java)
* [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java)
* [StatefulRecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java)
* [JsonSerializerMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java)
* [JobChainingUsingTaskDataMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java)
* [JobChainingUsingSeparateTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java)
| Example | Description |
|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [EnableImmediateExecutionMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java) | When scheduling executions to run `now()` or earlier, the local `Scheduler` will be hinted about this, and "wake up" to go check for new executions earlier than it normally would (as configured by `pollingInterval`. |
| [MaxRetriesMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java) | How to set a limit on the number of retries an execution can have. |
| [ExponentialBackoffMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java) | How to use exponential backoff as retry strategy instead of fixed delay as is default. |
| [ExponentialBackoffWithMaxRetriesMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java) | How to use exponential backoff as retry strategy **and** a hard limit on the maximum number of retries. |
| [TrackingProgressRecurringTaskMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java) | Recurring jobs may store `task_data` as a way of persisting state across executions. This example shows how. |
| [SpawningOtherTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java) | Demonstrates on task scheduling instances of another by using the `executionContext.getSchedulerClient()`. |
| [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java) | Demonstates some of the `SchedulerClient`'s capabilities. Scheduling, fetching scheduled executions etc. |
| [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java) | Multi-instance recurring jobs where the `Schedule` is stored as part of the `task_data`. For example suitable for multi-tenant applications where each tenent should have a recurring task. |
| [StatefulRecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java) | |
| [JsonSerializerMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java) | Overrides serialization of `task_data` from Java-serialization (default) to JSON. |
| [JobChainingUsingTaskDataMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java) | Job chaining, i.e. "when this instance is done executing, schedule another task. |
| [JobChainingUsingSeparateTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java) | Job chaining, as above. |
| [InterceptorMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/InterceptorMain.java) | Using `ExecutionInterceptor` to inject logic before and after execution for all `ExecutionHandler`. |



#### Spring Boot

Expand Down Expand Up @@ -256,6 +261,12 @@ How often to update the heartbeat timestamp for running executions. Default `5m`
:gear: `.missedHeartbeatsLimit(int)`<br/>
How many heartbeats may be missed before the execution is considered dead. Default `6`.

:gear: `.addExecutionInterceptor(ExecutionInterceptor)`<br/>
Adds an `ExecutionInterceptor` which may inject logic around executions. For Spring Boot, simply register a Bean of type `ExecutionInterceptor`.

:gear: `.addSchedulerListener(SchedulerListener)`<br/>
Adds an `SchedulerListener` which will receive Scheduler- and Execution-related events. For Spring Boot, simply register a Bean of type `SchedulerListener`.

:gear: `.schedulerName(SchedulerName)`<br/>
Name of this scheduler-instance. The name is stored in the database when an execution is picked by a scheduler.
Default `<hostname>`.
Expand Down Expand Up @@ -612,6 +623,7 @@ Some users have experienced intermittent test failures when running on a single-

The goal of `db-scheduler` is to be non-invasive and simple to use, but still solve the persistence problem, and the cluster-coordination problem.
It was originally targeted at applications with modest database schemas, to which adding 11 tables would feel a bit overkill..
**Update:** Also, as of now (2024), Quartz does not seem to be actively maintained either.

#### Why use a RDBMS for persistence and coordination?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.github.kagkarlsson.scheduler.boot.config.DbSchedulerStarter;
import com.github.kagkarlsson.scheduler.boot.config.startup.ContextReadyStart;
import com.github.kagkarlsson.scheduler.boot.config.startup.ImmediateStart;
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
import com.github.kagkarlsson.scheduler.exceptions.SerializationException;
import com.github.kagkarlsson.scheduler.serializer.Serializer;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
Expand Down Expand Up @@ -68,18 +70,24 @@ public class DbSchedulerAutoConfiguration {
private final DbSchedulerProperties config;
private final DataSource existingDataSource;
private final List<Task<?>> configuredTasks;
private final List<SchedulerListener> schedulerListeners;
private final List<ExecutionInterceptor> executionInterceptors;

public DbSchedulerAutoConfiguration(
DbSchedulerProperties dbSchedulerProperties,
DataSource dataSource,
List<Task<?>> configuredTasks) {
List<Task<?>> configuredTasks,
List<SchedulerListener> schedulerListeners,
List<ExecutionInterceptor> executionInterceptors) {
this.config =
Objects.requireNonNull(
dbSchedulerProperties, "Can't configure db-scheduler without required configuration");
this.existingDataSource =
Objects.requireNonNull(dataSource, "An existing javax.sql.DataSource is required");
this.configuredTasks =
Objects.requireNonNull(configuredTasks, "At least one Task must be configured");
this.schedulerListeners = schedulerListeners;
this.executionInterceptors = executionInterceptors;
}

/** Provide an empty customizer if not present in the context. */
Expand Down Expand Up @@ -179,6 +187,12 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis
// Shutdown max wait
builder.shutdownMaxWait(config.getShutdownMaxWait());

// Register listeners
schedulerListeners.forEach(builder::addSchedulerListener);

// Register interceptors
executionInterceptors.forEach(builder::addExecutionInterceptor);

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,35 @@

import static com.github.kagkarlsson.scheduler.ExceptionUtils.describe;

import com.github.kagkarlsson.scheduler.event.ExecutionChain;
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
import com.github.kagkarlsson.scheduler.event.SchedulerListener.CandidateEventType;
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
import com.github.kagkarlsson.scheduler.event.SchedulerListeners;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionHandler;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.Task;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "unchecked"})
class ExecutePicked implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ExecutePicked.class);
private final Executor executor;
private final TaskRepository taskRepository;
private SchedulerClientEventListener earlyExecutionListener;
private final SchedulerClient schedulerClient;
private final StatsRegistry statsRegistry;
private final SchedulerListeners schedulerListeners;
private final List<ExecutionInterceptor> executionInterceptors;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
Expand All @@ -47,9 +54,9 @@ class ExecutePicked implements Runnable {
public ExecutePicked(
Executor executor,
TaskRepository taskRepository,
SchedulerClientEventListener earlyExecutionListener,
SchedulerClient schedulerClient,
StatsRegistry statsRegistry,
SchedulerListeners schedulerListeners,
List<ExecutionInterceptor> executionInterceptors,
TaskResolver taskResolver,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
Expand All @@ -58,9 +65,9 @@ public ExecutePicked(
Execution pickedExecution) {
this.executor = executor;
this.taskRepository = taskRepository;
this.earlyExecutionListener = earlyExecutionListener;
this.schedulerClient = schedulerClient;
this.statsRegistry = statsRegistry;
this.schedulerListeners = schedulerListeners;
this.executionInterceptors = executionInterceptors;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
Expand All @@ -77,7 +84,8 @@ public void run() {
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);

try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
schedulerListeners.onCandidateEvent(CandidateEventType.EXECUTED);
schedulerListeners.onExecutionStart(currentlyExecuting);
executePickedExecution(pickedExecution, currentlyExecuting);
} finally {
executor.removeCurrentlyProcessing(executionId);
Expand All @@ -90,31 +98,28 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
LOG.error(
"Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.",
execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListeners.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
return;
}

Instant executionStarted = clock.now();
try {
LOG.debug("Executing: " + execution);
CompletionHandler completion =
task.get()
.execute(
execution.taskInstance,
new ExecutionContext(
schedulerState, execution, schedulerClient, currentlyExecuting));
ExecutionHandler handler = task.get();
ExecutionContext executionContext =
new ExecutionContext(schedulerState, execution, schedulerClient, currentlyExecuting);
ExecutionChain chain = new ExecutionChain(new ArrayList<>(executionInterceptors), handler);

CompletionHandler completion = chain.proceed(execution.taskInstance, executionContext);
LOG.debug("Execution done: " + execution);

complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);

} catch (RuntimeException unhandledException) {
failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);

} catch (Throwable unhandledError) {
failure(task.get(), execution, unhandledError, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
}

Expand All @@ -124,18 +129,18 @@ private void complete(
ExecutionComplete.success(execution, executionStarted, clock.now());
try {
completion.complete(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
completeEvent, new ExecutionOperations(taskRepository, schedulerListeners, execution));
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListeners.onSchedulerEvent(SchedulerEventType.COMPLETIONHANDLER_ERROR);
schedulerListeners.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
describe(e),
e);
} finally {
schedulerListeners.onExecutionComplete(completeEvent);
}
}

Expand All @@ -154,17 +159,18 @@ private void failure(
task.getFailureHandler()
.onFailure(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
new ExecutionOperations(taskRepository, schedulerListeners, execution));
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListeners.onSchedulerEvent(SchedulerEventType.FAILUREHANDLER_ERROR);
schedulerListeners.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
describe(cause),
e);
} finally {
schedulerListeners.onExecutionComplete(completeEvent);
}
}
}
Loading

0 comments on commit d09ecbd

Please sign in to comment.