diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/graphql/SchedulerGraphqlResource.java b/core/app/datasource/src/main/java/io/openk9/datasource/graphql/SchedulerGraphqlResource.java index 0e2510d97..a7786ee78 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/graphql/SchedulerGraphqlResource.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/graphql/SchedulerGraphqlResource.java @@ -64,6 +64,11 @@ public Uni newDataIndex(@Source Scheduler scheduler) { return schedulerService.getNewDataIndex(scheduler); } + @Mutation + public Uni cancelSchedulation(@Id long id) { + return schedulerService.cancelSchedulation(id); + } + @Inject SchedulerService schedulerService; diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/service/SchedulerService.java b/core/app/datasource/src/main/java/io/openk9/datasource/service/SchedulerService.java index 354403e19..30bfc74b2 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/service/SchedulerService.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/service/SchedulerService.java @@ -17,12 +17,19 @@ package io.openk9.datasource.service; +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; +import io.openk9.auth.tenant.TenantResolver; +import io.openk9.datasource.actor.ActorSystemProvider; import io.openk9.datasource.model.DataIndex; import io.openk9.datasource.model.Datasource; import io.openk9.datasource.model.Scheduler; import io.openk9.datasource.model.Scheduler_; import io.openk9.datasource.model.dto.SchedulerDTO; import io.openk9.datasource.model.util.Mutiny2; +import io.openk9.datasource.pipeline.SchedulationKeyUtils; +import io.openk9.datasource.pipeline.actor.Schedulation; import io.openk9.datasource.service.util.BaseK9EntityService; import io.openk9.datasource.util.UniActionListener; import io.smallrye.mutiny.Uni; @@ -97,6 +104,27 @@ public Uni> getDeletedContentIds(long id) { .flatMap(this::indexesDiff); } + public Uni cancelSchedulation(long schedulerId) { + return findById(schedulerId) + .chain(scheduler -> { + if (scheduler.getStatus() == Scheduler.SchedulerStatus.STARTED) { + + ActorSystem actorSystem = actorSystemProvider.getActorSystem(); + + ClusterSharding clusterSharding = ClusterSharding.get(actorSystem); + + EntityRef schedulationRef = clusterSharding.entityRefFor( + Schedulation.ENTITY_TYPE_KEY, + SchedulationKeyUtils.getValue( + tenantResolver.getTenantName(), scheduler.getScheduleId())); + + schedulationRef.tell(Schedulation.Cancel.INSTANCE); + } + + return Uni.createFrom().nothing(); + }); + } + private Uni> indexesDiff(Scheduler scheduler) { if (scheduler == null) { return Uni.createFrom().item(List.of()); @@ -152,4 +180,8 @@ private List mapToList(SearchResponse searchResponse) { @Inject RestHighLevelClient client; + @Inject + ActorSystemProvider actorSystemProvider; + @Inject + TenantResolver tenantResolver; }