Skip to content

Commit

Permalink
Add detailed shard statuses (#780)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson authored Jun 27, 2024
1 parent 99a2575 commit cdf49a5
Showing 1 changed file with 62 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import datetime
from typing import Dict, Optional

from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.cluster import Cluster
from console_link.models.schema_tools import contains_one_of
Expand Down Expand Up @@ -109,6 +111,7 @@ def get_status(self, deep_check, *args, **kwargs) -> CommandResult:
shard_status = self._get_detailed_status()
except Exception as e:
logger.error(f"Failed to get detailed status: {e}")
shard_status = None
if shard_status:
status_string += f"\n{shard_status}"

Expand All @@ -119,18 +122,64 @@ def get_status(self, deep_check, *args, **kwargs) -> CommandResult:
return CommandResult(True, (BackfillStatus.STOPPED, status_string))

def _get_detailed_status(self) -> Optional[str]:
status_query = {"query": {
"bool": {
"must": [{"exists": {"field": "expiration"}}],
"must_not": [{"exists": {"field": "completedAt"}}]
}
current_epoch_seconds = int(datetime.now().timestamp())
incomplete_query = {"query": {
"bool": {"must_not": [{"exists": {"field": "completedAt"}}]}
}}
completed_query = {"query": {
"bool": {"must": [{"exists": {"field": "completedAt"}}]}
}}
total_query = {"query": {"match_all": {}}}
in_progress_query = {"query": {
"bool": {"must": [
{"range": {"expiration": {"gte": current_epoch_seconds}}},
{"bool": {"must_not": [{"exists": {"field": "completedAt"}}]}}
]}
}}
unclaimed_query = {"query": {
"bool": {"must": [
{"range": {"expiration": {"lt": current_epoch_seconds}}},
{"bool": {"must_not": [{"exists": {"field": "completedAt"}}]}}
]}
}}
response = self.target_cluster.call_api("/.migrations_working_state/_search", json_body=status_query)
r_body = response.json()
logger.debug(f"Raw response: {r_body}")
if "hits" in r_body:
logger.info(f"Hits on detailed status query: {r_body['hits']}")
logger.info(f"Sample of remaining shards: {[hit['_id'] for hit in r_body['hits']['hits']]}")
return f"Remaining shards: {r_body['hits']['total']['value']}"
logger.warning("No hits on detailed status query, migration_working_state index may not exist or be populated")
queries = {
"total": total_query,
"completed": completed_query,
"incomplete": incomplete_query,
"in progress": in_progress_query,
"unclaimed": unclaimed_query
}
values = {key: parse_query_response(queries[key], self.target_cluster, key) for key in queries.keys()}
logger.info(f"Values: {values}")
if None in values.values():
logger.warning(f"Failed to get values for some queries: {values}")
return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None])

disclaimer = "This may be transient because of timing of executing the queries or indicate an issue" +\
" with the queries or the working state index"
# Check the various sums to make sure things add up correctly.
if values["incomplete"] + values["completed"] != values["total"]:
logger.warning(f"Incomplete ({values['incomplete']}) and completed ({values['completed']}) shards do not "
f"sum to the total ({values['total']}) shards." + disclaimer)
if values["unclaimed"] + values["in progress"] != values["incomplete"]:
logger.warning(f"Unclaimed ({values['unclaimed']}) and in progress ({values['in progress']}) shards do not"
f" sum to the incomplete ({values['incomplete']}) shards." + disclaimer)

return "\n".join([f"Shards {key}: {value}" for key, value in values.items()])


def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]:
try:
response = cluster.call_api("/.migrations_working_state/_search", json_body=query)
except Exception as e:
logger.error(f"Failed to execute query: {e}")
return None
logger.debug(f"Query: {label}, {response.request.path_url}, {response.request.body}")
body = response.json()
logger.debug(f"Raw response: {body}")
if "hits" in body:
logger.debug(f"Hits on {label} query: {body['hits']}")
logger.info(f"Sample of {label} shards: {[hit['_id'] for hit in body['hits']['hits']]}")
return int(body['hits']['total']['value'])
logger.warning(f"No hits on {label} query, migration_working_state index may not exist or be populated")
return None

0 comments on commit cdf49a5

Please sign in to comment.