diff --git a/README.md b/README.md index edbf8135..161eb6b4 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,9 @@ this, but testing has shown this is prone to deadlocks and thus not recommended :gear: `.heartbeatInterval(Duration)`
How often to update the heartbeat timestamp for running executions. Default `5m`. +:gear: `.missedHeartbeatsLimit(int)`
+How many heartbeats may be missed before the execution is considered dead. Default `1`. + :gear: `.schedulerName(SchedulerName)`
Name of this scheduler-instance. The name is stored in the database when an execution is picked by a scheduler. Default ``. @@ -479,9 +482,13 @@ Use-cases might be: ### Dead executions -During execution, the scheduler regularly updates a heartbeat-time for the task-execution. If an execution is marked as executing, but is not receiving updates to the heartbeat-time, it will be considered a _dead execution_ after time X. That may for example happen if the JVM running the scheduler suddenly exits. +During execution, the scheduler regularly updates a heartbeat-time for the task-execution. +If an execution is marked as executing, but is not receiving updates to the heartbeat-time, +it will be considered a _dead execution_ after time X. That may for example happen if the +JVM running the scheduler suddenly exits. -When a dead execution is found, the `Task`is consulted to see what should be done. A dead `RecurringTask` is typically rescheduled to `now()`. +When a dead execution is found, the `Task`is consulted to see what should be done. A dead +`RecurringTask` is typically rescheduled to `now()`. ## Performance diff --git a/db-scheduler/pom.xml b/db-scheduler/pom.xml index b6d00742..19f2b8a7 100644 --- a/db-scheduler/pom.xml +++ b/db-scheduler/pom.xml @@ -22,7 +22,6 @@ 1.6 3.14.2 1.14.5 - 0.6.0 2.0.4 42.5.1 1.7.36 diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/CurrentlyExecuting.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/CurrentlyExecuting.java index 0a8b0bff..f6467487 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/CurrentlyExecuting.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/CurrentlyExecuting.java @@ -24,11 +24,13 @@ public class CurrentlyExecuting { private final Execution execution; private final Clock clock; private final Instant startTime; + private final HeartbeatState heartbeatState; - public CurrentlyExecuting(Execution execution, Clock clock) { + public CurrentlyExecuting(Execution execution, Clock clock, HeartbeatConfig heartbeatConfig) { this.execution = execution; this.clock = clock; this.startTime = clock.now(); + this.heartbeatState = new HeartbeatState(clock, startTime, heartbeatConfig); } public Duration getDuration() { @@ -42,4 +44,12 @@ public TaskInstance getTaskInstance() { public Execution getExecution() { return execution; } + + public HeartbeatState getHeartbeatState() { + return heartbeatState; + } + + public void heartbeat(boolean successful, Instant now) { + heartbeatState.heartbeat(successful, now); + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java index 987cb01f..c5564d15 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java @@ -39,6 +39,7 @@ class ExecutePicked implements Runnable { private final SchedulerState schedulerState; private final ConfigurableLogger failureLogger; private final Clock clock; + private HeartbeatConfig heartbeatConfig; private final Execution pickedExecution; public ExecutePicked( @@ -51,6 +52,7 @@ public ExecutePicked( SchedulerState schedulerState, ConfigurableLogger failureLogger, Clock clock, + HeartbeatConfig heartbeatConfig, Execution pickedExecution) { this.executor = executor; this.taskRepository = taskRepository; @@ -61,23 +63,26 @@ public ExecutePicked( this.schedulerState = schedulerState; this.failureLogger = failureLogger; this.clock = clock; + this.heartbeatConfig = heartbeatConfig; this.pickedExecution = pickedExecution; } @Override public void run() { // FIXLATER: need to cleanup all the references back to scheduler fields - final UUID executionId = - executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock)); + CurrentlyExecuting currentlyExecuting = + new CurrentlyExecuting(pickedExecution, clock, heartbeatConfig); + final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting); + try { statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); - executePickedExecution(pickedExecution); + executePickedExecution(pickedExecution, currentlyExecuting); } finally { executor.removeCurrentlyProcessing(executionId); } } - private void executePickedExecution(Execution execution) { + private void executePickedExecution(Execution execution, CurrentlyExecuting currentlyExecuting) { final Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); if (!task.isPresent()) { LOG.error( @@ -94,7 +99,8 @@ private void executePickedExecution(Execution execution) { task.get() .execute( execution.taskInstance, - new ExecutionContext(schedulerState, execution, schedulerClient)); + new ExecutionContext( + schedulerState, execution, schedulerClient, currentlyExecuting)); LOG.debug("Execution done: " + execution); complete(completion, execution, executionStarted); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index 2371bdec..e601e40c 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -37,6 +37,7 @@ public class FetchCandidates implements PollStrategy { private final Clock clock; private final PollingStrategyConfig pollingStrategyConfig; private final Runnable triggerCheckForNewExecutions; + private HeartbeatConfig heartbeatConfig; AtomicInteger currentGenerationNumber = new AtomicInteger(0); private final int lowerLimit; private final int upperLimit; @@ -53,7 +54,8 @@ public FetchCandidates( TaskResolver taskResolver, Clock clock, PollingStrategyConfig pollingStrategyConfig, - Runnable triggerCheckForNewExecutions) { + Runnable triggerCheckForNewExecutions, + HeartbeatConfig heartbeatConfig) { this.executor = executor; this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; @@ -65,6 +67,7 @@ public FetchCandidates( this.clock = clock; this.pollingStrategyConfig = pollingStrategyConfig; this.triggerCheckForNewExecutions = triggerCheckForNewExecutions; + this.heartbeatConfig = heartbeatConfig; lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize); // FIXLATER: this is not "upper limit", but rather nr of executions to get. those already in // queue will become stale @@ -106,6 +109,7 @@ public void run() { schedulerState, failureLogger, clock, + heartbeatConfig, picked) .run()); }, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatConfig.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatConfig.java new file mode 100644 index 00000000..5445ebea --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler; + +import java.time.Duration; + +public class HeartbeatConfig { + + public final Duration heartbeatInterval; + public final int missedHeartbeatsLimit; + public final Duration maxAgeBeforeConsideredDead; + + public HeartbeatConfig( + Duration heartbeatInterval, int missedHeartbeatsLimit, Duration maxAgeBeforeConsideredDead) { + this.heartbeatInterval = heartbeatInterval; + this.missedHeartbeatsLimit = missedHeartbeatsLimit; + this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatState.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatState.java new file mode 100644 index 00000000..01574811 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/HeartbeatState.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler; + +import java.time.Duration; +import java.time.Instant; + +public class HeartbeatState { + private Clock clock; + private final Instant startTime; + private final HeartbeatConfig heartbeatConfig; + private int heartbeatSuccessesSinceLastFailure = 0; + private int heartbeatFailuresSinceLastSuccess = 0; + private Instant heartbeatLastSuccess; + private Instant heartbeatLastFailure; + + public HeartbeatState(Clock clock, Instant startTime, HeartbeatConfig heartbeatConfig) { + this.clock = clock; + this.startTime = startTime; + this.heartbeatConfig = heartbeatConfig; + heartbeatLastSuccess = startTime; + } + + public boolean hasStaleHeartbeat() { + Duration sinceLastSuccess = Duration.between(heartbeatLastSuccess, clock.now()); + + long heartbeatMillis = heartbeatConfig.heartbeatInterval.toMillis(); + long millisUntilConsideredStale = + heartbeatMillis + Math.min(10_000, (int) (heartbeatMillis * 0.25)); + return heartbeatFailuresSinceLastSuccess > 0 + || sinceLastSuccess.toMillis() > millisUntilConsideredStale; + } + + public double getFractionDead() { + Duration sinceLastSuccess = Duration.between(heartbeatLastSuccess, clock.now()); + return (double) sinceLastSuccess.toMillis() + / heartbeatConfig.maxAgeBeforeConsideredDead.toMillis(); + } + + public int getFailedHeartbeats() { + return heartbeatFailuresSinceLastSuccess; + } + + public void heartbeat(boolean successful, Instant lastHeartbeatAttempt) { + if (successful) { + heartbeatLastSuccess = lastHeartbeatAttempt; + heartbeatSuccessesSinceLastFailure++; + heartbeatFailuresSinceLastSuccess = 0; + } else { + heartbeatLastFailure = lastHeartbeatAttempt; + heartbeatSuccessesSinceLastFailure = 0; + heartbeatFailuresSinceLastSuccess++; + } + } + + public String describe() { + return "HeartbeatState{" + + "successesSinceLastFailure=" + + heartbeatSuccessesSinceLastFailure + + ", failuresSinceLastSuccess=" + + heartbeatFailuresSinceLastSuccess + + ", lastSuccess=" + + heartbeatLastSuccess + + ", lastFailure=" + + heartbeatLastFailure + + ", missedHeartbeatsLimit=" + + heartbeatConfig.missedHeartbeatsLimit + + '}'; + } +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 54d3121b..c648b6ad 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -35,6 +35,7 @@ public class LockAndFetchCandidates implements PollStrategy { private final Clock clock; private final PollingStrategyConfig pollingStrategyConfig; private final Runnable triggerCheckForNewExecutions; + private HeartbeatConfig maxAgeBeforeConsideredDead; private final int lowerLimit; private final int upperLimit; private AtomicBoolean moreExecutionsInDatabase = new AtomicBoolean(false); @@ -51,7 +52,8 @@ public LockAndFetchCandidates( TaskResolver taskResolver, Clock clock, PollingStrategyConfig pollingStrategyConfig, - Runnable triggerCheckForNewExecutions) { + Runnable triggerCheckForNewExecutions, + HeartbeatConfig maxAgeBeforeConsideredDead) { this.executor = executor; this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; @@ -63,6 +65,7 @@ public LockAndFetchCandidates( this.clock = clock; this.pollingStrategyConfig = pollingStrategyConfig; this.triggerCheckForNewExecutions = triggerCheckForNewExecutions; + this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead; lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize); upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize); } @@ -106,6 +109,7 @@ public void run() { schedulerState, failureLogger, clock, + maxAgeBeforeConsideredDead, picked), () -> { if (moreExecutionsInDatabase.get() diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 1215070f..60e0c9a2 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -14,13 +14,21 @@ package com.github.kagkarlsson.scheduler; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toList; import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState; import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger; import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; -import com.github.kagkarlsson.scheduler.task.*; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import com.github.kagkarlsson.scheduler.task.ExecutionOperations; +import com.github.kagkarlsson.scheduler.task.OnStartup; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskInstance; +import com.github.kagkarlsson.scheduler.task.TaskInstanceId; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -45,6 +53,8 @@ public class Scheduler implements SchedulerClient { protected final PollStrategy executeDueStrategy; protected final Executor executor; private final ScheduledExecutorService housekeeperExecutor; + private final HeartbeatConfig heartbeatConfig; + private final int numberOfMissedHeartbeatsBeforeDead; int threadpoolSize; private final Waiter executeDueWaiter; private final Duration deleteUnresolvedAfter; @@ -69,6 +79,7 @@ protected Scheduler( SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, + int numberOfMissedHeartbeatsBeforeDead, boolean enableImmediateExecution, StatsRegistry statsRegistry, PollingStrategyConfig pollingStrategyConfig, @@ -90,7 +101,11 @@ protected Scheduler( this.onStartup = onStartup; this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2), clock); this.heartbeatInterval = heartbeatInterval; + this.numberOfMissedHeartbeatsBeforeDead = numberOfMissedHeartbeatsBeforeDead; this.heartbeatWaiter = new Waiter(heartbeatInterval, clock); + this.heartbeatConfig = + new HeartbeatConfig( + heartbeatInterval, numberOfMissedHeartbeatsBeforeDead, getMaxAgeBeforeConsideredDead()); this.statsRegistry = statsRegistry; this.dueExecutor = dueExecutor; this.housekeeperExecutor = housekeeperExecutor; @@ -116,7 +131,8 @@ protected Scheduler( taskResolver, clock, pollingStrategyConfig, - this::triggerCheckForDueExecutions); + this::triggerCheckForDueExecutions, + heartbeatConfig); } else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) { executeDueStrategy = new FetchCandidates( @@ -131,7 +147,8 @@ protected Scheduler( taskResolver, clock, pollingStrategyConfig, - this::triggerCheckForDueExecutions); + this::triggerCheckForDueExecutions, + heartbeatConfig); } else { throw new IllegalArgumentException( "Unknown polling-strategy type: " + pollingStrategyConfig.type); @@ -303,6 +320,12 @@ public List getCurrentlyExecuting() { return executor.getCurrentlyExecuting(); } + public List getCurrentlyExecutingWithStaleHeartbeat() { + return executor.getCurrentlyExecuting().stream() + .filter(c -> c.getHeartbeatState().hasStaleHeartbeat()) + .collect(toList()); + } + @SuppressWarnings({"rawtypes", "unchecked"}) protected void detectDeadExecutions() { LOG.debug("Deleting executions with unresolved tasks."); @@ -368,26 +391,44 @@ void updateHeartbeats() { LOG.debug("Updating heartbeats for {} executions being processed.", currentlyProcessing.size()); Instant now = clock.now(); - currentlyProcessing.stream() - .map(CurrentlyExecuting::getExecution) - .forEach( - execution -> { - LOG.trace("Updating heartbeat for execution: " + execution); - try { - schedulerTaskRepository.updateHeartbeat(execution, now); - } catch (Throwable e) { - LOG.error( - "Failed while updating heartbeat for execution {}. Will try again later.", - execution, - e); - statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR); - } - }); + currentlyProcessing.forEach(execution -> updateHeartbeatForExecution(now, execution)); statsRegistry.register(SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS); } + protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting currentlyExecuting) { + // There is a race-condition: the execution may have been deleted or updated, causing + // this update to fail (or update 0 rows). This may happen once, but not multiple times. + + Execution e = currentlyExecuting.getExecution(); + LOG.trace("Updating heartbeat for execution: " + e); + + try { + boolean successfulHeartbeat = schedulerTaskRepository.updateHeartbeatWithRetry(e, now, 3); + currentlyExecuting.heartbeat(successfulHeartbeat, now); + + if (!successfulHeartbeat) { + statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT); + } + + HeartbeatState heartbeatState = currentlyExecuting.getHeartbeatState(); + if (heartbeatState.getFailedHeartbeats() > 1) { + LOG.warn( + "Execution has more than 1 failed heartbeats. Should not happen. Risk of being" + + " considered dead. See heartbeat-state. Heartbeat-state={}, Execution={}", + heartbeatState.describe(), + e); + statsRegistry.register(SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS); + } + + } catch (Throwable ex) { // just-in-case to avoid any "poison-pills" + LOG.error("Unexpteced failure while while updating heartbeat for execution {}.", e, ex); + statsRegistry.register(SchedulerStatsEvent.FAILED_HEARTBEAT); + statsRegistry.register(SchedulerStatsEvent.UNEXPECTED_ERROR); + } + } + Duration getMaxAgeBeforeConsideredDead() { - return heartbeatInterval.multipliedBy(4); + return heartbeatInterval.multipliedBy(numberOfMissedHeartbeatsBeforeDead); } public static SchedulerBuilder create(DataSource dataSource, Task... knownTasks) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index b4c71bd8..2802aeef 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -42,6 +42,7 @@ public class SchedulerBuilder { public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(10); public static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMinutes(5); + public static final int DEFAULT_MISSED_HEARTBEATS_LIMIT = 6; public static final Duration DEFAULT_DELETION_OF_UNRESOLVED_TASKS_DURATION = Duration.ofDays(14); public static final Duration SHUTDOWN_MAX_WAIT = Duration.ofMinutes(30); public static final PollingStrategyConfig DEFAULT_POLLING_STRATEGY = @@ -74,6 +75,7 @@ public class SchedulerBuilder { protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL; protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE; private boolean registerShutdownHook = false; + private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT; private boolean alwaysPersistTimestampInUTC = false; public SchedulerBuilder(DataSource dataSource, List> knownTasks) { @@ -102,6 +104,14 @@ public SchedulerBuilder heartbeatInterval(Duration duration) { return this; } + public SchedulerBuilder missedHeartbeatsLimit(int numberOfMissedHeartbeatsBeforeDead) { + if (numberOfMissedHeartbeatsBeforeDead < 4) { + throw new IllegalArgumentException("Heartbeat-limit must be at least 4"); + } + this.numberOfMissedHeartbeatsBeforeDead = numberOfMissedHeartbeatsBeforeDead; + return this; + } + public SchedulerBuilder threads(int numberOfThreads) { this.executorThreads = numberOfThreads; return this; @@ -278,6 +288,7 @@ public Scheduler build() { schedulerName, waiter, heartbeatInterval, + numberOfMissedHeartbeatsBeforeDead, enableImmediateExecution, statsRegistry, pollingStrategyConfig, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java index f54b9172..ea22a803 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.TaskInstanceId; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -59,12 +60,19 @@ boolean reschedule( List getDeadExecutions(Instant olderThan); - void updateHeartbeat(Execution execution, Instant heartbeatTime); + boolean updateHeartbeatWithRetry(Execution execution, Instant newHeartbeat, int tries); + + boolean updateHeartbeat(Execution execution, Instant heartbeatTime); List getExecutionsFailingLongerThan(Duration interval); Optional getExecution(String taskName, String taskInstanceId); + default Optional getExecution(TaskInstanceId taskInstance) { + return getExecution(taskInstance.getTaskName(), taskInstance.getId()); + } + ; + int removeExecutions(String taskName); void verifySupportsLockAndFetch(); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java index 5538f527..e66f6832 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java @@ -68,17 +68,6 @@ public AutodetectJdbcCustomization(DataSource dataSource, boolean persistTimesta LOG.info("Using MySQL jdbc-overrides for version older than 8. (v {})", dbVersion); detectedCustomization = new MySQLJdbcCustomization(true); } - } else { - if (persistTimestampInUTC) { - LOG.info( - "No database-specific jdbc-overrides applied. Behavior overridden to always store " - + "timestamps in zone UTC"); - } else { - LOG.warn( - "No database-specific jdbc-overrides applied. Assuming time-related columns " - + "to be of type compatibe with 'TIMESTAMP WITH TIME ZONE', i.e. zone is persisted." - + " If not, consider overriding to always UTC via '.alwaysPersistTimestampInUTC()'."); - } } else { if (persistTimestampInUTC) { LOG.info( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index 98bac05e..041ea349 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -18,11 +18,9 @@ import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; -import com.github.kagkarlsson.jdbc.*; import com.github.kagkarlsson.jdbc.JdbcRunner; import com.github.kagkarlsson.jdbc.ResultSetMapper; import com.github.kagkarlsson.jdbc.SQLRuntimeException; -import com.github.kagkarlsson.scheduler.*; import com.github.kagkarlsson.scheduler.Clock; import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter; import com.github.kagkarlsson.scheduler.SchedulerName; @@ -32,7 +30,10 @@ import com.github.kagkarlsson.scheduler.exceptions.ExecutionException; import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException; import com.github.kagkarlsson.scheduler.serializer.Serializer; -import com.github.kagkarlsson.scheduler.task.*; +import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; +import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskInstance; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -560,7 +561,23 @@ public List getDeadExecutions(Instant olderThan) { } @Override - public void updateHeartbeat(Execution e, Instant newHeartbeat) { + public boolean updateHeartbeatWithRetry(Execution execution, Instant newHeartbeat, int tries) { + + try { + return updateHeartbeat(execution, newHeartbeat); + } catch (RuntimeException e) { + if (tries <= 1) { + LOG.warn("Failed to update heartbeat. No more retries.", e); + return false; + } else { + LOG.info("Failed to update heartbeat. Remaining retries={}.", tries - 1, e); + return updateHeartbeatWithRetry(execution, newHeartbeat, tries - 1); + } + } + } + + @Override + public boolean updateHeartbeat(Execution e, Instant newHeartbeat) { final int updated = jdbcRunner.execute( @@ -578,14 +595,25 @@ public void updateHeartbeat(Execution e, Instant newHeartbeat) { }); if (updated == 0) { - LOG.trace("Did not update heartbeat. Execution must have been removed or rescheduled.", e); + // There is a race-condition: Executions are not removed from currently-executing until after + // the execution has been updated in the database, so this might happen. + LOG.warn( + "Did not update heartbeat. Execution must have been removed or rescheduled" + + "(i.e. CompletionHandler ran and finished just before heartbeat-update). " + + "This is a race-condition that may occur, but is very unlikely. " + + "task-instance={}", + e.taskInstance); + return false; } else { if (updated > 1) { - throw new IllegalStateException( - "Updated multiple rows updating heartbeat for execution. Should never happen since name and id is primary key. Execution: " + LOG.error( + "Updated multiple rows updating heartbeat for execution. Should never happen since " + + "name and id is primary key. Execution: " + e); + return true; } LOG.debug("Updated heartbeat for execution: " + e); + return true; } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistry.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistry.java index cf05b02e..ff4905a9 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistry.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/stats/StatsRegistry.java @@ -19,12 +19,14 @@ public interface StatsRegistry { enum SchedulerStatsEvent { UNEXPECTED_ERROR, + FAILED_HEARTBEAT, COMPLETIONHANDLER_ERROR, FAILUREHANDLER_ERROR, DEAD_EXECUTION, RAN_UPDATE_HEARTBEATS, RAN_DETECT_DEAD, RAN_EXECUTE_DUE, + FAILED_MULTIPLE_HEARTBEATS, UNRESOLVED_TASK } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Execution.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Execution.java index da535766..bdbbe389 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Execution.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Execution.java @@ -17,7 +17,7 @@ import java.util.Objects; @SuppressWarnings("rawtypes") -public final class Execution { +public final class Execution implements TaskInstanceId { public final TaskInstance taskInstance; public final Instant executionTime; public final boolean picked; @@ -107,4 +107,14 @@ public String toString() { + ", version=" + version; } + + @Override + public String getTaskName() { + return taskInstance.getTaskName(); + } + + @Override + public String getId() { + return taskInstance.getId(); + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionContext.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionContext.java index b67fb6be..3e8784af 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionContext.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionContext.java @@ -13,6 +13,7 @@ */ package com.github.kagkarlsson.scheduler.task; +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; import com.github.kagkarlsson.scheduler.SchedulerClient; import com.github.kagkarlsson.scheduler.SchedulerState; @@ -21,12 +22,17 @@ public class ExecutionContext { private final SchedulerState schedulerState; private final Execution execution; private final SchedulerClient schedulerClient; + private CurrentlyExecuting currentlyExecuting; public ExecutionContext( - SchedulerState schedulerState, Execution execution, SchedulerClient schedulerClient) { + SchedulerState schedulerState, + Execution execution, + SchedulerClient schedulerClient, + CurrentlyExecuting currentlyExecuting) { this.schedulerState = schedulerState; this.execution = execution; this.schedulerClient = schedulerClient; + this.currentlyExecuting = currentlyExecuting; } public SchedulerState getSchedulerState() { @@ -44,4 +50,8 @@ public SchedulerClient getSchedulerClient() { public Execution getExecution() { return execution; } + + public CurrentlyExecuting getCurrentlyExecuting() { + return currentlyExecuting; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index a98e9e95..b512e478 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -58,6 +58,7 @@ public class ManualScheduler extends Scheduler { schedulerName, waiter, heartbeatInterval, + SchedulerBuilder.DEFAULT_MISSED_HEARTBEATS_LIMIT, executeImmediately, statsRegistry, pollingStrategyConfig, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/SettableClock.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/SettableClock.java index c843b1c8..8d4973c9 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/SettableClock.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/SettableClock.java @@ -14,6 +14,7 @@ package com.github.kagkarlsson.scheduler.testhelper; import com.github.kagkarlsson.scheduler.Clock; +import java.time.Duration; import java.time.Instant; public class SettableClock implements Clock { @@ -28,4 +29,8 @@ public Instant now() { public void set(Instant newNow) { this.now = newNow; } + + public void tick(Duration toAdd) { + now = now.plus(toAdd); + } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/HeartbeatStateTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/HeartbeatStateTest.java new file mode 100644 index 00000000..7cadbc01 --- /dev/null +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/HeartbeatStateTest.java @@ -0,0 +1,98 @@ +package com.github.kagkarlsson.scheduler; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.*; + +import com.github.kagkarlsson.scheduler.testhelper.SettableClock; +import java.time.Duration; +import org.hamcrest.Matchers; +import org.hamcrest.number.IsCloseTo; +import org.junit.jupiter.api.Test; + +class HeartbeatStateTest { + private SettableClock clock = new SettableClock(); + + @Test + void happy() { + HeartbeatState state = + new HeartbeatState( + clock, + clock.now(), + new HeartbeatConfig(Duration.ofMinutes(1), 4, Duration.ofMinutes(4))); + + assertOk(state); + + clock.tick(Duration.ofSeconds(30)); + state.heartbeat(true, clock.now()); + + assertOk(state); + + clock.tick(Duration.ofSeconds(60)); + state.heartbeat(false, clock.now()); + assertFailing(state, 1, 0.25); + } + + @Test + void success_resets_failing() { + HeartbeatState state = + new HeartbeatState( + clock, + clock.now(), + new HeartbeatConfig(Duration.ofMinutes(1), 4, Duration.ofMinutes(4))); + + assertOk(state); + + clock.tick(Duration.ofSeconds(30)); + state.heartbeat(false, clock.now()); + + assertFailing(state, 1, 0.125); + + clock.tick(Duration.ofSeconds(60)); + state.heartbeat(false, clock.now()); + + assertFailing(state, 2, 0.375); + + clock.tick(Duration.ofSeconds(60)); + state.heartbeat(true, clock.now()); + + assertOk(state); + } + + @Test + void not_stale_until_tolerance_passed() { + HeartbeatState state = + new HeartbeatState( + clock, + clock.now(), + new HeartbeatConfig(Duration.ofSeconds(60), 4, Duration.ofMinutes(4))); + + assertOk(state); + + clock.tick(Duration.ofSeconds(60)); + assertThat(state.hasStaleHeartbeat(), is(false)); + + clock.tick(Duration.ofSeconds(5)); + assertThat(state.hasStaleHeartbeat(), is(false)); + + clock.tick(Duration.ofSeconds(25)); + assertThat(state.hasStaleHeartbeat(), is(true)); + + state.heartbeat(true, clock.now()); + assertThat(state.hasStaleHeartbeat(), is(false)); + + assertOk(state); + } + + private void assertFailing(HeartbeatState state, int timesFailed, double fractionDead) { + assertTrue(state.hasStaleHeartbeat()); + assertThat(state.getFailedHeartbeats(), is(timesFailed)); + assertThat(state.getFractionDead(), IsCloseTo.closeTo(fractionDead, 0.01)); + } + + private void assertOk(HeartbeatState state) { + assertFalse(state.hasStaleHeartbeat()); + assertThat(state.getFailedHeartbeats(), is(0)); + assertThat(state.getFractionDead(), Matchers.lessThan(0.01)); + } +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java index e69c0680..eb43c5b4 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java @@ -158,9 +158,12 @@ public PausingHandler() { public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { try { waitForExecute.countDown(); + LoggerFactory.getLogger(PausingHandler.class).trace("Awaiting waitInExecuteUntil."); waitInExecuteUntil.await(); + LoggerFactory.getLogger(PausingHandler.class) + .trace("Received countdown for waitInExecuteUntil."); } catch (InterruptedException e) { - LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); + LoggerFactory.getLogger(PausingHandler.class).info("Interrupted."); } } } @@ -202,11 +205,20 @@ public void execute(TaskInstance taskInstance, ExecutionContext executionCont public static class SimpleStatsRegistry extends StatsRegistry.DefaultStatsRegistry { public final AtomicInteger unexpectedErrors = new AtomicInteger(0); + public final AtomicInteger failedHeartbeats = new AtomicInteger(0); + public final AtomicInteger failedMultipleHeartbeats = new AtomicInteger(0); + public final AtomicInteger deadExecutions = new AtomicInteger(0); @Override public void register(SchedulerStatsEvent e) { if (e == SchedulerStatsEvent.UNEXPECTED_ERROR) { unexpectedErrors.incrementAndGet(); + } else if (e == SchedulerStatsEvent.FAILED_HEARTBEAT) { + failedHeartbeats.incrementAndGet(); + } else if (e == SchedulerStatsEvent.FAILED_MULTIPLE_HEARTBEATS) { + failedMultipleHeartbeats.incrementAndGet(); + } else if (e == SchedulerStatsEvent.DEAD_EXECUTION) { + deadExecutions.incrementAndGet(); } super.register(e); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/concurrent/ClusterTests.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/concurrent/ClusterTests.java index 7be7da61..5e200a78 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/concurrent/ClusterTests.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/concurrent/ClusterTests.java @@ -79,7 +79,9 @@ static void testConcurrencyForPollingStrategy( assertThat(completed.failed.size(), is(0)); assertThat(completed.ok.size(), is(ids.size())); assertThat("Should contain no duplicates", new HashSet<>(completed.ok).size(), is(ids.size())); - assertThat(stats.unexpectedErrors.get(), is(0)); + assertThat("No unexpected errors", stats.unexpectedErrors.get(), is(0)); + assertThat("No dead executions", stats.deadExecutions.get(), is(0)); + assertThat("No multiple-heartbeat-failures", stats.failedMultipleHeartbeats.get(), is(0)); assertThat(scheduler1.getCurrentlyExecuting(), hasSize(0)); assertThat(scheduler2.getCurrentlyExecuting(), hasSize(0)); } @@ -114,7 +116,9 @@ static void testRecurring(StopSchedulerExtension stopScheduler, DataSource datas scheduler1.stop(); scheduler2.stop(); - assertThat(stats.unexpectedErrors.get(), is(0)); + assertThat("No unexpected errors", stats.unexpectedErrors.get(), is(0)); + assertThat("No dead executions", stats.deadExecutions.get(), is(0)); + assertThat("No multiple-heartbeat-failures", stats.failedMultipleHeartbeats.get(), is(0)); assertThat(scheduler1.getCurrentlyExecuting(), hasSize(0)); assertThat(scheduler2.getCurrentlyExecuting(), hasSize(0)); } @@ -130,7 +134,7 @@ private static Scheduler createScheduler( .schedulerName(new Fixed(name)) .threads(NUMBER_OF_THREADS) .pollingInterval(Duration.ofMillis(50)) // also runs fine with 5s - .heartbeatInterval(Duration.ofMillis(2_000)) + .heartbeatInterval(Duration.ofMillis(500)) .statsRegistry(stats); schedulerCustomization.accept(builder); return builder.build(); @@ -143,7 +147,7 @@ private static Scheduler createSchedulerRecurring( .schedulerName(new Fixed(name)) .threads(NUMBER_OF_THREADS) .pollingInterval(Duration.ofMillis(50)) - .heartbeatInterval(Duration.ofMillis(2_000)) + .heartbeatInterval(Duration.ofMillis(500)) .statsRegistry(stats) .build(); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DetectStaleHeartbeatTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DetectStaleHeartbeatTest.java new file mode 100644 index 00000000..96832a62 --- /dev/null +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DetectStaleHeartbeatTest.java @@ -0,0 +1,79 @@ +package com.github.kagkarlsson.scheduler.functional; + +import static com.github.kagkarlsson.jdbc.PreparedStatementSetter.NOOP; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.kagkarlsson.jdbc.JdbcRunner; +import com.github.kagkarlsson.scheduler.CurrentlyExecuting; +import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension; +import com.github.kagkarlsson.scheduler.HeartbeatState; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.SchedulerName; +import com.github.kagkarlsson.scheduler.StopSchedulerExtension; +import com.github.kagkarlsson.scheduler.TestTasks.PausingHandler; +import com.github.kagkarlsson.scheduler.helper.TestableRegistry; +import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler.CancelDeadExecution; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.hamcrest.collection.IsCollectionWithSize; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DetectStaleHeartbeatTest { + private static final Logger LOG = LoggerFactory.getLogger(DetectStaleHeartbeatTest.class); + + @RegisterExtension + public EmbeddedPostgresqlExtension postgres = new EmbeddedPostgresqlExtension(); + + @RegisterExtension public StopSchedulerExtension stopScheduler = new StopSchedulerExtension(); + + @Test + public void test_dead_execution() throws InterruptedException { + PausingHandler handler = new PausingHandler<>(); + + OneTimeTask customTask = + Tasks.oneTime("custom-a", Void.class) + .onDeadExecution(new CancelDeadExecution<>()) + .execute(handler); + + TestableRegistry.Condition ranUpdateHeartbeats = + TestableRegistry.Conditions.ranUpdateHeartbeats(2); + TestableRegistry.Condition ranExecuteDue = TestableRegistry.Conditions.ranExecuteDue(1); + + TestableRegistry registry = + TestableRegistry.create().waitConditions(ranUpdateHeartbeats, ranExecuteDue).build(); + + Scheduler scheduler = + Scheduler.create(postgres.getDataSource(), customTask) + .pollingInterval(Duration.ofMillis(30)) + .heartbeatInterval(Duration.ofMillis(30)) + .schedulerName(new SchedulerName.Fixed("test")) + .statsRegistry(registry) + .build(); + stopScheduler.register(scheduler); + + scheduler.schedule(customTask.instance("1"), Instant.now()); + scheduler.start(); + handler.waitForExecute.await(); + + // fake other update that will cause heartbeat update to fail + new JdbcRunner(postgres.getDataSource()) + .execute("update scheduled_tasks set version = version + 1", NOOP); + ranUpdateHeartbeats.waitFor(); + + List failing = scheduler.getCurrentlyExecutingWithStaleHeartbeat(); + handler.waitInExecuteUntil.countDown(); + + assertThat(failing, IsCollectionWithSize.hasSize(1)); + HeartbeatState state = failing.get(0).getHeartbeatState(); + assertTrue(state.hasStaleHeartbeat()); + assertThat(state.getFailedHeartbeats(), is(2)); + } +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/RanUpdateHeartbeatsCondition.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/RanUpdateHeartbeatsCondition.java new file mode 100644 index 00000000..6cab2357 --- /dev/null +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/RanUpdateHeartbeatsCondition.java @@ -0,0 +1,44 @@ +package com.github.kagkarlsson.scheduler.helper; + +import com.github.kagkarlsson.scheduler.stats.StatsRegistry; +import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; +import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RanUpdateHeartbeatsCondition implements TestableRegistry.Condition { + private static final Logger LOG = LoggerFactory.getLogger(RanUpdateHeartbeatsCondition.class); + + private final CountDownLatch count; + private final int waitForCount; + + public RanUpdateHeartbeatsCondition(int waitForCount) { + count = new CountDownLatch(waitForCount); + this.waitForCount = waitForCount; + } + + @Override + public void waitFor() { + try { + LOG.debug("Starting await for " + waitForCount + " UpdateHeartbeats"); + count.await(); + LOG.debug("Finished wait for " + waitForCount + " UpdateHeartbeats"); + } catch (InterruptedException e) { + LOG.debug("Interrupted"); + } + } + + @Override + public void apply(StatsRegistry.SchedulerStatsEvent e) { + if (e == SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS) { + LOG.info("Received event update-heartbeats, counting down"); + count.countDown(); + } + } + + @Override + public void apply(StatsRegistry.CandidateStatsEvent e) {} + + @Override + public void apply(StatsRegistry.ExecutionStatsEvent e) {} +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/TestableRegistry.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/TestableRegistry.java index 5cf1f7d1..f7f90b29 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/TestableRegistry.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/helper/TestableRegistry.java @@ -173,6 +173,10 @@ public static Condition completed(int numberCompleted) { return new ExecutionCompletedCondition(numberCompleted); } + public static Condition ranUpdateHeartbeats(int count) { + return new RanUpdateHeartbeatsCondition(count); + } + public static Condition ranExecuteDue(int count) { return new RanExecuteDueCondition(count); } diff --git a/examples/features/pom.xml b/examples/features/pom.xml index 31abb893..5173e1a5 100644 --- a/examples/features/pom.xml +++ b/examples/features/pom.xml @@ -24,6 +24,11 @@ db-scheduler ${project.version} + + com.github.kagkarlsson + micro-jdbc + ${micro-jdbc.version} + org.slf4j slf4j-api diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java new file mode 100644 index 00000000..f5a0771f --- /dev/null +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.examples; + +import static com.github.kagkarlsson.jdbc.PreparedStatementSetter.NOOP; + +import com.github.kagkarlsson.examples.helpers.Example; +import com.github.kagkarlsson.jdbc.JdbcRunner; +import com.github.kagkarlsson.scheduler.HeartbeatState; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; +import java.time.Duration; +import java.time.Instant; +import javax.sql.DataSource; + +public class HeartbeatMonitoringMain extends Example { + + public static void main(String[] args) { + new HeartbeatMonitoringMain().runWithDatasource(); + } + + @Override + public void run(DataSource dataSource) { + + OneTimeTask waitForStaleHeartbeatTask = + Tasks.oneTime("wait-for-stale-heartbeat-task", Void.class) + .execute( + (inst, ctx) -> { + System.out.println("Running!"); + while (ctx.getCurrentlyExecuting().getHeartbeatState().getFractionDead() < 0.7) { + sleep(100); + printHeartbeat(ctx.getCurrentlyExecuting().getHeartbeatState()); + } + printHeartbeat(ctx.getCurrentlyExecuting().getHeartbeatState()); + System.out.println("Done!"); + }); + + final Scheduler scheduler = + Scheduler.create(dataSource, waitForStaleHeartbeatTask) + .threads(5) + .heartbeatInterval(Duration.ofSeconds(1)) + .missedHeartbeatsLimit(8) + .pollingInterval(Duration.ofSeconds(1)) + .build(); + + scheduler.start(); + + scheduler.schedule(waitForStaleHeartbeatTask.instance("1045"), Instant.now()); + + sleep(4000); + JdbcRunner jdbcRunner = new JdbcRunner(dataSource); + + // simulate something that will cause heartbeating to fail + System.out.println("Fake update on execution to cause heartbeat-update to fail."); + jdbcRunner.execute("update scheduled_tasks set version = version + 1", NOOP); + } + + private void printHeartbeat(HeartbeatState heartbeatState) { + System.out.printf( + "Will keep running until heartbeat-failure detected. Current state: failed-heartbeats=%s, fraction-dead=%s, stale=%s\n", + heartbeatState.getFailedHeartbeats(), + heartbeatState.getFractionDead(), + heartbeatState.hasStaleHeartbeat()); + } +} diff --git a/pom.xml b/pom.xml index 4c99b0f3..03021f10 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,7 @@ true + 0.6.0 1.7.36 1.2.12 2.7.11