Skip to content

Commit

Permalink
#518 fix stop flow
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-vi committed Jul 26, 2023
1 parent ba5f70a commit f8b569a
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private record SetScheduler(Scheduler scheduler) implements Command {}
private record EnrichPipelineResponseWrapper(EnrichPipeline.Response response) implements Command {}
private record NotificationSenderResponseWrapper(NotificationSender.Response response) implements Command {}
private enum Start implements Command {INSTANCE}
private enum Stop implements Command {INSTANCE}
private enum Tick implements Command {INSTANCE}

public sealed interface Response extends CborSerializable {}
Expand Down Expand Up @@ -157,6 +158,7 @@ private Receive<Command> finish() {
.onMessageEquals(PersistDataIndex.INSTANCE, this::onPersistDataIndex)
.onMessageEquals(PersistStatusFinished.INSTANCE, this::onPersistStatusFinished)
.onMessage(NotificationSenderResponseWrapper.class, this::onNotificationResponse)
.onMessageEquals(Stop.INSTANCE, this::onStop)
.build();
}

Expand Down Expand Up @@ -256,12 +258,11 @@ private Behavior<Command> onCancel() {
return s.persist(scheduler);
})
.invoke(this::destroyQueue)
.invoke(() -> getContext().getSelf().tell(Stop.INSTANCE))
)
);

logBehavior(STOPPED_BEHAVIOR);

return Behaviors.stopped();
return finish();
}

private Behavior<Command> enqueue(Command command) {
Expand Down Expand Up @@ -330,6 +331,7 @@ private Behavior<Command> onPersistStatusFinished() {
return s.persist(entity);
})
.invoke(this::destroyQueue)
.invoke(() -> getContext().getSelf().tell(Stop.INSTANCE))
)
);

Expand All @@ -341,18 +343,16 @@ private Behavior<Command> onPersistStatusFinished() {
return Behaviors.same();
}

logBehavior(STOPPED_BEHAVIOR);

return Behaviors.stopped();
return Behaviors.same();

}

private Behavior<Command> onNotificationResponse(
NotificationSenderResponseWrapper notificationResponseWrapper) {

logBehavior(STOPPED_BEHAVIOR);
getContext().getSelf().tell(Stop.INSTANCE);

return Behaviors.stopped();
return Behaviors.same();
}

private Behavior<Command> onTick() {
Expand Down Expand Up @@ -414,6 +414,11 @@ private String getIndexName() {
return indexName;
}

private Behavior<Command> onStop() {
logBehavior(STOPPED_BEHAVIOR);
return Behaviors.stopped();
}

private void logBehavior(String behavior) {
log.info("Schedulation with key {} behavior is {}", key, behavior);
}
Expand Down

0 comments on commit f8b569a

Please sign in to comment.