From 5aed6d0ca63e23e921996b52c71b5f2852eb922f Mon Sep 17 00:00:00 2001 From: Byron Himes Date: Wed, 7 Feb 2024 14:04:41 +0100 Subject: [PATCH] Use configured logging (GSI-614) (#19) * Add logging and update docs * Remove extra 'debug-statements' pre-commit hooks * Add log statements to repository and event sub --------- Co-authored-by: TheByronHimes --- .pre-commit-config.yaml | 3 - README.md | 47 +++++++++--- config_schema.json | 59 +++++++++++---- example_config.yaml | 2 + src/wps/adapters/inbound/event_sub.py | 7 ++ src/wps/config.py | 8 +- src/wps/core/repository.py | 103 +++++++++++++++++++++----- src/wps/main.py | 3 + 8 files changed, 183 insertions(+), 49 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0fc1a81..49ab325 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -47,9 +47,6 @@ repos: args: [--fix=lf] - id: no-commit-to-branch args: [--branch, dev, --branch, int, --branch, main] - - id: debug-statements - - id: debug-statements - - id: debug-statements - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.2.1 hooks: diff --git a/README.md b/README.md index 4d6c982..93e9cfd 100644 --- a/README.md +++ b/README.md @@ -83,46 +83,69 @@ wps --help ### Parameters The service requires the following configuration parameters: -- **`datasets_collection`** *(string)*: The name of the database collection for datasets. Default: `"datasets"`. - -- **`work_packages_collection`** *(string)*: The name of the database collection for work packages. Default: `"workPackages"`. +- **`log_level`** *(string)*: The minimum log level to capture. Must be one of: `["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "TRACE"]`. Default: `"INFO"`. -- **`work_package_valid_days`** *(integer)*: How many days a work package (and its access token) stays valid. Default: `30`. +- **`service_name`** *(string)*: Default: `"wps"`. -- **`work_package_signing_key`** *(string, format: password)*: The private key for signing work order tokens. +- **`service_instance_id`** *(string)*: A string that uniquely identifies this instance across all instances of this service. A globally unique Kafka client ID will be created by concatenating the service_name and the service_instance_id. Examples: ```json - "{\"crv\": \"P-256\", \"kty\": \"EC\", \"x\": \"...\", \"y\": \"...\"}" + "germany-bw-instance-001" ``` -- **`db_connection_str`** *(string, format: password)*: MongoDB connection string. Might include credentials. For more information see: https://naiveskill.com/mongodb-connection-string/. +- **`log_format`**: If set, will replace JSON formatting with the specified string format. If not set, has no effect. In addition to the standard attributes, the following can also be specified: timestamp, service, instance, level, correlation_id, and details. Default: `null`. + + - **Any of** + + - *string* + + - *null* Examples: ```json - "mongodb://localhost:27017" + "%(timestamp)s - %(service)s - %(level)s - %(message)s" ``` -- **`db_name`** *(string)*: Default: `"work-packages"`. + ```json + "%(asctime)s - Severity: %(levelno)s - %(msg)s" + ``` -- **`service_name`** *(string)*: Default: `"wps"`. -- **`service_instance_id`** *(string)*: A string that uniquely identifies this instance across all instances of this service. A globally unique Kafka client ID will be created by concatenating the service_name and the service_instance_id. +- **`datasets_collection`** *(string)*: The name of the database collection for datasets. Default: `"datasets"`. + +- **`work_packages_collection`** *(string)*: The name of the database collection for work packages. Default: `"workPackages"`. + +- **`work_package_valid_days`** *(integer)*: How many days a work package (and its access token) stays valid. Default: `30`. + +- **`work_package_signing_key`** *(string, format: password)*: The private key for signing work order tokens. Examples: ```json - "germany-bw-instance-001" + "{\"crv\": \"P-256\", \"kty\": \"EC\", \"x\": \"...\", \"y\": \"...\"}" ``` +- **`db_connection_str`** *(string, format: password)*: MongoDB connection string. Might include credentials. For more information see: https://naiveskill.com/mongodb-connection-string/. + + + Examples: + + ```json + "mongodb://localhost:27017" + ``` + + +- **`db_name`** *(string)*: Default: `"work-packages"`. + - **`kafka_servers`** *(array)*: A list of connection strings to connect to Kafka bootstrap servers. - **Items** *(string)* diff --git a/config_schema.json b/config_schema.json index c76b84d..7a61321 100644 --- a/config_schema.json +++ b/config_schema.json @@ -2,6 +2,50 @@ "additionalProperties": false, "description": "Modifies the orginal Settings class provided by the user", "properties": { + "log_level": { + "default": "INFO", + "description": "The minimum log level to capture.", + "enum": [ + "CRITICAL", + "ERROR", + "WARNING", + "INFO", + "DEBUG", + "TRACE" + ], + "title": "Log Level", + "type": "string" + }, + "service_name": { + "default": "wps", + "title": "Service Name", + "type": "string" + }, + "service_instance_id": { + "description": "A string that uniquely identifies this instance across all instances of this service. A globally unique Kafka client ID will be created by concatenating the service_name and the service_instance_id.", + "examples": [ + "germany-bw-instance-001" + ], + "title": "Service Instance Id", + "type": "string" + }, + "log_format": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "If set, will replace JSON formatting with the specified string format. If not set, has no effect. In addition to the standard attributes, the following can also be specified: timestamp, service, instance, level, correlation_id, and details", + "examples": [ + "%(timestamp)s - %(service)s - %(level)s - %(message)s", + "%(asctime)s - Severity: %(levelno)s - %(msg)s" + ], + "title": "Log Format" + }, "datasets_collection": { "default": "datasets", "description": "The name of the database collection for datasets", @@ -45,19 +89,6 @@ "title": "Db Name", "type": "string" }, - "service_name": { - "default": "wps", - "title": "Service Name", - "type": "string" - }, - "service_instance_id": { - "description": "A string that uniquely identifies this instance across all instances of this service. A globally unique Kafka client ID will be created by concatenating the service_name and the service_instance_id.", - "examples": [ - "germany-bw-instance-001" - ], - "title": "Service Instance Id", - "type": "string" - }, "kafka_servers": { "description": "A list of connection strings to connect to Kafka bootstrap servers.", "examples": [ @@ -311,9 +342,9 @@ } }, "required": [ + "service_instance_id", "work_package_signing_key", "db_connection_str", - "service_instance_id", "kafka_servers", "dataset_change_event_topic", "dataset_upsertion_event_type", diff --git a/example_config.yaml b/example_config.yaml index d1cdec7..6103b8a 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -30,6 +30,8 @@ kafka_ssl_cafile: '' kafka_ssl_certfile: '' kafka_ssl_keyfile: '' kafka_ssl_password: '' +log_format: null +log_level: INFO openapi_url: /openapi.json port: 8080 service_instance_id: wps_1 diff --git a/src/wps/adapters/inbound/event_sub.py b/src/wps/adapters/inbound/event_sub.py index a9d1282..9b8ab82 100644 --- a/src/wps/adapters/inbound/event_sub.py +++ b/src/wps/adapters/inbound/event_sub.py @@ -16,6 +16,7 @@ """KafkaEventSubscriber receiving events that announce datasets""" +import logging from contextlib import suppress from ghga_event_schemas import pydantic_ as event_schemas @@ -30,6 +31,8 @@ __all__ = ["EventSubTranslatorConfig", "EventSubTranslator"] +log = logging.getLogger(__name__) + class EventSubTranslatorConfig(BaseSettings): """Config for dataset creation related events.""" @@ -83,6 +86,10 @@ async def _handle_upsertion(self, payload: JsonObject): stage = WorkType[validated_payload.stage.name] except KeyError: # stage does not correspond to a work type, ignore event + log.info( + "Ignoring dataset event with unknown stage %s", + validated_payload.stage.name, + ) return files = [ diff --git a/src/wps/config.py b/src/wps/config.py index 83281bc..03006c4 100644 --- a/src/wps/config.py +++ b/src/wps/config.py @@ -18,6 +18,7 @@ from ghga_service_commons.api import ApiConfigBase from ghga_service_commons.auth.ghga import AuthConfig from hexkit.config import config_from_yaml +from hexkit.log import LoggingConfig from hexkit.providers.akafka import KafkaConfig from hexkit.providers.mongodb import MongoDbConfig @@ -25,8 +26,10 @@ from wps.adapters.outbound.http import AccessCheckConfig from wps.core.repository import WorkPackageConfig +SERVICE_NAME = "wps" -@config_from_yaml(prefix="wps") + +@config_from_yaml(prefix=SERVICE_NAME) class Config( ApiConfigBase, AuthConfig, @@ -35,8 +38,9 @@ class Config( KafkaConfig, MongoDbConfig, WorkPackageConfig, + LoggingConfig, ): """Config parameters and their defaults.""" - service_name: str = "wps" + service_name: str = SERVICE_NAME db_name: str = "work-packages" diff --git a/src/wps/core/repository.py b/src/wps/core/repository.py index bed04db..1a51e81 100644 --- a/src/wps/core/repository.py +++ b/src/wps/core/repository.py @@ -16,6 +16,7 @@ """A repository for work packages.""" +import logging from datetime import timedelta from typing import Optional @@ -48,6 +49,8 @@ WorkPackageDaoPort, ) +log = logging.getLogger(__name__) + class WorkPackageConfig(BaseSettings): """Config parameters needed for the WorkPackageRepository.""" @@ -88,7 +91,9 @@ def __init__( config.work_package_signing_key.get_secret_value() ) if not self._signing_key.has_private: - raise KeyError("No private work order signing key found.") + key_error = KeyError("No private work order signing key found.") + log.error(key_error) + raise key_error self._access = access_check self._dataset_dao = dataset_dao self._dao = work_package_dao @@ -107,22 +112,37 @@ async def create( """ user_id = auth_context.id if user_id is None: - raise self.WorkPackageAccessError("No internal user specified") + access_error = self.WorkPackageAccessError("No internal user specified") + log.error(access_error) + raise access_error + dataset_id = creation_data.dataset_id work_type = creation_data.type + extra = { # only used for logging + "user_id": user_id, + "dataset_id": dataset_id, + "work_type": work_type, + } + if work_type == WorkType.DOWNLOAD: if not await self._access.check_download_access(user_id, dataset_id): - raise self.WorkPackageAccessError("Missing dataset access permission") + access_error = self.WorkPackageAccessError( + "Missing dataset access permission" + ) + log.error(access_error, extra=extra) + raise access_error else: - raise self.WorkPackageAccessError("Unsupported work type") + access_error = self.WorkPackageAccessError("Unsupported work type") + log.error(access_error, extra=extra) + raise access_error try: dataset = await self.get_dataset(dataset_id) except self.DatasetNotFoundError as error: - raise self.WorkPackageAccessError( - "Cannot determine dataset files" - ) from error + access_error = self.WorkPackageAccessError("Cannot determine dataset files") + log.error(access_error, extra=extra) + raise access_error from error file_ids = [file.id for file in dataset.files] if creation_data.file_ids is not None: @@ -130,7 +150,11 @@ async def create( file_id_set = set(creation_data.file_ids) file_ids = [file_id for file_id in file_ids if file_id in file_id_set] if not file_ids: - raise self.WorkPackageAccessError("No existing files have been specified") + access_error = self.WorkPackageAccessError( + "No existing files have been specified" + ) + log.error(access_error, extra=extra) + raise access_error file_id_set = set(file_ids) files = { @@ -178,24 +202,44 @@ async def get( - if a work_package_access_token is specified and it does not match the token hash that is stored in the work package """ + extra = {"work_package_id": work_package_id} # only used for logging + try: work_package = await self._dao.get_by_id(work_package_id) except ResourceNotFoundError as error: - raise self.WorkPackageAccessError("Work package not found") from error + access_error = self.WorkPackageAccessError("Work package not found") + log.error(access_error, extra=extra) + raise access_error from error + if work_package_access_token and work_package.token_hash != hash_token( work_package_access_token ): - raise self.WorkPackageAccessError("Invalid work package access token") + access_error = self.WorkPackageAccessError( + "Invalid work package access token" + ) + log.error(access_error, extra=extra) + raise access_error + if check_valid: if not work_package.created <= now_as_utc() <= work_package.expires: - raise self.WorkPackageAccessError("Work package has expired") + access_error = self.WorkPackageAccessError("Work package has expired") + log.error(access_error, extra=extra) + raise access_error + if work_package.type == WorkType.DOWNLOAD: if not await self._access.check_download_access( work_package.user_id, work_package.dataset_id ): - raise self.WorkPackageAccessError("Access has been revoked") + access_error = self.WorkPackageAccessError( + "Access has been revoked" + ) + log.error(access_error, extra=extra) + raise access_error else: - raise self.WorkPackageAccessError("Unsupported work type") + access_error = self.WorkPackageAccessError("Unsupported work type") + log.error(access_error, extra=extra) + raise access_error + return work_package async def work_order_token( @@ -215,13 +259,25 @@ async def work_order_token( - if a work_package_access_token is specified and it does not match the token hash that is stored in the work package """ + extra = { # only used for logging + "work_package_id": work_package_id, + "file_id": file_id, + "check_valid": check_valid, + } + work_package = await self.get( work_package_id, check_valid=check_valid, work_package_access_token=work_package_access_token, ) + if file_id not in work_package.files: - raise self.WorkPackageAccessError("File is not contained in work package") + access_error = self.WorkPackageAccessError( + "File is not contained in work package" + ) + log.error(access_error, extra=extra) + raise access_error + user_public_crypt4gh_key = work_package.user_public_crypt4gh_key wot = WorkOrderToken( type=work_package.type, @@ -246,7 +302,9 @@ async def delete_dataset(self, dataset_id: str) -> None: try: await self._dataset_dao.delete(id_=dataset_id) except ResourceNotFoundError as error: - raise self.DatasetNotFoundError("Dataset not found") from error + dataset_not_found_error = self.DatasetNotFoundError("Dataset not found") + log.error(dataset_not_found_error, extra={"dataset_id": dataset_id}) + raise dataset_not_found_error from error async def get_dataset(self, dataset_id: str) -> Dataset: """Get a registered dataset using the given ID. @@ -256,7 +314,9 @@ async def get_dataset(self, dataset_id: str) -> Dataset: try: return await self._dataset_dao.get_by_id(dataset_id) except ResourceNotFoundError as error: - raise self.DatasetNotFoundError("Dataset not found") from error + dataset_not_found_error = self.DatasetNotFoundError("Dataset not found") + log.error(dataset_not_found_error, extra={"dataset_id": dataset_id}) + raise dataset_not_found_error from error async def get_datasets( self, *, auth_context: AuthContext, work_type: Optional[WorkType] = None @@ -270,15 +330,22 @@ async def get_datasets( """ user_id = auth_context.id if user_id is None: - raise self.WorkPackageAccessError("No internal user specified") + access_error = self.WorkPackageAccessError("No internal user specified") + log.error(access_error) + raise access_error + if work_type is not None and work_type != WorkType.DOWNLOAD: - raise self.WorkPackageAccessError("Unsupported work type") + access_error = self.WorkPackageAccessError("Unsupported work type") + log.error(access_error, extra={"work_type": work_type}) + raise access_error + dataset_ids = await self._access.get_datasets_with_download_access(user_id) datasets: list[Dataset] = [] for dataset_id in dataset_ids: try: dataset = await self.get_dataset(dataset_id) except self.DatasetNotFoundError: + log.debug("Dataset '%s' not found, continuing...", dataset_id) continue datasets.append(dataset) return datasets diff --git a/src/wps/main.py b/src/wps/main.py index a1a2b96..dc2a253 100644 --- a/src/wps/main.py +++ b/src/wps/main.py @@ -16,6 +16,7 @@ """In this module object construction and dependency injection is carried out.""" from ghga_service_commons.api import run_server +from hexkit.log import configure_logging from wps.config import Config from wps.inject import prepare_consumer, prepare_rest_app @@ -24,6 +25,7 @@ async def run_rest_app() -> None: """Run the HTTP REST API.""" config = Config() # type: ignore + configure_logging(config=config) async with prepare_rest_app(config=config) as app: await run_server(app=app, config=config) @@ -32,6 +34,7 @@ async def run_rest_app() -> None: async def consume_events(run_forever: bool = True) -> None: """Run an event consumer listening to the configured topic.""" config = Config() # type: ignore + configure_logging(config=config) async with prepare_consumer(config=config) as consumer: await consumer.event_subscriber.run(forever=run_forever)