Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

40 add logging aggregation, LOGLEVEL env var, other logging tweaks #45

Merged
merged 7 commits into from
Jul 26, 2023
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which
```
PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password
PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port
LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`))
DEV_MODE=1 // disables host verification
```

Expand Down
2 changes: 2 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Requires a running deployment of registry
#### Env Variables
`PROV_ENDPOINT` - the URL of the registry OpenSearch http endpoint
`PROV_CREDENTIALS` - a JSON string of format `{"$username": "$password"}`
`LOGLEVEL` - (optional - defaults to `INFO`) an integer log level or anycase string matching a python log level like `INFO`
`DEV_MODE=1` - (optional) in dev mode, host cert verification is disabled


### Development
Expand Down
5 changes: 3 additions & 2 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from typing import Callable, Iterable

from pds.registrysweepers import provenance, ancestry
from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since
from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9% of developers fix this issue

reportMissingImports: Import "pds.registrysweepers.utils" could not be resolved


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


configure_logging(filepath=None, log_level=logging.INFO)
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -92,6 +92,7 @@
logging.error(err)
raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}')

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that Sean recommended to use a log configuration file to configure the python logs, I don't know how that would integrate in a AWS deployment. We would need to mount the log configuration file in the docker image from a configuration store (I don't know what that is in AWS?). Then we would have the log level in this file.

I don't wnat to overcomplicate the changes right now, so let say it is for comment/discussion, not for change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you serve config files or complex objects from the Parameter Store? Preferably whatever mechanism we use should be trivially-adjustable in AWS (as environment variables are) to allow us to switch into debug logging easily if an issue is found.


def run_factory(sweeper_f: Callable) -> Callable:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7% of developers fix this issue

E302: expected 2 blank lines, found 1


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.

return functools.partial(
Expand All @@ -100,7 +101,7 @@ def run_factory(sweeper_f: Callable) -> Callable:
username=username,
password=password,
log_filepath='provenance.log',
log_level=logging.INFO, # TODO: pull this from LOGLEVEL env var
log_level=log_level,
verify_host_certs=True if not dev_mode else False
)

Expand Down
70 changes: 50 additions & 20 deletions src/pds/registrysweepers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def parse_log_level(input: str) -> int:
try:
result = int(input)
except ValueError:
result = getattr(logging, input)
result = getattr(logging, input.upper())
return result


Expand Down Expand Up @@ -123,6 +123,9 @@ def query_registry_db(
path = ",".join([index_name] + cross_cluster_indexes) + f"/_search?scroll={scroll_validity_duration_minutes}m"
served_hits = 0

last_info_log_at_percentage = 0
log.info("Query progress: 0%")

more_data_exists = True
while more_data_exists:
resp = retry_call(
Expand All @@ -143,14 +146,11 @@ def query_registry_db(
total_hits = data["hits"]["total"]["value"]
log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})")

last_info_log_at_percentage = 0
log.info("Query progress: 0%")

for hit in data["hits"]["hits"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a tqdm package which can help to track progress in a loop. I would prefer to use that rather than adding specific indicators which can make the code less readable regarding its primary focus.

served_hits += 1

percentage_of_hits_served = int(served_hits / total_hits * 100)
if last_info_log_at_percentage is None or percentage_of_hits_served > (last_info_log_at_percentage + 5):
if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5):
last_info_log_at_percentage = percentage_of_hits_served
log.info(f"Query progress: {percentage_of_hits_served}%")

Expand Down Expand Up @@ -245,25 +245,55 @@ def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterabl
headers=headers,
verify=host.verify,
)
response.raise_for_status()

# N.B. HTTP status 200 is insufficient as a success check for _bulk API.
# See: https://github.com/elastic/elasticsearch/issues/41434
response.raise_for_status()
response_content = response.json()
if response_content.get("errors"):
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
items_with_error = [item for item in response_content["items"] if "error" in item["update"]]
items_with_warnings = [item for item in items_with_error if item["update"]["error"]["type"] in warn_types]
items_with_errors = [item for item in items_with_error if item["update"]["error"]["type"] not in warn_types]

for item in items_with_warnings:
error_type = item["update"]["error"]["type"]
log.warning(f'Attempt to update document {item["update"]["_id"]} failed due to {error_type}')

for item in items_with_errors:
log.error(
f'Attempt to update document {item["update"]["_id"]} unexpectedly failed: {item["update"]["error"]}'
)

log.info("Successfully wrote bulk updates chunk")
items_with_problems = [item for item in response_content["items"] if "error" in item["update"]]

if log.isEnabledFor(logging.WARNING):
items_with_warnings = [
item for item in items_with_problems if item["update"]["error"]["type"] in warn_types
]
warning_aggregates = aggregate_update_error_types(items_with_warnings)
for error_type, reason_aggregate in warning_aggregates.items():
for error_reason, ids in reason_aggregate.items():
log.warning(
f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}"
)

if log.isEnabledFor(logging.ERROR):
items_with_errors = [
item for item in items_with_problems if item["update"]["error"]["type"] not in warn_types
]
error_aggregates = aggregate_update_error_types(items_with_errors)
for error_type, reason_aggregate in error_aggregates.items():
for error_reason, ids in reason_aggregate.items():
log.error(
f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}"
)


def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]:
"""Return a nested aggregation of ids, aggregated first by error type, then by reason"""
agg: Dict[str, Dict[str, List[str]]] = {}
for item in items:
id = item["update"]["_id"]
error = item["update"]["error"]
error_type = error["type"]
error_reason = error["reason"]
if error_type not in agg:
agg[error_type] = {}

if error_reason not in agg[error_type]:
agg[error_type][error_reason] = []

agg[error_type][error_reason].append(id)

return agg


def coerce_list_type(db_value: Any) -> List[Any]:
Expand Down