From cdf49a55a7df68b97362d2823318d4296a9427c6 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Wed, 26 Jun 2024 21:17:30 -0600 Subject: [PATCH] Add detailed shard statuses (#780) Signed-off-by: Mikayla Thompson --- .../console_link/models/backfill_rfs.py | 75 +++++++++++++++---- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index 8ce27a224..41b4cddf8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -1,4 +1,6 @@ +from datetime import datetime from typing import Dict, Optional + from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.cluster import Cluster from console_link.models.schema_tools import contains_one_of @@ -109,6 +111,7 @@ def get_status(self, deep_check, *args, **kwargs) -> CommandResult: shard_status = self._get_detailed_status() except Exception as e: logger.error(f"Failed to get detailed status: {e}") + shard_status = None if shard_status: status_string += f"\n{shard_status}" @@ -119,18 +122,64 @@ def get_status(self, deep_check, *args, **kwargs) -> CommandResult: return CommandResult(True, (BackfillStatus.STOPPED, status_string)) def _get_detailed_status(self) -> Optional[str]: - status_query = {"query": { - "bool": { - "must": [{"exists": {"field": "expiration"}}], - "must_not": [{"exists": {"field": "completedAt"}}] - } + current_epoch_seconds = int(datetime.now().timestamp()) + incomplete_query = {"query": { + "bool": {"must_not": [{"exists": {"field": "completedAt"}}]} + }} + completed_query = {"query": { + "bool": {"must": [{"exists": {"field": "completedAt"}}]} + }} + total_query = {"query": {"match_all": {}}} + in_progress_query = {"query": { + "bool": {"must": [ + {"range": {"expiration": {"gte": current_epoch_seconds}}}, + {"bool": {"must_not": [{"exists": {"field": "completedAt"}}]}} + ]} + }} + unclaimed_query = {"query": { + "bool": {"must": [ + {"range": {"expiration": {"lt": current_epoch_seconds}}}, + {"bool": {"must_not": [{"exists": {"field": "completedAt"}}]}} + ]} }} - response = self.target_cluster.call_api("/.migrations_working_state/_search", json_body=status_query) - r_body = response.json() - logger.debug(f"Raw response: {r_body}") - if "hits" in r_body: - logger.info(f"Hits on detailed status query: {r_body['hits']}") - logger.info(f"Sample of remaining shards: {[hit['_id'] for hit in r_body['hits']['hits']]}") - return f"Remaining shards: {r_body['hits']['total']['value']}" - logger.warning("No hits on detailed status query, migration_working_state index may not exist or be populated") + queries = { + "total": total_query, + "completed": completed_query, + "incomplete": incomplete_query, + "in progress": in_progress_query, + "unclaimed": unclaimed_query + } + values = {key: parse_query_response(queries[key], self.target_cluster, key) for key in queries.keys()} + logger.info(f"Values: {values}") + if None in values.values(): + logger.warning(f"Failed to get values for some queries: {values}") + return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) + + disclaimer = "This may be transient because of timing of executing the queries or indicate an issue" +\ + " with the queries or the working state index" + # Check the various sums to make sure things add up correctly. + if values["incomplete"] + values["completed"] != values["total"]: + logger.warning(f"Incomplete ({values['incomplete']}) and completed ({values['completed']}) shards do not " + f"sum to the total ({values['total']}) shards." + disclaimer) + if values["unclaimed"] + values["in progress"] != values["incomplete"]: + logger.warning(f"Unclaimed ({values['unclaimed']}) and in progress ({values['in progress']}) shards do not" + f" sum to the incomplete ({values['incomplete']}) shards." + disclaimer) + + return "\n".join([f"Shards {key}: {value}" for key, value in values.items()]) + + +def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: + try: + response = cluster.call_api("/.migrations_working_state/_search", json_body=query) + except Exception as e: + logger.error(f"Failed to execute query: {e}") return None + logger.debug(f"Query: {label}, {response.request.path_url}, {response.request.body}") + body = response.json() + logger.debug(f"Raw response: {body}") + if "hits" in body: + logger.debug(f"Hits on {label} query: {body['hits']}") + logger.info(f"Sample of {label} shards: {[hit['_id'] for hit in body['hits']['hits']]}") + return int(body['hits']['total']['value']) + logger.warning(f"No hits on {label} query, migration_working_state index may not exist or be populated") + return None