Skip to content

Commit

Permalink
Merge pull request #320 from kagkarlsson/dead_executions_handler_exec…
Browse files Browse the repository at this point in the history
…utioncomplete

Change signature of DeadExecutionHandler, send an ExecutionComplete event
  • Loading branch information
kagkarlsson authored Sep 13, 2022
2 parents 6987fca + 82e58a9 commit 8ffa88f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import com.github.kagkarlsson.scheduler.task.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -266,7 +260,9 @@ protected void detectDeadExecutions() {
Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (task.isPresent()) {
statsRegistry.register(SchedulerStatsEvent.DEAD_EXECUTION);
task.get().getDeadExecutionHandler().deadExecution(execution, new ExecutionOperations(schedulerTaskRepository, earlyExecutionListener, execution));
task.get().getDeadExecutionHandler().deadExecution(
ExecutionComplete.failure(execution, now, now, null),
new ExecutionOperations(schedulerTaskRepository, earlyExecutionListener, execution));
} else {
LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", execution.taskInstance.getTaskName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@
import java.time.Instant;

public interface DeadExecutionHandler<T> {
void deadExecution(Execution execution, ExecutionOperations<T> executionOperations);
void deadExecution(ExecutionComplete executionComplete, ExecutionOperations<T> executionOperations);

class ReviveDeadExecution<T> implements DeadExecutionHandler<T> {
private static final Logger LOG = LoggerFactory.getLogger(ReviveDeadExecution.class);

@Override
public void deadExecution(Execution execution, ExecutionOperations<T> executionOperations) {
public void deadExecution(ExecutionComplete executionComplete, ExecutionOperations<T> executionOperations) {
final Instant now = Instant.now();
LOG.info("Reviving dead execution: " + execution + " to " + now);
executionOperations.reschedule(new ExecutionComplete(execution, now, now, ExecutionComplete.Result.FAILED, null), now);
LOG.info("Reviving dead execution: " + executionComplete.getExecution() + " to " + now);
executionOperations.reschedule(executionComplete, now);
}
}

class CancelDeadExecution<T> implements DeadExecutionHandler<T> {
private static final Logger LOG = LoggerFactory.getLogger(ReviveDeadExecution.class);

@Override
public void deadExecution(Execution execution, ExecutionOperations<T> executionOperations) {
LOG.warn("Cancelling dead execution: " + execution);
public void deadExecution(ExecutionComplete executionComplete, ExecutionOperations<T> executionOperations) {
LOG.warn("Cancelling dead execution: " + executionComplete.getExecution());
executionOperations.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository;
import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.DeadExecutionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.VoidExecutionHandler;
import com.github.kagkarlsson.scheduler.task.*;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.testhelper.SettableClock;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -150,9 +144,9 @@ public static class ReviveDead<T> extends DeadExecutionHandler.ReviveDeadExecuti
public int timesCalled = 0;

@Override
public void deadExecution(Execution execution, ExecutionOperations<T> executionOperations) {
public void deadExecution(ExecutionComplete executionComplete, ExecutionOperations<T> executionOperations) {
timesCalled++;
super.deadExecution(execution, executionOperations);
super.deadExecution(executionComplete, executionOperations);
}
}

Expand Down

0 comments on commit 8ffa88f

Please sign in to comment.