diff --git a/.github/workflows/nucliadb_search.yml b/.github/workflows/nucliadb_search.yml index 211745e58a..e375e649fb 100644 --- a/.github/workflows/nucliadb_search.yml +++ b/.github/workflows/nucliadb_search.yml @@ -77,15 +77,15 @@ jobs: max-parallel: 2 matrix: include: - - maindb_driver: "tikv" - storage_backend: "gcs" - python-version: "3.11" + # - maindb_driver: "tikv" + # storage_backend: "gcs" + # python-version: "3.11" - maindb_driver: "tikv" storage_backend: "s3" python-version: "3.11" - - maindb_driver: "pg" - storage_backend: "gcs" - python-version: "3.11" + # - maindb_driver: "pg" + # storage_backend: "gcs" + # python-version: "3.11" steps: - name: Checkout the repository diff --git a/nucliadb/nucliadb/common/http_clients/processing.py b/nucliadb/nucliadb/common/http_clients/processing.py index 49ae8cbd71..75ff46ea9a 100644 --- a/nucliadb/nucliadb/common/http_clients/processing.py +++ b/nucliadb/nucliadb/common/http_clients/processing.py @@ -179,12 +179,12 @@ class StatusResultV2(pydantic.BaseModel): description="Timestamp of when the resource was first scheduled.", ) completed_at: Optional[datetime] = pydantic.Field( - ..., + None, title="Completed At", description="Timestamp of when the resource was completed", ) scheduled_at: Optional[datetime] = pydantic.Field( - ..., + None, title="Scheduled At", description="Timestamp of when the resource was first scheduled.", ) diff --git a/nucliadb/nucliadb/reader/api/v1/services.py b/nucliadb/nucliadb/reader/api/v1/services.py index 670e43fdac..39833107a0 100644 --- a/nucliadb/nucliadb/reader/api/v1/services.py +++ b/nucliadb/nucliadb/reader/api/v1/services.py @@ -17,7 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # - +import asyncio from typing import Optional, Union from fastapi import HTTPException @@ -49,7 +49,10 @@ from nucliadb.common.context.fastapi import get_app_context from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager from nucliadb.common.http_clients import processing +from nucliadb.common.maindb.utils import get_driver +from nucliadb.ingest.orm.knowledgebox import KnowledgeBox from nucliadb.models.responses import HTTPClientError +from nucliadb.reader import SERVICE_NAME from nucliadb.reader.api.v1.router import KB_PREFIX, api from nucliadb.reader.reader.notifications import kb_notifications_stream from nucliadb_models.configuration import KBConfiguration @@ -59,7 +62,7 @@ from nucliadb_models.synonyms import KnowledgeBoxSynonyms from nucliadb_models.vectors import VectorSet, VectorSets from nucliadb_utils.authentication import requires -from nucliadb_utils.utilities import get_ingest +from nucliadb_utils.utilities import get_ingest, get_storage @api.get( @@ -377,6 +380,30 @@ async def processing_status( return HTTPClientError(status_code=404, detail="Knowledge Box not found") async with processing.ProcessingV2HTTPClient() as client: - return await client.status( + results = await client.status( cursor=cursor, scheduled=scheduled, kbid=kbid, limit=limit ) + + storage = await get_storage(service_name=SERVICE_NAME) + driver = get_driver() + + async with driver.transaction(wait_for_abort=False, read_only=True) as txn: + kb = KnowledgeBox(txn, storage, kbid) + + max_simultaneous = asyncio.Semaphore(10) + + async def _composition(result: processing.StatusResultV2) -> None: + async with max_simultaneous: + resource = await kb.get(result.resource_id) + if resource is None: + return + + basic = await resource.get_basic() + if basic is None: + return + + result.title = basic.title + + await asyncio.gather(*[_composition(result) for result in results.results]) + + return results diff --git a/nucliadb/nucliadb/reader/tests/fixtures.py b/nucliadb/nucliadb/reader/tests/fixtures.py index 46f7c12d2a..0f3537792f 100644 --- a/nucliadb/nucliadb/reader/tests/fixtures.py +++ b/nucliadb/nucliadb/reader/tests/fixtures.py @@ -100,17 +100,16 @@ def broker_simple_resource(knowledgebox: str, number: int) -> BrokerMessage: @pytest.fixture(scope="function") -async def test_pagination_resources( - processor, knowledgebox_ingest, test_settings_reader -): +async def test_resources(processor, knowledgebox_ingest, test_settings_reader): """ Create a set of resources with only basic information to test pagination """ - + resources = [] amount = 10 for i in range(1, 10 + 1): message = broker_simple_resource(knowledgebox_ingest, i) await processor.process(message=message, seqid=i) + resources.append(message.uuid) # Give processed data some time to reach the node from time import time @@ -134,4 +133,4 @@ async def test_pagination_resources( break print(f"got {count}, retrying") - yield knowledgebox_ingest + yield knowledgebox_ingest, resources diff --git a/nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py b/nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py index c5880c2758..524e1a7822 100644 --- a/nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py +++ b/nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py @@ -19,12 +19,15 @@ # import asyncio from collections.abc import AsyncGenerator +from datetime import datetime from typing import Callable from unittest import mock +from unittest.mock import AsyncMock, MagicMock, patch import pytest from httpx import AsyncClient +from nucliadb.common.http_clients import processing from nucliadb.reader.api.v1.router import KB_PREFIX from nucliadb_models.notifications import ( Notification, @@ -136,3 +139,40 @@ async def test_activity_kb_not_found( async with reader_api(roles=[NucliaDBRoles.READER]) as client: resp = await client.get(f"/{KB_PREFIX}/foobar/notifications") assert resp.status_code == 404 + + +async def test_processing_status( + reader_api, + test_resources: tuple[str, list[str]], +): + kbid, resources = test_resources + + processing_client = MagicMock() + processing_client.__aenter__ = AsyncMock(return_value=processing_client) + processing_client.__aexit__ = AsyncMock(return_value=None) + processing_client.status = AsyncMock( + return_value=processing.StatusResultsV2( # type: ignore + results=[ + processing.StatusResultV2( # type: ignore + processing_id="processing_id", + resource_id=resource_id, + kbid=kbid, + completed=False, + scheduled=False, + timestamp=datetime.now(), + ) + for resource_id in resources + ] + ) + ) + with patch( + "nucliadb.reader.api.v1.services.processing.ProcessingV2HTTPClient", + return_value=processing_client, + ): + async with reader_api(roles=[NucliaDBRoles.READER]) as client: + resp = await client.get(f"/{KB_PREFIX}/{kbid}/processing-status") + assert resp.status_code == 200 + + data = processing.StatusResultsV2.parse_obj(resp.json()) + + assert all([result.title is not None for result in data.results]) diff --git a/nucliadb/nucliadb/reader/tests/test_list_resources.py b/nucliadb/nucliadb/reader/tests/test_list_resources.py index c4a7f149c0..8b71b37053 100644 --- a/nucliadb/nucliadb/reader/tests/test_list_resources.py +++ b/nucliadb/nucliadb/reader/tests/test_list_resources.py @@ -46,13 +46,13 @@ ) async def test_list_resources( reader_api: Callable[..., AsyncClient], - test_pagination_resources: str, + test_resources: tuple[str, list[str]], page: Optional[int], size: Optional[int], expected_resources_count: int, expected_is_last_page: bool, ) -> None: - kbid = test_pagination_resources + kbid = test_resources[0] query_params = {} if page is not None: diff --git a/nucliadb/nucliadb/writer/api/v1/field.py b/nucliadb/nucliadb/writer/api/v1/field.py index e07fb60da6..54285e418f 100644 --- a/nucliadb/nucliadb/writer/api/v1/field.py +++ b/nucliadb/nucliadb/writer/api/v1/field.py @@ -113,7 +113,6 @@ async def finish_field_put( transaction = get_transaction_utility() processing = get_processing() - await _add_basic_data_to_processing_payload(toprocess) processing_info = await processing.send_to_process(toprocess, partition) writer.source = BrokerMessage.MessageSource.WRITER @@ -704,7 +703,6 @@ async def _append_messages_to_conversation_field( field.messages.extend(messages) await parse_conversation_field(field_id, field, writer, toprocess, kbid, rid) - await _add_basic_data_to_processing_payload(toprocess) try: processing_info = await processing.send_to_process(toprocess, partition) @@ -800,7 +798,6 @@ async def _append_blocks_to_layout_field( field = models.InputLayoutField(body=models.InputLayoutContent()) field.body.blocks.update(blocks) await parse_layout_field(field_id, field, writer, toprocess, kbid, rid) - await _add_basic_data_to_processing_payload(toprocess) try: processing_info = await processing.send_to_process(toprocess, partition) @@ -972,24 +969,3 @@ async def reprocess_file_field( await transaction.commit(writer, partition, wait=False) return ResourceUpdated(seqid=processing_info.seqid) - - -async def _add_basic_data_to_processing_payload(toprocess: PushPayload): - """ - Add basic data to processing payload like title and anything - else that is on the resource level. - """ - storage = await get_storage(service_name=SERVICE_NAME) - driver = get_driver() - async with driver.transaction() as txn: - kb = KnowledgeBox(txn, storage, toprocess.kbid) - - resource = await kb.get(toprocess.uuid) - if resource is None: - return - - basic = await resource.get_basic() - if basic is None: - return - - toprocess.title = basic.title