diff --git a/build.gradle b/build.gradle index a7e9b17e5..c23a039b5 100644 --- a/build.gradle +++ b/build.gradle @@ -12,6 +12,7 @@ import java.util.concurrent.Callable import org.opensearch.gradle.test.RestIntegTestTask import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask +import org.opensearch.gradle.info.BuildParams buildscript { ext { @@ -67,8 +68,7 @@ plugins { id 'com.netflix.nebula.ospackage' version "11.0.0" id "com.diffplug.spotless" version "6.18.0" id 'java-library' - // Gradle 7.6 support was added in test-retry 1.4.0. - id 'org.gradle.test-retry' version '1.4.1' + id 'org.gradle.test-retry' version '1.5.4' } tasks.withType(JavaCompile) { @@ -286,14 +286,13 @@ def _numNodes = findProperty('numNodes') as Integer ?: 1 def opensearch_tmp_dir = rootProject.file('build/private/opensearch_tmp').absoluteFile opensearch_tmp_dir.mkdirs() -boolean isCiServer = System.getenv().containsKey("CI") test { retry { - if (isCiServer) { - failOnPassedAfterRetry = false + if (BuildParams.isCi()) { maxRetries = 6 maxFailures = 10 } + failOnPassedAfterRetry = false } include '**/*Tests.class' systemProperty 'tests.security.manager', 'false' @@ -314,11 +313,11 @@ tasks.named("check").configure { dependsOn(integTest) } integTest { retry { - if (isCiServer) { - failOnPassedAfterRetry = false + if (BuildParams.isCi()) { maxRetries = 6 maxFailures = 10 } + failOnPassedAfterRetry = false } dependsOn "bundlePlugin" systemProperty 'tests.security.manager', 'false' diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java index 0fb2fe7fb..582f3e39a 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java @@ -11,11 +11,11 @@ package org.opensearch.ad; -import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_PARSE_DETECTOR_MSG; import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_FIND_CONFIG_MSG; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_PARSE_CONFIG_MSG; import java.util.List; import java.util.Map; @@ -30,7 +30,6 @@ import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.model.ADTaskType; import org.opensearch.ad.model.AnomalyDetector; @@ -66,6 +65,7 @@ import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.common.exception.NotSerializedExceptionName; import org.opensearch.timeseries.common.exception.ResourceNotFoundException; +import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.Job; @@ -110,7 +110,7 @@ public AnomalyDetectorProfileRunner( public void profile(String detectorId, ActionListener listener, Set profilesToCollect) { if (profilesToCollect.isEmpty()) { - listener.onFailure(new IllegalArgumentException(ADCommonMessages.EMPTY_PROFILES_COLLECT)); + listener.onFailure(new IllegalArgumentException(CommonMessages.EMPTY_PROFILES_COLLECT)); return; } calculateTotalResponsesToWait(detectorId, profilesToCollect, listener); @@ -133,8 +133,8 @@ private void calculateTotalResponsesToWait( AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId); prepareProfile(detector, listener, profilesToCollect); } catch (Exception e) { - logger.error(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, e); - listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST)); + logger.error(FAIL_TO_PARSE_CONFIG_MSG + detectorId, e); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_CONFIG_MSG + detectorId, BAD_REQUEST)); } } else { listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, BAD_REQUEST)); @@ -208,7 +208,7 @@ private void prepareProfile( new MultiResponsesDelegateActionListener( listener, totalResponsesToWait, - ADCommonMessages.FAIL_FETCH_ERR_MSG + detectorId, + CommonMessages.FAIL_FETCH_ERR_MSG + detectorId, false ); if (profilesToCollect.contains(DetectorProfileName.ERROR)) { @@ -267,7 +267,7 @@ private void prepareProfile( } } catch (Exception e) { - logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG, e); + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG, e); listener.onFailure(e); } } else { @@ -278,7 +278,7 @@ private void prepareProfile( logger.info(exception.getMessage()); onGetDetectorForPrepare(detectorId, listener, profilesToCollect); } else { - logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG + detectorId); + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG + detectorId); listener.onFailure(exception); } })); @@ -305,7 +305,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener { - logger.warn(ADCommonMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getId()); + logger.warn(CommonMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getId()); listener.onFailure(searchException); }); // using the original context in listener as user roles have no permissions for internal operations like fetching a @@ -359,7 +359,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener { - logger.warn(ADCommonMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getId()); + logger.warn(CommonMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getId()); listener.onFailure(searchException); }); // using the original context in listener as user roles have no permissions for internal operations like fetching a diff --git a/src/main/java/org/opensearch/ad/EntityProfileRunner.java b/src/main/java/org/opensearch/ad/EntityProfileRunner.java index 3fc04fe96..ce14dbef2 100644 --- a/src/main/java/org/opensearch/ad/EntityProfileRunner.java +++ b/src/main/java/org/opensearch/ad/EntityProfileRunner.java @@ -24,7 +24,6 @@ import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; @@ -92,7 +91,7 @@ public void profile( ActionListener listener ) { if (profilesToCollect == null || profilesToCollect.size() == 0) { - listener.onFailure(new IllegalArgumentException(ADCommonMessages.EMPTY_PROFILES_COLLECT)); + listener.onFailure(new IllegalArgumentException(CommonMessages.EMPTY_PROFILES_COLLECT)); return; } GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId); @@ -247,7 +246,7 @@ private void getJob( new MultiResponsesDelegateActionListener( listener, totalResponsesToWait, - ADCommonMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId, + CommonMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId, false ); @@ -310,7 +309,7 @@ private void getJob( })); } } catch (Exception e) { - logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG, e); + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG, e); listener.onFailure(e); } } else { @@ -321,7 +320,7 @@ private void getJob( logger.info(exception.getMessage()); sendUnknownState(profilesToCollect, entityValue, true, listener); } else { - logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception); + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception); listener.onFailure(exception); } })); diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java index 0f8c6fca4..6590cead6 100644 --- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -11,7 +11,7 @@ package org.opensearch.ad; -import static org.opensearch.ad.constant.ADCommonMessages.CAN_NOT_FIND_LATEST_TASK; +import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_LATEST_TASK; import java.time.Instant; import java.util.ArrayList; diff --git a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java index 31adb1dac..1f186f647 100644 --- a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java +++ b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java @@ -21,12 +21,7 @@ public class ADCommonMessages { public static final String FEATURE_NOT_AVAILABLE_ERR_MSG = "No Feature in current detection window."; public static final String DISABLED_ERR_MSG = "AD functionality is disabled. To enable update plugins.anomaly_detection.enabled to true"; - public static String FAIL_TO_PARSE_DETECTOR_MSG = "Fail to parse detector with id: "; - public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector "; - public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector "; public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than "; - public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid"; - public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for "; public static String DETECTOR_IS_RUNNING = "Detector is already running"; public static String DETECTOR_MISSING = "Detector is missing"; public static String AD_TASK_ACTION_MISSING = "AD task action is missing"; @@ -41,13 +36,11 @@ public class ADCommonMessages { public static String EXCEED_HISTORICAL_ANALYSIS_LIMIT = "Exceed max historical analysis limit per node"; public static String NO_ELIGIBLE_NODE_TO_RUN_DETECTOR = "No eligible node to run detector "; public static String EMPTY_STALE_RUNNING_ENTITIES = "Empty stale running entities"; - public static String CAN_NOT_FIND_LATEST_TASK = "can't find latest task"; public static String NO_ENTITY_FOUND = "No entity found"; public static String HISTORICAL_ANALYSIS_CANCELLED = "Historical analysis cancelled by user"; public static String HC_DETECTOR_TASK_IS_UPDATING = "HC detector task is updating"; public static String INVALID_TIME_CONFIGURATION_UNITS = "Time unit %s is not supported"; public static String FAIL_TO_GET_DETECTOR = "Fail to get detector"; - public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info"; public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector"; public static String FAIL_TO_UPDATE_DETECTOR = "Fail to update detector"; public static String FAIL_TO_PREVIEW_DETECTOR = "Fail to preview detector"; @@ -55,27 +48,6 @@ public class ADCommonMessages { public static String FAIL_TO_STOP_DETECTOR = "Fail to stop detector"; public static String FAIL_TO_DELETE_DETECTOR = "Fail to delete detector"; public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result"; - public static String FAIL_TO_GET_STATS = "Fail to get stats"; - public static String FAIL_TO_SEARCH = "Fail to search"; - - public static String WINDOW_DELAY_REC = - "Latest seen data point is at least %d minutes ago, consider changing window delay to at least %d minutes."; - public static String TIME_FIELD_NOT_ENOUGH_HISTORICAL_DATA = - "There isn't enough historical data found with current timefield selected."; - public static String DETECTOR_INTERVAL_REC = - "The selected detector interval might collect sparse data. Consider changing interval length to: "; - public static String RAW_DATA_TOO_SPARSE = - "Source index data is potentially too sparse for model training. Consider changing interval length or ingesting more data"; - public static String MODEL_VALIDATION_FAILED_UNEXPECTEDLY = "Model validation experienced issues completing."; - public static String FILTER_QUERY_TOO_SPARSE = "Data is too sparse after data filter is applied. Consider changing the data filter"; - public static String CATEGORY_FIELD_TOO_SPARSE = - "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data "; - public static String CATEGORY_FIELD_NO_DATA = - "No entity was found with the given categorical fields. Consider revising category field/s or ingesting more data"; - public static String FEATURE_QUERY_TOO_SPARSE = - "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries."; - public static String TIMEOUT_ON_INTERVAL_REC = "Timed out getting interval recommendation"; - public static final String NO_MODEL_ERR_MSG = "No RCF models are available either because RCF" + " models are not ready or all nodes are unresponsive or the system might have bugs."; public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX; diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index fd59759fc..5a3b19f24 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -33,7 +33,6 @@ import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.task.ADTaskManager; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.AnomalyResultAction; import org.opensearch.ad.transport.AnomalyResultRequest; import org.opensearch.ad.transport.StopDetectorAction; @@ -53,6 +52,7 @@ import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.TaskState; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportService; @@ -121,10 +121,10 @@ public IndexAnomalyDetectorJobActionHandler( * @param detector anomaly detector * @param listener Listener to send responses */ - public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { + public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { // this start listener is created & injected throughout the job handler so that whenever the job response is received, // there's the extra step of trying to index results and update detector state with a 60s delay. - ActionListener startListener = ActionListener.wrap(r -> { + ActionListener startListener = ActionListener.wrap(r -> { try { Instant executionEndTime = Instant.now(); IntervalTimeConfiguration schedule = (IntervalTimeConfiguration) detector.getInterval(); @@ -182,7 +182,7 @@ public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { + private void createJob(AnomalyDetector detector, ActionListener listener) { try { IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getInterval(); Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); @@ -209,7 +209,7 @@ private void createJob(AnomalyDetector detector, ActionListener listener) { + private void getJobForWrite(AnomalyDetector detector, Job job, ActionListener listener) { GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId); client @@ -227,7 +227,7 @@ private void onGetAnomalyDetectorJobForWrite( GetResponse response, AnomalyDetector detector, Job job, - ActionListener listener + ActionListener listener ) throws IOException { if (response.isExists()) { try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { @@ -271,8 +271,7 @@ private void onGetAnomalyDetectorJobForWrite( } } - private void indexAnomalyDetectorJob(Job job, ExecutorFunction function, ActionListener listener) - throws IOException { + private void indexAnomalyDetectorJob(Job job, ExecutorFunction function, ActionListener listener) throws IOException { IndexRequest indexRequest = new IndexRequest(CommonName.JOB_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(job.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE)) @@ -294,7 +293,7 @@ private void indexAnomalyDetectorJob(Job job, ExecutorFunction function, ActionL private void onIndexAnomalyDetectorJobResponse( IndexResponse response, ExecutorFunction function, - ActionListener listener + ActionListener listener ) { if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { String errorMsg = getShardsFailure(response); @@ -304,13 +303,7 @@ private void onIndexAnomalyDetectorJobResponse( if (function != null) { function.execute(); } else { - AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse( - response.getId(), - response.getVersion(), - response.getSeqNo(), - response.getPrimaryTerm(), - RestStatus.OK - ); + JobResponse anomalyDetectorJobResponse = new JobResponse(response.getId()); listener.onResponse(anomalyDetectorJobResponse); } } @@ -323,7 +316,7 @@ private void onIndexAnomalyDetectorJobResponse( * @param detectorId detector identifier * @param listener Listener to send responses */ - public void stopAnomalyDetectorJob(String detectorId, ActionListener listener) { + public void stopAnomalyDetectorJob(String detectorId, ActionListener listener) { GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId); client.get(getRequest, ActionListener.wrap(response -> { @@ -368,10 +361,7 @@ public void stopAnomalyDetectorJob(String detectorId, ActionListener listener.onFailure(exception))); } - private ActionListener stopAdDetectorListener( - String detectorId, - ActionListener listener - ) { + private ActionListener stopAdDetectorListener(String detectorId, ActionListener listener) { return new ActionListener() { @Override public void onResponse(StopDetectorResponse stopDetectorResponse) { diff --git a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java index 1f26d6bbd..f37a10580 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java @@ -33,7 +33,6 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.transport.ValidateAnomalyDetectorResponse; @@ -278,7 +277,7 @@ private void getSampleRangesForValidationChecks( listener .onFailure( new ValidationException( - ADCommonMessages.TIME_FIELD_NOT_ENOUGH_HISTORICAL_DATA, + CommonMessages.TIME_FIELD_NOT_ENOUGH_HISTORICAL_DATA, ValidationIssueType.TIMEFIELD_FIELD, ValidationAspect.MODEL ) @@ -305,7 +304,7 @@ private void getBucketAggregates( listener .onFailure( new ValidationException( - ADCommonMessages.CATEGORY_FIELD_TOO_SPARSE, + CommonMessages.CATEGORY_FIELD_TOO_SPARSE, ValidationIssueType.CATEGORY, ValidationAspect.MODEL ) @@ -422,12 +421,12 @@ public void onResponse(SearchResponse response) { listener .onFailure( new ValidationException( - ADCommonMessages.TIMEOUT_ON_INTERVAL_REC, + CommonMessages.TIMEOUT_ON_INTERVAL_REC, ValidationIssueType.TIMEOUT, ValidationAspect.MODEL ) ); - logger.info(ADCommonMessages.TIMEOUT_ON_INTERVAL_REC); + logger.info(CommonMessages.TIMEOUT_ON_INTERVAL_REC); // keep trying higher intervals as new interval is below max, and we aren't decreasing yet } else if (newIntervalMinute < MAX_INTERVAL_REC_LENGTH_IN_MINUTES && !decreasingInterval) { searchWithDifferentInterval(newIntervalMinute); @@ -510,7 +509,7 @@ public void onFailure(Exception e) { listener .onFailure( new ValidationException( - ADCommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, + CommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, ValidationIssueType.AGGREGATION, ValidationAspect.MODEL ) @@ -540,7 +539,7 @@ private void processIntervalRecommendation(IntervalTimeConfiguration interval, l listener .onFailure( new ValidationException( - ADCommonMessages.DETECTOR_INTERVAL_REC + interval.getInterval(), + CommonMessages.INTERVAL_REC + interval.getInterval(), ValidationIssueType.DETECTION_INTERVAL, ValidationAspect.MODEL, interval @@ -591,7 +590,7 @@ private Histogram checkBucketResultErrors(SearchResponse response) { listener .onFailure( new ValidationException( - ADCommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, + CommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, ValidationIssueType.AGGREGATION, ValidationAspect.MODEL ) @@ -615,7 +614,7 @@ private void processRawDataResults(SearchResponse response, long latestTime) { if (fullBucketRate < INTERVAL_BUCKET_MINIMUM_SUCCESS_RATE) { listener .onFailure( - new ValidationException(ADCommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL) + new ValidationException(CommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL) ); } else { checkDataFilterSparsity(latestTime); @@ -652,7 +651,7 @@ private void processDataFilterResults(SearchResponse response, long latestTime) listener .onFailure( new ValidationException( - ADCommonMessages.FILTER_QUERY_TOO_SPARSE, + CommonMessages.FILTER_QUERY_TOO_SPARSE, ValidationIssueType.FILTER_QUERY, ValidationAspect.MODEL ) @@ -714,11 +713,7 @@ private void processTopEntityResults(SearchResponse response, long latestTime) { if (fullBucketRate < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { listener .onFailure( - new ValidationException( - ADCommonMessages.CATEGORY_FIELD_TOO_SPARSE, - ValidationIssueType.CATEGORY, - ValidationAspect.MODEL - ) + new ValidationException(CommonMessages.CATEGORY_FIELD_TOO_SPARSE, ValidationIssueType.CATEGORY, ValidationAspect.MODEL) ); } else { try { @@ -741,7 +736,7 @@ private void checkFeatureQueryDelegate(long latestTime) throws IOException { new MultiResponsesDelegateActionListener<>( validateFeatureQueriesListener, anomalyDetector.getFeatureAttributes().size(), - ADCommonMessages.FEATURE_QUERY_TOO_SPARSE, + CommonMessages.FEATURE_QUERY_TOO_SPARSE, false ); @@ -765,7 +760,7 @@ private void checkFeatureQueryDelegate(long latestTime) throws IOException { multiFeatureQueriesResponseListener .onFailure( new ValidationException( - ADCommonMessages.FEATURE_QUERY_TOO_SPARSE, + CommonMessages.FEATURE_QUERY_TOO_SPARSE, ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.MODEL ) @@ -777,7 +772,7 @@ private void checkFeatureQueryDelegate(long latestTime) throws IOException { }, e -> { logger.error(e); multiFeatureQueriesResponseListener - .onFailure(new OpenSearchStatusException(ADCommonMessages.FEATURE_QUERY_TOO_SPARSE, RestStatus.BAD_REQUEST, e)); + .onFailure(new OpenSearchStatusException(CommonMessages.FEATURE_QUERY_TOO_SPARSE, RestStatus.BAD_REQUEST, e)); }); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint @@ -798,7 +793,7 @@ private void sendWindowDelayRec(long latestTimeInMillis) { listener .onFailure( new ValidationException( - String.format(Locale.ROOT, ADCommonMessages.WINDOW_DELAY_REC, minutesSinceLastStamp, minutesSinceLastStamp), + String.format(Locale.ROOT, CommonMessages.WINDOW_DELAY_REC, minutesSinceLastStamp, minutesSinceLastStamp), ValidationIssueType.WINDOW_DELAY, ValidationAspect.MODEL, new IntervalTimeConfiguration(minutesSinceLastStamp, ChronoUnit.MINUTES) @@ -821,7 +816,7 @@ private void windowDelayRecommendation(long latestTime) { // a time was always above 0.25 meaning the best suggestion is to simply ingest more data or change interval since // we have no more insight regarding the root cause of the lower density. listener - .onFailure(new ValidationException(ADCommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL)); + .onFailure(new ValidationException(CommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL)); } private LongBounds getTimeRangeBounds(long endMillis, IntervalTimeConfiguration detectorIntervalInMinutes) { diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index 454e5cfc8..268bbc26a 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -12,7 +12,6 @@ package org.opensearch.ad.task; import static org.opensearch.action.DocWriteResponse.Result.CREATED; -import static org.opensearch.ad.constant.ADCommonMessages.CAN_NOT_FIND_LATEST_TASK; import static org.opensearch.ad.constant.ADCommonMessages.DETECTOR_IS_RUNNING; import static org.opensearch.ad.constant.ADCommonMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT; import static org.opensearch.ad.constant.ADCommonMessages.HC_DETECTOR_TASK_IS_UPDATING; @@ -47,6 +46,7 @@ import static org.opensearch.ad.stats.InternalStatNames.AD_USED_BATCH_TASK_SLOT_COUNT; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.TimeSeriesAnalyticsPlugin.AD_BATCH_TASK_THREAD_POOL_NAME; +import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_LATEST_TASK; import static org.opensearch.timeseries.constant.CommonMessages.CREATE_INDEX_NOT_ACKNOWLEDGED; import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_FIND_CONFIG_MSG; import static org.opensearch.timeseries.constant.CommonName.TASK_ID_FIELD; @@ -119,7 +119,6 @@ import org.opensearch.ad.transport.ADTaskProfileAction; import org.opensearch.ad.transport.ADTaskProfileNodeResponse; import org.opensearch.ad.transport.ADTaskProfileRequest; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.ForwardADTaskAction; import org.opensearch.ad.transport.ForwardADTaskRequest; import org.opensearch.client.Client; @@ -167,6 +166,7 @@ import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.TaskState; import org.opensearch.timeseries.task.RealtimeTaskCache; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportRequestOptions; @@ -281,7 +281,7 @@ public void startDetector( User user, TransportService transportService, ThreadContext.StoredContext context, - ActionListener listener + ActionListener listener ) { // upgrade index mapping of AD default indices detectionIndices.update(); @@ -319,7 +319,7 @@ private void startRealtimeOrHistoricalDetection( IndexAnomalyDetectorJobActionHandler handler, User user, TransportService transportService, - ActionListener listener, + ActionListener listener, Optional detector ) { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { @@ -355,7 +355,7 @@ protected void forwardApplyForTaskSlotsRequestToLeadNode( DateRange detectionDateRange, User user, TransportService transportService, - ActionListener listener + ActionListener listener ) { ForwardADTaskRequest forwardADTaskRequest = new ForwardADTaskRequest( detector, @@ -369,7 +369,7 @@ protected void forwardApplyForTaskSlotsRequestToLeadNode( public void forwardScaleTaskSlotRequestToLeadNode( ADTask adTask, TransportService transportService, - ActionListener listener + ActionListener listener ) { forwardRequestToLeadNode(new ForwardADTaskRequest(adTask, ADTaskAction.CHECK_AVAILABLE_TASK_SLOTS), transportService, listener); } @@ -377,7 +377,7 @@ public void forwardScaleTaskSlotRequestToLeadNode( public void forwardRequestToLeadNode( ForwardADTaskRequest forwardADTaskRequest, TransportService transportService, - ActionListener listener + ActionListener listener ) { hashRing.buildAndGetOwningNodeWithSameLocalAdVersion(AD_TASK_LEAD_NODE_MODEL_ID, node -> { if (!node.isPresent()) { @@ -390,7 +390,7 @@ public void forwardRequestToLeadNode( ForwardADTaskAction.NAME, forwardADTaskRequest, transportRequestOptions, - new ActionListenerResponseHandler<>(listener, AnomalyDetectorJobResponse::new) + new ActionListenerResponseHandler<>(listener, JobResponse::new) ); }, listener); } @@ -411,7 +411,7 @@ public void startHistoricalAnalysis( User user, int availableTaskSlots, TransportService transportService, - ActionListener listener + ActionListener listener ) { String detectorId = detector.getId(); hashRing.buildAndGetOwningNodeWithSameLocalAdVersion(detectorId, owningNode -> { @@ -465,7 +465,7 @@ protected void forwardDetectRequestToCoordinatingNode( ADTaskAction adTaskAction, TransportService transportService, DiscoveryNode node, - ActionListener listener + ActionListener listener ) { Version adVersion = hashRing.getAdVersion(node.getId()); transportService @@ -476,7 +476,7 @@ protected void forwardDetectRequestToCoordinatingNode( // node, check ADTaskManager#cleanDetectorCache. new ForwardADTaskRequest(detector, detectionDateRange, user, adTaskAction, availableTaskSlots, adVersion), transportRequestOptions, - new ActionListenerResponseHandler<>(listener, AnomalyDetectorJobResponse::new) + new ActionListenerResponseHandler<>(listener, JobResponse::new) ); } @@ -492,7 +492,7 @@ protected void forwardADTaskToCoordinatingNode( ADTask adTask, ADTaskAction adTaskAction, TransportService transportService, - ActionListener listener + ActionListener listener ) { logger.debug("Forward AD task to coordinating node, task id: {}, action: {}", adTask.getTaskId(), adTaskAction.name()); transportService @@ -501,7 +501,7 @@ protected void forwardADTaskToCoordinatingNode( ForwardADTaskAction.NAME, new ForwardADTaskRequest(adTask, adTaskAction), transportRequestOptions, - new ActionListenerResponseHandler<>(listener, AnomalyDetectorJobResponse::new) + new ActionListenerResponseHandler<>(listener, JobResponse::new) ); } @@ -519,7 +519,7 @@ protected void forwardStaleRunningEntitiesToCoordinatingNode( ADTaskAction adTaskAction, TransportService transportService, List staleRunningEntity, - ActionListener listener + ActionListener listener ) { transportService .sendRequest( @@ -527,7 +527,7 @@ protected void forwardStaleRunningEntitiesToCoordinatingNode( ForwardADTaskAction.NAME, new ForwardADTaskRequest(adTask, adTaskAction, staleRunningEntity), transportRequestOptions, - new ActionListenerResponseHandler<>(listener, AnomalyDetectorJobResponse::new) + new ActionListenerResponseHandler<>(listener, JobResponse::new) ); } @@ -551,7 +551,7 @@ public void checkTaskSlots( User user, ADTaskAction afterCheckAction, TransportService transportService, - ActionListener listener + ActionListener listener ) { String detectorId = detector.getId(); logger.debug("Start checking task slots for detector: {}, task action: {}", detectorId, afterCheckAction); @@ -566,7 +566,7 @@ public void checkTaskSlots( ); return; } - ActionListener wrappedActionListener = ActionListener.runAfter(listener, () -> { + ActionListener wrappedActionListener = ActionListener.runAfter(listener, () -> { checkingTaskSlot.release(1); logger.debug("Release checking task slot semaphore on lead node for detector {}", detectorId); }); @@ -648,7 +648,7 @@ private void forwardToCoordinatingNode( User user, ADTaskAction targetActionOfTaskSlotChecking, TransportService transportService, - ActionListener wrappedActionListener, + ActionListener wrappedActionListener, int approvedTaskSlots ) { switch (targetActionOfTaskSlotChecking) { @@ -675,7 +675,7 @@ protected void scaleTaskLaneOnCoordinatingNode( ADTask adTask, int approvedTaskSlot, TransportService transportService, - ActionListener listener + ActionListener listener ) { DiscoveryNode coordinatingNode = getCoordinatingNode(adTask); transportService @@ -684,7 +684,7 @@ protected void scaleTaskLaneOnCoordinatingNode( ForwardADTaskAction.NAME, new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS), transportRequestOptions, - new ActionListenerResponseHandler<>(listener, AnomalyDetectorJobResponse::new) + new ActionListenerResponseHandler<>(listener, JobResponse::new) ); } @@ -726,7 +726,7 @@ public void startDetector( DateRange detectionDateRange, User user, TransportService transportService, - ActionListener listener + ActionListener listener ) { try { if (detectionIndices.doesStateIndexExist()) { @@ -817,7 +817,7 @@ public void stopDetector( IndexAnomalyDetectorJobActionHandler handler, User user, TransportService transportService, - ActionListener listener + ActionListener listener ) { getDetector(detectorId, (detector) -> { if (!detector.isPresent()) { @@ -1212,12 +1212,7 @@ public boolean hcBatchTaskExpired(Long latestHCTaskRunTime) { return latestHCTaskRunTime + HC_BATCH_TASK_CACHE_TIMEOUT_IN_MILLIS < Instant.now().toEpochMilli(); } - private void stopHistoricalAnalysis( - String detectorId, - Optional adTask, - User user, - ActionListener listener - ) { + private void stopHistoricalAnalysis(String detectorId, Optional adTask, User user, ActionListener listener) { if (!adTask.isPresent()) { listener.onFailure(new ResourceNotFoundException(detectorId, "Detector not started")); return; @@ -1234,7 +1229,7 @@ private void stopHistoricalAnalysis( ADCancelTaskRequest cancelTaskRequest = new ADCancelTaskRequest(detectorId, taskId, userName, dataNodes); client.execute(ADCancelTaskAction.INSTANCE, cancelTaskRequest, ActionListener.wrap(response -> { - listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(taskId)); }, e -> { logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, e); listener.onFailure(e); @@ -1455,7 +1450,7 @@ private void updateLatestFlagOfOldTasksAndCreateNewTask( AnomalyDetector detector, DateRange detectionDateRange, User user, - ActionListener listener + ActionListener listener ) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); updateByQueryRequest.indices(DETECTION_STATE_INDEX); @@ -1494,7 +1489,7 @@ private void createNewADTask( DateRange detectionDateRange, User user, String coordinatingNode, - ActionListener listener + ActionListener listener ) { String userName = user == null ? null : user.getName(); Instant now = Instant.now(); @@ -1555,8 +1550,8 @@ public void createADTaskDirectly(ADTask adTask, Consumer func private void onIndexADTaskResponse( IndexResponse response, ADTask adTask, - BiConsumer> function, - ActionListener listener + BiConsumer> function, + ActionListener listener ) { if (response == null || response.getResult() != CREATED) { String errorMsg = getShardsFailure(response); @@ -1564,7 +1559,7 @@ private void onIndexADTaskResponse( return; } adTask.setTaskId(response.getId()); - ActionListener delegatedListener = ActionListener.wrap(r -> { listener.onResponse(r); }, e -> { + ActionListener delegatedListener = ActionListener.wrap(r -> { listener.onResponse(r); }, e -> { handleADTaskException(adTask, e); if (e instanceof DuplicateTaskException) { listener.onFailure(new OpenSearchStatusException(DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST)); @@ -1595,7 +1590,7 @@ private void onIndexADTaskResponse( } } - private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionListener delegatedListener) { + private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionListener delegatedListener) { BoolQueryBuilder query = new BoolQueryBuilder(); query.filter(new TermQueryBuilder(DETECTOR_ID_FIELD, adTask.getConfigId())); query.filter(new TermQueryBuilder(IS_LATEST_FIELD, false)); @@ -1626,13 +1621,7 @@ private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionLis runBatchResultAction(response, adTask, delegatedListener); } else { // return response directly for realtime detection - AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse( - response.getId(), - response.getVersion(), - response.getSeqNo(), - response.getPrimaryTerm(), - RestStatus.OK - ); + JobResponse anomalyDetectorJobResponse = new JobResponse(response.getId()); delegatedListener.onResponse(anomalyDetectorJobResponse); } }, delegatedListener); @@ -1720,7 +1709,7 @@ public void cleanChildTasksAndADResultsOfDeletedTask() { }, TimeValue.timeValueSeconds(DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), AD_BATCH_TASK_THREAD_POOL_NAME); } - private void runBatchResultAction(IndexResponse response, ADTask adTask, ActionListener listener) { + private void runBatchResultAction(IndexResponse response, ADTask adTask, ActionListener listener) { client.execute(ADBatchAnomalyResultAction.INSTANCE, new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> { String remoteOrLocal = r.isRunTaskRemotely() ? "remote" : "local"; logger @@ -1731,13 +1720,7 @@ private void runBatchResultAction(IndexResponse response, ADTask adTask, ActionL remoteOrLocal, r.getNodeId() ); - AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse( - response.getId(), - response.getVersion(), - response.getSeqNo(), - response.getPrimaryTerm(), - RestStatus.OK - ); + JobResponse anomalyDetectorJobResponse = new JobResponse(response.getId()); listener.onResponse(anomalyDetectorJobResponse); }, e -> listener.onFailure(e))); } @@ -1956,7 +1939,7 @@ public void stopLatestRealtimeTask( TaskState state, Exception error, TransportService transportService, - ActionListener listener + ActionListener listener ) { getAndExecuteOnLatestDetectorLevelTask(detectorId, REALTIME_TASK_TYPES, (adTask) -> { if (adTask.isPresent() && !adTask.get().isDone()) { @@ -1967,7 +1950,7 @@ public void stopLatestRealtimeTask( } ExecutorFunction function = () -> updateADTask(adTask.get().getTaskId(), updatedFields, ActionListener.wrap(r -> { if (error == null) { - listener.onResponse(new AnomalyDetectorJobResponse(detectorId, 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(detectorId)); } else { listener.onFailure(error); } @@ -2199,7 +2182,7 @@ private void entityTaskDone( ADTask adTask, Exception exception, TransportService transportService, - ActionListener listener + ActionListener listener ) { try { ADTaskAction action = getAdEntityTaskAction(adTask, exception); @@ -2261,7 +2244,7 @@ public boolean isRetryableError(String error) { * @param state AD task state * @param listener action listener */ - public void setHCDetectorTaskDone(ADTask adTask, TaskState state, ActionListener listener) { + public void setHCDetectorTaskDone(ADTask adTask, TaskState state, ActionListener listener) { String detectorId = adTask.getConfigId(); String taskId = adTask.isEntityTask() ? adTask.getParentTaskId() : adTask.getTaskId(); String detectorTaskId = adTask.getConfigLevelTaskId(); @@ -2349,7 +2332,7 @@ public void setHCDetectorTaskDone(ADTask adTask, TaskState state, ActionListener } - listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(taskId)); } /** @@ -2463,11 +2446,7 @@ private void updateADHCDetectorTask( * @param transportService transport service * @param listener action listener */ - public void runNextEntityForHCADHistorical( - ADTask adTask, - TransportService transportService, - ActionListener listener - ) { + public void runNextEntityForHCADHistorical(ADTask adTask, TransportService transportService, ActionListener listener) { String detectorId = adTask.getConfigId(); int scaleDelta = scaleTaskSlots(adTask, transportService, ActionListener.wrap(r -> { logger.debug("Scale up task slots done for detector {}, task {}", detectorId, adTask.getTaskId()); @@ -2480,7 +2459,7 @@ public void runNextEntityForHCADHistorical( adTask.getTaskId(), adTaskCacheManager.getDetectorTaskSlots(detectorId) ); - listener.onResponse(new AnomalyDetectorJobResponse(detectorId, 0, 0, 0, RestStatus.ACCEPTED)); + listener.onResponse(new JobResponse(detectorId)); return; } client.execute(ADBatchAnomalyResultAction.INSTANCE, new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> { @@ -2493,7 +2472,7 @@ public void runNextEntityForHCADHistorical( remoteOrLocal, r.getNodeId() ); - AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(detectorId, 0, 0, 0, RestStatus.OK); + JobResponse anomalyDetectorJobResponse = new JobResponse(detectorId); listener.onResponse(anomalyDetectorJobResponse); }, e -> { listener.onFailure(e); })); } @@ -2508,11 +2487,7 @@ public void runNextEntityForHCADHistorical( * @param scaleUpListener action listener * @return task slots scale delta */ - protected int scaleTaskSlots( - ADTask adTask, - TransportService transportService, - ActionListener scaleUpListener - ) { + protected int scaleTaskSlots(ADTask adTask, TransportService transportService, ActionListener scaleUpListener) { String detectorId = adTask.getConfigId(); if (!scaleEntityTaskLane.tryAcquire()) { logger.debug("Can't get scaleEntityTaskLane semaphore"); @@ -2752,7 +2727,7 @@ public synchronized void removeStaleRunningEntity( ADTask adTask, String entity, TransportService transportService, - ActionListener listener + ActionListener listener ) { String detectorId = adTask.getConfigId(); boolean removed = adTaskCacheManager.removeRunningEntity(detectorId, entity); diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobAction.java index b11283181..83ea58960 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobAction.java @@ -13,14 +13,15 @@ import org.opensearch.action.ActionType; import org.opensearch.ad.constant.CommonValue; +import org.opensearch.timeseries.transport.JobResponse; -public class AnomalyDetectorJobAction extends ActionType { +public class AnomalyDetectorJobAction extends ActionType { // External Action which used for public facing RestAPIs. public static final String NAME = CommonValue.EXTERNAL_ACTION_PREFIX + "detector/jobmanagement"; public static final AnomalyDetectorJobAction INSTANCE = new AnomalyDetectorJobAction(); private AnomalyDetectorJobAction() { - super(NAME, AnomalyDetectorJobResponse::new); + super(NAME, JobResponse::new); } } diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobResponse.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobResponse.java deleted file mode 100644 index f65c5a06b..000000000 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobResponse.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.ad.transport; - -import java.io.IOException; - -import org.opensearch.core.action.ActionResponse; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.timeseries.util.RestHandlerUtils; - -public class AnomalyDetectorJobResponse extends ActionResponse implements ToXContentObject { - private final String id; - private final long version; - private final long seqNo; - private final long primaryTerm; - private final RestStatus restStatus; - - public AnomalyDetectorJobResponse(StreamInput in) throws IOException { - super(in); - id = in.readString(); - version = in.readLong(); - seqNo = in.readLong(); - primaryTerm = in.readLong(); - restStatus = in.readEnum(RestStatus.class); - } - - public AnomalyDetectorJobResponse(String id, long version, long seqNo, long primaryTerm, RestStatus restStatus) { - this.id = id; - this.version = version; - this.seqNo = seqNo; - this.primaryTerm = primaryTerm; - this.restStatus = restStatus; - } - - public String getId() { - return id; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - out.writeLong(version); - out.writeLong(seqNo); - out.writeLong(primaryTerm); - out.writeEnum(restStatus); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder - .startObject() - .field(RestHandlerUtils._ID, id) - .field(RestHandlerUtils._VERSION, version) - .field(RestHandlerUtils._SEQ_NO, seqNo) - .field(RestHandlerUtils._PRIMARY_TERM, primaryTerm) - .endObject(); - } -} diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java index 5a81c43ae..2ffb8b85a 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java @@ -39,10 +39,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.tasks.Task; import org.opensearch.timeseries.model.DateRange; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportService; -public class AnomalyDetectorJobTransportAction extends HandledTransportAction { +public class AnomalyDetectorJobTransportAction extends HandledTransportAction { private final Logger logger = LogManager.getLogger(AnomalyDetectorJobTransportAction.class); private final Client client; @@ -81,7 +82,7 @@ public AnomalyDetectorJobTransportAction( } @Override - protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener actionListener) { + protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener actionListener) { String detectorId = request.getDetectorID(); DateRange detectionDateRange = request.getDetectionDateRange(); boolean historical = request.isHistorical(); @@ -90,7 +91,7 @@ protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionLis String rawPath = request.getRawPath(); TimeValue requestTimeout = AD_REQUEST_TIMEOUT.get(settings); String errorMessage = rawPath.endsWith(RestHandlerUtils.START_JOB) ? FAIL_TO_START_DETECTOR : FAIL_TO_STOP_DETECTOR; - ActionListener listener = wrapRestActionListener(actionListener, errorMessage); + ActionListener listener = wrapRestActionListener(actionListener, errorMessage); // By the time request reaches here, the user permissions are validated by Security plugin. User user = getUserContext(client); @@ -124,7 +125,7 @@ protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionLis } private void executeDetector( - ActionListener listener, + ActionListener listener, String detectorId, DateRange detectionDateRange, boolean historical, diff --git a/src/main/java/org/opensearch/ad/transport/EntityProfileRequest.java b/src/main/java/org/opensearch/ad/transport/EntityProfileRequest.java index 7e4054a8a..2aba165a7 100644 --- a/src/main/java/org/opensearch/ad/transport/EntityProfileRequest.java +++ b/src/main/java/org/opensearch/ad/transport/EntityProfileRequest.java @@ -27,6 +27,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.model.Entity; public class EntityProfileRequest extends ActionRequest implements ToXContentObject { @@ -92,7 +93,7 @@ public ActionRequestValidationException validate() { validationException = addValidationError("Entity value is missing", validationException); } if (profilesToCollect == null || profilesToCollect.isEmpty()) { - validationException = addValidationError(ADCommonMessages.EMPTY_PROFILES_COLLECT, validationException); + validationException = addValidationError(CommonMessages.EMPTY_PROFILES_COLLECT, validationException); } return validationException; } diff --git a/src/main/java/org/opensearch/ad/transport/ForwardADTaskAction.java b/src/main/java/org/opensearch/ad/transport/ForwardADTaskAction.java index 309714cc8..f63a188cd 100644 --- a/src/main/java/org/opensearch/ad/transport/ForwardADTaskAction.java +++ b/src/main/java/org/opensearch/ad/transport/ForwardADTaskAction.java @@ -15,13 +15,14 @@ import org.opensearch.action.ActionType; import org.opensearch.ad.constant.CommonValue; +import org.opensearch.timeseries.transport.JobResponse; -public class ForwardADTaskAction extends ActionType { +public class ForwardADTaskAction extends ActionType { // Internal Action which is not used for public facing RestAPIs. public static final String NAME = CommonValue.INTERNAL_ACTION_PREFIX + "detector/" + AD_TASK + "/forward"; public static final ForwardADTaskAction INSTANCE = new ForwardADTaskAction(); private ForwardADTaskAction() { - super(NAME, AnomalyDetectorJobResponse::new); + super(NAME, JobResponse::new); } } diff --git a/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java b/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java index cab9a4f65..d2c571fa8 100644 --- a/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java @@ -37,11 +37,12 @@ import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.TaskState; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.transport.TransportService; import com.google.common.collect.ImmutableMap; -public class ForwardADTaskTransportAction extends HandledTransportAction { +public class ForwardADTaskTransportAction extends HandledTransportAction { private final Logger logger = LogManager.getLogger(ForwardADTaskTransportAction.class); private final TransportService transportService; private final ADTaskManager adTaskManager; @@ -75,7 +76,7 @@ public ForwardADTaskTransportAction( } @Override - protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener listener) { + protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener listener) { ADTaskAction adTaskAction = request.getAdTaskAction(); AnomalyDetector detector = request.getDetector(); DateRange detectionDateRange = request.getDetectionDateRange(); @@ -120,7 +121,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener if (!adTaskCacheManager.hasEntity(detectorId)) { adTaskCacheManager.setDetectorTaskSlots(detectorId, 0); logger.info("Historical HC detector done, will remove from cache, detector id:{}", detectorId); - listener.onResponse(new AnomalyDetectorJobResponse(detectorId, 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(detectorId)); TaskState state = !adTask.isEntityTask() && adTask.getError() != null ? TaskState.FAILED : TaskState.FINISHED; adTaskManager.setHCDetectorTaskDone(adTask, state, listener); } else { @@ -176,7 +177,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener logger.debug("After scale down, only 1 task slot reserved for detector {}, run next entity", detectorId); adTaskManager.runNextEntityForHCADHistorical(adTask, transportService, listener); } - listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.ACCEPTED)); + listener.onResponse(new JobResponse(adTask.getTaskId())); } } else { logger.warn("Can only push back entity task"); @@ -193,7 +194,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener adTaskCacheManager.scaleUpDetectorTaskSlots(detectorId, newSlots); } } - listener.onResponse(new AnomalyDetectorJobResponse(detector.getId(), 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(detector.getId())); break; case CANCEL: logger.debug("Received CANCEL action for detector {}", detectorId); @@ -206,7 +207,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener if (!adTaskCacheManager.hasEntity(detectorId) || !adTask.isEntityTask()) { adTaskManager.setHCDetectorTaskDone(adTask, TaskState.STOPPED, listener); } - listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(adTask.getTaskId())); } else { listener.onFailure(new IllegalArgumentException("Only support cancel HC now")); } @@ -227,7 +228,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener for (String entity : staleRunningEntities) { adTaskManager.removeStaleRunningEntity(adTask, entity, transportService, listener); } - listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(adTask.getTaskId())); break; case CLEAN_CACHE: boolean historicalTask = adTask.isHistoricalTask(); @@ -249,7 +250,7 @@ protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener stateManager.clear(detectorId); featureManager.clear(detectorId); } - listener.onResponse(new AnomalyDetectorJobResponse(detector.getId(), 0, 0, 0, RestStatus.OK)); + listener.onResponse(new JobResponse(detector.getId())); break; default: listener.onFailure(new OpenSearchStatusException("Unsupported AD task action " + adTaskAction, RestStatus.BAD_REQUEST)); diff --git a/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java index 7db017d3d..b932ae601 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java @@ -11,7 +11,7 @@ package org.opensearch.ad.transport; -import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_GET_DETECTOR_INFO; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_GET_CONFIG_INFO; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; import org.apache.logging.log4j.LogManager; @@ -60,7 +60,7 @@ protected void doExecute( ) { String name = request.getName(); String rawPath = request.getRawPath(); - ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_DETECTOR_INFO); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_CONFIG_INFO); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { SearchRequest searchRequest = new SearchRequest().indices(CommonName.CONFIG_INDEX); if (rawPath.endsWith(RestHandlerUtils.COUNT)) { diff --git a/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java index 5ce2c2319..ebf4016cf 100644 --- a/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java @@ -11,7 +11,7 @@ package org.opensearch.ad.transport; -import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_GET_STATS; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_GET_STATS; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; import java.util.HashMap; diff --git a/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java b/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java index 8e23243d8..05c69196d 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java @@ -11,8 +11,8 @@ package org.opensearch.ad.transport.handler; -import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_SEARCH; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_FILTER_BY_BACKEND_ROLES; +import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_SEARCH; import static org.opensearch.timeseries.util.ParseUtils.addUserBackendRolesFilter; import static org.opensearch.timeseries.util.ParseUtils.getUserContext; import static org.opensearch.timeseries.util.ParseUtils.isAdmin; diff --git a/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java b/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java index 46de0c762..deb31cad7 100644 --- a/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java +++ b/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java @@ -35,6 +35,7 @@ public class ForecastCommonMessages { public static String FAIL_TO_FIND_FORECASTER_MSG = "Can not find forecaster with id: "; public static final String FORECASTER_ID_MISSING_MSG = "Forecaster ID is missing"; public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid"; + public static String FAIL_TO_GET_FORECASTER = "Fail to get forecaster"; // ====================================== // Security @@ -45,10 +46,17 @@ public class ForecastCommonMessages { // ====================================== // Used for custom forecast result index // ====================================== + public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index "; public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX; // ====================================== // Task // ====================================== public static String FORECASTER_IS_RUNNING = "Forecaster is already running"; + + // ====================================== + // Job + // ====================================== + public static String FAIL_TO_START_FORECASTER = "Fail to start forecaster"; + public static String FAIL_TO_STOP_FORECASTER = "Fail to stop forecaster"; } diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index ae2064add..0576f9693 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -49,6 +49,22 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static final String UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG = "Feature has an unknown exception caught while executing the feature query: "; public static String DUPLICATE_FEATURE_AGGREGATION_NAMES = "Config has duplicate feature aggregation query names: "; + public static String TIME_FIELD_NOT_ENOUGH_HISTORICAL_DATA = + "There isn't enough historical data found with current timefield selected."; + public static String CATEGORY_FIELD_TOO_SPARSE = + "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data "; + public static String WINDOW_DELAY_REC = + "Latest seen data point is at least %d minutes ago, consider changing window delay to at least %d minutes."; + public static String INTERVAL_REC = "The selected interval might collect sparse data. Consider changing interval length to: "; + public static String RAW_DATA_TOO_SPARSE = + "Source index data is potentially too sparse for model training. Consider changing interval length or ingesting more data"; + public static String MODEL_VALIDATION_FAILED_UNEXPECTEDLY = "Model validation experienced issues completing."; + public static String FILTER_QUERY_TOO_SPARSE = "Data is too sparse after data filter is applied. Consider changing the data filter"; + public static String CATEGORY_FIELD_NO_DATA = + "No entity was found with the given categorical fields. Consider revising category field/s or ingesting more data"; + public static String FEATURE_QUERY_TOO_SPARSE = + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries."; + public static String TIMEOUT_ON_INTERVAL_REC = "Timed out getting interval recommendation"; // ====================================== // Index message @@ -67,6 +83,8 @@ public static String getTooManyCategoricalFieldErr(int limit) { // Transport // ====================================== public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid"; + public static String FAIL_TO_DELETE_CONFIG = "Fail to delete config"; + public static String FAIL_TO_GET_CONFIG_INFO = "Fail to get config info"; // ====================================== // transport/restful client @@ -91,4 +109,34 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static String NO_PERMISSION_TO_ACCESS_CONFIG = "User does not have permissions to access config: "; public static String FAIL_TO_GET_USER_INFO = "Unable to get user information from config "; + // ====================================== + // transport + // ====================================== + public static final String CONFIG_ID_MISSING_MSG = "config ID is missing"; + public static final String MODEL_ID_MISSING_MSG = "model ID is missing"; + + // ====================================== + // task + // ====================================== + public static String CAN_NOT_FIND_LATEST_TASK = "can't find latest task"; + + // ====================================== + // Job + // ====================================== + public static String CONFIG_IS_RUNNING = "Config is already running"; + public static String FAIL_TO_SEARCH = "Fail to search"; + + // ====================================== + // Profile API + // ====================================== + public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid"; + public static String FAIL_TO_PARSE_CONFIG_MSG = "Fail to parse config with id: "; + public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for "; + public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for config "; + public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for config "; + + // ====================================== + // Stats API + // ====================================== + public static String FAIL_TO_GET_STATS = "Fail to get stats"; } diff --git a/src/main/java/org/opensearch/timeseries/transport/JobResponse.java b/src/main/java/org/opensearch/timeseries/transport/JobResponse.java new file mode 100644 index 000000000..faa7df2c8 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/transport/JobResponse.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.timeseries.transport; + +import java.io.IOException; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.util.RestHandlerUtils; + +public class JobResponse extends ActionResponse implements ToXContentObject { + private final String id; + + public JobResponse(StreamInput in) throws IOException { + super(in); + id = in.readString(); + } + + public JobResponse(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(RestHandlerUtils._ID, id).endObject(); + } +} diff --git a/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java b/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java index af598c82d..d94226baa 100644 --- a/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java +++ b/src/test/java/org/opensearch/ad/EntityProfileRunnerTests.java @@ -31,7 +31,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchResponseSections; import org.opensearch.action.search.ShardSearchFailure; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.EntityProfile; @@ -58,6 +57,7 @@ import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.IntervalTimeConfiguration; @@ -308,7 +308,7 @@ public void testEmptyProfile() throws InterruptedException { assertTrue("Should not reach here", false); inProgressLatch.countDown(); }, exception -> { - assertTrue(exception.getMessage().contains(ADCommonMessages.EMPTY_PROFILES_COLLECT)); + assertTrue(exception.getMessage().contains(CommonMessages.EMPTY_PROFILES_COLLECT)); inProgressLatch.countDown(); })); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); diff --git a/src/test/java/org/opensearch/ad/HistoricalAnalysisIntegTestCase.java b/src/test/java/org/opensearch/ad/HistoricalAnalysisIntegTestCase.java index d40fa84f8..9b8356081 100644 --- a/src/test/java/org/opensearch/ad/HistoricalAnalysisIntegTestCase.java +++ b/src/test/java/org/opensearch/ad/HistoricalAnalysisIntegTestCase.java @@ -41,7 +41,6 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.transport.AnomalyDetectorJobAction; import org.opensearch.ad.transport.AnomalyDetectorJobRequest; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.TermQueryBuilder; @@ -57,6 +56,7 @@ import org.opensearch.timeseries.model.Feature; import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.TaskState; +import org.opensearch.timeseries.transport.JobResponse; import com.google.common.collect.ImmutableList; @@ -232,7 +232,7 @@ public ADTask startHistoricalAnalysis(Instant startTime, Instant endTime) throws UNASSIGNED_PRIMARY_TERM, START_JOB ); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); + JobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); return getADTask(response.getId()); } @@ -246,7 +246,7 @@ public ADTask startHistoricalAnalysis(String detectorId, Instant startTime, Inst UNASSIGNED_PRIMARY_TERM, START_JOB ); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); + JobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); return getADTask(response.getId()); } } diff --git a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java index 8edab0d15..e856cd1cd 100644 --- a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java +++ b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java @@ -27,12 +27,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; import org.opensearch.common.xcontent.support.XContentMapValues; import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.constant.CommonMessages; import com.google.common.collect.ImmutableMap; import com.google.gson.JsonElement; @@ -117,10 +117,7 @@ public void testValidationIntervalRecommendation() throws Exception { @SuppressWarnings("unchecked") Map> messageMap = (Map>) XContentMapValues .extractValue("model", responseMap); - assertEquals( - ADCommonMessages.DETECTOR_INTERVAL_REC + recDetectorIntervalMinutes, - messageMap.get("detection_interval").get("message") - ); + assertEquals(CommonMessages.INTERVAL_REC + recDetectorIntervalMinutes, messageMap.get("detection_interval").get("message")); } public void testValidationWindowDelayRecommendation() throws Exception { @@ -158,7 +155,7 @@ public void testValidationWindowDelayRecommendation() throws Exception { Map> messageMap = (Map>) XContentMapValues .extractValue("model", responseMap); assertEquals( - String.format(Locale.ROOT, ADCommonMessages.WINDOW_DELAY_REC, expectedWindowDelayMinutes, expectedWindowDelayMinutes), + String.format(Locale.ROOT, CommonMessages.WINDOW_DELAY_REC, expectedWindowDelayMinutes, expectedWindowDelayMinutes), messageMap.get("window_delay").get("message") ); } diff --git a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobAction.java b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobAction.java index 327e3bf51..a861ec9de 100644 --- a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobAction.java +++ b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobAction.java @@ -13,15 +13,15 @@ import org.opensearch.action.ActionType; import org.opensearch.ad.constant.CommonValue; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; +import org.opensearch.timeseries.transport.JobResponse; -public class MockAnomalyDetectorJobAction extends ActionType { +public class MockAnomalyDetectorJobAction extends ActionType { // External Action which used for public facing RestAPIs. public static final String NAME = CommonValue.EXTERNAL_ACTION_PREFIX + "detector/mockjobmanagement"; public static final MockAnomalyDetectorJobAction INSTANCE = new MockAnomalyDetectorJobAction(); private MockAnomalyDetectorJobAction() { - super(NAME, AnomalyDetectorJobResponse::new); + super(NAME, JobResponse::new); } } diff --git a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java index bf339161b..48425a747 100644 --- a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java +++ b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java @@ -25,7 +25,6 @@ import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyDetectorJobRequest; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.AnomalyDetectorJobTransportAction; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -38,11 +37,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.tasks.Task; import org.opensearch.timeseries.model.DateRange; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportService; -public class MockAnomalyDetectorJobTransportActionWithUser extends - HandledTransportAction { +public class MockAnomalyDetectorJobTransportActionWithUser extends HandledTransportAction { private final Logger logger = LogManager.getLogger(AnomalyDetectorJobTransportAction.class); private final Client client; @@ -85,7 +84,7 @@ public MockAnomalyDetectorJobTransportActionWithUser( } @Override - protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener listener) { + protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener listener) { String detectorId = request.getDetectorID(); DateRange detectionDateRange = request.getDetectionDateRange(); boolean historical = request.isHistorical(); @@ -125,7 +124,7 @@ protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionLis } private void executeDetector( - ActionListener listener, + ActionListener listener, String detectorId, long seqNo, long primaryTerm, diff --git a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java index 59853ce62..3bb0f1fbb 100644 --- a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java +++ b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.action.DocWriteResponse.Result.CREATED; -import static org.opensearch.ad.constant.ADCommonMessages.CAN_NOT_FIND_LATEST_TASK; +import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_LATEST_TASK; import java.io.IOException; import java.util.Arrays; @@ -42,7 +42,6 @@ import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.task.ADTaskCacheManager; import org.opensearch.ad.task.ADTaskManager; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.AnomalyResultAction; import org.opensearch.ad.transport.AnomalyResultResponse; import org.opensearch.ad.transport.ProfileAction; @@ -60,6 +59,7 @@ import org.opensearch.timeseries.common.exception.InternalFailure; import org.opensearch.timeseries.common.exception.ResourceNotFoundException; import org.opensearch.timeseries.model.Feature; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; import org.opensearch.transport.TransportService; @@ -146,9 +146,9 @@ public void setUp() throws Exception { adTaskManager = mock(ADTaskManager.class); doAnswer(invocation -> { Object[] args = invocation.getArguments(); - ActionListener listener = (ActionListener) args[4]; + ActionListener listener = (ActionListener) args[4]; - AnomalyDetectorJobResponse response = mock(AnomalyDetectorJobResponse.class); + JobResponse response = mock(JobResponse.class); listener.onResponse(response); return null; @@ -193,7 +193,7 @@ public void setUp() throws Exception { public void testDelayHCProfile() { when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(false); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); handler.startAnomalyDetectorJob(detector, listener); @@ -220,7 +220,7 @@ public void testNoDelayHCProfile() { when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); handler.startAnomalyDetectorJob(detector, listener); @@ -246,7 +246,7 @@ public void testHCProfileException() { when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); handler.startAnomalyDetectorJob(detector, listener); @@ -283,7 +283,7 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingNodeResourceNotFoundExcept return null; }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); handler.startAnomalyDetectorJob(detector, listener); @@ -321,7 +321,7 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingException() { return null; }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); handler.startAnomalyDetectorJob(detector, listener); @@ -347,7 +347,7 @@ public void testIndexException() throws IOException { return null; }).when(client).execute(any(AnomalyResultAction.class), any(), any()); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); AggregationBuilder aggregationBuilder = TestHelpers .parseAggregation("{\"test\":{\"max\":{\"field\":\"" + MockSimpleLog.VALUE_FIELD + "\"}}}"); Feature feature = new Feature(randomAlphaOfLength(5), randomAlphaOfLength(10), true, aggregationBuilder); diff --git a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java index 9d09f7fee..f9df58903 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java @@ -94,7 +94,6 @@ import org.opensearch.ad.transport.ADStatsNodesResponse; import org.opensearch.ad.transport.ADTaskProfileNodeResponse; import org.opensearch.ad.transport.ADTaskProfileResponse; -import org.opensearch.ad.transport.AnomalyDetectorJobResponse; import org.opensearch.ad.transport.ForwardADTaskRequest; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; @@ -130,6 +129,7 @@ import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.TaskState; import org.opensearch.timeseries.task.RealtimeTaskCache; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; @@ -157,7 +157,7 @@ public class ADTaskManagerTests extends ADUnitTestCase { private IndexAnomalyDetectorJobActionHandler indexAnomalyDetectorJobActionHandler; private DateRange detectionDateRange; - private ActionListener listener; + private ActionListener listener; private DiscoveryNode node1; private DiscoveryNode node2; @@ -200,7 +200,7 @@ public class ADTaskManagerTests extends ADUnitTestCase { + ",\"parent_task_id\":\"a1civ3sBwF58XZxvKrko\",\"worker_node\":\"DL5uOJV3TjOOAyh5hJXrCA\",\"current_piece\"" + ":1630999260000,\"execution_end_time\":1630999442814}}"; @Captor - ArgumentCaptor> remoteResponseHandler; + ArgumentCaptor> remoteResponseHandler; @Override public void setUp() throws Exception { @@ -255,9 +255,9 @@ public void setUp() throws Exception { ) ); - listener = spy(new ActionListener() { + listener = spy(new ActionListener() { @Override - public void onResponse(AnomalyDetectorJobResponse bulkItemResponses) {} + public void onResponse(JobResponse bulkItemResponses) {} @Override public void onFailure(Exception e) {} @@ -775,7 +775,7 @@ public void testGetLocalADTaskProfilesByDetectorId() { @SuppressWarnings("unchecked") public void testRemoveStaleRunningEntity() throws IOException { - ActionListener actionListener = mock(ActionListener.class); + ActionListener actionListener = mock(ActionListener.class); ADTask adTask = randomAdTask(); String entity = randomAlphaOfLength(5); ExecutorService executeService = mock(ExecutorService.class); @@ -1006,7 +1006,7 @@ public void testStartHistoricalAnalysisWithNoOwningNode() throws IOException { DateRange detectionDateRange = TestHelpers.randomDetectionDateRange(); User user = null; int availableTaskSlots = randomIntBetween(1, 10); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer> function = invocation.getArgument(1); function.accept(Optional.empty()); @@ -1187,7 +1187,7 @@ private void setupGetAndExecuteOnLatestADTasks(ADTaskProfile adTaskProfile) { }).when(client).search(any(), any()); String detectorId = randomAlphaOfLength(5); Consumer> function = mock(Consumer.class); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer getNodeFunction = invocation.getArgument(0); @@ -1413,7 +1413,7 @@ public void testDeleteADTasksWithException() { @SuppressWarnings("unchecked") public void testScaleUpTaskSlots() throws IOException { ADTask adTask = randomAdTask(ADTaskType.HISTORICAL_HC_ENTITY); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); when(adTaskCacheManager.getAvailableNewEntityTaskLanes(anyString())).thenReturn(0); doReturn(2).when(adTaskManager).detectorTaskSlotScaleDelta(anyString()); when(adTaskCacheManager.getLastScaleEntityTaskLaneTime(anyString())).thenReturn(null); @@ -1433,7 +1433,7 @@ public void testScaleUpTaskSlots() throws IOException { public void testForwardRequestToLeadNodeWithNotExistingNode() throws IOException { ADTask adTask = randomAdTask(ADTaskType.HISTORICAL_HC_ENTITY); ForwardADTaskRequest forwardADTaskRequest = new ForwardADTaskRequest(adTask, ADTaskAction.APPLY_FOR_TASK_SLOTS); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer> function = invocation.getArgument(1); function.accept(Optional.empty()); @@ -1449,7 +1449,7 @@ public void testScaleTaskLaneOnCoordinatingNode() { ADTask adTask = mock(ADTask.class); when(adTask.getCoordinatingNode()).thenReturn(node1.getId()); when(nodeFilter.getEligibleDataNodes()).thenReturn(new DiscoveryNode[] { node1, node2 }); - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, transportService, listener); } @@ -1458,7 +1458,7 @@ public void testStartDetectorWithException() throws IOException { AnomalyDetector detector = randomAnomalyDetector(ImmutableList.of(randomFeature(true))); DateRange detectionDateRange = randomDetectionDateRange(); User user = null; - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); when(detectionIndices.doesStateIndexExist()).thenReturn(false); doThrow(new RuntimeException("test")).when(detectionIndices).initStateIndex(any()); adTaskManager.startDetector(detector, detectionDateRange, user, transportService, listener); @@ -1469,7 +1469,7 @@ public void testStartDetectorWithException() throws IOException { public void testStopDetectorWithNonExistingDetector() { String detectorId = randomAlphaOfLength(5); boolean historical = true; - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer> function = invocation.getArgument(1); function.accept(Optional.empty()); @@ -1483,7 +1483,7 @@ public void testStopDetectorWithNonExistingDetector() { public void testStopDetectorWithNonExistingTask() { String detectorId = randomAlphaOfLength(5); boolean historical = true; - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer> function = invocation.getArgument(1); AnomalyDetector detector = randomAnomalyDetector(ImmutableList.of(randomFeature(true))); @@ -1505,7 +1505,7 @@ public void testStopDetectorWithNonExistingTask() { public void testStopDetectorWithTaskDone() { String detectorId = randomAlphaOfLength(5); boolean historical = true; - ActionListener listener = mock(ActionListener.class); + ActionListener listener = mock(ActionListener.class); doAnswer(invocation -> { Consumer> function = invocation.getArgument(1); AnomalyDetector detector = randomAnomalyDetector(ImmutableList.of(randomFeature(true))); diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java index 42c09d44f..79bc66527 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java @@ -37,18 +37,18 @@ import org.opensearch.commons.ConfigConstants; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.rest.RestStatus; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.model.DateRange; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.transport.TransportService; public class AnomalyDetectorJobActionTests extends OpenSearchIntegTestCase { private AnomalyDetectorJobTransportAction action; private Task task; private AnomalyDetectorJobRequest request; - private ActionListener response; + private ActionListener response; @Override @Before @@ -82,9 +82,9 @@ public void setUp() throws Exception { ); task = mock(Task.class); request = new AnomalyDetectorJobRequest("1234", 4567, 7890, "_start"); - response = new ActionListener() { + response = new ActionListener() { @Override - public void onResponse(AnomalyDetectorJobResponse adResponse) { + public void onResponse(JobResponse adResponse) { // Will not be called as there is no detector Assert.assertTrue(false); } @@ -138,10 +138,10 @@ public void testAdJobRequest_NullDetectionDateRange() throws IOException { @Test public void testAdJobResponse() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); - AnomalyDetectorJobResponse response = new AnomalyDetectorJobResponse("1234", 45, 67, 890, RestStatus.OK); + JobResponse response = new JobResponse("1234"); response.writeTo(out); StreamInput input = out.bytes().streamInput(); - AnomalyDetectorJobResponse newResponse = new AnomalyDetectorJobResponse(input); + JobResponse newResponse = new JobResponse(input); Assert.assertEquals(response.getId(), newResponse.getId()); } } diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 50765deb8..6f7629039 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -59,6 +59,7 @@ import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.TaskState; import org.opensearch.timeseries.stats.StatNames; +import org.opensearch.timeseries.transport.JobResponse; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -136,7 +137,7 @@ public void testStartHistoricalAnalysisWithUser() throws IOException { ); Client nodeClient = getDataNodeClient(); if (nodeClient != null) { - AnomalyDetectorJobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000); + JobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000); ADTask adTask = getADTask(response.getId()); assertNotNull(adTask.getStartedBy()); assertNotNull(adTask.getUser()); @@ -166,7 +167,7 @@ public void testStartHistoricalAnalysisForSingleCategoryHCWithUser() throws IOEx Client nodeClient = getDataNodeClient(); if (nodeClient != null) { - AnomalyDetectorJobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000); + JobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000); waitUntil(() -> { try { ADTask task = getADTask(response.getId()); @@ -218,7 +219,7 @@ public void testStartHistoricalAnalysisForMultiCategoryHCWithUser() throws IOExc Client nodeClient = getDataNodeClient(); if (nodeClient != null) { - AnomalyDetectorJobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100_000); + JobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100_000); String taskId = response.getId(); waitUntil(() -> { @@ -253,7 +254,7 @@ public void testRunMultipleTasksForHistoricalAnalysis() throws IOException, Inte .randomDetector(ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); String detectorId = createDetector(detector); AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId, dateRange); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); + JobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); assertNotNull(response.getId()); OpenSearchStatusException exception = null; // Add retry to solve the flaky test @@ -326,7 +327,7 @@ public void testCleanOldTaskDocs() throws InterruptedException, IOException { START_JOB ); - AtomicReference response = new AtomicReference<>(); + AtomicReference response = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); Thread.sleep(2000); client().execute(AnomalyDetectorJobAction.INSTANCE, request, ActionListener.wrap(r -> { @@ -369,7 +370,7 @@ private List startRealtimeDetector() throws IOException { .randomDetector(ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); String detectorId = createDetector(detector); AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId, null); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); + JobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); String jobId = response.getId(); assertEquals(detectorId, jobId); return ImmutableList.of(detectorId, jobId); diff --git a/src/test/java/org/opensearch/ad/transport/ForwardADTaskTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ForwardADTaskTransportActionTests.java index a45fb6779..ac83b5c8e 100644 --- a/src/test/java/org/opensearch/ad/transport/ForwardADTaskTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ForwardADTaskTransportActionTests.java @@ -40,6 +40,7 @@ import org.opensearch.tasks.Task; import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.transport.JobResponse; import org.opensearch.transport.TransportService; import com.google.common.collect.ImmutableList; @@ -53,7 +54,7 @@ public class ForwardADTaskTransportActionTests extends ADUnitTestCase { private NodeStateManager stateManager; private ForwardADTaskTransportAction forwardADTaskTransportAction; private Task task; - private ActionListener listener; + private ActionListener listener; @SuppressWarnings("unchecked") @Override diff --git a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java index 1181b685f..5b14020d6 100644 --- a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java @@ -38,7 +38,6 @@ import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.ADTaskType; import org.opensearch.ad.settings.AnomalyDetectorSettings; @@ -143,7 +142,7 @@ public void testInvalidRequest() throws IOException { future = new PlainActionFuture<>(); action.doExecute(null, request, future); - assertException(future, OpenSearchStatusException.class, ADCommonMessages.EMPTY_PROFILES_COLLECT); + assertException(future, OpenSearchStatusException.class, CommonMessages.EMPTY_PROFILES_COLLECT); } @SuppressWarnings("unchecked") diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorResponseTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorResponseTests.java index 6c52634d0..510ed2683 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorResponseTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorResponseTests.java @@ -16,12 +16,12 @@ import java.util.Map; import org.junit.Test; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.model.DetectorValidationIssue; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.constant.CommonMessages; public class ValidateAnomalyDetectorResponseTests extends AbstractTimeSeriesTest { @@ -84,7 +84,7 @@ public void testResponseToXContentWithIntervalRec() throws IOException { String validationResponse = TestHelpers.xContentBuilderToString(response.toXContent(TestHelpers.builder())); assertEquals( "{\"model\":{\"detection_interval\":{\"message\":\"" - + ADCommonMessages.DETECTOR_INTERVAL_REC + + CommonMessages.INTERVAL_REC + intervalRec + "\",\"suggested_value\":{\"period\":{\"interval\":5,\"unit\":\"Minutes\"}}}}}", validationResponse diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 23a5150cc..685f3a07e 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -58,7 +58,6 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.ShardSearchFailure; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.constant.CommonValue; import org.opensearch.ad.feature.Features; @@ -133,6 +132,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.rest.OpenSearchRestTestCase; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.dataprocessor.ImputationMethod; import org.opensearch.timeseries.dataprocessor.ImputationOption; @@ -1541,7 +1541,7 @@ public static DetectorValidationIssue randomDetectorValidationIssueWithDetectorI DetectorValidationIssue issue = new DetectorValidationIssue( ValidationAspect.MODEL, ValidationIssueType.DETECTION_INTERVAL, - ADCommonMessages.DETECTOR_INTERVAL_REC + intervalRec, + CommonMessages.INTERVAL_REC + intervalRec, null, new IntervalTimeConfiguration(intervalRec, ChronoUnit.MINUTES) );