From d40480a82e74961018a63994c41e85b18f748fd1 Mon Sep 17 00:00:00 2001 From: SergioSim Date: Mon, 13 Nov 2023 09:07:27 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F(backends)=20use=20common=20u?= =?UTF-8?q?tilities=20among=20backends?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During data backend unification work some shared utilities were developed to factorize duplication: `parse_dict_to_bytes`, `read_raw` and `iter_by_batch`. However, some backends still used their own implementation of these utilities leading to some minor behavioral differences among backends. Thus, for consistency, we update backends to use common utilities. --- CHANGELOG.md | 1 - pyproject.toml | 1 - src/ralph/backends/data/async_es.py | 38 ++++--- src/ralph/backends/data/async_mongo.py | 68 ++++++------ src/ralph/backends/data/base.py | 38 +++---- src/ralph/backends/data/clickhouse.py | 108 ++++++------------ src/ralph/backends/data/es.py | 39 ++++--- src/ralph/backends/data/fs.py | 141 +++++++++++------------- src/ralph/backends/data/ldp.py | 12 +- src/ralph/backends/data/mongo.py | 68 ++++++------ src/ralph/backends/data/s3.py | 98 +++++----------- src/ralph/backends/data/swift.py | 110 ++++++++---------- src/ralph/backends/http/async_lrs.py | 8 +- src/ralph/cli.py | 2 +- src/ralph/utils.py | 92 +++++++++++++--- tests/backends/data/test_async_es.py | 34 ++++-- tests/backends/data/test_async_mongo.py | 41 ++++--- tests/backends/data/test_clickhouse.py | 36 +++--- tests/backends/data/test_es.py | 32 +++++- tests/backends/data/test_fs.py | 49 ++++++-- tests/backends/data/test_ldp.py | 2 +- tests/backends/data/test_mongo.py | 39 ++++--- tests/backends/data/test_s3.py | 36 +++--- tests/backends/data/test_swift.py | 15 ++- tests/backends/http/test_async_lrs.py | 7 +- 25 files changed, 590 insertions(+), 525 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42eb07b99..c08b0882b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,6 @@ methods under the unified `lrs` backend interface [BC] have an authority field matching that of the user - CLI: change `push` to `write` and `fetch` to `read` [BC] - Upgrade `fastapi` to `0.104.1` -- Upgrade `more-itertools` to `10.1.0` - Upgrade `sentry_sdk` to `1.34.0` - Upgrade `uvicorn` to `0.24.0.post1` - API: Invalid parameters now return 400 status code diff --git a/pyproject.toml b/pyproject.toml index e6c6421e4..1def2d29c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,6 @@ backend-ldp = [ ] backend-lrs = [ "httpx<0.25.0", # pin as Python 3.7 is no longer supported from release 0.25.0 - "more-itertools==10.1.0", ] backend-mongo = [ "motor[srv]>=3.3.0", diff --git a/src/ralph/backends/data/async_es.py b/src/ralph/backends/data/async_es.py index d156aa4e3..240ac71ca 100644 --- a/src/ralph/backends/data/async_es.py +++ b/src/ralph/backends/data/async_es.py @@ -18,7 +18,7 @@ ) from ralph.backends.data.es import ESDataBackend, ESDataBackendSettings, ESQuery from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import async_parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) Settings = TypeVar("Settings", bound=ESDataBackendSettings) @@ -131,7 +131,8 @@ async def read( # noqa: PLR0912, PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`. raw_output (bool): Controls whether to yield dictionaries or bytes. - ignore_errors (bool): Ignored. + ignore_errors (bool): No impact as encoding errors are not expected in + Elasticsearch results. Yield: bytes: The next raw document if `raw_output` is True. @@ -140,10 +141,23 @@ async def read( # noqa: PLR0912, PLR0913 Raise: BackendException: If a failure occurs during Elasticsearch connection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + async for document in async_parse_dict_to_bytes( + documents, self.settings.LOCALE_ENCODING, ignore_errors, logger + ): + yield document + + return + target = target if target else self.settings.DEFAULT_INDEX chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE - if ignore_errors: - logger.warning("The `ignore_errors` argument is ignored") if not query.pit.keep_alive: query.pit.keep_alive = self.settings.POINT_IN_TIME_KEEP_ALIVE @@ -183,10 +197,6 @@ async def read( # noqa: PLR0912, PLR0913 if count: query.search_after = [str(part) for part in documents[-1]["sort"]] kwargs["search_after"] = query.search_after - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) for document in documents: yield document @@ -206,9 +216,9 @@ async def write( # noqa: PLR0913 If target is `None`, the `DEFAULT_INDEX` is used instead. chunk_size (int or None): The number of documents to write in one batch. If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`. - ignore_errors (bool): If `True`, errors during the write operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, errors during decoding, encoding and + sending batches of documents are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -217,8 +227,8 @@ async def write( # noqa: PLR0913 int: The number of documents written. Raise: - BackendException: If a failure occurs while writing to Elasticsearch or - during document decoding and `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If the `operation_type` is `APPEND` as it is not supported. """ @@ -240,7 +250,7 @@ async def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_iterable_to_dict(data, ignore_errors, logger) logger.debug( "Start writing to the %s index (chunk size: %d)", target, chunk_size diff --git a/src/ralph/backends/data/async_mongo.py b/src/ralph/backends/data/async_mongo.py index 02272b336..6c2fc2d9a 100644 --- a/src/ralph/backends/data/async_mongo.py +++ b/src/ralph/backends/data/async_mongo.py @@ -1,10 +1,9 @@ """Async MongoDB data backend for Ralph.""" -import json import logging from io import IOBase from itertools import chain -from typing import Any, Dict, Iterable, Iterator, Optional, TypeVar, Union +from typing import Iterable, Iterator, Optional, TypeVar, Union from bson.errors import BSONError from motor.motor_asyncio import AsyncIOMotorClient @@ -18,7 +17,7 @@ MongoQuery, ) from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict +from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_iterable_to_dict from ..data.base import ( AsyncListable, @@ -140,16 +139,34 @@ async def read( # noqa: PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead. raw_output (bool): Whether to yield dictionaries or bytes. - ignore_errors (bool): Whether to ignore errors when reading documents. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. Yield: bytes: The next raw document if `raw_output` is True. dict: The next JSON parsed document if `raw_output` is False. Raise: - BackendException: If a failure occurs during MongoDB connection. - BackendParameterException: If a failure occurs with MongoDB collection. + BackendException: If a failure occurs during MongoDB connection or + during encoding documents and `ignore_errors` is set to `False`. + BackendParameterException: If the `target` is not a valid collection name. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + async for document in async_parse_dict_to_bytes( + documents, self.settings.LOCALE_ENCODING, ignore_errors, logger + ): + yield document + + return + if not chunk_size: chunk_size = self.settings.DEFAULT_CHUNK_SIZE @@ -163,20 +180,10 @@ async def read( # noqa: PLR0913 logger.error(msg, target, error) raise BackendParameterException(msg % (target, error)) from error - reader = self._read_raw if raw_output else lambda _: _ try: async for document in collection.find(batch_size=chunk_size, **query): document.update({"_id": str(document.get("_id"))}) - try: - yield reader(document) - except (TypeError, ValueError) as error: - msg = "Failed to encode MongoDB document with ID %s: %s" - document_id = document.get("_id") - logger.error(msg, document_id, error) - if ignore_errors: - logger.warning(msg, document_id, error) - continue - raise BackendException(msg % (document_id, error)) from error + yield document except (PyMongoError, IndexError, TypeError, ValueError) as error: msg = "Failed to execute MongoDB query: %s" logger.error(msg, error) @@ -197,7 +204,9 @@ async def write( # noqa: PLR0913 target (str or None): The target MongoDB collection name. chunk_size (int or None): The number of documents to write in one batch. If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead. - ignore_errors (bool): Whether to ignore errors or not. + ignore_errors (bool): If `True`, errors during decoding, encoding and + sending batches of documents are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -206,8 +215,8 @@ async def write( # noqa: PLR0913 int: The number of documents written. Raise: - BackendException: If a failure occurs while writing to MongoDB or - during document decoding and `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If the `operation_type` is `APPEND` as it is not supported. """ @@ -235,29 +244,26 @@ async def write( # noqa: PLR0913 try: first_record = next(data) except StopIteration: - logger.warning("Data Iterator is empty; skipping write to target.") + logger.info("Data Iterator is empty; skipping write to target.") return count data = chain([first_record], data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_iterable_to_dict(data, ignore_errors, logger) if operation_type == BaseOperationType.UPDATE: - for batch in MongoDataBackend.iter_by_batch( - MongoDataBackend.to_replace_one(data), chunk_size - ): + data = MongoDataBackend.to_replace_one(data) + for batch in iter_by_batch(data, chunk_size): count += await self._bulk_update(batch, ignore_errors, collection) logger.info("Updated %d documents with success", count) elif operation_type == BaseOperationType.DELETE: - for batch in MongoDataBackend.iter_by_batch( - MongoDataBackend.to_ids(data), chunk_size - ): + for batch in iter_by_batch(MongoDataBackend.to_ids(data), chunk_size): count += await self._bulk_delete(batch, ignore_errors, collection) logger.info("Deleted %d documents with success", count) else: data = MongoDataBackend.to_documents( data, ignore_errors, operation_type, logger ) - for batch in MongoDataBackend.iter_by_batch(data, chunk_size): + for batch in iter_by_batch(data, chunk_size): count += await self._bulk_import(batch, ignore_errors, collection) logger.info("Inserted %d documents with success", count) @@ -326,7 +332,3 @@ async def _bulk_update(batch: list, ignore_errors: bool, collection: Collection) modified_count = updated_documents.modified_count logger.debug("Updated %d documents chunk with success", modified_count) return modified_count - - def _read_raw(self, document: Dict[str, Any]) -> bytes: - """Read the `document` dictionary and return bytes.""" - return json.dumps(document).encode(self.settings.LOCALE_ENCODING) diff --git a/src/ralph/backends/data/base.py b/src/ralph/backends/data/base.py index fd60a0d6e..39731d099 100644 --- a/src/ralph/backends/data/base.py +++ b/src/ralph/backends/data/base.py @@ -102,9 +102,8 @@ def write( # noqa: PLR0913 chunk_size (int or None): The number of records or bytes to write in one batch, depending on whether `data` contains dictionaries or bytes. If `chunk_size` is `None`, a default value is used instead. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, escapable errors are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -113,8 +112,8 @@ def write( # noqa: PLR0913 int: The number of written records. Raise: - BackendException: If a failure during the write operation occurs and - `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If a backend argument value is not valid. """ @@ -262,17 +261,17 @@ def read( # noqa: PLR0913 are encoded as JSON. If the records are bytes and `raw_output` is set to `False`, they are decoded as JSON by line. - ignore_errors (bool): If `True`, errors during the read operation - are be ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. Yield: dict: If `raw_output` is False. bytes: If `raw_output` is True. Raise: - BackendException: If a failure during the read operation occurs and - `ignore_errors` is set to `False`. + BackendException: If a failure during the read operation occurs or + during encoding records and `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ @@ -322,9 +321,8 @@ async def write( # noqa: PLR0913 chunk_size (int or None): The number of records or bytes to write in one batch, depending on whether `data` contains dictionaries or bytes. If `chunk_size` is `None`, a default value is used instead. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, escapable errors are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -333,8 +331,8 @@ async def write( # noqa: PLR0913 int: The number of written records. Raise: - BackendException: If a failure during the write operation occurs and - `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If a backend argument value is not valid. """ @@ -447,17 +445,17 @@ async def read( # noqa: PLR0913 are encoded as JSON. If the records are bytes and `raw_output` is set to `False`, they are decoded as JSON by line. - ignore_errors (bool): If `True`, errors during the read operation - are be ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. Yield: dict: If `raw_output` is False. bytes: If `raw_output` is True. Raise: - BackendException: If a failure during the read operation occurs and - `ignore_errors` is set to `False`. + BackendException: If a failure during the read operation occurs or + during encoding records and `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ diff --git a/src/ralph/backends/data/clickhouse.py b/src/ralph/backends/data/clickhouse.py index 5373da0db..a4da48c85 100755 --- a/src/ralph/backends/data/clickhouse.py +++ b/src/ralph/backends/data/clickhouse.py @@ -35,6 +35,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions from ralph.exceptions import BackendException, BackendParameterException +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -206,7 +207,7 @@ def list( yield str(table.get("name")) @enforce_query_checks - def read( # noqa: PLR0912, PLR0913 + def read( # noqa: PLR0913 self, *, query: Optional[Union[str, ClickHouseQuery]] = None, @@ -224,17 +225,30 @@ def read( # noqa: PLR0912, PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. If chunk_size is `None` it defaults to `default_chunk_size`. raw_output (bool): Controls whether to yield dictionaries or bytes. - ignore_errors (bool): If `True`, errors during the encoding operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. Yield: bytes: The next raw document if `raw_output` is True. dict: The next JSON parsed document if `raw_output` is False. Raise: - BackendException: If a failure occurs during ClickHouse connection. + BackendException: If a failure occurs during ClickHouse connection or + during encoding documents and `ignore_errors` is set to `False`. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return + if target is None: target = self.event_table_name @@ -269,8 +283,6 @@ def read( # noqa: PLR0912, PLR0913 if query.limit: sql += f"\nLIMIT {query.limit}" - reader = self._read_raw if raw_output else self._read_json - logger.debug( "Start reading the %s table of the %s database (chunk size: %d)", target, @@ -284,16 +296,9 @@ def read( # noqa: PLR0912, PLR0913 settings={"buffer_size": chunk_size}, column_oriented=query.column_oriented, ).named_results() - for statement in result: - try: - yield reader(statement) - except (TypeError, ValueError) as error: - msg = "Failed to encode document %s: %s" - if ignore_errors: - logger.warning(msg, statement, error) - continue - logger.error(msg, statement, error) - raise BackendException(msg % (statement, error)) from error + yield from parse_iterable_to_dict( + result, ignore_errors, logger, self._parse_event_json + ) except (ClickHouseError, IndexError, TypeError, ValueError) as error: msg = "Failed to read documents: %s" logger.error(msg, error) @@ -315,9 +320,9 @@ def write( # noqa: PLR0913 If target is `None`, the `event_table_name` is used instead. chunk_size (int or None): The number of documents to write in one batch. If `chunk_size` is `None` it defaults to `default_chunk_size`. - ignore_errors (bool): If `True`, errors during the write operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, errors during decoding, encoding and + sending batches of documents are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -326,8 +331,8 @@ def write( # noqa: PLR0913 int: The number of documents written. Raise: - BackendException: If a failure occurs while writing to ClickHouse or - during document decoding and `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If the `operation_type` is `APPEND`, `UPDATE` or `DELETE` as it is not supported. """ @@ -352,7 +357,7 @@ def write( # noqa: PLR0913 data = chain([first_record], data) if isinstance(first_record, bytes): - data = self._parse_bytes_to_dict(data, ignore_errors) + data = parse_iterable_to_dict(data, ignore_errors, logger) if operation_type not in [BaseOperationType.CREATE, BaseOperationType.INDEX]: msg = "%s operation_type is not allowed." @@ -361,30 +366,9 @@ def write( # noqa: PLR0913 # operation_type is either CREATE or INDEX count = 0 - batch = [] - - for insert_tuple in self._to_insert_tuples( - data, - ignore_errors=ignore_errors, - ): - batch.append(insert_tuple) - if len(batch) < chunk_size: - continue - - count += self._bulk_import( - batch, - ignore_errors=ignore_errors, - event_table_name=target, - ) - batch = [] - - # Edge case: if the total number of documents is lower than the chunk size - if len(batch) > 0: - count += self._bulk_import( - batch, - ignore_errors=ignore_errors, - event_table_name=target, - ) + insert_tuples = self._to_insert_tuples(data, ignore_errors) + for batch in iter_by_batch(insert_tuples, chunk_size): + count += self._bulk_import(batch, ignore_errors, target) logger.info("Inserted a total of %d documents with success", count) @@ -473,35 +457,9 @@ def _bulk_import( return inserted_count @staticmethod - def _parse_bytes_to_dict( - raw_documents: Iterable[bytes], ignore_errors: bool - ) -> Iterator[dict]: - """Read the `raw_documents` Iterable and yield dictionaries.""" - for raw_document in raw_documents: - try: - yield json.loads(raw_document) - except (TypeError, json.JSONDecodeError) as error: - if ignore_errors: - logger.warning( - "Raised error: %s, for document %s", error, raw_document - ) - continue - logger.error("Raised error: %s, for document %s", error, raw_document) - raise error - - @staticmethod - def _read_json(document: Dict[str, Any]) -> Dict[str, Any]: - """Read the `documents` row and yield for the event JSON.""" + def _parse_event_json(document: Dict[str, Any]) -> Dict[str, Any]: + """Return the `document` with a JSON parsed `event` field.""" if "event" in document: document["event"] = json.loads(document["event"]) return document - - def _read_raw(self, document: Dict[str, Any]) -> bytes: - """Read the `documents` Iterable and yield bytes.""" - # We want to return a JSON structure of the whole row, so if the event string - # is in there we first need to serialize it so that we can deserialize the - # whole thing. - document = self._read_json(document) - - return json.dumps(document).encode(self.locale_encoding) diff --git a/src/ralph/backends/data/es.py b/src/ralph/backends/data/es.py index 49dcb4253..be1369959 100644 --- a/src/ralph/backends/data/es.py +++ b/src/ralph/backends/data/es.py @@ -22,7 +22,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions, CommaSeparatedTuple from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -199,7 +199,7 @@ def list( yield index @enforce_query_checks - def read( # noqa: PLR0912, PLR0913 + def read( # noqa: PLR0913 self, *, query: Optional[Union[str, ESQuery]] = None, @@ -219,7 +219,8 @@ def read( # noqa: PLR0912, PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`. raw_output (bool): Controls whether to yield dictionaries or bytes. - ignore_errors (bool): Ignored. + ignore_errors (bool): No impact as encoding errors are not expected in + Elasticsearch results. Yield: bytes: The next raw document if `raw_output` is True. @@ -228,10 +229,19 @@ def read( # noqa: PLR0912, PLR0913 Raise: BackendException: If a failure occurs during Elasticsearch connection. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return target = target if target else self.settings.DEFAULT_INDEX chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE - if ignore_errors: - logger.warning("The `ignore_errors` argument is ignored") if not query.pit.keep_alive: query.pit.keep_alive = self.settings.POINT_IN_TIME_KEEP_ALIVE @@ -269,12 +279,7 @@ def read( # noqa: PLR0912, PLR0913 if count: query.search_after = [str(part) for part in documents[-1]["sort"]] kwargs["search_after"] = query.search_after - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) - for document in documents: - yield document + yield from documents def write( # noqa: PLR0913 self, @@ -292,9 +297,9 @@ def write( # noqa: PLR0913 If target is `None`, the `DEFAULT_INDEX` is used instead. chunk_size (int or None): The number of documents to write in one batch. If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`. - ignore_errors (bool): If `True`, errors during the write operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, errors during decoding, encoding and + sending batches of documents are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. @@ -303,8 +308,8 @@ def write( # noqa: PLR0913 int: The number of documents written. Raise: - BackendException: If a failure occurs while writing to Elasticsearch or - during document decoding and `ignore_errors` is set to `False`. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If the `operation_type` is `APPEND` as it is not supported. """ @@ -326,7 +331,7 @@ def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_iterable_to_dict(data, ignore_errors, logger) logger.debug( "Start writing to the %s index (chunk size: %d)", target, chunk_size diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index e3d160ee9..5bf826334 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -1,13 +1,12 @@ """FileSystem data backend for Ralph.""" -import json import logging import os from datetime import datetime, timezone -from io import IOBase +from io import BufferedReader, IOBase from itertools import chain from pathlib import Path -from typing import IO, Iterable, Iterator, Optional, TypeVar, Union +from typing import Iterable, Iterator, Optional, Tuple, TypeVar, Union from uuid import uuid4 from ralph.backends.data.base import ( @@ -23,7 +22,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -113,11 +112,11 @@ def list( details (bool): Get detailed file information instead of just file paths. new (bool): Given the history, list only not already read files. - Yields: + Yield: str: The next file path. (If details is False). dict: The next file details. (If details is True). - Raises: + Raise: BackendParameterException: If the `target` argument is not a directory path. """ target: Path = Path(target) if target else self.default_directory @@ -172,42 +171,27 @@ def read( # noqa: PLR0913 chunk_size (int or None): The chunk size when reading documents by batches. Ignored if `raw_output` is set to False. raw_output (bool): Controls whether to yield bytes or dictionaries. - ignore_errors (bool): If `True`, errors during the read operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. - Yields: + Yield: bytes: The next chunk of the read files if `raw_output` is True. dict: The next JSON parsed line of the read files if `raw_output` is False. - Raises: - BackendException: If a failure during the read operation occurs and - `ignore_errors` is set to `False`. + Raise: + BackendException: If a failure during the read operation occurs or + during JSON encoding lines and `ignore_errors` is set to `False`. """ - if not query.query_string: - query.query_string = self.default_query_string - if not chunk_size: chunk_size = self.default_chunk_size - target = Path(target) if target else self.default_directory - if not target.is_absolute() and target != self.default_directory: - target = self.default_directory / target - paths = list( - filter(lambda path: path.is_file(), target.glob(query.query_string)) - ) - - if not paths: - logger.info("No file found for query: %s", target / query.query_string) - return - - logger.debug("Reading matching files: %s", paths) - - for path in paths: - with path.open("rb") as file: - reader = self._read_raw if raw_output else self._read_dict - for chunk in reader(file, chunk_size, ignore_errors): + for file, path in self._iter_files_matching_query(target, query): + if raw_output: + while chunk := file.read(chunk_size): yield chunk + else: + yield from parse_iterable_to_dict(file, ignore_errors, logger) # The file has been read, add a new entry to the history. self.append_to_history( @@ -224,12 +208,34 @@ def read( # noqa: PLR0913 } ) + def _iter_files_matching_query( + self, target: Union[str, None], query: BaseQuery + ) -> Iterator[Tuple[BufferedReader, Path]]: + """Return file/path tuples for files matching the query in the target folder.""" + if not query.query_string: + query.query_string = self.default_query_string + + path = Path(target) if target else self.default_directory + if not path.is_absolute() and path != self.default_directory: + path = self.default_directory / path + + paths = list(filter(lambda x: x.is_file(), path.glob(query.query_string))) + if not paths: + msg = "No file found for query: %s" + logger.info(msg, path / Path(str(query.query_string))) + return + + logger.debug("Reading matching files: %s", paths) + for path in paths: + with path.open("rb") as file: + yield file, path + def write( # noqa: PLR0913 self, data: Union[IOBase, Iterable[bytes], Iterable[dict]], target: Optional[str] = None, chunk_size: Optional[int] = None, # noqa: ARG002 - ignore_errors: bool = False, # noqa: ARG002 + ignore_errors: bool = False, operation_type: Optional[BaseOperationType] = None, ) -> int: """Write data records to the target file and return their count. @@ -242,7 +248,9 @@ def write( # noqa: PLR0913 If target is `None`, a random (uuid4) file is created in the `default_directory_path` and used as the target instead. chunk_size (int or None): Ignored. - ignore_errors (bool): Ignored. + ignore_errors (bool): If `True`, errors during decoding and encoding of + records are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If operation_type is `CREATE` or `INDEX`, the target file is expected to be absent. If the target file exists a `FileExistsError` is raised. @@ -250,12 +258,14 @@ def write( # noqa: PLR0913 If operation_type is `APPEND`, the data is appended to the end of the target file. - Returns: + Return: int: The number of written files. - Raises: - BackendException: If the `operation_type` is `CREATE` or `INDEX` and the - target file already exists. + Raise: + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. + E.g.: the `operation_type` is `CREATE` or `INDEX` and the target file + already exists. BackendParameterException: If the `operation_type` is `DELETE` as it is not supported. """ @@ -265,6 +275,13 @@ def write( # noqa: PLR0913 except StopIteration: logger.info("Data Iterator is empty; skipping write to target.") return 0 + + data = chain((first_record,), data) + if isinstance(first_record, dict): + data = parse_dict_to_bytes( + data, self.locale_encoding, ignore_errors, logger + ) + if not operation_type: operation_type = self.default_operation_type @@ -296,11 +313,14 @@ def write( # noqa: PLR0913 mode = "ab" logger.debug("Appending to file: %s", path) - with path.open(mode) as file: - is_dict = isinstance(first_record, dict) - writer = self._write_dict if is_dict else self._write_raw - for chunk in chain((first_record,), data): - writer(file, chunk) + try: + with path.open(mode) as file: + for chunk in data: + file.write(chunk) + except OSError as error: + msg = "Failed to write to %s: %s" + logger.error(msg, path, error) + raise BackendException(msg % (path, error)) from error # The file has been created, add a new entry to the history. self.append_to_history( @@ -319,34 +339,5 @@ def write( # noqa: PLR0913 return 1 def close(self) -> None: - """FS backend has nothing to close, this method is not implemented.""" - msg = "FS data backend does not support `close` method" - logger.error(msg) - raise NotImplementedError(msg) - - @staticmethod - def _read_raw(file: IO, chunk_size: int, _ignore_errors: bool) -> Iterator[bytes]: - """Read the `file` in chunks of size `chunk_size` and yield them.""" - while chunk := file.read(chunk_size): - yield chunk - - @staticmethod - def _read_dict(file: IO, _chunk_size: int, ignore_errors: bool) -> Iterator[dict]: - """Read the `file` by line and yield JSON parsed dictionaries.""" - for i, line in enumerate(file): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s, in file %s at line %s" - logger.error(msg, err, file, i) - if not ignore_errors: - raise BackendException(msg % (err, file, i)) from err - - @staticmethod - def _write_raw(file: IO, chunk: bytes) -> None: - """Write the `chunk` bytes to the file.""" - file.write(chunk) - - def _write_dict(self, file: IO, chunk: dict) -> None: - """Write the `chunk` dictionary to the file.""" - file.write(bytes(f"{json.dumps(chunk)}\n", encoding=self.locale_encoding)) + """FS backend has no open connections to close. No action.""" + logger.info("No open connections to close; skipping") diff --git a/src/ralph/backends/data/ldp.py b/src/ralph/backends/data/ldp.py index adf194fe0..63a544e06 100644 --- a/src/ralph/backends/data/ldp.py +++ b/src/ralph/backends/data/ldp.py @@ -114,11 +114,11 @@ def list( details (bool): Get detailed archive information in addition to archive IDs. new (bool): Given the history, list only not already read archives. - Yields: + Yield: str: If `details` is False. dict: If `details` is True. - Raises: + Raise: BackendParameterException: If the `target` is `None` and no `DEFAULT_STREAM_ID` is given. BackendException: If a failure during retrieval of archives list occurs. @@ -167,13 +167,13 @@ def read( # noqa: PLR0913 If target is `None`, the `DEFAULT_STREAM_ID` is used instead. chunk_size (int or None): The chunk size when reading archives by batch. raw_output (bool): Ignored. Always set to `True`. - ignore_errors (bool): Ignored. + ignore_errors (bool): No impact as no encoding operation is performed. - Yields: + Yield: bytes: The content of the archive matching the query. - Raises: - BackendException: If a failure during the read operation occurs. + Raise: + BackendException: If a failure occurs during LDP connection. BackendParameterException: If the `query` argument is not an archive name. """ if query.query_string is None: diff --git a/src/ralph/backends/data/mongo.py b/src/ralph/backends/data/mongo.py index 8f38aa64b..2f4d4e3f5 100644 --- a/src/ralph/backends/data/mongo.py +++ b/src/ralph/backends/data/mongo.py @@ -36,7 +36,7 @@ ) from ralph.conf import BaseSettingsConfig, ClientOptions from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import parse_bytes_to_dict, read_raw +from ralph.utils import iter_by_batch, parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -115,7 +115,7 @@ def __init__(self, settings: Optional[Settings] = None): def status(self) -> DataBackendStatus: """Check the MongoDB connection status. - Returns: + Return: DataBackendStatus: The status of the data backend. """ # Check MongoDB connection. @@ -150,7 +150,7 @@ def list( details (bool): Get detailed collection information instead of just IDs. new (bool): Ignored. - Raises: + Raise: BackendException: If a failure during the list operation occurs. BackendParameterException: If the `target` is not a valid database name. """ @@ -194,16 +194,31 @@ def read( # noqa: PLR0913 chunk_size (int or None): The chunk size when reading archives by batch. If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead. raw_output (bool): Whether to yield dictionaries or bytes. - ignore_errors (bool): Whether to ignore errors when reading documents. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. - Yields: + Yield: dict: If `raw_output` is False. bytes: If `raw_output` is True. - Raises: - BackendException: If a failure during the read operation occurs. + Raise: + BackendException: If a failure occurs during MongoDB connection or + during encoding documents and `ignore_errors` is set to `False`. BackendParameterException: If the `target` is not a valid collection name. """ + if raw_output: + documents = self.read( + query=query, + target=target, + chunk_size=chunk_size, + raw_output=False, + ignore_errors=ignore_errors, + ) + locale = self.settings.LOCALE_ENCODING + yield from parse_dict_to_bytes(documents, locale, ignore_errors, logger) + return + if not chunk_size: chunk_size = self.settings.DEFAULT_CHUNK_SIZE @@ -221,12 +236,7 @@ def read( # noqa: PLR0913 try: documents = collection.find(batch_size=chunk_size, **query) documents = (d.update({"_id": str(d.get("_id"))}) or d for d in documents) - if raw_output: - documents = read_raw( - documents, self.settings.LOCALE_ENCODING, ignore_errors, logger - ) - for document in documents: - yield document + yield from documents except (PyMongoError, IndexError, TypeError, ValueError) as error: msg = "Failed to execute MongoDB query: %s" logger.error(msg, error) @@ -247,17 +257,19 @@ def write( # noqa: PLR0913 target (str or None): The target MongoDB collection name. chunk_size (int or None): The number of documents to write in one batch. If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead. - ignore_errors (bool): Whether to ignore errors or not. + ignore_errors (bool): If `True`, errors during decoding, encoding and + sending batches of documents are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. - Returns: + Return: int: The number of documents written. - Raises: - BackendException: If a failure occurs while writing to MongoDB or - during document decoding and `ignore_errors` is set to `False`. + Raise: + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If the `operation_type` is `APPEND` as it is not supported. """ @@ -289,19 +301,19 @@ def write( # noqa: PLR0913 return count data = chain([first_record], data) if isinstance(first_record, bytes): - data = parse_bytes_to_dict(data, ignore_errors, logger) + data = parse_iterable_to_dict(data, ignore_errors, logger) if operation_type == BaseOperationType.UPDATE: - for batch in self.iter_by_batch(self.to_replace_one(data), chunk_size): + for batch in iter_by_batch(self.to_replace_one(data), chunk_size): count += self._bulk_update(batch, ignore_errors, collection) logger.info("Updated %d documents with success", count) elif operation_type == BaseOperationType.DELETE: - for batch in self.iter_by_batch(self.to_ids(data), chunk_size): + for batch in iter_by_batch(self.to_ids(data), chunk_size): count += self._bulk_delete(batch, ignore_errors, collection) logger.info("Deleted %d documents with success", count) else: data = self.to_documents(data, ignore_errors, operation_type, logger) - for batch in self.iter_by_batch(data, chunk_size): + for batch in iter_by_batch(data, chunk_size): count += self._bulk_import(batch, ignore_errors, collection) logger.info("Inserted %d documents with success", count) @@ -320,18 +332,6 @@ def close(self) -> None: logger.error(msg, error) raise BackendException(msg % error) from error - @staticmethod - def iter_by_batch(data: Iterable[dict], chunk_size: int): - """Iterate over `data` Iterable and yield batches of size `chunk_size`.""" - batch = [] - for document in data: - batch.append(document) - if len(batch) >= chunk_size: - yield batch - batch = [] - if batch: - yield batch - @staticmethod def to_ids(data: Iterable[dict]) -> Iterable[str]: """Convert `data` statements to ids.""" diff --git a/src/ralph/backends/data/s3.py b/src/ralph/backends/data/s3.py index c04ca5b73..d2eb12893 100644 --- a/src/ralph/backends/data/s3.py +++ b/src/ralph/backends/data/s3.py @@ -1,6 +1,5 @@ """S3 data backend for Ralph.""" -import json import logging from io import IOBase from itertools import chain @@ -16,7 +15,6 @@ ReadTimeoutError, ResponseStreamingError, ) -from botocore.response import StreamingBody from requests_toolbelt import StreamingIterator from ralph.backends.data.base import ( @@ -32,7 +30,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -121,11 +119,11 @@ def list( details (bool): Get detailed object information instead of just object name. new (bool): Given the history, list only unread files. - Yields: + Yield: str: The next object name. (If details is False). dict: The next object details. (If details is True). - Raises: + Raise: BackendException: If a failure occurs. """ if target is None: @@ -169,23 +167,22 @@ def read( # noqa: PLR0913 Args: query: (str or BaseQuery): The ID of the object to read. - target (str or None): The target bucket containing the objects. + target (str or None): The target bucket containing the object. If target is `None`, the `default_bucket` is used instead. chunk_size (int or None): The chunk size when reading objects by batch. raw_output (bool): Controls whether to yield bytes or dictionaries. - ignore_errors (bool): If `True`, errors during the read operation - will be ignored and logged. If `False` (default), a `BackendException` - will be raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. - Yields: + Yield: dict: If `raw_output` is False. bytes: If `raw_output` is True. - Raises: - BackendException: If a failure during the read operation occurs and - `ignore_errors` is set to `False`. - BackendParameterException: If a backend argument value is not valid and - `ignore_errors` is set to `False`. + Raise: + BackendException: If a connection failure occurs while reading from S3 or + during object encoding and `ignore_errors` is set to `False`. + BackendParameterException: If a backend argument value is not valid. """ if query.query_string is None: msg = "Invalid query. The query should be a valid object name." @@ -204,18 +201,18 @@ def read( # noqa: PLR0913 error_msg = err.response["Error"]["Message"] msg = "Failed to download %s: %s" logger.error(msg, query.query_string, error_msg) - if not ignore_errors: - raise BackendException(msg % (query.query_string, error_msg)) from err + raise BackendException(msg % (query.query_string, error_msg)) from err - reader = self._read_raw if raw_output else self._read_dict try: - for chunk in reader(response["Body"], chunk_size, ignore_errors): - yield chunk + if raw_output: + yield from response["Body"].iter_chunks(chunk_size) + else: + lines = response["Body"].iter_lines(chunk_size) + yield from parse_iterable_to_dict(lines, ignore_errors, logger) except (ReadTimeoutError, ResponseStreamingError) as err: msg = "Failed to read chunk from object %s" logger.error(msg, query.query_string) - if not ignore_errors: - raise BackendException(msg % (query.query_string)) from err + raise BackendException(msg % (query.query_string)) from err # Archive fetched, add a new entry to the history. self.append_to_history( @@ -247,9 +244,9 @@ def write( # noqa: PLR0913 If target does not contain a `/`, it is assumed to be the target object and the default bucket is used. chunk_size (int or None): Ignored. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, errors during decoding and encoding of + records are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If operation_type is `CREATE` or `INDEX`, the target object is @@ -260,7 +257,8 @@ def write( # noqa: PLR0913 int: The number of written objects. Raise: - BackendException: If a failure during the write operation occurs. + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If a backend argument value is not valid. """ data = iter(data) @@ -307,7 +305,9 @@ def write( # noqa: PLR0913 data = chain((first_record,), data) if isinstance(first_record, dict): - data = self._parse_dict_to_bytes(data, ignore_errors) + data = parse_dict_to_bytes( + data, self.settings.LOCALE_ENCODING, ignore_errors, logger + ) counter = {"count": 0} data = self._count(data, counter) @@ -361,48 +361,8 @@ def close(self) -> None: raise BackendException(msg % error) from error @staticmethod - def _read_raw( - obj: StreamingBody, chunk_size: int, _ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `object` in chunks of size `chunk_size` and yield them.""" - for chunk in obj.iter_chunks(chunk_size): - yield chunk - - @staticmethod - def _read_dict( - obj: StreamingBody, chunk_size: int, ignore_errors: bool - ) -> Iterator[dict]: - """Read the `object` by line and yield JSON parsed dictionaries.""" - for line in obj.iter_lines(chunk_size): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s" - logger.error(msg, err) - if not ignore_errors: - raise BackendException(msg % err) from err - - @staticmethod - def _parse_dict_to_bytes( - statements: Iterable[dict], ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `statements` Iterable and yield bytes.""" - for statement in statements: - try: - yield bytes(f"{json.dumps(statement)}\n", encoding="utf-8") - except TypeError as error: - msg = "Failed to encode JSON: %s, for document %s" - logger.error(msg, error, statement) - if ignore_errors: - continue - raise BackendException(msg % (error, statement)) from error - - @staticmethod - def _count( - statements: Union[Iterable[bytes], Iterable[dict]], - counter: dict, - ) -> Iterator: + def _count(statements: Iterable, counter: dict) -> Iterator: """Count the elements in the `statements` Iterable and yield element.""" for statement in statements: - counter["count"] += 1 yield statement + counter["count"] += 1 diff --git a/src/ralph/backends/data/swift.py b/src/ralph/backends/data/swift.py index 77cc52817..58a2e8434 100644 --- a/src/ralph/backends/data/swift.py +++ b/src/ralph/backends/data/swift.py @@ -1,9 +1,9 @@ """Base data backend for Ralph.""" -import json import logging from functools import cached_property from io import IOBase +from itertools import chain from typing import Iterable, Iterator, Optional, Union from uuid import uuid4 @@ -22,7 +22,7 @@ from ralph.backends.mixins import HistoryMixin from ralph.conf import BaseSettingsConfig from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import now +from ralph.utils import now, parse_dict_to_bytes, parse_iterable_to_dict logger = logging.getLogger(__name__) @@ -110,7 +110,7 @@ def connection(self): def status(self) -> DataBackendStatus: """Implement data backend checks (e.g. connection, cluster status). - Returns: + Return: DataBackendStatus: The status of the data backend. """ try: @@ -133,11 +133,11 @@ def list( details (bool): Get detailed object information instead of just names. new (bool): Given the history, list only not already read objects. - Yields: + Yield: str: The next object path. (If details is False) dict: The next object details. (If `details` is True.) - Raises: + Raise: BackendException: If a failure occurs. """ if target is None: @@ -184,24 +184,23 @@ def read( # noqa: PLR0913 are encoded as JSON. If the objects are bytes and `raw_output` is set to `False`, they are decoded as JSON by line. - ignore_errors (bool): If `True`, errors during the read operation - are be ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, encoding errors during the read operation + will be ignored and logged. + If `False` (default), a `BackendException` is raised on any error. - Yields: + Yield: dict: If `raw_output` is False. bytes: If `raw_output` is True. - Raises: - BackendException: If a failure during the read operation occurs and - `ignore_errors` is set to `False`. + Raise: + BackendException: If a failure during the read operation occurs or + during encoding records and `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ if query.query_string is None: msg = "Invalid query. The query should be a valid archive name." logger.error(msg) - if not ignore_errors: - raise BackendParameterException(msg) + raise BackendParameterException(msg) target = target if target else self.default_container @@ -221,13 +220,13 @@ def read( # noqa: PLR0913 msg = "Failed to read %s: %s" error = err.msg logger.error(msg, query.query_string, error) - if not ignore_errors: - raise BackendException(msg % (query.query_string, error)) from err - - reader = self._read_raw if raw_output else self._read_dict + raise BackendException(msg % (query.query_string, error)) from err - for chunk in reader(content, chunk_size, ignore_errors): - yield chunk + if raw_output: + while chunk := content.read(chunk_size): + yield chunk + else: + yield from parse_iterable_to_dict(content, ignore_errors, logger) # Archive read, add a new entry to the history self.append_to_history( @@ -240,7 +239,7 @@ def read( # noqa: PLR0913 } ) - def write( # noqa: PLR0912, PLR0913 + def write( # noqa: PLR0913 self, data: Union[IOBase, Iterable[bytes], Iterable[dict]], target: Optional[str] = None, @@ -255,26 +254,37 @@ def write( # noqa: PLR0912, PLR0913 target (str or None): The target container name. If `target` is `None`, a default value is used instead. chunk_size (int or None): Ignored. - ignore_errors (bool): If `True`, errors during the write operation - are ignored and logged. If `False` (default), a `BackendException` - is raised if an error occurs. + ignore_errors (bool): If `True`, errors during decoding and encoding of + records are ignored and logged. + If `False` (default), a `BackendException` is raised on any error. operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. - Returns: + Return: int: The number of written records. - Raises: - BackendException: If a failure during the write operation occurs and - `ignore_errors` is set to `False`. + Raise: + BackendException: If any failure occurs during the write operation or + if an inescapable failure occurs and `ignore_errors` is set to `True`. BackendParameterException: If a backend argument value is not valid. """ + data = iter(data) try: - first_record = next(iter(data)) + first_record = next(data) except StopIteration: logger.info("Data Iterator is empty; skipping write to target.") return 0 + + data = chain((first_record,), data) + if isinstance(first_record, dict): + data = parse_dict_to_bytes( + data, self.settings.LOCALE_ENCODING, ignore_errors, logger + ) + + counter = {"count": 0} + data = self._count(data, counter) + if not operation_type: operation_type = self.default_operation_type @@ -303,21 +313,13 @@ def write( # noqa: PLR0912, PLR0913 ]: msg = "%s operation_type is not allowed." logger.error(msg, operation_type.name) - if not ignore_errors: - raise BackendParameterException(msg % operation_type.name) + raise BackendParameterException(msg % operation_type.name) if operation_type in [BaseOperationType.CREATE, BaseOperationType.INDEX]: if target_object in list(self.list(target=target_container)): msg = "%s already exists and overwrite is not allowed for operation %s" logger.error(msg, target_object, operation_type) - if not ignore_errors: - raise BackendException(msg % (target_object, operation_type)) - - if isinstance(first_record, dict): - data = [ - json.dumps(statement).encode(self.locale_encoding) - for statement in data - ] + raise BackendException(msg % (target_object, operation_type)) try: self.connection.put_object( @@ -330,10 +332,9 @@ def write( # noqa: PLR0912, PLR0913 msg = "Failed to write to object %s: %s" error = err.msg logger.error(msg, target_object, error) - if not ignore_errors: - raise BackendException(msg % (target_object, error)) from err + raise BackendException(msg % (target_object, error)) from err - count = sum(1 for _ in data) + count = counter["count"] logging.info("Successfully written %s statements to %s", count, target) # Archive written, add a new entry to the history @@ -382,23 +383,8 @@ def _details(self, container: str, name: str): } @staticmethod - def _read_dict( - obj: Iterable, _chunk_size: int, ignore_errors: bool - ) -> Iterator[dict]: - """Read the `object` by line and yield JSON parsed dictionaries.""" - for i, line in enumerate(obj): - try: - yield json.loads(line) - except (TypeError, json.JSONDecodeError) as err: - msg = "Raised error: %s, at line %s" - logger.error(msg, err, i) - if not ignore_errors: - raise BackendException(msg % (err, i)) from err - - @staticmethod - def _read_raw( - obj: Iterable, chunk_size: int, _ignore_errors: bool - ) -> Iterator[bytes]: - """Read the `object` by line and yield bytes.""" - while chunk := obj.read(chunk_size): - yield chunk + def _count(statements: Iterable, counter: dict) -> Iterator: + """Count the elements in the `statements` Iterable and yield element.""" + for statement in statements: + yield statement + counter["count"] += 1 diff --git a/src/ralph/backends/http/async_lrs.py b/src/ralph/backends/http/async_lrs.py index b97a2462a..a92d40fc1 100644 --- a/src/ralph/backends/http/async_lrs.py +++ b/src/ralph/backends/http/async_lrs.py @@ -8,14 +8,13 @@ from urllib.parse import ParseResult, parse_qs, urljoin, urlparse from httpx import AsyncClient, HTTPError, HTTPStatusError, RequestError -from more_itertools import chunked from pydantic import AnyHttpUrl, BaseModel, Field, parse_obj_as from pydantic.types import PositiveInt from ralph.backends.lrs.base import LRSStatementsQuery from ralph.conf import BaseSettingsConfig, HeadersParameters from ralph.exceptions import BackendException, BackendParameterException -from ralph.utils import gather_with_limited_concurrency +from ralph.utils import gather_with_limited_concurrency, iter_by_batch from .base import ( BaseHTTPBackend, @@ -257,6 +256,9 @@ async def write( # noqa: PLR0913 if not target: target = self.settings.STATEMENTS_ENDPOINT + if not chunk_size: + chunk_size = 500 + target = ParseResult( scheme=urlparse(self.base_url).scheme, netloc=urlparse(self.base_url).netloc, @@ -288,7 +290,7 @@ async def write( # noqa: PLR0913 # Create tasks tasks = set() - for chunk in chunked(data, chunk_size): + for chunk in iter_by_batch(data, chunk_size): tasks.add(self._post_and_raise_for_status(target, chunk, ignore_errors)) # Run POST tasks diff --git a/src/ralph/cli.py b/src/ralph/cli.py index 7e9f5a17d..9367bbe3e 100644 --- a/src/ralph/cli.py +++ b/src/ralph/cli.py @@ -675,7 +675,7 @@ def read( # noqa: PLR0913 iter_over_async(statements) if isasyncgen(statements) else statements ) for statement in statements: - click.echo(statement) + click.echo(statement, nl=False) elif isinstance(backend, BaseStreamBackend): backend.stream(sys.stdout.buffer) elif isinstance(backend, BaseHTTPBackend): diff --git a/src/ralph/utils.py b/src/ralph/utils.py index e1d08b834..c942391ba 100644 --- a/src/ralph/utils.py +++ b/src/ralph/utils.py @@ -8,8 +8,24 @@ from functools import reduce from importlib import import_module from inspect import getmembers, isclass, iscoroutine +from itertools import islice from logging import Logger, getLogger -from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Type, Union +from typing import ( + Any, + AsyncIterable, + AsyncIterator, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, +) from ralph.exceptions import BackendException, UnsupportedBackendException @@ -171,39 +187,81 @@ def statements_are_equivalent(statement_1: dict, statement_2: dict) -> bool: return True -def parse_bytes_to_dict( - raw_documents: Iterable[bytes], ignore_errors: bool, logger_class: logging.Logger +T = TypeVar("T") + + +def parse_iterable_to_dict( + raw_documents: Iterable[T], + ignore_errors: bool, + logger_class: logging.Logger, + parser: Callable[[T], Dict[str, Any]] = json.loads, + exceptions: Tuple[Type[Exception], ...] = (TypeError, json.JSONDecodeError), ) -> Iterator[dict]: """Read the `raw_documents` Iterable and yield dictionaries.""" - for raw_document in raw_documents: + for i, raw_document in enumerate(raw_documents): try: - yield json.loads(raw_document) - except (TypeError, json.JSONDecodeError) as error: - msg = "Failed to decode JSON: %s, for document: %s" + yield parser(raw_document) + except exceptions as error: + msg = "Failed to decode JSON: %s, for document: %s, at line %s" if ignore_errors: - logger_class.warning(msg, error, raw_document) + logger_class.warning(msg, error, raw_document, i) continue - logger_class.error(msg, error, raw_document) - raise BackendException(msg % (error, raw_document)) from error + logger_class.error(msg, error, raw_document, i) + raise BackendException(msg % (error, raw_document, i)) from error -def read_raw( +def parse_dict_to_bytes( documents: Iterable[Dict[str, Any]], encoding: str, ignore_errors: bool, logger_class: logging.Logger, ) -> Iterator[bytes]: """Read the `documents` Iterable with the `encoding` and yield bytes.""" - for document in documents: + for i, document in enumerate(documents): + try: + yield f"{json.dumps(document)}\n".encode(encoding) + except (TypeError, ValueError) as error: + msg = "Failed to encode JSON: %s, for document: %s, at line %s" + if ignore_errors: + logger_class.warning(msg, error, document, i) + continue + logger_class.error(msg, error, document, i) + raise BackendException(msg % (error, document, i)) from error + + +async def async_parse_dict_to_bytes( + documents: AsyncIterable[Dict[str, Any]], + encoding: str, + ignore_errors: bool, + logger_class: logging.Logger, +) -> AsyncIterator[bytes]: + """Read the `documents` Iterable with the `encoding` and yield bytes.""" + i = 0 + async for document in documents: try: - yield json.dumps(document).encode(encoding) + yield f"{json.dumps(document)}\n".encode(encoding) except (TypeError, ValueError) as error: - msg = "Failed to convert document to bytes: %s" + msg = "Failed to encode JSON: %s, for document: %s, at line %s" if ignore_errors: - logger_class.warning(msg, error) + logger_class.warning(msg, error, document, i) continue - logger_class.error(msg, error) - raise BackendException(msg % error) from error + logger_class.error(msg, error, document, i) + raise BackendException(msg % (error, document, i)) from error + + i += 1 + + +def iter_by_batch(iterable: Iterable, n: int) -> Iterable[list]: + """Batch `iterable` into lists of length `n`. The last batch may be less than `n`. + + Taken from itertools.batched introduced in python 3.12 + """ + if n < 1: + raise ValueError("n must be at least one") + + it = iter(iterable) + while batch := list(islice(it, n)): + yield batch def iter_over_async(agenerator) -> Iterable: diff --git a/tests/backends/data/test_async_es.py b/tests/backends/data/test_async_es.py index ff589ad30..cbb7d198b 100644 --- a/tests/backends/data/test_async_es.py +++ b/tests/backends/data/test_async_es.py @@ -355,23 +355,39 @@ def mock_async_es_search_open_pit(**kwargs): async def test_backends_data_async_es_read_with_ignore_errors( es, async_es_backend, monkeypatch, caplog ): - """Test the `AsyncESDataBackend.read` method, given `ignore_errors` set to `True`, - should log a warning message. - """ + """Test the `AsyncESDataBackend.read` method, given `ignore_errors` set to `False`, + should raise a BackendException if a JSON parsing error occurs. - backend = async_es_backend() + Given `ignore_errors` set to `False`, the `read` method should log a warning + message instead. + """ async def mock_async_es_search(**kwargs): - return {"hits": {"hits": []}} + return {"hits": {"hits": [{"foo": 1j, "sort": []}]}} + backend = async_es_backend() monkeypatch.setattr(backend.client, "search", mock_async_es_search) + error = ( + "Failed to encode JSON: Object of type complex is not JSON serializable, " + "for document: {'foo': 1j, 'sort': []}, at line 0" + ) + with pytest.raises(BackendException, match=re.escape(error)): + with caplog.at_level(logging.ERROR): + _ = [x async for x in backend.read(ignore_errors=False, raw_output=True)] + + assert ( + "ralph.backends.data.async_es", + logging.ERROR, + error, + ) in caplog.record_tuples + with caplog.at_level(logging.WARNING): - _ = [statement async for statement in backend.read(ignore_errors=True)] + _ = [x async for x in backend.read(ignore_errors=True, raw_output=True)] assert ( "ralph.backends.data.async_es", logging.WARNING, - "The `ignore_errors` argument is ignored", + error, ) in caplog.record_tuples await backend.close() @@ -718,7 +734,7 @@ async def test_backends_data_async_es_write_without_ignore_errors( # By default, we should raise an error and stop the importation. msg = ( r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " - r"for document: b'This is invalid JSON'" + r"for document: b'This is invalid JSON', at line 1" ) with pytest.raises(BackendException, match=msg): with caplog.at_level(logging.ERROR): @@ -747,7 +763,7 @@ async def test_backends_data_async_es_write_with_ignore_errors( msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'This is invalid JSON'" + "for document: b'This is invalid JSON', at line 1" ) records = [{"id": idx, "count": random.randint(0, 100)} for idx in range(10)] # Patch a record with a non-expected type for the count field (should be diff --git a/tests/backends/data/test_async_mongo.py b/tests/backends/data/test_async_mongo.py index 44fdd5937..d6d2ce3d0 100644 --- a/tests/backends/data/test_async_mongo.py +++ b/tests/backends/data/test_async_mongo.py @@ -2,6 +2,7 @@ import json import logging +import re import pytest from bson.objectid import ObjectId @@ -323,9 +324,9 @@ async def test_backends_data_async_mongo_read_with_raw_output( {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756da", "id": "bar"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756da", "id": "bar"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) @@ -451,14 +452,15 @@ async def test_backends_data_async_mongo_read_with_ignore_errors( """ backend = async_mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) @@ -478,8 +480,9 @@ async def test_backends_data_async_mongo_read_with_ignore_errors( assert ( "ralph.backends.data.async_mongo", logging.WARNING, - "Failed to encode MongoDB document with ID 64945e530468d817b1f756da: " - "Object of type ObjectId is not JSON serializable", + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1", ) in caplog.record_tuples @@ -493,9 +496,10 @@ async def test_backends_data_async_mongo_read_without_ignore_errors( """ backend = async_mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}' @@ -503,27 +507,28 @@ async def test_backends_data_async_mongo_read_without_ignore_errors( await backend.database.foobar.insert_many(documents[:2]) kwargs = {"raw_output": True, "ignore_errors": False} msg = ( - "Failed to encode MongoDB document with ID 64945e530468d817b1f756da: " - "Object of type ObjectId is not JSON serializable" + "Failed to encode JSON: Object of type ObjectId is not JSON serializable, " + "for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1" ) with caplog.at_level(logging.ERROR): - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [statement async for statement in backend.read(**kwargs)] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, target="foobar") ] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, chunk_size=2) ] assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = [ statement async for statement in backend.read(**kwargs, chunk_size=1000) ] @@ -936,7 +941,7 @@ async def test_backends_data_async_mongo_write_with_unparsable_documents( backend = async_mongo_backend() msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'not valid JSON!'" + "for document: b'not valid JSON!', at line 0" ) msg_regex = msg.replace("(", r"\(").replace(")", r"\)") with pytest.raises(BackendException, match=msg_regex): @@ -962,13 +967,13 @@ async def test_backends_data_async_mongo_write_with_no_data( 0. """ backend = async_mongo_backend() - with caplog.at_level(logging.WARNING): + with caplog.at_level(logging.INFO): assert await backend.write(data=[]) == 0 msg = "Data Iterator is empty; skipping write to target." assert ( "ralph.backends.data.async_mongo", - logging.WARNING, + logging.INFO, msg, ) in caplog.record_tuples diff --git a/tests/backends/data/test_clickhouse.py b/tests/backends/data/test_clickhouse.py index 0b84a28fd..efd7e0401 100644 --- a/tests/backends/data/test_clickhouse.py +++ b/tests/backends/data/test_clickhouse.py @@ -2,7 +2,9 @@ import json import logging +import re import uuid +from collections import namedtuple from datetime import datetime, timedelta import pytest @@ -222,26 +224,23 @@ def test_backends_data_clickhouse_read_with_failures( ): """Test the `ClickHouseDataBackend.read` method with failures.""" backend = clickhouse_backend() - - statement = {"id": str(uuid.uuid4()), "timestamp": str(datetime.utcnow())} - document = {"event": json.dumps(statement)} - backend.write([statement]) + document = {"event": "Invalid JSON!"} # JSON encoding error - def mock_read_json(*args, **kwargs): - """Mock the `ClickHouseDataBackend._read_json` method.""" - raise TypeError("Error") + def mock_clickhouse_client_query(*args, **kwargs): + """Mock the `clickhouse.Client.query` returning an unparsable document.""" + return namedtuple("_", "named_results")(lambda: [document]) - monkeypatch.setattr(backend, "_read_json", mock_read_json) + monkeypatch.setattr(backend.client, "query", mock_clickhouse_client_query) - msg = f"Failed to encode document {document}: Error" + msg = ( + "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " + "for document: {'event': 'Invalid JSON!'}, at line 0" + ) # Not ignoring errors with caplog.at_level(logging.ERROR): - with pytest.raises( - BackendException, - match=msg, - ): + with pytest.raises(BackendException, match=re.escape(msg)): list(backend.read(raw_output=False, ignore_errors=False)) assert ( @@ -277,10 +276,7 @@ def mock_query(*_, **__): msg = "Failed to read documents: Something is wrong" with caplog.at_level(logging.ERROR): - with pytest.raises( - BackendException, - match=msg, - ): + with pytest.raises(BackendException, match=re.escape(msg)): list(backend.read(ignore_errors=True)) assert ( @@ -549,7 +545,11 @@ def test_backends_data_clickhouse_write_bytes_failed(clickhouse, clickhouse_back byte_data.append(json_str.encode("utf-8")) count = 0 - with pytest.raises(json.JSONDecodeError): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'failed_json_str', at line 0" + ) + with pytest.raises(BackendException, match=msg): count = backend.write(byte_data) assert count == 0 diff --git a/tests/backends/data/test_es.py b/tests/backends/data/test_es.py index e002e8da8..e9a3e7620 100644 --- a/tests/backends/data/test_es.py +++ b/tests/backends/data/test_es.py @@ -316,19 +316,39 @@ def mock_es_search_open_pit(**kwargs): def test_backends_data_es_read_with_ignore_errors(es, es_backend, monkeypatch, caplog): - """Test the `ESDataBackend.read` method, given `ignore_errors` set to `True`, - should log a warning message. + """Test the `ESDataBackend.read` method, given `ignore_errors` set to `False`, + should raise a BackendException if a JSON parsing error occurs. + + Given `ignore_errors` set to `False`, the `read` method should log a warning + message instead. """ + def mock_es_search(**kwargs): + return {"hits": {"hits": [{"foo": 1j, "sort": []}]}} + backend = es_backend() - monkeypatch.setattr(backend.client, "search", lambda **_: {"hits": {"hits": []}}) + monkeypatch.setattr(backend.client, "search", mock_es_search) + error = ( + "Failed to encode JSON: Object of type complex is not JSON serializable, " + "for document: {'foo': 1j, 'sort': []}, at line 0" + ) + with pytest.raises(BackendException, match=re.escape(error)): + with caplog.at_level(logging.ERROR): + list(backend.read(ignore_errors=False, raw_output=True)) + + assert ( + "ralph.backends.data.es", + logging.ERROR, + error, + ) in caplog.record_tuples + with caplog.at_level(logging.WARNING): - list(backend.read(ignore_errors=True)) + list(backend.read(ignore_errors=True, raw_output=True)) assert ( "ralph.backends.data.es", logging.WARNING, - "The `ignore_errors` argument is ignored", + error, ) in caplog.record_tuples backend.close() @@ -642,7 +662,7 @@ def test_backends_data_es_write_without_ignore_errors(es, es_backend, caplog): # By default, we should raise an error and stop the importation. msg = ( r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " - r"for document: b'This is invalid JSON'" + r"for document: b'This is invalid JSON', at line 1" ) with pytest.raises(BackendException, match=msg): with caplog.at_level(logging.ERROR): diff --git a/tests/backends/data/test_fs.py b/tests/backends/data/test_fs.py index 7ecee5bb0..ddc32a595 100644 --- a/tests/backends/data/test_fs.py +++ b/tests/backends/data/test_fs.py @@ -2,6 +2,7 @@ import json import logging import os +import re from collections.abc import Iterable from operator import itemgetter from uuid import uuid4 @@ -623,7 +624,11 @@ def test_backends_data_fs_read_without_ignore_errors(fs_backend, fs, monkeypatch result = backend.read(ignore_errors=False) assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 1" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no entry should be @@ -638,7 +643,11 @@ def test_backends_data_fs_read_without_ignore_errors(fs_backend, fs, monkeypatch result = backend.read(ignore_errors=False, target=absolute_path) assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method succeeds to read one file entirely, and fails to read @@ -661,7 +670,11 @@ def test_backends_data_fs_read_without_ignore_errors(fs_backend, fs, monkeypatch # line, the `read` method should raise a `BackendException`. result = backend.read(ignore_errors=False, target="bar") assert isinstance(result, Iterable) - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no new entry should be @@ -745,6 +758,22 @@ def test_backends_data_fs_write_with_file_exists_error(operation_type, fs_backen assert not sorted(backend.history, key=itemgetter("id")) +def test_backends_data_fs_write_with_file_not_found_error(fs_backend, fs, caplog): + """Test the `FSDataBackend.write` method, given a target not matching an existing + directory location, should raise a `BackendException`. + """ + backend = fs_backend() + msg = ( + "Failed to write to /unreachable/foo.txt: " + "[Errno 2] No such file or directory in the fake filesystem: '/unreachable'" + ) + with pytest.raises(BackendException, match=re.escape(msg)): + with caplog.at_level(logging.ERROR): + backend.write(target="/unreachable/foo.txt", data=[b"foo"]) + + assert ("ralph.backends.data.fs", logging.ERROR, msg) in caplog.record_tuples + + def test_backends_data_fs_write_with_delete_operation( fs_backend, ): @@ -972,11 +1001,15 @@ def test_backends_data_fs_write_without_target(fs_backend, monkeypatch): ] -def test_backends_data_fs_close(fs_backend): - """Test that the `FSDataBackend.close` method raise an error.""" +def test_backends_data_fs_close(fs_backend, caplog): + """Test that the `FSDataBackend.close` method produces an info level log.""" backend = fs_backend() - - error = "FS data backend does not support `close` method" - with pytest.raises(NotImplementedError, match=error): + with caplog.at_level(logging.INFO): backend.close() + + assert ( + "ralph.backends.data.fs", + logging.INFO, + "No open connections to close; skipping", + ) in caplog.record_tuples diff --git a/tests/backends/data/test_ldp.py b/tests/backends/data/test_ldp.py index 2aaa3a08f..e4c12a74d 100644 --- a/tests/backends/data/test_ldp.py +++ b/tests/backends/data/test_ldp.py @@ -679,7 +679,7 @@ def mock_post(url): def test_backends_data_ldp_close(ldp_backend, caplog): - """Test that the `LDPDataBackend.close` method raise an error.""" + """Test that the `LDPDataBackend.close` method produces an info level log.""" backend = ldp_backend() diff --git a/tests/backends/data/test_mongo.py b/tests/backends/data/test_mongo.py index 8f4d65090..cbbd78ce4 100644 --- a/tests/backends/data/test_mongo.py +++ b/tests/backends/data/test_mongo.py @@ -2,6 +2,7 @@ import json import logging +import re import pytest from bson.objectid import ObjectId @@ -256,9 +257,9 @@ def test_backends_data_mongo_read_with_raw_output(mongo, mongo_backend): {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756da", "id": "bar"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756da", "id": "bar"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) @@ -348,14 +349,15 @@ def test_backends_data_mongo_read_with_ignore_errors(mongo, mongo_backend, caplo """ backend = mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] expected = [ - b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}', - b'{"_id": "64945e530468d817b1f756db", "id": "baz"}', + b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n', + b'{"_id": "64945e530468d817b1f756db", "id": "baz"}\n', ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) @@ -369,8 +371,9 @@ def test_backends_data_mongo_read_with_ignore_errors(mongo, mongo_backend, caplo assert ( "ralph.backends.data.mongo", logging.WARNING, - "Failed to convert document to bytes: " - "Object of type ObjectId is not JSON serializable", + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1", ) in caplog.record_tuples backend.close() @@ -382,33 +385,35 @@ def test_backends_data_mongo_read_without_ignore_errors(mongo, mongo_backend, ca """ backend = mongo_backend() + unparsable_value = ObjectId() documents = [ {"_id": ObjectId("64945e53a4ee2699573e0d6f"), "id": "foo"}, - {"_id": ObjectId("64945e530468d817b1f756da"), "id": ObjectId()}, + {"_id": ObjectId("64945e530468d817b1f756da"), "id": unparsable_value}, {"_id": ObjectId("64945e530468d817b1f756db"), "id": "baz"}, ] - expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}' + expected = b'{"_id": "64945e53a4ee2699573e0d6f", "id": "foo"}\n' backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) kwargs = {"raw_output": True, "ignore_errors": False} msg = ( - "Failed to convert document to bytes: " - "Object of type ObjectId is not JSON serializable" + "Failed to encode JSON: Object of type ObjectId is not " + "JSON serializable, for document: {'_id': '64945e530468d817b1f756da', " + f"'id': ObjectId('{unparsable_value}')}}, at line 1" ) with caplog.at_level(logging.ERROR): - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs) assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, target="foobar") assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, chunk_size=2) assert next(result) == expected next(result) - with pytest.raises(BackendException, match=msg): + with pytest.raises(BackendException, match=re.escape(msg)): result = backend.read(**kwargs, chunk_size=1000) assert next(result) == expected next(result) @@ -785,7 +790,7 @@ def test_backends_data_mongo_write_with_unparsable_documents(mongo_backend, capl backend = mongo_backend() msg = ( "Failed to decode JSON: Expecting value: line 1 column 1 (char 0), " - "for document: b'not valid JSON!'" + "for document: b'not valid JSON!', at line 0" ) msg_regex = msg.replace("(", r"\(").replace(")", r"\)") with caplog.at_level(logging.ERROR): diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py index 37c8db62d..83e376ef5 100644 --- a/tests/backends/data/test_s3.py +++ b/tests/backends/data/test_s3.py @@ -3,6 +3,8 @@ import datetime import json import logging +import re +from collections import namedtuple import boto3 import pytest @@ -319,7 +321,8 @@ def test_backends_data_s3_read_with_invalid_output_should_log_the_error( assert ( "ralph.backends.data.s3", logging.ERROR, - "Raised error: Expecting value: line 1 column 1 (char 0)", + "Failed to decode JSON: Expecting value: line 1 column 1 (char 0)," + " for document: b'some contents in the body', at line 0", ) in caplog.record_tuples backend.clean_history(lambda *_: True) @@ -425,13 +428,18 @@ def test_backends_data_s3_read_with_iter_error_should_log_the_error( Body=body, ) - def mock_read_raw(*args, **kwargs): - raise ResponseStreamingError(error="error") + def mock_get_object(*args, **kwargs): # pylint: disable=unused-argument + """Mock the boto3 client.get_object method raising an exception on iteration.""" + + def raising_iter_chunks(*_, **__): # pylint: disable=unused-argument + raise ResponseStreamingError(error="error") + + return {"Body": namedtuple("_", "iter_chunks")(raising_iter_chunks)} with caplog.at_level(logging.ERROR): with pytest.raises(BackendException): backend = s3_backend() - monkeypatch.setattr(backend, "_read_raw", mock_read_raw) + monkeypatch.setattr(backend.client, "get_object", mock_get_object) backend.clean_history(lambda *_: True) list(backend.read(query=object_name, target=bucket_name, raw_output=True)) @@ -581,11 +589,13 @@ def test_backends_data_s3_write_with_create_index_operation( data = [{"some": "content", "datetime": date}] - error = "Object of type datetime is not JSON serializable" - - with caplog.at_level(logging.ERROR): + msg = ( + "Failed to encode JSON: Object of type datetime is not JSON serializable, " + f"for document: {data[0]}, at line 0" + ) + with caplog.at_level(logging.WARNING): # Without ignoring error - with pytest.raises(BackendException, match=error): + with pytest.raises(BackendException, match=re.escape(msg)): response = backend.write( data=data, target=object_name, @@ -603,18 +613,14 @@ def test_backends_data_s3_write_with_create_index_operation( assert list( filter( - lambda record: record[1] == logging.ERROR, + lambda record: record[1] in [logging.ERROR, logging.WARNING], caplog.record_tuples, ) ) == ( [ - ( - "ralph.backends.data.s3", - logging.ERROR, - f"Failed to encode JSON: {error}, for document {data[0]}", - ) + ("ralph.backends.data.s3", logging.ERROR, msg), + ("ralph.backends.data.s3", logging.WARNING, msg), ] - * 2 ) backend.close() diff --git a/tests/backends/data/test_swift.py b/tests/backends/data/test_swift.py index 2d98be057..cb7f03910 100644 --- a/tests/backends/data/test_swift.py +++ b/tests/backends/data/test_swift.py @@ -457,7 +457,11 @@ def mock_get_object_1(*args, **kwargs): result = backend.read(ignore_errors=False, query="2020-06-02.gz") assert isinstance(result, Iterable) assert next(result) == valid_dictionary - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 1" + ) + with pytest.raises(BackendException, match=msg): next(result) # When the `read` method fails to read a file entirely, then no entry should be @@ -474,7 +478,11 @@ def mock_get_object_2(*args, **kwargs): # method should raise a `BackendException` at the second line. result = backend.read(ignore_errors=False, query="2020-06-03.gz") assert isinstance(result, Iterable) - with pytest.raises(BackendException, match="Raised error:"): + msg = ( + r"Failed to decode JSON: Expecting value: line 1 column 1 \(char 0\), " + r"for document: b'baz\\n', at line 0" + ) + with pytest.raises(BackendException, match=msg): next(result) backend.close() @@ -625,7 +633,8 @@ def test_backends_data_swift_write_without_target( def mock_get_container(*args, **kwargs): return (None, [x["name"] for x in listing]) - def mock_put_object(*args, **kwargs): + def mock_put_object(container, obj, contents): + list(contents) return 1 def mock_head_object(*args, **kwargs): diff --git a/tests/backends/http/test_async_lrs.py b/tests/backends/http/test_async_lrs.py index 9d7dfca01..096036b70 100644 --- a/tests/backends/http/test_async_lrs.py +++ b/tests/backends/http/test_async_lrs.py @@ -672,11 +672,14 @@ async def test_backends_http_async_lrs_write_without_operation( max_num_simultaneous=max_num_simultaneous, ) + # If no chunk_size is provided, a default value (500) should be used. + if chunk_size is None: + chunk_size = 500 + assert ( "ralph.backends.http.async_lrs", logging.DEBUG, - f"Start writing to the {base_url}{target} endpoint (chunk size: " - f"{chunk_size})", + f"Start writing to the {base_url}{target} endpoint (chunk size: {chunk_size})", ) in caplog.record_tuples assert (