From 4f61b754bd2856522c10d34842b3fd336950a0b8 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Fri, 15 Nov 2024 12:01:46 +0100 Subject: [PATCH] Remove local maindb driver (#2636) --- .github/workflows/ci.yml | 3 - .../nucliadb/common/datamanagers/resources.py | 21 +- nucliadb/src/nucliadb/common/maindb/local.py | 228 ------------------ nucliadb/src/nucliadb/common/maindb/utils.py | 15 -- nucliadb/src/nucliadb/health.py | 3 +- nucliadb/src/nucliadb/ingest/settings.py | 7 +- nucliadb/src/nucliadb/search/app.py | 3 +- nucliadb/src/nucliadb/standalone/config.py | 11 +- nucliadb/tests/fixtures.py | 71 +----- nucliadb/tests/ndbfixtures/maindb.py | 68 +----- .../integration/common/maindb/test_drivers.py | 10 - .../integration/search/test_search.py | 173 ------------- .../nucliadb/unit/common/maindb/test_utils.py | 12 - nucliadb/tests/search/node.py | 13 +- nucliadb/tests/writer/test_resources.py | 4 - nucliadb_dataset/tests/conftest.py | 6 +- .../src/nucliadb_sdk/tests/fixtures.py | 47 +++- nucliadb_sdk/tests/conftest.py | 1 + 18 files changed, 74 insertions(+), 622 deletions(-) delete mode 100644 nucliadb/src/nucliadb/common/maindb/local.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a26790478..477c717ca8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -343,7 +343,6 @@ jobs: strategy: fail-fast: false matrix: - maindb_driver: ["pg"] shard: [0, 1, 2] steps: @@ -365,8 +364,6 @@ jobs: - name: Run NucliaDB tests # These tests can be flaky, let's retry them... uses: nick-fields/retry@v3 - env: - TESTING_MAINDB_DRIVERS: ${{ matrix.maindb_driver }} with: max_attempts: 2 retry_on: error diff --git a/nucliadb/src/nucliadb/common/datamanagers/resources.py b/nucliadb/src/nucliadb/common/datamanagers/resources.py index 2180a460c0..895ce4084f 100644 --- a/nucliadb/src/nucliadb/common/datamanagers/resources.py +++ b/nucliadb/src/nucliadb/common/datamanagers/resources.py @@ -26,7 +26,6 @@ from nucliadb.common.maindb.exceptions import ConflictError, NotFoundError # These should be refactored -from nucliadb.ingest.settings import settings as ingest_settings from nucliadb_protos import resources_pb2 from nucliadb_utils.utilities import get_storage @@ -37,7 +36,6 @@ KB_RESOURCE_BASIC = "/kbs/{kbid}/r/{uuid}" -KB_RESOURCE_BASIC_FS = "/kbs/{kbid}/r/{uuid}/basic" # Only used on FS driver KB_RESOURCE_ORIGIN = "/kbs/{kbid}/r/{uuid}/origin" KB_RESOURCE_EXTRA = "/kbs/{kbid}/r/{uuid}/extra" KB_RESOURCE_SECURITY = "/kbs/{kbid}/r/{uuid}/security" @@ -128,24 +126,15 @@ async def get_basic(txn: Transaction, *, kbid: str, rid: str) -> Optional[resour async def get_basic_raw(txn: Transaction, *, kbid: str, rid: str) -> Optional[bytes]: - if ingest_settings.driver == "local": - raw_basic = await txn.get(KB_RESOURCE_BASIC_FS.format(kbid=kbid, uuid=rid)) - else: - raw_basic = await txn.get(KB_RESOURCE_BASIC.format(kbid=kbid, uuid=rid)) + raw_basic = await txn.get(KB_RESOURCE_BASIC.format(kbid=kbid, uuid=rid)) return raw_basic async def set_basic(txn: Transaction, *, kbid: str, rid: str, basic: resources_pb2.Basic): - if ingest_settings.driver == "local": - await txn.set( - KB_RESOURCE_BASIC_FS.format(kbid=kbid, uuid=rid), - basic.SerializeToString(), - ) - else: - await txn.set( - KB_RESOURCE_BASIC.format(kbid=kbid, uuid=rid), - basic.SerializeToString(), - ) + await txn.set( + KB_RESOURCE_BASIC.format(kbid=kbid, uuid=rid), + basic.SerializeToString(), + ) # Origin diff --git a/nucliadb/src/nucliadb/common/maindb/local.py b/nucliadb/src/nucliadb/common/maindb/local.py deleted file mode 100644 index f238bebe1e..0000000000 --- a/nucliadb/src/nucliadb/common/maindb/local.py +++ /dev/null @@ -1,228 +0,0 @@ -# Copyright (C) 2021 Bosutech XXI S.L. -# -# nucliadb is offered under the AGPL v3.0 and as commercial software. -# For commercial licensing, contact us at info@nuclia.com. -# -# AGPL: -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -import glob -import os -from contextlib import asynccontextmanager -from typing import AsyncGenerator, Optional - -from nucliadb.common.maindb.driver import ( - DEFAULT_BATCH_SCAN_LIMIT, - DEFAULT_SCAN_LIMIT, - Driver, - Transaction, -) - -try: - import aiofiles - - FILES = True -except ImportError: # pragma: no cover - FILES = False - - -class LocalTransaction(Transaction): - modified_keys: dict[str, bytes] - visited_keys: dict[str, bytes] - deleted_keys: list[str] - - def __init__(self, url: str, driver: Driver): - self.url = url - self.open = True - self.driver = driver - self.modified_keys = {} - self.visited_keys = {} - self.deleted_keys = [] - - def clean(self): - self.modified_keys.clear() - self.visited_keys.clear() - self.deleted_keys.clear() - - async def abort(self): - self.clean() - self.open = False - - def compute_path(self, key: str): - return f"{self.url}{key.rstrip('/')}/__data__" - - async def save(self, key: str, value: bytes): - key = self.compute_path(key) - folder = os.path.dirname(key) - if not os.path.exists(folder): - os.makedirs(folder, exist_ok=True) - async with aiofiles.open(key, "wb+") as resp: - await resp.write(value) - - async def remove(self, key: str): - try: - path = self.compute_path(key) - os.remove(path) - except FileNotFoundError: - # Deleting a key that does not exist - pass - - async def read(self, key: str) -> Optional[bytes]: - try: - async with aiofiles.open(self.compute_path(key), "rb") as resp: - return await resp.read() - except FileNotFoundError: - return None - except IsADirectoryError: - return None - except NotADirectoryError: - return None - - async def commit(self): - if len(self.modified_keys) == 0 and len(self.deleted_keys) == 0: - self.clean() - return - - not_to_check = [] - count = 0 - for key, value in self.modified_keys.items(): - await self.save(key, value) - count += 1 - for key in self.deleted_keys: - await self.remove(key) - not_to_check.append(count) - count += 1 - self.clean() - self.open = False - - async def batch_get(self, keys: list[str], for_update: bool = False) -> list[Optional[bytes]]: - results: list[Optional[bytes]] = [] - for key in keys: - obj = await self.get(key) - if obj: - results.append(obj) - else: - results.append(None) - - for idx, key in enumerate(keys): - if key in self.deleted_keys: - results[idx] = None - if key in self.modified_keys: - results[idx] = self.modified_keys[key] - if key in self.visited_keys: - results[idx] = self.visited_keys[key] - - return results - - async def get(self, key: str, for_update: bool = False) -> Optional[bytes]: - if key in self.deleted_keys: - raise KeyError(f"Not found {key}") - - if key in self.modified_keys: - return self.modified_keys[key] - - if key in self.visited_keys: - return self.visited_keys[key] - - else: - obj = await self.read(key) - if obj is not None: - self.visited_keys[key] = obj - return obj - - async def set(self, key: str, value: bytes): - if key in self.deleted_keys: - self.deleted_keys.remove(key) - - if key in self.visited_keys: - del self.visited_keys[key] - - self.modified_keys[key] = value - - async def delete(self, key: str): - if key not in self.deleted_keys: - self.deleted_keys.append(key) - - if key in self.visited_keys: - del self.visited_keys[key] - - if key in self.modified_keys: - del self.modified_keys[key] - - async def delete_by_prefix(self, prefix: str) -> None: - keys = [] - for key in self.modified_keys.keys(): - if key.startswith(prefix): - keys.append(key) - for key in keys: - await self.delete(key) - - async def keys(self, match: str, count: int = DEFAULT_SCAN_LIMIT, include_start: bool = True): - prev_key = None - - get_all_keys = count == -1 - total_count = DEFAULT_BATCH_SCAN_LIMIT if get_all_keys else count - real_count = 0 - path = self.compute_path(match).replace("/__data__", "") + "/" - for str_key in glob.glob(path + "**", recursive=True): - real_key = str_key.replace("/__data__", "") - if real_key in self.deleted_keys: - continue - if os.path.isdir(str_key) or not os.path.exists(str_key): - continue - for new_key in self.modified_keys.keys(): - if ( - match in new_key - and prev_key is not None - and prev_key < new_key - and new_key < real_key - ): - yield new_key.replace(self.url, "") - - yield real_key.replace(self.url, "") - if real_count >= total_count: - break - real_count += 1 - prev_key = real_key - if prev_key is None: - for new_key in self.modified_keys.keys(): - if match in new_key: - yield new_key.replace(self.url, "") - - async def count(self, match: str) -> int: - value = 0 - async for _ in self.keys(match, count=-1): - value += 1 - return value - - -class LocalDriver(Driver): - url: str - - def __init__(self, url: str): - self.url = os.path.abspath(url.rstrip("/")) - - async def initialize(self): - if self.initialized is False and os.path.exists(self.url) is False: - os.makedirs(self.url, exist_ok=True) - self.initialized = True - - async def finalize(self): - pass - - @asynccontextmanager - async def transaction(self, read_only: bool = False) -> AsyncGenerator[Transaction, None]: - if self.url is None: - raise AttributeError("Invalid url") - yield LocalTransaction(self.url, self) diff --git a/nucliadb/src/nucliadb/common/maindb/utils.py b/nucliadb/src/nucliadb/common/maindb/utils.py index 2e928878e1..d81572ba4c 100644 --- a/nucliadb/src/nucliadb/common/maindb/utils.py +++ b/nucliadb/src/nucliadb/common/maindb/utils.py @@ -29,13 +29,6 @@ except ImportError: # pragma: no cover PG = False -try: - from nucliadb.common.maindb.local import LocalDriver - - FILES = True -except ImportError: # pragma: no cover - FILES = False - def get_driver() -> Driver: driver = get_utility(Utility.MAINDB_DRIVER) @@ -61,14 +54,6 @@ async def setup_driver() -> Driver: acquire_timeout_ms=settings.driver_pg_connection_pool_acquire_timeout_ms, ) set_utility(Utility.MAINDB_DRIVER, pg_driver) - elif settings.driver == DriverConfig.LOCAL: - if not FILES: - raise ConfigurationError("`aiofiles` python package not installed.") - if settings.driver_local_url is None: - raise ConfigurationError("No DRIVER_LOCAL_URL env var defined.") - - local_driver = LocalDriver(settings.driver_local_url) - set_utility(Utility.MAINDB_DRIVER, local_driver) else: raise ConfigurationError(f"Invalid DRIVER defined configured: {settings.driver}") diff --git a/nucliadb/src/nucliadb/health.py b/nucliadb/src/nucliadb/health.py index 64aaa147a3..455f7b8453 100644 --- a/nucliadb/src/nucliadb/health.py +++ b/nucliadb/src/nucliadb/health.py @@ -42,9 +42,8 @@ def nats_manager_healthy() -> bool: def nodes_health_check() -> bool: from nucliadb.common.cluster import manager - from nucliadb.ingest.settings import DriverConfig, settings - return len(manager.INDEX_NODES) > 0 or settings.driver == DriverConfig.LOCAL + return len(manager.INDEX_NODES) > 0 def pubsub_check() -> bool: diff --git a/nucliadb/src/nucliadb/ingest/settings.py b/nucliadb/src/nucliadb/ingest/settings.py index 822f06170b..bbda4e8dd1 100644 --- a/nucliadb/src/nucliadb/ingest/settings.py +++ b/nucliadb/src/nucliadb/ingest/settings.py @@ -26,7 +26,6 @@ class DriverConfig(Enum): PG = "pg" - LOCAL = "local" NOT_SET = "notset" # setting not provided @classmethod @@ -40,11 +39,7 @@ def _missing_(cls, value): class DriverSettings(BaseSettings): - driver: DriverConfig = Field(default=DriverConfig.NOT_SET, description="K/V storage driver") - driver_local_url: Optional[str] = Field( - default=None, - description="Local path to store data on file system. Example: /nucliadb/data/main", - ) + driver: DriverConfig = Field(default=DriverConfig.PG, description="K/V storage driver") driver_pg_url: Optional[str] = Field( default=None, description="PostgreSQL DSN. The connection string to the PG server. Example: postgres://username:password@postgres:5432/nucliadb.", # noqa diff --git a/nucliadb/src/nucliadb/search/app.py b/nucliadb/src/nucliadb/search/app.py index 3845331f19..63fb315576 100644 --- a/nucliadb/src/nucliadb/search/app.py +++ b/nucliadb/src/nucliadb/search/app.py @@ -31,7 +31,6 @@ from nucliadb.search import API_PREFIX from nucliadb.search.api.v1.router import api as api_v1 from nucliadb.search.lifecycle import lifespan -from nucliadb.search.settings import settings from nucliadb_telemetry import errors from nucliadb_telemetry.fastapi.utils import ( client_disconnect_handler, @@ -106,7 +105,7 @@ async def node_members(request: Request) -> JSONResponse: async def alive(request: Request) -> JSONResponse: - if len(manager.get_index_nodes()) == 0 and settings.driver != "local": + if len(manager.get_index_nodes()) == 0: return JSONResponse({"status": "error"}, status_code=503) else: return JSONResponse({"status": "ok"}) diff --git a/nucliadb/src/nucliadb/standalone/config.py b/nucliadb/src/nucliadb/standalone/config.py index 4a997cc61c..77857fb11e 100644 --- a/nucliadb/src/nucliadb/standalone/config.py +++ b/nucliadb/src/nucliadb/standalone/config.py @@ -44,11 +44,7 @@ def config_standalone_driver(nucliadb_args: Settings): if ingest_settings.driver == DriverConfig.NOT_SET: # no driver specified, for standalone, we force defaulting to local here - ingest_settings.driver = DriverConfig.LOCAL - - if ingest_settings.driver == DriverConfig.LOCAL and ingest_settings.driver_local_url is None: - # also provide default path for local driver when none provided - ingest_settings.driver_local_url = "./data/main" + ingest_settings.driver = DriverConfig.PG if storage_settings.file_backend == FileBackendConfig.NOT_SET: # no driver specified, for standalone, we try to automate some settings here @@ -57,11 +53,6 @@ def config_standalone_driver(nucliadb_args: Settings): if storage_settings.file_backend == FileBackendConfig.LOCAL and storage_settings.local_files is None: storage_settings.local_files = "./data/blob" - if ingest_settings.driver_local_url is not None and not os.path.isdir( - ingest_settings.driver_local_url - ): - os.makedirs(ingest_settings.driver_local_url, exist_ok=True) - # need to force inject this to env var if "DATA_PATH" not in os.environ: os.environ["DATA_PATH"] = nucliadb_args.data_path diff --git a/nucliadb/tests/fixtures.py b/nucliadb/tests/fixtures.py index 128810e93f..e6d091e755 100644 --- a/nucliadb/tests/fixtures.py +++ b/nucliadb/tests/fixtures.py @@ -21,7 +21,6 @@ import logging import os import tempfile -from typing import AsyncIterator from unittest.mock import AsyncMock, Mock, patch import psycopg @@ -29,12 +28,10 @@ from grpc import aio from httpx import AsyncClient from pytest_docker_fixtures import images -from pytest_lazy_fixtures import lazy_fixture from nucliadb.common.cluster import manager as cluster_manager from nucliadb.common.maindb.driver import Driver from nucliadb.common.maindb.exceptions import UnsetUtility -from nucliadb.common.maindb.local import LocalDriver from nucliadb.common.maindb.pg import PGDriver from nucliadb.common.maindb.utils import get_driver from nucliadb.ingest.settings import DriverConfig, DriverSettings @@ -524,31 +521,6 @@ def metrics_registry(): yield prometheus_client.registry.REGISTRY -@pytest.fixture(scope="function") -def local_maindb_settings(tmpdir): - return DriverSettings( - driver=DriverConfig.LOCAL, - driver_local_url=f"{tmpdir}/main", - ) - - -@pytest.fixture(scope="function") -async def local_maindb_driver(local_maindb_settings) -> AsyncIterator[Driver]: - path = local_maindb_settings.driver_local_url - ingest_settings.driver = DriverConfig.LOCAL - ingest_settings.driver_local_url = path - - driver: Driver = LocalDriver(url=path) - await driver.initialize() - - yield driver - - await driver.finalize() - - ingest_settings.driver_local_url = None - clean_utility(Utility.MAINDB_DRIVER) - - @pytest.fixture(scope="function") async def pg_maindb_settings(pg): url = f"postgresql://postgres:postgres@{pg[0]}:{pg[1]}/postgres" @@ -596,45 +568,14 @@ async def pg_maindb_driver(pg_maindb_settings: DriverSettings): await driver.finalize() -# Coma separated list of drivers -DEFAULT_MAINDB_DRIVER = "pg" - - -def maindb_settings_lazy_fixtures(default_drivers: str = DEFAULT_MAINDB_DRIVER): - driver_types = os.environ.get("TESTING_MAINDB_DRIVERS", default_drivers) - return [lazy_fixture.lf(f"{driver_type}_maindb_settings") for driver_type in driver_types.split(",")] - - -@pytest.fixture(scope="function", params=maindb_settings_lazy_fixtures()) -def maindb_settings(request): - """ - Allows dynamically loading the driver fixtures via env vars. - - TESTING_MAINDB_DRIVERS=redis,local pytest nucliadb/tests/ - - Any test using the nucliadb fixture will be run twice, once with redis driver and once with local driver. - """ - yield request.param - - -def maindb_driver_lazy_fixtures(default_drivers: str = DEFAULT_MAINDB_DRIVER): - """ - Allows running tests using maindb_driver for each supported driver type via env vars. - - TESTING_MAINDB_DRIVERS=redis,local pytest nucliadb/tests/ingest - - Any test using the maindb_driver fixture will be run twice, once with redis_driver and once with local_driver. - """ - driver_types = os.environ.get("TESTING_MAINDB_DRIVERS", default_drivers) - return [lazy_fixture.lf(f"{driver_type}_maindb_driver") for driver_type in driver_types.split(",")] +@pytest.fixture(scope="function") +def maindb_settings(pg_maindb_settings): + yield pg_maindb_settings -@pytest.fixture( - scope="function", - params=maindb_driver_lazy_fixtures(), -) -async def maindb_driver(request): - driver = request.param +@pytest.fixture(scope="function") +async def maindb_driver(pg_maindb_driver): + driver = pg_maindb_driver set_utility(Utility.MAINDB_DRIVER, driver) yield driver diff --git a/nucliadb/tests/ndbfixtures/maindb.py b/nucliadb/tests/ndbfixtures/maindb.py index ebce5bebce..2608669fe3 100644 --- a/nucliadb/tests/ndbfixtures/maindb.py +++ b/nucliadb/tests/ndbfixtures/maindb.py @@ -18,19 +18,14 @@ # along with this program. If not, see . # import logging -import os -from pathlib import Path from typing import AsyncIterator from unittest.mock import patch import psycopg import pytest -from pytest import FixtureRequest from pytest_docker_fixtures import images -from pytest_lazy_fixtures import lazy_fixture from nucliadb.common.maindb.driver import Driver -from nucliadb.common.maindb.local import LocalDriver from nucliadb.common.maindb.pg import PGDriver from nucliadb.ingest.settings import DriverConfig, DriverSettings from nucliadb.ingest.settings import settings as ingest_settings @@ -49,41 +44,14 @@ images.settings["postgresql"]["env"]["POSTGRES_PASSWORD"] = "postgres" -def maindb_settings_lazy_fixtures(default_drivers="local"): - driver_types = os.environ.get("TESTING_MAINDB_DRIVERS", default_drivers) - return [lazy_fixture.lf(f"{driver_type}_maindb_settings") for driver_type in driver_types.split(",")] - - -@pytest.fixture(scope="function", params=maindb_settings_lazy_fixtures()) -def maindb_settings(request: FixtureRequest): - """ - Allows dynamically loading the driver fixtures via env vars. - - TESTING_MAINDB_DRIVERS=redis,local pytest nucliadb/tests/ - - Any test using the nucliadb fixture will be run twice, once with redis driver and once with local driver. - """ - yield request.param - - -def maindb_driver_lazy_fixtures(default_drivers: str = "pg"): - """ - Allows running tests using maindb_driver for each supported driver type via env vars. - - TESTING_MAINDB_DRIVERS=redis,local pytest nucliadb/tests/ingest - - Any test using the maindb_driver fixture will be run twice, once with redis_driver and once with local_driver. - """ - driver_types = os.environ.get("TESTING_MAINDB_DRIVERS", default_drivers) - return [lazy_fixture.lf(f"{driver_type}_maindb_driver") for driver_type in driver_types.split(",")] +@pytest.fixture(scope="function") +def maindb_settings(pg_maindb_settings): + yield pg_maindb_settings -@pytest.fixture( - scope="function", - params=maindb_driver_lazy_fixtures(), -) -async def maindb_driver(request: FixtureRequest) -> AsyncIterator[Driver]: - driver: Driver = request.param +@pytest.fixture(scope="function") +async def maindb_driver(pg_maindb_driver) -> AsyncIterator[Driver]: + driver: Driver = pg_maindb_driver set_utility(Utility.MAINDB_DRIVER, driver) yield driver @@ -148,27 +116,3 @@ async def pg_maindb_driver(pg_maindb_settings: DriverSettings): yield driver await driver.finalize() - - -@pytest.fixture(scope="function") -def local_maindb_settings(tmp_path: Path): - return DriverSettings( - driver=DriverConfig.LOCAL, - driver_local_url=str((tmp_path / "main").absolute()), - ) - - -@pytest.fixture(scope="function") -async def local_maindb_driver(local_maindb_settings: DriverSettings) -> AsyncIterator[Driver]: - path = local_maindb_settings.driver_local_url - assert path is not None - with ( - patch.object(ingest_settings, "driver", DriverConfig.LOCAL), - patch.object(ingest_settings, "driver_local_url", path), - ): - driver: Driver = LocalDriver(url=path) - await driver.initialize() - - yield driver - - await driver.finalize() diff --git a/nucliadb/tests/nucliadb/integration/common/maindb/test_drivers.py b/nucliadb/tests/nucliadb/integration/common/maindb/test_drivers.py index 553bf26b1a..b432c77c44 100644 --- a/nucliadb/tests/nucliadb/integration/common/maindb/test_drivers.py +++ b/nucliadb/tests/nucliadb/integration/common/maindb/test_drivers.py @@ -17,7 +17,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # -import os import psycopg_pool import pytest @@ -25,16 +24,7 @@ from nucliadb.common.maindb.driver import Driver from nucliadb.common.maindb.pg import PGDriver -TESTING_MAINDB_DRIVERS = os.environ.get("TESTING_MAINDB_DRIVERS", "pg,local").split(",") - -@pytest.mark.skip(reason="Local driver doesn't implement saving info in intermediate nodes") -@pytest.mark.skipif("local" not in TESTING_MAINDB_DRIVERS, reason="local not in TESTING_MAINDB_DRIVERS") -async def test_local_driver(local_driver): - await driver_basic(local_driver) - - -@pytest.mark.skipif("pg" not in TESTING_MAINDB_DRIVERS, reason="pg not in TESTING_MAINDB_DRIVERS") async def test_pg_driver_pool_timeout(pg): url = f"postgresql://postgres:postgres@{pg[0]}:{pg[1]}/postgres" driver = PGDriver(url, connection_pool_min_size=1, connection_pool_max_size=1) diff --git a/nucliadb/tests/nucliadb/integration/search/test_search.py b/nucliadb/tests/nucliadb/integration/search/test_search.py index e6320122a8..5694555e3f 100644 --- a/nucliadb/tests/nucliadb/integration/search/test_search.py +++ b/nucliadb/tests/nucliadb/integration/search/test_search.py @@ -19,7 +19,6 @@ # import asyncio import math -import os from datetime import datetime, timedelta from unittest import mock from unittest.mock import AsyncMock, Mock, patch @@ -51,8 +50,6 @@ ) from tests.utils import broker_resource, inject_message -TESTING_MAINDB_DRIVERS = os.environ.get("TESTING_MAINDB_DRIVERS", "pg,local").split(",") - @pytest.mark.asyncio async def test_simple_search_sc_2062( @@ -330,88 +327,6 @@ async def test_paragraph_search_with_filters( assert paragraph_results[0]["field"] == "summary" -@pytest.mark.asyncio -@pytest.mark.skipif("pg" in TESTING_MAINDB_DRIVERS, reason="PG catalog does not support with_status") -async def test_catalog_can_filter_by_processing_status( - nucliadb_reader: AsyncClient, - nucliadb_grpc: WriterStub, - knowledgebox, -): - """ - Test description: - - Creates a resource for each processing status value. - - Checks that if not specified, search returns all resources. - - Checks that search is able to filter by each value. - - Checks that we can get counts for each processing status - """ - valid_status = ["PROCESSED", "PENDING", "ERROR"] - - created = 0 - for status_name, status_value in rpb.Metadata.Status.items(): - if status_name not in valid_status: - continue - bm = broker_resource(knowledgebox) - bm.basic.metadata.status = status_value - await inject_message(nucliadb_grpc, bm) - created += 1 - - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == created - - # Two should be PROCESSED (the ERROR is counted as processed) - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "with_status": "PROCESSED", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == 2 - - # One should be PENDING - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "with_status": "PENDING", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == 1 - - # Get the list of PENDING - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "filters": ["/metadata.status/PENDING"], - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == 1 - - # Check facets by processing status - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "faceted": ["/metadata.status"], - }, - ) - assert resp.status_code == 200 - resp_json = resp.json() - facets = resp_json["fulltext"]["facets"] - for status in valid_status: - assert facets["/metadata.status"][f"/metadata.status/{status}"] == 1 - - @pytest.mark.skip(reason="Needs sc-5626") @pytest.mark.asyncio async def test_( @@ -651,68 +566,6 @@ async def test_search_relations( assert expected_relation in entities[entity]["related_to"] -@pytest.mark.asyncio -@pytest.mark.skipif("pg" in TESTING_MAINDB_DRIVERS, reason="PG catalog does not support with_status") -async def test_processing_status_doesnt_change_on_search_after_processed( - nucliadb_reader: AsyncClient, - nucliadb_writer: AsyncClient, - nucliadb_grpc: WriterStub, - knowledgebox, -): - # Inject a resource with status=PROCESSED - bm = broker_resource(knowledgebox) - bm.basic.metadata.status = rpb.Metadata.Status.PROCESSED - await inject_message(nucliadb_grpc, bm) - - # Check that search for resource list shows it - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "with_status": "PROCESSED", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == 1 - - # Edit the resource so that status=PENDING - assert ( - await nucliadb_writer.patch( - f"/kb/{knowledgebox}/resource/{bm.uuid}", - json={ - "title": "My new title", - }, - ) - ).status_code == 200 - - # Wait a bit until for the node to index it - await asyncio.sleep(1) - - # Check that search for resource list still shows it - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "with_status": "PROCESSED", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["resources"]) == 1 - - # Check that facets count it as PENDING though - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "faceted": ["/metadata.status"], - }, - ) - assert resp.status_code == 200 - resp_json = resp.json() - facets = resp_json["fulltext"]["facets"] - assert facets["/metadata.status"] == {"/metadata.status/PENDING": 1} - - @pytest.mark.asyncio async def test_search_automatic_relations( nucliadb_reader: AsyncClient, nucliadb_writer: AsyncClient, knowledgebox @@ -1483,32 +1336,6 @@ async def test_search_handles_limits_exceeded_error( assert resp.json() == {"detail": "over the quota"} -@pytest.mark.asyncio -@pytest.mark.skipif("pg" in TESTING_MAINDB_DRIVERS, reason="PG catalog cannot do shards") -async def test_catalog_returns_shard_and_node_data( - nucliadb_reader: AsyncClient, - knowledgebox, -): - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["shards"]) > 0 - - resp = await nucliadb_reader.get( - f"/kb/{knowledgebox}/catalog", - params={ - "query": "", - "debug": "true", - }, - ) - assert resp.status_code == 200 - assert len(resp.json()["nodes"]) > 0 - - @pytest.mark.asyncio async def test_catalog_post( nucliadb_reader: AsyncClient, diff --git a/nucliadb/tests/nucliadb/unit/common/maindb/test_utils.py b/nucliadb/tests/nucliadb/unit/common/maindb/test_utils.py index e46d192e8b..9f814122a5 100644 --- a/nucliadb/tests/nucliadb/unit/common/maindb/test_utils.py +++ b/nucliadb/tests/nucliadb/unit/common/maindb/test_utils.py @@ -46,18 +46,6 @@ async def test_setup_driver_pg(): mock.initialize.assert_awaited_once() -@pytest.mark.asyncio -async def test_setup_driver_local(): - mock = AsyncMock(initialized=False) - with ( - patch.object(settings, "driver", DriverConfig("local")), - patch.object(settings, "driver_local_url", "driver_local_url"), - patch("nucliadb.common.maindb.utils.LocalDriver", return_value=mock), - ): - assert await setup_driver() == mock - mock.initialize.assert_awaited_once() - - @pytest.mark.asyncio async def test_setup_driver_error(): with ( diff --git a/nucliadb/tests/search/node.py b/nucliadb/tests/search/node.py index f783827e59..5e0fcb4ffd 100644 --- a/nucliadb/tests/search/node.py +++ b/nucliadb/tests/search/node.py @@ -241,15 +241,16 @@ def __init__(self, natsd, storage: NodeStorage): def start(self): docker_platform_name = self.docker_client.api.version()["Platform"]["Name"].upper() - if "GITHUB_ACTION" not in os.environ and ( - "DESKTOP" in docker_platform_name - # newer versions use community - or "DOCKER ENGINE - COMMUNITY" == docker_platform_name - ): + if "GITHUB_ACTION" in os.environ: + # Valid when using github actions + docker_internal_host = "172.17.0.1" + elif docker_platform_name == "DOCKER ENGINE - COMMUNITY": + # for linux users + docker_internal_host = "172.17.0.1" + elif "DOCKER DESKTOP" in docker_platform_name: # Valid when using Docker desktop docker_internal_host = "host.docker.internal" else: - # Valid when using github actions docker_internal_host = "172.17.0.1" self.volume_node_1 = self.docker_client.volumes.create(driver="local") diff --git a/nucliadb/tests/writer/test_resources.py b/nucliadb/tests/writer/test_resources.py index f1ce038037..7494e41f63 100644 --- a/nucliadb/tests/writer/test_resources.py +++ b/nucliadb/tests/writer/test_resources.py @@ -26,7 +26,6 @@ import nucliadb_models from nucliadb.common import datamanagers -from nucliadb.common.maindb.local import LocalDriver from nucliadb.ingest.orm.resource import Resource from nucliadb.ingest.processing import PushPayload from nucliadb.writer.api.v1.router import ( @@ -233,9 +232,6 @@ async def test_reprocess_resource( mocker, maindb_driver, ) -> None: - if isinstance(maindb_driver, (LocalDriver)): - pytest.skip("Keys might not be ordered correctly in this driver") - rsc = test_resource kbid = rsc.kb.kbid rid = rsc.uuid diff --git a/nucliadb_dataset/tests/conftest.py b/nucliadb_dataset/tests/conftest.py index 2a75c404e4..0cf54c2c3d 100644 --- a/nucliadb_dataset/tests/conftest.py +++ b/nucliadb_dataset/tests/conftest.py @@ -17,4 +17,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # -pytest_plugins = ["nucliadb_sdk.tests.fixtures", "nucliadb_dataset.tests.fixtures"] +pytest_plugins = [ + "pytest_docker_fixtures", + "nucliadb_sdk.tests.fixtures", + "nucliadb_dataset.tests.fixtures", +] diff --git a/nucliadb_sdk/src/nucliadb_sdk/tests/fixtures.py b/nucliadb_sdk/src/nucliadb_sdk/tests/fixtures.py index 679c4aae96..7cd1fa8746 100644 --- a/nucliadb_sdk/src/nucliadb_sdk/tests/fixtures.py +++ b/nucliadb_sdk/src/nucliadb_sdk/tests/fixtures.py @@ -27,6 +27,7 @@ from typing import Optional from uuid import uuid4 +import docker # type: ignore import pytest import requests from pytest_docker_fixtures import images # type: ignore @@ -34,11 +35,14 @@ import nucliadb_sdk +images.settings["postgresql"]["version"] = "11" +images.settings["postgresql"]["env"]["POSTGRES_PASSWORD"] = "postgres" +images.settings["postgresql"]["env"]["POSTGRES_DB"] = "postgres" + images.settings["nucliadb"] = { "image": "nuclia/nucliadb", "version": "latest", "env": { - "DRIVER": "local", "NUCLIADB_DISABLE_ANALYTICS": "True", "dummy_predict": "True", "dummy_processing": "True", @@ -80,8 +84,35 @@ class NucliaFixture: container: Optional[NucliaDB] = None +def get_docker_internal_host(): + """ + This is needed for the case when we are starting a nucliadb container for testing, + it needs to know the docker internal host to connect to pg container that is on the same network. + """ + docker_client = docker.from_env(version=BaseImage.docker_version) + docker_platform_name = docker_client.api.version()["Platform"]["Name"].upper() + if "GITHUB_ACTION" in os.environ: + # Valid when using github actions + docker_internal_host = "172.17.0.1" + elif docker_platform_name == "DOCKER ENGINE - COMMUNITY": + # for linux users + docker_internal_host = "172.17.0.1" + elif "DOCKER DESKTOP" in docker_platform_name: + # Valid when using Docker desktop + docker_internal_host = "host.docker.internal" + else: + docker_internal_host = "172.17.0.1" + return docker_internal_host + + @pytest.fixture(scope="session") -def nucliadb(): +def nucliadb(pg): + pg_host, pg_port = pg + # Setup the connection to pg for the maindb driver + url = f"postgresql://postgres:postgres@{pg_host}:{pg_port}/postgres" + images.settings["nucliadb"]["env"]["DRIVER"] = "PG" + images.settings["nucliadb"]["env"]["DRIVER_PG_URL"] = url + if os.environ.get("TEST_LOCAL_NUCLIADB"): host = os.environ.get("TEST_LOCAL_NUCLIADB") child = None @@ -115,14 +146,16 @@ def nucliadb(): if child: child.kill() else: + # We need to replace the localhost with the internal docker host to allow container-to-container communication + images.settings["nucliadb"]["env"]["DRIVER_PG_URL"] = images.settings["nucliadb"]["env"][ + "DRIVER_PG_URL" + ].replace("localhost", get_docker_internal_host()) + print("NucliaDB running on", images.settings["nucliadb"]["env"]["DRIVER_PG_URL"]) container = NucliaDB() host, port = container.run() network = container.container_obj.attrs["NetworkSettings"] - if os.environ.get("TESTING", "") == "jenkins": - grpc = 8060 - else: - service_port = "8060/tcp" - grpc = network["Ports"][service_port][0]["HostPort"] + service_port = "8060/tcp" + grpc = network["Ports"][service_port][0]["HostPort"] yield NucliaFixture( host=host, port=port, diff --git a/nucliadb_sdk/tests/conftest.py b/nucliadb_sdk/tests/conftest.py index 9be5041653..61587db9d3 100644 --- a/nucliadb_sdk/tests/conftest.py +++ b/nucliadb_sdk/tests/conftest.py @@ -18,5 +18,6 @@ # along with this program. If not, see . # pytest_plugins = [ + "pytest_docker_fixtures", "nucliadb_sdk.tests.fixtures", ]