diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java index fc2d3f3f..001d6a03 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java @@ -16,7 +16,9 @@ import static com.github.kagkarlsson.scheduler.ExceptionUtils.describe; import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import com.github.kagkarlsson.scheduler.task.CompletionHandler; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.ExecutionComplete; @@ -36,7 +38,7 @@ class ExecutePicked implements Runnable { private final TaskRepository taskRepository; private SchedulerClientEventListener earlyExecutionListener; private final SchedulerClient schedulerClient; - private final StatsRegistry statsRegistry; + private final SchedulerListener schedulerListener; private final TaskResolver taskResolver; private final SchedulerState schedulerState; private final ConfigurableLogger failureLogger; @@ -49,7 +51,7 @@ public ExecutePicked( TaskRepository taskRepository, SchedulerClientEventListener earlyExecutionListener, SchedulerClient schedulerClient, - StatsRegistry statsRegistry, + SchedulerListener schedulerListener, TaskResolver taskResolver, SchedulerState schedulerState, ConfigurableLogger failureLogger, @@ -60,7 +62,7 @@ public ExecutePicked( this.taskRepository = taskRepository; this.earlyExecutionListener = earlyExecutionListener; this.schedulerClient = schedulerClient; - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; this.taskResolver = taskResolver; this.schedulerState = schedulerState; this.failureLogger = failureLogger; @@ -77,7 +79,8 @@ public void run() { final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting); try { - statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); + schedulerListener.onCandidateEvent(CandidateEventType.EXECUTED); + schedulerListener.onExecutionStart(currentlyExecuting); executePickedExecution(pickedExecution, currentlyExecuting); } finally { executor.removeCurrentlyProcessing(executionId); @@ -90,7 +93,7 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr LOG.error( "Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName()); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); return; } @@ -106,15 +109,12 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr LOG.debug("Execution done: " + execution); complete(completion, execution, executionStarted); - statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED); } catch (RuntimeException unhandledException) { failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception"); - statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); } catch (Throwable unhandledError) { failure(task.get(), execution, unhandledError, executionStarted, "Error"); - statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); } } @@ -126,16 +126,17 @@ private void complete( completion.complete( completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); - statsRegistry.registerSingleCompletedExecution(completeEvent); } catch (Throwable e) { - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.COMPLETIONHANDLER_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); LOG.error( "Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. " + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, describe(e), e); + } finally { + schedulerListener.onExecutionComplete(completeEvent); } } @@ -155,16 +156,17 @@ private void failure( .onFailure( completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution)); - statsRegistry.registerSingleCompletedExecution(completeEvent); } catch (Throwable e) { - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.FAILUREHANDLER_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); LOG.error( "Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. " + "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, describe(cause), e); + } finally { + schedulerListener.onExecutionComplete(completeEvent); } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index e601e40c..49f7eaf1 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -14,7 +14,9 @@ package com.github.kagkarlsson.scheduler; import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import com.github.kagkarlsson.scheduler.task.Execution; import java.time.Instant; import java.util.List; @@ -30,7 +32,7 @@ public class FetchCandidates implements PollStrategy { private final TaskRepository taskRepository; private final SchedulerClient schedulerClient; private SchedulerClientEventListener earlyExecutionListener; - private final StatsRegistry statsRegistry; + private final SchedulerListener schedulerListener; private final SchedulerState schedulerState; private final ConfigurableLogger failureLogger; private final TaskResolver taskResolver; @@ -48,7 +50,7 @@ public FetchCandidates( SchedulerClient schedulerClient, SchedulerClientEventListener earlyExecutionListener, int threadpoolSize, - StatsRegistry statsRegistry, + SchedulerListener schedulerListener, SchedulerState schedulerState, ConfigurableLogger failureLogger, TaskResolver taskResolver, @@ -60,7 +62,7 @@ public FetchCandidates( this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; this.earlyExecutionListener = earlyExecutionListener; - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; this.schedulerState = schedulerState; this.failureLogger = failureLogger; this.taskResolver = taskResolver; @@ -104,7 +106,7 @@ public void run() { taskRepository, earlyExecutionListener, schedulerClient, - statsRegistry, + schedulerListener, taskResolver, schedulerState, failureLogger, @@ -117,7 +119,7 @@ public void run() { newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run); }); } - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); + schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE); } private class PickDue implements Callable> { @@ -141,7 +143,7 @@ public Optional call() { if (addedDueExecutionsBatch.isOlderGenerationThan(currentGenerationNumber.get())) { // skipping execution due to it being stale addedDueExecutionsBatch.markBatchAsStale(); - statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE); + schedulerListener.onCandidateEvent(CandidateEventType.STALE); LOG.trace( "Skipping queued execution (current generationNumber: {}, execution generationNumber: {})", currentGenerationNumber, @@ -154,7 +156,7 @@ public Optional call() { if (!pickedExecution.isPresent()) { // someone else picked id LOG.debug("Execution picked by another scheduler. Continuing to next due execution."); - statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED); + schedulerListener.onCandidateEvent(CandidateEventType.ALREADY_PICKED); return Optional.empty(); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index c648b6ad..6df79656 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -14,7 +14,8 @@ package com.github.kagkarlsson.scheduler; import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import com.github.kagkarlsson.scheduler.task.Execution; import java.time.Instant; import java.util.List; @@ -28,7 +29,7 @@ public class LockAndFetchCandidates implements PollStrategy { private final TaskRepository taskRepository; private final SchedulerClient schedulerClient; private SchedulerClientEventListener earlyExecutionListener; - private final StatsRegistry statsRegistry; + private final SchedulerListener schedulerListener; private final TaskResolver taskResolver; private final SchedulerState schedulerState; private final ConfigurableLogger failureLogger; @@ -46,7 +47,7 @@ public LockAndFetchCandidates( SchedulerClient schedulerClient, SchedulerClientEventListener earlyExecutionListener, int threadpoolSize, - StatsRegistry statsRegistry, + SchedulerListener schedulerListener, SchedulerState schedulerState, ConfigurableLogger failureLogger, TaskResolver taskResolver, @@ -58,7 +59,7 @@ public LockAndFetchCandidates( this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; this.earlyExecutionListener = earlyExecutionListener; - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; this.taskResolver = taskResolver; this.schedulerState = schedulerState; this.failureLogger = failureLogger; @@ -104,7 +105,7 @@ public void run() { taskRepository, earlyExecutionListener, schedulerClient, - statsRegistry, + schedulerListener, taskResolver, schedulerState, failureLogger, @@ -118,6 +119,6 @@ public void run() { } }); } - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE); + schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE); } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunAndLogErrors.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunAndLogErrors.java index 358955d4..a4a83545 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunAndLogErrors.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunAndLogErrors.java @@ -13,18 +13,19 @@ */ package com.github.kagkarlsson.scheduler; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class RunAndLogErrors implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(RunAndLogErrors.class); private final Runnable toRun; - private final StatsRegistry statsRegistry; + private final SchedulerListener schedulerListener; - public RunAndLogErrors(Runnable toRun, StatsRegistry statsRegistry) { + public RunAndLogErrors(Runnable toRun, SchedulerListener schedulerListener) { this.toRun = toRun; - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; } @Override @@ -33,7 +34,7 @@ public void run() { toRun.run(); } catch (Throwable e) { LOG.error("Unhandled exception. Will keep running.", e); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunUntilShutdown.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunUntilShutdown.java index 40d4b86d..6434b073 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunUntilShutdown.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/RunUntilShutdown.java @@ -13,7 +13,8 @@ */ package com.github.kagkarlsson.scheduler; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,17 +23,17 @@ class RunUntilShutdown implements Runnable { private final Runnable toRun; private final Waiter waitBetweenRuns; private final SchedulerState schedulerState; - private final StatsRegistry statsRegistry; + private final SchedulerListener schedulerListener; public RunUntilShutdown( Runnable toRun, Waiter waitBetweenRuns, SchedulerState schedulerState, - StatsRegistry statsRegistry) { + SchedulerListener schedulerListener) { this.toRun = toRun; this.waitBetweenRuns = waitBetweenRuns; this.schedulerState = schedulerState; - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; } @Override @@ -43,7 +44,7 @@ public void run() { toRun.run(); } catch (Throwable e) { LOG.error("Unhandled exception. Will keep running.", e); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); } } @@ -54,7 +55,7 @@ public void run() { LOG.debug("Thread '{}' interrupted due to shutdown.", Thread.currentThread().getName()); } else { LOG.error("Unexpected interruption of thread. Will keep running.", interruptedException); - statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 60e0c9a2..991c78f5 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -19,8 +19,8 @@ import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState; import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.logging.LogLevel; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType; import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.ExecutionComplete; import com.github.kagkarlsson.scheduler.task.ExecutionOperations; @@ -62,7 +62,7 @@ public class Scheduler implements SchedulerClient { protected final List onStartup; private final Waiter detectDeadWaiter; private final Duration heartbeatInterval; - final StatsRegistry statsRegistry; + final SchedulerListener schedulerListener; private final ExecutorService dueExecutor; private final Waiter heartbeatWaiter; final SettableSchedulerState schedulerState = new SettableSchedulerState(); @@ -81,7 +81,7 @@ protected Scheduler( Duration heartbeatInterval, int numberOfMissedHeartbeatsBeforeDead, boolean enableImmediateExecution, - StatsRegistry statsRegistry, + SchedulerListener schedulerListener, PollingStrategyConfig pollingStrategyConfig, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, @@ -106,7 +106,7 @@ protected Scheduler( this.heartbeatConfig = new HeartbeatConfig( heartbeatInterval, numberOfMissedHeartbeatsBeforeDead, getMaxAgeBeforeConsideredDead()); - this.statsRegistry = statsRegistry; + this.schedulerListener = schedulerListener; this.dueExecutor = dueExecutor; this.housekeeperExecutor = housekeeperExecutor; earlyExecutionListener = @@ -125,7 +125,7 @@ protected Scheduler( this, earlyExecutionListener, threadpoolSize, - statsRegistry, + schedulerListener, schedulerState, failureLogger, taskResolver, @@ -141,7 +141,7 @@ protected Scheduler( this, earlyExecutionListener, threadpoolSize, - statsRegistry, + schedulerListener, schedulerState, failureLogger, taskResolver, @@ -162,15 +162,16 @@ public void start() { executeOnStartup(); dueExecutor.submit( - new RunUntilShutdown(executeDueStrategy, executeDueWaiter, schedulerState, statsRegistry)); + new RunUntilShutdown( + executeDueStrategy, executeDueWaiter, schedulerState, schedulerListener)); housekeeperExecutor.scheduleWithFixedDelay( - new RunAndLogErrors(this::detectDeadExecutions, statsRegistry), + new RunAndLogErrors(this::detectDeadExecutions, schedulerListener), 0, detectDeadWaiter.getWaitDuration().toMillis(), MILLISECONDS); housekeeperExecutor.scheduleWithFixedDelay( - new RunAndLogErrors(this::updateHeartbeats, statsRegistry), + new RunAndLogErrors(this::updateHeartbeats, schedulerListener), 0, heartbeatWaiter.getWaitDuration().toMillis(), MILLISECONDS); @@ -192,7 +193,7 @@ protected void executeOnStartup() { os.onStartup(onStartupClient, this.clock); } catch (Exception e) { LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", e); - statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); } }); } @@ -355,7 +356,8 @@ protected void detectDeadExecutions() { Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); if (task.isPresent()) { - statsRegistry.register(SchedulerStatsEvent.DEAD_EXECUTION); + schedulerListener.onSchedulerEvent(SchedulerEventType.DEAD_EXECUTION); + schedulerListener.onExecutionDead(execution); task.get() .getDeadExecutionHandler() .deadExecution( @@ -373,13 +375,13 @@ protected void detectDeadExecutions() { "Failed while handling dead execution {}. Will be tried again later.", execution, e); - statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); } }); } else { LOG.trace("No dead executions found."); } - statsRegistry.register(SchedulerStatsEvent.RAN_DETECT_DEAD); + schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_DETECT_DEAD); } void updateHeartbeats() { @@ -392,7 +394,7 @@ void updateHeartbeats() { LOG.debug("Updating heartbeats for {} executions being processed.", currentlyProcessing.size()); Instant now = clock.now(); currentlyProcessing.forEach(execution -> updateHeartbeatForExecution(now, execution)); - statsRegistry.register(SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS); + schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_UPDATE_HEARTBEATS); } protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting currentlyExecuting) { @@ -407,7 +409,8 @@ protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting curre currentlyExecuting.heartbeat(successfulHeartbeat, now); if (!successfulHeartbeat) { - statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT); + schedulerListener.onSchedulerEvent(SchedulerEventType.FAILED_HEARTBEAT); + schedulerListener.onExecutionFailedHeartbeat(currentlyExecuting); } HeartbeatState heartbeatState = currentlyExecuting.getHeartbeatState(); @@ -417,13 +420,14 @@ protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting curre + " considered dead. See heartbeat-state. Heartbeat-state={}, Execution={}", heartbeatState.describe(), e); - statsRegistry.register(SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS); + schedulerListener.onSchedulerEvent(SchedulerEventType.FAILED_MULTIPLE_HEARTBEATS); } } catch (Throwable ex) { // just-in-case to avoid any "poison-pills" LOG.error("Unexpteced failure while while updating heartbeat for execution {}.", e, ex); - statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT); - statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR); + schedulerListener.onSchedulerEvent(SchedulerEventType.FAILED_HEARTBEAT); + schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR); + schedulerListener.onExecutionFailedHeartbeat(currentlyExecuting); } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index 2802aeef..dae0be51 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -22,7 +22,10 @@ import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.serializer.Serializer; +import com.github.kagkarlsson.scheduler.stats.CompositeSchedulerListener; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.StatsRegistryAdapter; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; import java.time.Duration; @@ -37,9 +40,7 @@ import org.slf4j.LoggerFactory; public class SchedulerBuilder { - private static final Logger LOG = LoggerFactory.getLogger(SchedulerBuilder.class); public static final double UPPER_LIMIT_FRACTION_OF_THREADS_FOR_FETCH = 3.0; - public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(10); public static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMinutes(5); public static final int DEFAULT_MISSED_HEARTBEATS_LIMIT = 6; @@ -50,14 +51,13 @@ public class SchedulerBuilder { PollingStrategyConfig.Type.FETCH, 0.5, UPPER_LIMIT_FRACTION_OF_THREADS_FOR_FETCH); public static final LogLevel DEFAULT_FAILURE_LOG_LEVEL = LogLevel.WARN; public static final boolean LOG_STACK_TRACE_ON_FAILURE = true; - - protected Clock clock = new SystemClock(); // if this is set, waiter-clocks must be updated - + private static final Logger LOG = LoggerFactory.getLogger(SchedulerBuilder.class); protected final DataSource dataSource; - protected SchedulerName schedulerName; - protected int executorThreads = 10; protected final List> knownTasks = new ArrayList<>(); protected final List startTasks = new ArrayList<>(); + protected Clock clock = new SystemClock(); // if this is set, waiter-clocks must be updated + protected SchedulerName schedulerName; + protected int executorThreads = 10; protected Waiter waiter = new Waiter(DEFAULT_POLLING_INTERVAL, clock); protected StatsRegistry statsRegistry = StatsRegistry.NOOP; protected Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL; @@ -77,6 +77,7 @@ public class SchedulerBuilder { private boolean registerShutdownHook = false; private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT; private boolean alwaysPersistTimestampInUTC = false; + private List schedulerListeners = new ArrayList<>(); public SchedulerBuilder(DataSource dataSource, List> knownTasks) { this.dataSource = dataSource; @@ -137,6 +138,11 @@ public SchedulerBuilder statsRegistry(StatsRegistry statsRegistry) { return this; } + public SchedulerBuilder addSchedulerListener(SchedulerListener schedulerListener) { + this.schedulerListeners.add(schedulerListener); + return this; + } + public SchedulerBuilder schedulerName(SchedulerName schedulerName) { this.schedulerName = schedulerName; return this; @@ -268,6 +274,10 @@ public Scheduler build() { 3, defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-housekeeper-")); } + if (statsRegistry != null) { + addSchedulerListener(new StatsRegistryAdapter(statsRegistry)); + } + LOG.info( "Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s enable-immediate-execution={}, table-name={}, name={}", executorThreads, @@ -290,7 +300,7 @@ public Scheduler build() { heartbeatInterval, numberOfMissedHeartbeatsBeforeDead, enableImmediateExecution, - statsRegistry, + new CompositeSchedulerListener(schedulerListeners), pollingStrategyConfig, deleteUnresolvedAfter, shutdownMaxWait, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/AbstractSchedulerListener.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/AbstractSchedulerListener.java new file mode 100644 index 00000000..e2b87d62 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/AbstractSchedulerListener.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.stats; + +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; + +public abstract class AbstractSchedulerListener implements SchedulerListener { + + @Override + public void onExecutionStart(CurrentlyExecuting currentlyExecuting) {} + + @Override + public void onExecutionComplete(ExecutionComplete executionComplete) {} + + @Override + public void onExecutionDead(Execution execution) {} + + @Override + public void onExecutionFailedHeartbeat(CurrentlyExecuting currentlyExecuting) {} + + @Override + public void onSchedulerEvent(SchedulerEventType type) {} + + @Override + public void onCandidateEvent(CandidateEventType type) {} +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/CompositeSchedulerListener.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/CompositeSchedulerListener.java new file mode 100644 index 00000000..b48d9f8e --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/CompositeSchedulerListener.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.stats; + +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompositeSchedulerListener implements SchedulerListener { + private static final Logger LOG = LoggerFactory.getLogger(CompositeSchedulerListener.class); + + private final List schedulerListeners; + + public CompositeSchedulerListener(List schedulerListeners) { + this.schedulerListeners = schedulerListeners; + } + + @Override + public void onExecutionStart(CurrentlyExecuting currentlyExecuting) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors( + listener, "onExecutionStart", () -> listener.onExecutionStart(currentlyExecuting)); + }); + } + + @Override + public void onExecutionComplete(ExecutionComplete executionComplete) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors( + listener, + "onExecutionComplete", + () -> listener.onExecutionComplete(executionComplete)); + }); + } + + @Override + public void onExecutionDead(Execution execution) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors(listener, "onExecutionDead", () -> listener.onExecutionDead(execution)); + }); + } + + @Override + public void onExecutionFailedHeartbeat(CurrentlyExecuting currentlyExecuting) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors( + listener, + "onExecutionFailedHeartbeat", + () -> listener.onExecutionFailedHeartbeat(currentlyExecuting)); + }); + } + + @Override + public void onSchedulerEvent(SchedulerEventType type) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors(listener, "onSchedulerEvent", () -> listener.onSchedulerEvent(type)); + }); + } + + @Override + public void onCandidateEvent(CandidateEventType type) { + schedulerListeners.forEach( + listener -> { + fireAndLogErrors(listener, "onCandidateEvent", () -> listener.onCandidateEvent(type)); + }); + } + + public void fireAndLogErrors(SchedulerListener listener, String method, Runnable r) { + try { + r.run(); + } catch (RuntimeException e) { + LOG.warn( + "Listener '{}' method '{}' threw an unexpected Exception", + listener.getClass().getName(), + method); + } + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/SchedulerListener.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/SchedulerListener.java new file mode 100644 index 00000000..3d2554ab --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/SchedulerListener.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.stats; + +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry.CandidateStatsEvent; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; + +public interface SchedulerListener { + + void onExecutionStart(CurrentlyExecuting currentlyExecuting); + + void onExecutionComplete(ExecutionComplete executionComplete); + + void onExecutionDead(Execution execution); + + void onExecutionFailedHeartbeat(CurrentlyExecuting currentlyExecuting); + + void onSchedulerEvent(SchedulerEventType type); + + void onCandidateEvent(CandidateEventType type); + + enum SchedulerEventType { + UNEXPECTED_ERROR(SchedulerStatsEvent.UNEXPECTED_ERROR), + FAILED_HEARTBEAT(SchedulerStatsEvent.FAILED_HEARTBEAT), + COMPLETIONHANDLER_ERROR(SchedulerStatsEvent.COMPLETIONHANDLER_ERROR), + FAILUREHANDLER_ERROR(SchedulerStatsEvent.FAILUREHANDLER_ERROR), + DEAD_EXECUTION(SchedulerStatsEvent.DEAD_EXECUTION), + RAN_UPDATE_HEARTBEATS(SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS), + RAN_DETECT_DEAD(SchedulerStatsEvent.RAN_DETECT_DEAD), + RAN_EXECUTE_DUE(SchedulerStatsEvent.RAN_EXECUTE_DUE), + FAILED_MULTIPLE_HEARTBEATS(SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS), + UNRESOLVED_TASK(SchedulerStatsEvent.UNRESOLVED_TASK); + + private final SchedulerStatsEvent statsRegistryEvent; + + SchedulerEventType(SchedulerStatsEvent statsRegistryEvent) { + this.statsRegistryEvent = statsRegistryEvent; + } + + public SchedulerStatsEvent toStatsRegistryEvent() { + return statsRegistryEvent; + } + } + + enum CandidateEventType { + STALE(CandidateStatsEvent.STALE), + ALREADY_PICKED(CandidateStatsEvent.ALREADY_PICKED), + EXECUTED(CandidateStatsEvent.EXECUTED); + + private final CandidateStatsEvent statsRegistryEvent; + + CandidateEventType(CandidateStatsEvent statsRegistryEvent) { + this.statsRegistryEvent = statsRegistryEvent; + } + + public CandidateStatsEvent toStatsRegistryEvent() { + return statsRegistryEvent; + } + } + + SchedulerListener NOOP = new AbstractSchedulerListener() {}; +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistryAdapter.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistryAdapter.java new file mode 100644 index 00000000..ad1e1ed6 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistryAdapter.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.stats; + +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry.ExecutionStatsEvent; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete.Result; + +public class StatsRegistryAdapter implements SchedulerListener { + + private final StatsRegistry statsRegistry; + + public StatsRegistryAdapter(StatsRegistry statsRegistry) { + this.statsRegistry = statsRegistry; + } + + @Override + public void onExecutionStart(CurrentlyExecuting currentlyExecuting) {} + + @Override + public void onExecutionComplete(ExecutionComplete executionComplete) { + if (statsRegistry == null) { + return; + } + + if (executionComplete.getResult() == Result.OK) { + statsRegistry.register(ExecutionStatsEvent.COMPLETED); + } else { + statsRegistry.register(ExecutionStatsEvent.FAILED); + } + statsRegistry.registerSingleCompletedExecution(executionComplete); + } + + @Override + public void onExecutionDead(Execution execution) {} + + @Override + public void onExecutionFailedHeartbeat(CurrentlyExecuting currentlyExecuting) {} + + @Override + public void onSchedulerEvent(SchedulerEventType type) { + if (statsRegistry == null) { + return; + } + statsRegistry.register(type.toStatsRegistryEvent()); + } + + @Override + public void onCandidateEvent(CandidateEventType type) { + if (statsRegistry == null) { + return; + } + statsRegistry.register(type.toStatsRegistryEvent()); + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index b512e478..3cdfd2be 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -15,7 +15,7 @@ import com.github.kagkarlsson.scheduler.*; import com.github.kagkarlsson.scheduler.logging.LogLevel; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; import com.github.kagkarlsson.scheduler.task.OnStartup; import java.time.Duration; import java.time.Instant; @@ -40,7 +40,7 @@ public class ManualScheduler extends Scheduler { Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, - StatsRegistry statsRegistry, + SchedulerListener schedulerListener, PollingStrategyConfig pollingStrategyConfig, Duration deleteUnresolvedAfter, LogLevel logLevel, @@ -60,7 +60,7 @@ public class ManualScheduler extends Scheduler { heartbeatInterval, SchedulerBuilder.DEFAULT_MISSED_HEARTBEATS_LIMIT, executeImmediately, - statsRegistry, + schedulerListener, pollingStrategyConfig, deleteUnresolvedAfter, Duration.ZERO, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index afc3af4c..fb927bfa 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -21,6 +21,7 @@ import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.StatsRegistryAdapter; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; import java.util.ArrayList; @@ -103,7 +104,7 @@ public ManualScheduler build() { waiter, heartbeatInterval, enableImmediateExecution, - statsRegistry, + new StatsRegistryAdapter(statsRegistry), Optional.ofNullable(pollingStrategyConfig).orElse(PollingStrategyConfig.DEFAULT_FETCH), deleteUnresolvedAfter, LogLevel.DEBUG, diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/RunUntilShutdownTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/RunUntilShutdownTest.java index 4c6c76a8..95e81a88 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/RunUntilShutdownTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/RunUntilShutdownTest.java @@ -3,7 +3,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; -import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.SchedulerListener; import java.time.Duration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -22,7 +22,7 @@ public void setUp() { runnable = new TimeLimitedRunnable(2, schedulerState); countingWaiter = new CountingWaiter(); runUntilShutdown = - new RunUntilShutdown(runnable, countingWaiter, schedulerState, StatsRegistry.NOOP); + new RunUntilShutdown(runnable, countingWaiter, schedulerState, SchedulerListener.NOOP); } @Test