Skip to content

Commit

Permalink
Move listener for immediate-execution to builder
Browse files Browse the repository at this point in the history
  • Loading branch information
kagkarlsson committed Jul 2, 2024
1 parent 92189fa commit 087425a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TriggerCheckForDueExecutions extends AbstractSchedulerListener {
private static final Logger LOG = LoggerFactory.getLogger(TriggerCheckForDueExecutions.class);
private SchedulerState schedulerState;
class ImmediateCheckForDueExecutions extends AbstractSchedulerListener {
private static final Logger LOG = LoggerFactory.getLogger(ImmediateCheckForDueExecutions.class);
private final Scheduler scheduler;
private Clock clock;
private Waiter executeDueWaiter;

public TriggerCheckForDueExecutions(
SchedulerState schedulerState, Clock clock, Waiter executeDueWaiter) {
this.schedulerState = schedulerState;
public ImmediateCheckForDueExecutions(
Scheduler scheduler, Clock clock) {
this.scheduler = scheduler;
this.clock = clock;
this.executeDueWaiter = executeDueWaiter;
}

@Override
public void onExecutionScheduled(
TaskInstanceId taskInstanceId, Instant scheduledToExecutionTime) {
SchedulerState schedulerState = scheduler.getSchedulerState();
if (!schedulerState.isStarted() || schedulerState.isShuttingDown()) {
LOG.debug(
"Will not act on scheduling event for execution (task: '{}', id: '{}') as scheduler is starting or shutting down.",
Expand All @@ -48,7 +47,7 @@ public void onExecutionScheduled(
"Task-instance scheduled to run directly, triggering check for due executions (unless it is already running). Task: {}, instance: {}",
taskInstanceId.getTaskName(),
taskInstanceId.getId());
executeDueWaiter.wakeOrSkipNextWait();
scheduler.triggerCheckForDueExecutions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ protected Scheduler(
Waiter executeDueWaiter,
Duration heartbeatInterval,
int numberOfMissedHeartbeatsBeforeDead,
boolean enableImmediateExecution,
List<SchedulerListener> schedulerListeners,
List<SchedulerListener> schedulerListeners,
PollingStrategyConfig pollingStrategyConfig,
Duration deleteUnresolvedAfter,
Duration shutdownMaxWait,
Expand Down Expand Up @@ -109,9 +108,6 @@ protected Scheduler(
this.schedulerListeners = new SchedulerListeners(schedulerListeners);
this.dueExecutor = dueExecutor;
this.housekeeperExecutor = housekeeperExecutor;
if (enableImmediateExecution) {
this.schedulerListeners.add(new TriggerCheckForDueExecutions(schedulerState, clock, executeDueWaiter));
}
delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ public Scheduler build() {
waiter,
heartbeatInterval,
numberOfMissedHeartbeatsBeforeDead,
enableImmediateExecution,
schedulerListeners,
schedulerListeners,
pollingStrategyConfig,
deleteUnresolvedAfter,
shutdownMaxWait,
Expand All @@ -313,6 +312,10 @@ public Scheduler build() {
candidateDueExecutor,
candidateHousekeeperExecutor);

if (enableImmediateExecution) {
scheduler.registerSchedulerListener(new ImmediateCheckForDueExecutions(scheduler, clock));
}

if (registerShutdownHook) {
Runtime.getRuntime()
.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public class ManualScheduler extends Scheduler {
waiter,
heartbeatInterval,
SchedulerBuilder.DEFAULT_MISSED_HEARTBEATS_LIMIT,
executeImmediately,
schedulerListeners,
schedulerListeners,
pollingStrategyConfig,
deleteUnresolvedAfter,
Duration.ZERO,
Expand Down

0 comments on commit 087425a

Please sign in to comment.