Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compose title for processing status results #1769

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/nucliadb_search.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ jobs:
max-parallel: 2
matrix:
include:
- maindb_driver: "tikv"
storage_backend: "gcs"
python-version: "3.11"
# - maindb_driver: "tikv"
# storage_backend: "gcs"
# python-version: "3.11"
- maindb_driver: "tikv"
storage_backend: "s3"
python-version: "3.11"
- maindb_driver: "pg"
storage_backend: "gcs"
python-version: "3.11"
# - maindb_driver: "pg"
# storage_backend: "gcs"
# python-version: "3.11"

steps:
- name: Checkout the repository
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/nucliadb/common/http_clients/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ class StatusResultV2(pydantic.BaseModel):
description="Timestamp of when the resource was first scheduled.",
)
completed_at: Optional[datetime] = pydantic.Field(
...,
None,
title="Completed At",
description="Timestamp of when the resource was completed",
)
scheduled_at: Optional[datetime] = pydantic.Field(
...,
None,
title="Scheduled At",
description="Timestamp of when the resource was first scheduled.",
)
Expand Down
33 changes: 30 additions & 3 deletions nucliadb/nucliadb/reader/api/v1/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import asyncio
from typing import Optional, Union

from fastapi import HTTPException
Expand Down Expand Up @@ -49,7 +49,10 @@
from nucliadb.common.context.fastapi import get_app_context
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.common.http_clients import processing
from nucliadb.common.maindb.utils import get_driver
from nucliadb.ingest.orm.knowledgebox import KnowledgeBox
from nucliadb.models.responses import HTTPClientError
from nucliadb.reader import SERVICE_NAME
from nucliadb.reader.api.v1.router import KB_PREFIX, api
from nucliadb.reader.reader.notifications import kb_notifications_stream
from nucliadb_models.configuration import KBConfiguration
Expand All @@ -59,7 +62,7 @@
from nucliadb_models.synonyms import KnowledgeBoxSynonyms
from nucliadb_models.vectors import VectorSet, VectorSets
from nucliadb_utils.authentication import requires
from nucliadb_utils.utilities import get_ingest
from nucliadb_utils.utilities import get_ingest, get_storage


@api.get(
Expand Down Expand Up @@ -377,6 +380,30 @@ async def processing_status(
return HTTPClientError(status_code=404, detail="Knowledge Box not found")

async with processing.ProcessingV2HTTPClient() as client:
return await client.status(
results = await client.status(
cursor=cursor, scheduled=scheduled, kbid=kbid, limit=limit
)

storage = await get_storage(service_name=SERVICE_NAME)
driver = get_driver()

async with driver.transaction(wait_for_abort=False, read_only=True) as txn:
kb = KnowledgeBox(txn, storage, kbid)

max_simultaneous = asyncio.Semaphore(10)

async def _composition(result: processing.StatusResultV2) -> None:
async with max_simultaneous:
resource = await kb.get(result.resource_id)
if resource is None:
return

basic = await resource.get_basic()
if basic is None:
return

result.title = basic.title

await asyncio.gather(*[_composition(result) for result in results.results])

return results
9 changes: 4 additions & 5 deletions nucliadb/nucliadb/reader/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,16 @@ def broker_simple_resource(knowledgebox: str, number: int) -> BrokerMessage:


@pytest.fixture(scope="function")
async def test_pagination_resources(
processor, knowledgebox_ingest, test_settings_reader
):
async def test_resources(processor, knowledgebox_ingest, test_settings_reader):
"""
Create a set of resources with only basic information to test pagination
"""

resources = []
amount = 10
for i in range(1, 10 + 1):
message = broker_simple_resource(knowledgebox_ingest, i)
await processor.process(message=message, seqid=i)
resources.append(message.uuid)
# Give processed data some time to reach the node

from time import time
Expand All @@ -134,4 +133,4 @@ async def test_pagination_resources(
break
print(f"got {count}, retrying")

yield knowledgebox_ingest
yield knowledgebox_ingest, resources
40 changes: 40 additions & 0 deletions nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
#
import asyncio
from collections.abc import AsyncGenerator
from datetime import datetime
from typing import Callable
from unittest import mock
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from httpx import AsyncClient

from nucliadb.common.http_clients import processing
from nucliadb.reader.api.v1.router import KB_PREFIX
from nucliadb_models.notifications import (
Notification,
Expand Down Expand Up @@ -136,3 +139,40 @@ async def test_activity_kb_not_found(
async with reader_api(roles=[NucliaDBRoles.READER]) as client:
resp = await client.get(f"/{KB_PREFIX}/foobar/notifications")
assert resp.status_code == 404


async def test_processing_status(
reader_api,
test_resources: tuple[str, list[str]],
):
kbid, resources = test_resources

processing_client = MagicMock()
processing_client.__aenter__ = AsyncMock(return_value=processing_client)
processing_client.__aexit__ = AsyncMock(return_value=None)
processing_client.status = AsyncMock(
return_value=processing.StatusResultsV2( # type: ignore
results=[
processing.StatusResultV2( # type: ignore
processing_id="processing_id",
resource_id=resource_id,
kbid=kbid,
completed=False,
scheduled=False,
timestamp=datetime.now(),
)
for resource_id in resources
]
)
)
with patch(
"nucliadb.reader.api.v1.services.processing.ProcessingV2HTTPClient",
return_value=processing_client,
):
async with reader_api(roles=[NucliaDBRoles.READER]) as client:
resp = await client.get(f"/{KB_PREFIX}/{kbid}/processing-status")
assert resp.status_code == 200

data = processing.StatusResultsV2.parse_obj(resp.json())

assert all([result.title is not None for result in data.results])
4 changes: 2 additions & 2 deletions nucliadb/nucliadb/reader/tests/test_list_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
)
async def test_list_resources(
reader_api: Callable[..., AsyncClient],
test_pagination_resources: str,
test_resources: tuple[str, list[str]],
page: Optional[int],
size: Optional[int],
expected_resources_count: int,
expected_is_last_page: bool,
) -> None:
kbid = test_pagination_resources
kbid = test_resources[0]

query_params = {}
if page is not None:
Expand Down
24 changes: 0 additions & 24 deletions nucliadb/nucliadb/writer/api/v1/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ async def finish_field_put(
transaction = get_transaction_utility()
processing = get_processing()

await _add_basic_data_to_processing_payload(toprocess)
processing_info = await processing.send_to_process(toprocess, partition)

writer.source = BrokerMessage.MessageSource.WRITER
Expand Down Expand Up @@ -704,7 +703,6 @@ async def _append_messages_to_conversation_field(
field.messages.extend(messages)

await parse_conversation_field(field_id, field, writer, toprocess, kbid, rid)
await _add_basic_data_to_processing_payload(toprocess)

try:
processing_info = await processing.send_to_process(toprocess, partition)
Expand Down Expand Up @@ -800,7 +798,6 @@ async def _append_blocks_to_layout_field(
field = models.InputLayoutField(body=models.InputLayoutContent())
field.body.blocks.update(blocks)
await parse_layout_field(field_id, field, writer, toprocess, kbid, rid)
await _add_basic_data_to_processing_payload(toprocess)

try:
processing_info = await processing.send_to_process(toprocess, partition)
Expand Down Expand Up @@ -972,24 +969,3 @@ async def reprocess_file_field(
await transaction.commit(writer, partition, wait=False)

return ResourceUpdated(seqid=processing_info.seqid)


async def _add_basic_data_to_processing_payload(toprocess: PushPayload):
"""
Add basic data to processing payload like title and anything
else that is on the resource level.
"""
storage = await get_storage(service_name=SERVICE_NAME)
driver = get_driver()
async with driver.transaction() as txn:
kb = KnowledgeBox(txn, storage, toprocess.kbid)

resource = await kb.get(toprocess.uuid)
if resource is None:
return

basic = await resource.get_basic()
if basic is None:
return

toprocess.title = basic.title
Loading