Skip to content

Commit

Permalink
issue #528: delete datasource purge jobs by quartz
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-vi committed Jul 26, 2023
1 parent 91ce8ee commit 73b5a30
Showing 1 changed file with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public record TriggerDatasource(
String tenantName, long datasourceId, Boolean startFromFirst) implements Command {}
public record TriggerDatasourcePurge(String tenantName, long datasourceId) implements Command {}
private record ScheduleDatasourceInternal(String tenantName, long datasourceId, boolean schedulable, String cron) implements Command {}
private record UnScheduleJobInternal(String jobName) implements Command {}
private record TriggerDatasourceInternal(String tenantName, Datasource datasource, boolean startFromFirst) implements Command {}
private record InvokePluginDriverInternal(
String tenantName, io.openk9.datasource.model.Scheduler scheduler,
Expand Down Expand Up @@ -136,10 +137,11 @@ private static Behavior<Command> initial(
return Behaviors.receive(Command.class)
.onMessage(InitialSubscribeResponse.class, isr -> onInitialSubscribeResponse(ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, jobNames, isr))
.onMessage(ScheduleDatasource.class, ad -> onAddDatasource(ad, ctx))
.onMessage(UnScheduleDatasource.class, rd -> onRemoveDatasource(rd, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames))
.onMessage(UnScheduleDatasource.class, rd -> onRemoveDatasource(rd, ctx))
.onMessage(TriggerDatasource.class, jm -> onTriggerDatasource(jm, ctx, transactionInvoker))
.onMessage(TriggerDatasourcePurge.class, tdp -> onTriggerDatasourcePurge(tdp, ctx, restHighLevelClient, transactionInvoker))
.onMessage(ScheduleDatasourceInternal.class, sdi -> onScheduleDatasourceInternal(sdi, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames))
.onMessage(UnScheduleJobInternal.class, rd -> onUnscheduleJobInternal(rd, ctx, quartzSchedulerTypedExtension, httpPluginDriverClient, transactionInvoker, restHighLevelClient, messageGateway, jobNames))
.onMessage(TriggerDatasourceInternal.class, tdi -> onTriggerDatasourceInternal(tdi, ctx, transactionInvoker, messageGateway))
.onMessage(InvokePluginDriverInternal.class, ipdi -> onInvokePluginDriverInternal(ctx, httpPluginDriverClient, ipdi.tenantName, ipdi.scheduler, ipdi.startFromFirst))
.onMessage(StartSchedulerInternal.class, ssi -> onStartScheduler(ctx, transactionInvoker, messageGateway, ssi))
Expand All @@ -150,6 +152,33 @@ private static Behavior<Command> initial(

}

private static Behavior<Command> onUnscheduleJobInternal(
UnScheduleJobInternal msg, ActorContext<Command> ctx,
QuartzSchedulerTypedExtension quartzSchedulerTypedExtension,
HttpPluginDriverClient httpPluginDriverClient, TransactionInvoker transactionInvoker,
RestHighLevelClient restHighLevelClient, ActorRef<MessageGateway.Command> messageGateway,
List<String> jobNames) {

String jobName = msg.jobName;

if (jobNames.contains(jobName)) {
quartzSchedulerTypedExtension.deleteJobSchedule(jobName);
quartzSchedulerTypedExtension.deleteJobSchedule(jobName + "-purge");

List<String> newJobNames = new ArrayList<>(jobNames);
newJobNames.remove(jobName);
log.info("Job removed: {}", jobName);

return initial(
ctx, quartzSchedulerTypedExtension, httpPluginDriverClient,
transactionInvoker, restHighLevelClient, messageGateway, newJobNames);
}

log.info("Job not found: {}", jobName);

return Behaviors.same();
}

private static Behavior<Command> onStartSubscribeResponse(
ActorContext<Command> ctx, StartSubscribeResponse cmsr) {

Expand Down Expand Up @@ -409,29 +438,14 @@ private static Behavior<Command> onTriggerDatasourcePurge(
}

private static Behavior<Command> onRemoveDatasource(
UnScheduleDatasource removeDatasource, ActorContext<Command> ctx,
QuartzSchedulerTypedExtension quartzSchedulerTypedExtension,
HttpPluginDriverClient httpPluginDriverClient,
TransactionInvoker transactionInvoker,
RestHighLevelClient restHighLevelClient,
ActorRef<MessageGateway.Command> messageGatewayService, List<String> jobNames) {
UnScheduleDatasource removeDatasource, ActorContext<Command> ctx) {

long datasourceId = removeDatasource.datasourceId;
String tenantName = removeDatasource.tenantName;

String jobName = tenantName + "-" + datasourceId;

if (jobNames.contains(jobName)) {
quartzSchedulerTypedExtension.deleteJobSchedule(jobName);
List<String> newJobNames = new ArrayList<>(jobNames);
newJobNames.remove(jobName);
log.info("Job removed: {}", jobName);
return initial(
ctx, quartzSchedulerTypedExtension, httpPluginDriverClient,
transactionInvoker, restHighLevelClient, messageGatewayService, newJobNames);
}

log.info("Job not found: {}", jobName);
ctx.getSelf().tell(new UnScheduleJobInternal(jobName));

return Behaviors.same();

Expand Down

0 comments on commit 73b5a30

Please sign in to comment.