Skip to content

Commit

Permalink
New interface SchedulerListener. Adapt old StatsRegistry. Add executi…
Browse files Browse the repository at this point in the history
…on-events.
  • Loading branch information
kagkarlsson committed Jun 20, 2024
1 parent 1b52ac3 commit 1b03709
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import static com.github.kagkarlsson.scheduler.ExceptionUtils.describe;

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
Expand All @@ -36,7 +38,7 @@ class ExecutePicked implements Runnable {
private final TaskRepository taskRepository;
private SchedulerClientEventListener earlyExecutionListener;
private final SchedulerClient schedulerClient;
private final StatsRegistry statsRegistry;
private final SchedulerListener schedulerListener;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
Expand All @@ -49,7 +51,7 @@ public ExecutePicked(
TaskRepository taskRepository,
SchedulerClientEventListener earlyExecutionListener,
SchedulerClient schedulerClient,
StatsRegistry statsRegistry,
SchedulerListener schedulerListener,
TaskResolver taskResolver,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
Expand All @@ -60,7 +62,7 @@ public ExecutePicked(
this.taskRepository = taskRepository;
this.earlyExecutionListener = earlyExecutionListener;
this.schedulerClient = schedulerClient;
this.statsRegistry = statsRegistry;
this.schedulerListener = schedulerListener;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
Expand All @@ -77,7 +79,8 @@ public void run() {
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);

try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
schedulerListener.onCandidateEvent(CandidateEventType.EXECUTED);
schedulerListener.onExecutionStart(currentlyExecuting);
executePickedExecution(pickedExecution, currentlyExecuting);
} finally {
executor.removeCurrentlyProcessing(executionId);
Expand All @@ -90,7 +93,7 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
LOG.error(
"Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.",
execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
return;
}

Expand All @@ -106,15 +109,12 @@ private void executePickedExecution(Execution execution, CurrentlyExecuting curr
LOG.debug("Execution done: " + execution);

complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);

} catch (RuntimeException unhandledException) {
failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);

} catch (Throwable unhandledError) {
failure(task.get(), execution, unhandledError, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
}

Expand All @@ -126,16 +126,17 @@ private void complete(
completion.complete(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.COMPLETIONHANDLER_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
describe(e),
e);
} finally {
schedulerListener.onExecutionComplete(completeEvent);
}
}

Expand All @@ -155,16 +156,17 @@ private void failure(
.onFailure(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.FAILUREHANDLER_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}, because {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
describe(cause),
e);
} finally {
schedulerListener.onExecutionComplete(completeEvent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.CandidateEventType;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
import com.github.kagkarlsson.scheduler.task.Execution;
import java.time.Instant;
import java.util.List;
Expand All @@ -30,7 +32,7 @@ public class FetchCandidates implements PollStrategy {
private final TaskRepository taskRepository;
private final SchedulerClient schedulerClient;
private SchedulerClientEventListener earlyExecutionListener;
private final StatsRegistry statsRegistry;
private final SchedulerListener schedulerListener;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
private final TaskResolver taskResolver;
Expand All @@ -48,7 +50,7 @@ public FetchCandidates(
SchedulerClient schedulerClient,
SchedulerClientEventListener earlyExecutionListener,
int threadpoolSize,
StatsRegistry statsRegistry,
SchedulerListener schedulerListener,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
TaskResolver taskResolver,
Expand All @@ -60,7 +62,7 @@ public FetchCandidates(
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
this.earlyExecutionListener = earlyExecutionListener;
this.statsRegistry = statsRegistry;
this.schedulerListener = schedulerListener;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
this.taskResolver = taskResolver;
Expand Down Expand Up @@ -104,7 +106,7 @@ public void run() {
taskRepository,
earlyExecutionListener,
schedulerClient,
statsRegistry,
schedulerListener,
taskResolver,
schedulerState,
failureLogger,
Expand All @@ -117,7 +119,7 @@ public void run() {
newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run);
});
}
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE);
}

private class PickDue implements Callable<Optional<Execution>> {
Expand All @@ -141,7 +143,7 @@ public Optional<Execution> call() {
if (addedDueExecutionsBatch.isOlderGenerationThan(currentGenerationNumber.get())) {
// skipping execution due to it being stale
addedDueExecutionsBatch.markBatchAsStale();
statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE);
schedulerListener.onCandidateEvent(CandidateEventType.STALE);
LOG.trace(
"Skipping queued execution (current generationNumber: {}, execution generationNumber: {})",
currentGenerationNumber,
Expand All @@ -154,7 +156,7 @@ public Optional<Execution> call() {
if (!pickedExecution.isPresent()) {
// someone else picked id
LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED);
schedulerListener.onCandidateEvent(CandidateEventType.ALREADY_PICKED);
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
import com.github.kagkarlsson.scheduler.task.Execution;
import java.time.Instant;
import java.util.List;
Expand All @@ -28,7 +29,7 @@ public class LockAndFetchCandidates implements PollStrategy {
private final TaskRepository taskRepository;
private final SchedulerClient schedulerClient;
private SchedulerClientEventListener earlyExecutionListener;
private final StatsRegistry statsRegistry;
private final SchedulerListener schedulerListener;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
Expand All @@ -46,7 +47,7 @@ public LockAndFetchCandidates(
SchedulerClient schedulerClient,
SchedulerClientEventListener earlyExecutionListener,
int threadpoolSize,
StatsRegistry statsRegistry,
SchedulerListener schedulerListener,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
TaskResolver taskResolver,
Expand All @@ -58,7 +59,7 @@ public LockAndFetchCandidates(
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
this.earlyExecutionListener = earlyExecutionListener;
this.statsRegistry = statsRegistry;
this.schedulerListener = schedulerListener;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void run() {
taskRepository,
earlyExecutionListener,
schedulerClient,
statsRegistry,
schedulerListener,
taskResolver,
schedulerState,
failureLogger,
Expand All @@ -118,6 +119,6 @@ public void run() {
}
});
}
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
schedulerListener.onSchedulerEvent(SchedulerEventType.RAN_EXECUTE_DUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
*/
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RunAndLogErrors implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(RunAndLogErrors.class);
private final Runnable toRun;
private final StatsRegistry statsRegistry;
private final SchedulerListener schedulerListener;

public RunAndLogErrors(Runnable toRun, StatsRegistry statsRegistry) {
public RunAndLogErrors(Runnable toRun, SchedulerListener schedulerListener) {
this.toRun = toRun;
this.statsRegistry = statsRegistry;
this.schedulerListener = schedulerListener;
}

@Override
Expand All @@ -33,7 +34,7 @@ public void run() {
toRun.run();
} catch (Throwable e) {
LOG.error("Unhandled exception. Will keep running.", e);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
*/
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener;
import com.github.kagkarlsson.scheduler.stats.SchedulerListener.SchedulerEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,17 +23,17 @@ class RunUntilShutdown implements Runnable {
private final Runnable toRun;
private final Waiter waitBetweenRuns;
private final SchedulerState schedulerState;
private final StatsRegistry statsRegistry;
private final SchedulerListener schedulerListener;

public RunUntilShutdown(
Runnable toRun,
Waiter waitBetweenRuns,
SchedulerState schedulerState,
StatsRegistry statsRegistry) {
SchedulerListener schedulerListener) {
this.toRun = toRun;
this.waitBetweenRuns = waitBetweenRuns;
this.schedulerState = schedulerState;
this.statsRegistry = statsRegistry;
this.schedulerListener = schedulerListener;
}

@Override
Expand All @@ -43,7 +44,7 @@ public void run() {
toRun.run();
} catch (Throwable e) {
LOG.error("Unhandled exception. Will keep running.", e);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
}
}

Expand All @@ -54,7 +55,7 @@ public void run() {
LOG.debug("Thread '{}' interrupted due to shutdown.", Thread.currentThread().getName());
} else {
LOG.error("Unexpected interruption of thread. Will keep running.", interruptedException);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
schedulerListener.onSchedulerEvent(SchedulerEventType.UNEXPECTED_ERROR);
}
}
}
Expand Down
Loading

0 comments on commit 1b03709

Please sign in to comment.