Skip to content

Commit

Permalink
#519: adds cancelSchedulation mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-vi committed Jul 26, 2023
1 parent 3a1ba56 commit ba5f70a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public Uni<DataIndex> newDataIndex(@Source Scheduler scheduler) {
return schedulerService.getNewDataIndex(scheduler);
}

@Mutation
public Uni<Void> cancelSchedulation(@Id long id) {
return schedulerService.cancelSchedulation(id);
}

@Inject
SchedulerService schedulerService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package io.openk9.datasource.service;

import akka.actor.typed.ActorSystem;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import io.openk9.auth.tenant.TenantResolver;
import io.openk9.datasource.actor.ActorSystemProvider;
import io.openk9.datasource.model.DataIndex;
import io.openk9.datasource.model.Datasource;
import io.openk9.datasource.model.Scheduler;
import io.openk9.datasource.model.Scheduler_;
import io.openk9.datasource.model.dto.SchedulerDTO;
import io.openk9.datasource.model.util.Mutiny2;
import io.openk9.datasource.pipeline.SchedulationKeyUtils;
import io.openk9.datasource.pipeline.actor.Schedulation;
import io.openk9.datasource.service.util.BaseK9EntityService;
import io.openk9.datasource.util.UniActionListener;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -97,6 +104,27 @@ public Uni<List<String>> getDeletedContentIds(long id) {
.flatMap(this::indexesDiff);
}

public Uni<Void> cancelSchedulation(long schedulerId) {
return findById(schedulerId)
.chain(scheduler -> {
if (scheduler.getStatus() == Scheduler.SchedulerStatus.STARTED) {

ActorSystem<?> actorSystem = actorSystemProvider.getActorSystem();

ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);

EntityRef<Schedulation.Command> schedulationRef = clusterSharding.entityRefFor(
Schedulation.ENTITY_TYPE_KEY,
SchedulationKeyUtils.getValue(
tenantResolver.getTenantName(), scheduler.getScheduleId()));

schedulationRef.tell(Schedulation.Cancel.INSTANCE);
}

return Uni.createFrom().nothing();
});
}

private Uni<List<String>> indexesDiff(Scheduler scheduler) {
if (scheduler == null) {
return Uni.createFrom().item(List.of());
Expand Down Expand Up @@ -152,4 +180,8 @@ private List<String> mapToList(SearchResponse searchResponse) {

@Inject
RestHighLevelClient client;
@Inject
ActorSystemProvider actorSystemProvider;
@Inject
TenantResolver tenantResolver;
}

0 comments on commit ba5f70a

Please sign in to comment.