Skip to content

Commit

Permalink
♻️(backends) use common utilities among backends
Browse files Browse the repository at this point in the history
During data backend unification work some shared utilities were
developed to factorize duplication:
`parse_dict_to_bytes`, `read_raw` and `iter_by_batch`.
However, some backends still used their own implementation of
these utilities leading to some minor behavioral differences
among backends.
Thus, for consistency, we update backends to use common utilities.
  • Loading branch information
SergioSim committed Nov 24, 2023
1 parent 89b53c0 commit d40480a
Show file tree
Hide file tree
Showing 25 changed files with 590 additions and 525 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ methods under the unified `lrs` backend interface [BC]
have an authority field matching that of the user
- CLI: change `push` to `write` and `fetch` to `read` [BC]
- Upgrade `fastapi` to `0.104.1`
- Upgrade `more-itertools` to `10.1.0`
- Upgrade `sentry_sdk` to `1.34.0`
- Upgrade `uvicorn` to `0.24.0.post1`
- API: Invalid parameters now return 400 status code
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ backend-ldp = [
]
backend-lrs = [
"httpx<0.25.0", # pin as Python 3.7 is no longer supported from release 0.25.0
"more-itertools==10.1.0",
]
backend-mongo = [
"motor[srv]>=3.3.0",
Expand Down
38 changes: 24 additions & 14 deletions src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from ralph.backends.data.es import ESDataBackend, ESDataBackendSettings, ESQuery
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import parse_bytes_to_dict, read_raw
from ralph.utils import async_parse_dict_to_bytes, parse_iterable_to_dict

logger = logging.getLogger(__name__)
Settings = TypeVar("Settings", bound=ESDataBackendSettings)
Expand Down Expand Up @@ -131,7 +131,8 @@ async def read( # noqa: PLR0912, PLR0913
chunk_size (int or None): The chunk size when reading documents by batches.
If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`.
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): Ignored.
ignore_errors (bool): No impact as encoding errors are not expected in
Elasticsearch results.
Yield:
bytes: The next raw document if `raw_output` is True.
Expand All @@ -140,10 +141,23 @@ async def read( # noqa: PLR0912, PLR0913
Raise:
BackendException: If a failure occurs during Elasticsearch connection.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
async for document in async_parse_dict_to_bytes(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
):
yield document

return

target = target if target else self.settings.DEFAULT_INDEX
chunk_size = chunk_size if chunk_size else self.settings.DEFAULT_CHUNK_SIZE
if ignore_errors:
logger.warning("The `ignore_errors` argument is ignored")

if not query.pit.keep_alive:
query.pit.keep_alive = self.settings.POINT_IN_TIME_KEEP_ALIVE
Expand Down Expand Up @@ -183,10 +197,6 @@ async def read( # noqa: PLR0912, PLR0913
if count:
query.search_after = [str(part) for part in documents[-1]["sort"]]
kwargs["search_after"] = query.search_after
if raw_output:
documents = read_raw(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
)
for document in documents:
yield document

Expand All @@ -206,9 +216,9 @@ async def write( # noqa: PLR0913
If target is `None`, the `DEFAULT_INDEX` is used instead.
chunk_size (int or None): The number of documents to write in one batch.
If chunk_size is `None` it defaults to `DEFAULT_CHUNK_SIZE`.
ignore_errors (bool): If `True`, errors during the write operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
ignore_errors (bool): If `True`, errors during decoding, encoding and
sending batches of documents are ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Expand All @@ -217,8 +227,8 @@ async def write( # noqa: PLR0913
int: The number of documents written.
Raise:
BackendException: If a failure occurs while writing to Elasticsearch or
during document decoding and `ignore_errors` is set to `False`.
BackendException: If any failure occurs during the write operation or
if an inescapable failure occurs and `ignore_errors` is set to `True`.
BackendParameterException: If the `operation_type` is `APPEND` as it is not
supported.
"""
Expand All @@ -240,7 +250,7 @@ async def write( # noqa: PLR0913

data = chain((first_record,), data)
if isinstance(first_record, bytes):
data = parse_bytes_to_dict(data, ignore_errors, logger)
data = parse_iterable_to_dict(data, ignore_errors, logger)

logger.debug(
"Start writing to the %s index (chunk size: %d)", target, chunk_size
Expand Down
68 changes: 35 additions & 33 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Async MongoDB data backend for Ralph."""

import json
import logging
from io import IOBase
from itertools import chain
from typing import Any, Dict, Iterable, Iterator, Optional, TypeVar, Union
from typing import Iterable, Iterator, Optional, TypeVar, Union

from bson.errors import BSONError
from motor.motor_asyncio import AsyncIOMotorClient
Expand All @@ -18,7 +17,7 @@
MongoQuery,
)
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import parse_bytes_to_dict
from ralph.utils import async_parse_dict_to_bytes, iter_by_batch, parse_iterable_to_dict

from ..data.base import (
AsyncListable,
Expand Down Expand Up @@ -140,16 +139,34 @@ async def read( # noqa: PLR0913
chunk_size (int or None): The chunk size when reading documents by batches.
If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead.
raw_output (bool): Whether to yield dictionaries or bytes.
ignore_errors (bool): Whether to ignore errors when reading documents.
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
Yield:
bytes: The next raw document if `raw_output` is True.
dict: The next JSON parsed document if `raw_output` is False.
Raise:
BackendException: If a failure occurs during MongoDB connection.
BackendParameterException: If a failure occurs with MongoDB collection.
BackendException: If a failure occurs during MongoDB connection or
during encoding documents and `ignore_errors` is set to `False`.
BackendParameterException: If the `target` is not a valid collection name.
"""
if raw_output:
documents = self.read(
query=query,
target=target,
chunk_size=chunk_size,
raw_output=False,
ignore_errors=ignore_errors,
)
async for document in async_parse_dict_to_bytes(
documents, self.settings.LOCALE_ENCODING, ignore_errors, logger
):
yield document

return

if not chunk_size:
chunk_size = self.settings.DEFAULT_CHUNK_SIZE

Expand All @@ -163,20 +180,10 @@ async def read( # noqa: PLR0913
logger.error(msg, target, error)
raise BackendParameterException(msg % (target, error)) from error

reader = self._read_raw if raw_output else lambda _: _
try:
async for document in collection.find(batch_size=chunk_size, **query):
document.update({"_id": str(document.get("_id"))})
try:
yield reader(document)
except (TypeError, ValueError) as error:
msg = "Failed to encode MongoDB document with ID %s: %s"
document_id = document.get("_id")
logger.error(msg, document_id, error)
if ignore_errors:
logger.warning(msg, document_id, error)
continue
raise BackendException(msg % (document_id, error)) from error
yield document
except (PyMongoError, IndexError, TypeError, ValueError) as error:
msg = "Failed to execute MongoDB query: %s"
logger.error(msg, error)
Expand All @@ -197,7 +204,9 @@ async def write( # noqa: PLR0913
target (str or None): The target MongoDB collection name.
chunk_size (int or None): The number of documents to write in one batch.
If chunk_size is `None` the `DEFAULT_CHUNK_SIZE` is used instead.
ignore_errors (bool): Whether to ignore errors or not.
ignore_errors (bool): If `True`, errors during decoding, encoding and
sending batches of documents are ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Expand All @@ -206,8 +215,8 @@ async def write( # noqa: PLR0913
int: The number of documents written.
Raise:
BackendException: If a failure occurs while writing to MongoDB or
during document decoding and `ignore_errors` is set to `False`.
BackendException: If any failure occurs during the write operation or
if an inescapable failure occurs and `ignore_errors` is set to `True`.
BackendParameterException: If the `operation_type` is `APPEND` as it is not
supported.
"""
Expand Down Expand Up @@ -235,29 +244,26 @@ async def write( # noqa: PLR0913
try:
first_record = next(data)
except StopIteration:
logger.warning("Data Iterator is empty; skipping write to target.")
logger.info("Data Iterator is empty; skipping write to target.")
return count
data = chain([first_record], data)
if isinstance(first_record, bytes):
data = parse_bytes_to_dict(data, ignore_errors, logger)
data = parse_iterable_to_dict(data, ignore_errors, logger)

if operation_type == BaseOperationType.UPDATE:
for batch in MongoDataBackend.iter_by_batch(
MongoDataBackend.to_replace_one(data), chunk_size
):
data = MongoDataBackend.to_replace_one(data)
for batch in iter_by_batch(data, chunk_size):
count += await self._bulk_update(batch, ignore_errors, collection)
logger.info("Updated %d documents with success", count)
elif operation_type == BaseOperationType.DELETE:
for batch in MongoDataBackend.iter_by_batch(
MongoDataBackend.to_ids(data), chunk_size
):
for batch in iter_by_batch(MongoDataBackend.to_ids(data), chunk_size):
count += await self._bulk_delete(batch, ignore_errors, collection)
logger.info("Deleted %d documents with success", count)
else:
data = MongoDataBackend.to_documents(
data, ignore_errors, operation_type, logger
)
for batch in MongoDataBackend.iter_by_batch(data, chunk_size):
for batch in iter_by_batch(data, chunk_size):
count += await self._bulk_import(batch, ignore_errors, collection)
logger.info("Inserted %d documents with success", count)

Expand Down Expand Up @@ -326,7 +332,3 @@ async def _bulk_update(batch: list, ignore_errors: bool, collection: Collection)
modified_count = updated_documents.modified_count
logger.debug("Updated %d documents chunk with success", modified_count)
return modified_count

def _read_raw(self, document: Dict[str, Any]) -> bytes:
"""Read the `document` dictionary and return bytes."""
return json.dumps(document).encode(self.settings.LOCALE_ENCODING)
38 changes: 18 additions & 20 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ def write( # noqa: PLR0913
chunk_size (int or None): The number of records or bytes to write in one
batch, depending on whether `data` contains dictionaries or bytes.
If `chunk_size` is `None`, a default value is used instead.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
ignore_errors (bool): If `True`, escapable errors are ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Expand All @@ -113,8 +112,8 @@ def write( # noqa: PLR0913
int: The number of written records.
Raise:
BackendException: If a failure during the write operation occurs and
`ignore_errors` is set to `False`.
BackendException: If any failure occurs during the write operation or
if an inescapable failure occurs and `ignore_errors` is set to `True`.
BackendParameterException: If a backend argument value is not valid.
"""

Expand Down Expand Up @@ -262,17 +261,17 @@ def read( # noqa: PLR0913
are encoded as JSON.
If the records are bytes and `raw_output` is set to `False`, they are
decoded as JSON by line.
ignore_errors (bool): If `True`, errors during the read operation
are be ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
Yield:
dict: If `raw_output` is False.
bytes: If `raw_output` is True.
Raise:
BackendException: If a failure during the read operation occurs and
`ignore_errors` is set to `False`.
BackendException: If a failure during the read operation occurs or
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""

Expand Down Expand Up @@ -322,9 +321,8 @@ async def write( # noqa: PLR0913
chunk_size (int or None): The number of records or bytes to write in one
batch, depending on whether `data` contains dictionaries or bytes.
If `chunk_size` is `None`, a default value is used instead.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
ignore_errors (bool): If `True`, escapable errors are ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
Expand All @@ -333,8 +331,8 @@ async def write( # noqa: PLR0913
int: The number of written records.
Raise:
BackendException: If a failure during the write operation occurs and
`ignore_errors` is set to `False`.
BackendException: If any failure occurs during the write operation or
if an inescapable failure occurs and `ignore_errors` is set to `True`.
BackendParameterException: If a backend argument value is not valid.
"""

Expand Down Expand Up @@ -447,17 +445,17 @@ async def read( # noqa: PLR0913
are encoded as JSON.
If the records are bytes and `raw_output` is set to `False`, they are
decoded as JSON by line.
ignore_errors (bool): If `True`, errors during the read operation
are be ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
Yield:
dict: If `raw_output` is False.
bytes: If `raw_output` is True.
Raise:
BackendException: If a failure during the read operation occurs and
`ignore_errors` is set to `False`.
BackendException: If a failure during the read operation occurs or
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""

Expand Down
Loading

0 comments on commit d40480a

Please sign in to comment.