Skip to content

Commit

Permalink
delete checkpoint when ad job is stopped due to EndRunException
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Aug 23, 2023
1 parent b9674ac commit d2b189e
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -462,16 +463,16 @@ private void stopAdJobForEndRunException(
LockModel lock,
Instant detectionStartTime,
Instant executionStartTime,
EndRunException exception,
EndRunException endRunException,
ExecuteADResultResponseRecorder recorder,
AnomalyDetector detector
) {
String detectorId = jobParameter.getName();
detectorEndRunExceptionCount.remove(detectorId);
String errorPrefix = exception.isEndNow()
String errorPrefix = endRunException.isEndNow()
? "Stopped detector: "
: "Stopped detector as job failed consecutively for more than " + this.maxRetryForEndRunException + " times: ";
String error = errorPrefix + exception.getMessage();
String error = errorPrefix + endRunException.getMessage();
stopAdJob(
detectorId,
() -> indexAnomalyResultException(
Expand All @@ -487,6 +488,22 @@ private void stopAdJobForEndRunException(
detector
)
);

// delete checkpoint
String adID = detector.getId();
DeleteModelRequest modelDeleteRequest = new DeleteModelRequest(adID);
client.execute(DeleteModelAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
log.warn("Cannot delete all models of detector {}", adID);
for (FailedNodeException failedNodeException : response.failures()) {
log.warn("Deleting models of node has exception", failedNodeException);
}
} else {
log.info("models of detector {} get deleted", adID);
}
}, exception -> {
log.error(new ParameterizedMessage("Deletion of detector [{}] has exception.", adID), exception);
}));
}

private void stopAdJob(String detectorId, ExecutorFunction function) {
Expand Down

0 comments on commit d2b189e

Please sign in to comment.