diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java index 0d56e65f4..da69b79a8 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java @@ -61,6 +61,7 @@ public record TriggerDatasource( String tenantName, long datasourceId, Boolean startFromFirst) implements Command {} public record TriggerDatasourcePurge(String tenantName, long datasourceId) implements Command {} private record ScheduleDatasourceInternal(String tenantName, long datasourceId, boolean schedulable, String cron) implements Command {} + private record UnScheduleJobInternal(String jobName) implements Command {} private record TriggerDatasourceInternal(String tenantName, Datasource datasource, boolean startFromFirst) implements Command {} private record InvokePluginDriverInternal( String tenantName, io.openk9.datasource.model.Scheduler scheduler, @@ -136,10 +137,11 @@ private static Behavior initial( return Behaviors.receive(Command.class) .onMessage(InitialSubscribeResponse.class, isr -> onInitialSubscribeResponse(ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, jobNames, isr)) .onMessage(ScheduleDatasource.class, ad -> onAddDatasource(ad, ctx)) - .onMessage(UnScheduleDatasource.class, rd -> onRemoveDatasource(rd, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames)) + .onMessage(UnScheduleDatasource.class, rd -> onRemoveDatasource(rd, ctx)) .onMessage(TriggerDatasource.class, jm -> onTriggerDatasource(jm, ctx, transactionInvoker)) .onMessage(TriggerDatasourcePurge.class, tdp -> onTriggerDatasourcePurge(tdp, ctx, restHighLevelClient, transactionInvoker)) .onMessage(ScheduleDatasourceInternal.class, sdi -> onScheduleDatasourceInternal(sdi, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames)) + .onMessage(UnScheduleJobInternal.class, rd -> onUnscheduleJobInternal(rd, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames)) .onMessage(TriggerDatasourceInternal.class, tdi -> onTriggerDatasourceInternal(tdi, ctx, transactionInvoker, messageGateway)) .onMessage(InvokePluginDriverInternal.class, ipdi -> onInvokePluginDriverInternal(ctx, httpPluginDriverClient, ipdi.tenantName, ipdi.scheduler, ipdi.startFromFirst)) .onMessage(StartSchedulerInternal.class, ssi -> onStartScheduler(ctx, transactionInvoker, messageGateway, ssi)) @@ -150,6 +152,33 @@ private static Behavior initial( } + private static Behavior onUnscheduleJobInternal( + UnScheduleJobInternal msg, ActorContext ctx, + QuartzSchedulerTypedExtension quartzSchedulerTypedExtension, + HttpPluginDriverClient httpPluginDriverClient, TransactionInvoker transactionInvoker, + RestHighLevelClient restHighLevelClient, ActorRef messageGateway, + List jobNames) { + + String jobName = msg.jobName; + + if (jobNames.contains(jobName)) { + quartzSchedulerTypedExtension.deleteJobSchedule(jobName); + quartzSchedulerTypedExtension.deleteJobSchedule(jobName + "-purge"); + + List newJobNames = new ArrayList<>(jobNames); + newJobNames.remove(jobName); + log.info("Job removed: {}", jobName); + + return initial( + ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, + transactionInvoker, restHighLevelClient, messageGateway, newJobNames); + } + + log.info("Job not found: {}", jobName); + + return Behaviors.same(); + } + private static Behavior onStartSubscribeResponse( ActorContext ctx, StartSubscribeResponse cmsr) { @@ -409,29 +438,14 @@ private static Behavior onTriggerDatasourcePurge( } private static Behavior onRemoveDatasource( - UnScheduleDatasource removeDatasource, ActorContext ctx, - QuartzSchedulerTypedExtension quartzSchedulerTypedExtension, - HttpPluginDriverClient httpPluginDriverClient, - TransactionInvoker transactionInvoker, - RestHighLevelClient restHighLevelClient, - ActorRef messageGatewayService, List jobNames) { + UnScheduleDatasource removeDatasource, ActorContext ctx) { long datasourceId = removeDatasource.datasourceId; String tenantName = removeDatasource.tenantName; String jobName = tenantName + "-" + datasourceId; - if (jobNames.contains(jobName)) { - quartzSchedulerTypedExtension.deleteJobSchedule(jobName); - List newJobNames = new ArrayList<>(jobNames); - newJobNames.remove(jobName); - log.info("Job removed: {}", jobName); - return initial( - ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, - transactionInvoker, restHighLevelClient, messageGatewayService, newJobNames); - } - - log.info("Job not found: {}", jobName); + ctx.getSelf().tell(new UnScheduleJobInternal(jobName)); return Behaviors.same();