diff --git a/nucliadb/nucliadb/common/http_clients/processing.py b/nucliadb/nucliadb/common/http_clients/processing.py index 31fe123e85..409d5f3e7c 100644 --- a/nucliadb/nucliadb/common/http_clients/processing.py +++ b/nucliadb/nucliadb/common/http_clients/processing.py @@ -18,6 +18,7 @@ # along with this program. If not, see . # import logging +from datetime import datetime from typing import Any, Optional import aiohttp @@ -41,6 +42,19 @@ def get_processing_api_url() -> str: return nuclia_settings.nuclia_cluster_url + "/api/internal/processing" +def get_processing_api_url_v2() -> str: + if nuclia_settings.nuclia_service_account: + return ( + nuclia_settings.nuclia_public_url.format(zone=nuclia_settings.nuclia_zone) + + "/api/v2/processing" + ) + else: + return ( + nuclia_settings.nuclia_processing_cluster_url + + "/api/internal/v2/processing" + ) + + def check_status(resp: aiohttp.ClientResponse, resp_text: str) -> None: if resp.status < 300: return @@ -125,3 +139,121 @@ async def pull(self, partition: str) -> PullResponse: check_proxy_telemetry_headers(resp) check_status(resp, resp_text) return PullResponse.parse_raw(resp_text) + + +class StatusResultV2(pydantic.BaseModel): + processing_id: str = pydantic.Field( + ..., + title="Processing ID", + description="Processing ID of the resource.", + ) + resource_id: str = pydantic.Field( + ..., + title="Resource ID", + description="Resource ID.", + ) + kbid: str = pydantic.Field(..., title="KnowledgeBox ID") + title: Optional[str] = pydantic.Field( + None, + title="Title", + description="Title of the resource.", + ) + labels: list[str] = pydantic.Field( + [], + title="Labels", + description="Labels of the resource.", + ) + completed: bool = pydantic.Field( + ..., + title="Completed", + description="Whether the resource has been completed", + ) + scheduled: bool = pydantic.Field( + ..., + title="Scheduled", + description="Whether the resource has been scheduled", + ) + timestamp: datetime = pydantic.Field( + ..., + title="Timestamp", + description="Timestamp of when the resource was first scheduled.", + ) + completed_at: Optional[datetime] = pydantic.Field( + ..., + title="Completed At", + description="Timestamp of when the resource was completed", + ) + scheduled_at: Optional[datetime] = pydantic.Field( + ..., + title="Scheduled At", + description="Timestamp of when the resource was first scheduled.", + ) + failed: bool = pydantic.Field( + False, + title="Failed", + description="Whether the resource has failed to process", + ) + retries: int = pydantic.Field( + 0, + title="Retries", + description="Number of retries for the resource.", + ) + schedule_eta: float = pydantic.Field( + 0.0, + title="Schedule ETA", + description="Estimated time until the resource is scheduled.", + ) + + +class StatusResultsV2(pydantic.BaseModel): + results: list[StatusResultV2] = pydantic.Field( + [], + title="Results", + description="List of results.", + ) + cursor: Optional[str] = pydantic.Field( + None, + title="Cursor", + description="Cursor to use for the next page of results.", + ) + + +class ProcessingV2HTTPClient: + def __init__(self): + self.session = aiohttp.ClientSession() + self.base_url = get_processing_api_url_v2() + self.headers = {} + if nuclia_settings.nuclia_service_account is not None: + self.headers[ + "X-STF-NUAKEY" + ] = f"Bearer {nuclia_settings.nuclia_service_account}" + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + await self.close() + + async def close(self): + await self.session.close() + + async def status( + self, + cursor: Optional[str] = None, + scheduled: Optional[bool] = None, + kbid: Optional[str] = None, + limit: int = 20, + ) -> StatusResultsV2: + url = self.base_url + "/status" + params: dict[str, str] = {"limit": str(limit)} + if cursor is not None: + params["cursor"] = cursor + if scheduled is not None: + params["scheduled"] = str(scheduled) + if kbid is not None: + params["kbid"] = kbid + + async with self.session.get(url, headers=self.headers, params=params) as resp: + resp_text = await resp.text() + check_status(resp, resp_text) + return StatusResultsV2.parse_raw(resp_text) diff --git a/nucliadb/nucliadb/ingest/consumer/pull.py b/nucliadb/nucliadb/ingest/consumer/pull.py index c717ae5d28..017a1cfe76 100644 --- a/nucliadb/nucliadb/ingest/consumer/pull.py +++ b/nucliadb/nucliadb/ingest/consumer/pull.py @@ -24,7 +24,6 @@ import nats import nats.errors from aiohttp.client_exceptions import ClientConnectorError -from nats.aio.subscription import Subscription from nucliadb_protos.writer_pb2 import BrokerMessage, BrokerMessageBlobReference from nucliadb.common.http_clients.processing import ProcessingHTTPClient @@ -48,8 +47,6 @@ class PullWorker: The processing pull endpoint is also described as the "processing proxy" at times. """ - subscriptions: list[Subscription] - def __init__( self, driver: Driver, diff --git a/nucliadb/nucliadb/ingest/processing.py b/nucliadb/nucliadb/ingest/processing.py index 33b891e1ed..326846658e 100644 --- a/nucliadb/nucliadb/ingest/processing.py +++ b/nucliadb/nucliadb/ingest/processing.py @@ -19,6 +19,7 @@ # import base64 import datetime +import logging import uuid from collections import defaultdict from contextlib import AsyncExitStack @@ -39,11 +40,13 @@ from nucliadb_models.configuration import KBConfiguration from nucliadb_models.resource import QueueType from nucliadb_telemetry import metrics -from nucliadb_utils import logger +from nucliadb_utils import const from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError from nucliadb_utils.settings import nuclia_settings, storage_settings from nucliadb_utils.storages.storage import Storage -from nucliadb_utils.utilities import Utility, get_ingest, set_utility +from nucliadb_utils.utilities import Utility, get_ingest, has_feature, set_utility + +logger = logging.getLogger(__name__) _T = TypeVar("_T") @@ -72,9 +75,9 @@ class Source(SourceValue, Enum): # type: ignore class ProcessingInfo(BaseModel): - seqid: int + seqid: Optional[int] account_seq: Optional[int] - queue: QueueType + queue: Optional[QueueType] = None class PushPayload(BaseModel): @@ -85,6 +88,8 @@ class PushPayload(BaseModel): source: Optional[Source] = None userid: str + title: Optional[str] = None + genericfield: dict[str, models.Text] = {} # New File @@ -127,6 +132,7 @@ async def start_processing_engine(): onprem=nuclia_settings.onprem, nuclia_jwt_key=nuclia_settings.nuclia_jwt_key, nuclia_cluster_url=nuclia_settings.nuclia_cluster_url, + nuclia_processing_cluster_url=nuclia_settings.nuclia_processing_cluster_url, nuclia_public_url=nuclia_settings.nuclia_public_url, driver=storage_settings.file_backend, days_to_keep=storage_settings.upload_token_expiration, @@ -142,6 +148,7 @@ def __init__( nuclia_zone: Optional[str] = None, nuclia_public_url: Optional[str] = None, nuclia_cluster_url: Optional[str] = None, + nuclia_processing_cluster_url: Optional[str] = None, onprem: Optional[bool] = False, nuclia_jwt_key: Optional[str] = None, days_to_keep: int = 3, @@ -173,7 +180,13 @@ def __init__( self.nuclia_internal_push = ( f"{self.nuclia_cluster_url}/api/internal/processing/push" ) + self.nuclia_internal_push_v2 = ( + f"{nuclia_processing_cluster_url}/api/internal/v2/processing/push" + ) self.nuclia_external_push = f"{self.nuclia_public_url}/api/v1/processing/push" + self.nuclia_external_push_v2 = ( + f"{self.nuclia_public_url}/api/v2/processing/push" + ) self.nuclia_jwt_key = nuclia_jwt_key self.days_to_keep = days_to_keep @@ -389,22 +402,34 @@ async def send_to_process( headers = {"CONTENT-TYPE": "application/json"} if self.onprem is False: # Upload the payload + url = self.nuclia_internal_push + if has_feature( + const.Features.PROCESSING_V2, + context={ + "kbid": item.kbid, + }, + ): + url = self.nuclia_internal_push_v2 item.partition = partition resp = await self.session.post( - url=f"{self.nuclia_internal_push}", - data=item.json(), - headers=headers, + url=url, data=item.json(), headers=headers ) else: + url = self.nuclia_external_push + "?partition=" + str(partition) + if has_feature( + const.Features.PROCESSING_V2, + context={ + "kbid": item.kbid, + }, + ): + url = self.nuclia_external_push_v2 item.learning_config = await self.get_configuration(item.kbid) headers.update( {"X-STF-NUAKEY": f"Bearer {self.nuclia_service_account}"} ) # Upload the payload resp = await self.session.post( - url=self.nuclia_external_push + "?partition=" + str(partition), - data=item.json(), - headers=headers, + url=url, data=item.json(), headers=headers ) if resp.status == 200: data = await resp.json() @@ -429,7 +454,9 @@ async def send_to_process( ) return ProcessingInfo( - seqid=seqid, account_seq=account_seq, queue=QueueType(queue_type) + seqid=seqid, + account_seq=account_seq, + queue=QueueType(queue_type) if queue_type is not None else None, ) diff --git a/nucliadb/nucliadb/reader/api/v1/services.py b/nucliadb/nucliadb/reader/api/v1/services.py index 0bbcfd8f4e..670e43fdac 100644 --- a/nucliadb/nucliadb/reader/api/v1/services.py +++ b/nucliadb/nucliadb/reader/api/v1/services.py @@ -18,7 +18,7 @@ # along with this program. If not, see . # -from typing import Union +from typing import Optional, Union from fastapi import HTTPException from fastapi.responses import StreamingResponse @@ -48,6 +48,7 @@ from nucliadb.common.context import ApplicationContext from nucliadb.common.context.fastapi import get_app_context from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager +from nucliadb.common.http_clients import processing from nucliadb.models.responses import HTTPClientError from nucliadb.reader.api.v1.router import KB_PREFIX, api from nucliadb.reader.reader.notifications import kb_notifications_stream @@ -348,3 +349,34 @@ async def notifications_endpoint( async def exists_kb(context: ApplicationContext, kbid: str) -> bool: dm = KnowledgeBoxDataManager(context.kv_driver) return await dm.exists_kb(kbid) + + +@api.get( + f"/{KB_PREFIX}/{{kbid}}/processing-status", + status_code=200, + name="Knowledge Box Processing Status", + description="Provides the status of the processing of the given Knowledge Box.", + tags=["Knowledge Box Services"], + response_model=processing.StatusResultsV2, + responses={ + "404": {"description": "Knowledge Box not found"}, + }, +) +@requires(NucliaDBRoles.READER) +@version(1) +async def processing_status( + request: Request, + kbid: str, + cursor: Optional[str] = None, + scheduled: Optional[bool] = None, + limit: int = 20, +) -> Union[processing.StatusResultsV2, HTTPClientError]: + context = get_app_context(request.app) + + if not await exists_kb(context, kbid): + return HTTPClientError(status_code=404, detail="Knowledge Box not found") + + async with processing.ProcessingV2HTTPClient() as client: + return await client.status( + cursor=cursor, scheduled=scheduled, kbid=kbid, limit=limit + ) diff --git a/nucliadb/nucliadb/writer/api/v1/field.py b/nucliadb/nucliadb/writer/api/v1/field.py index f93ecdd09e..e07fb60da6 100644 --- a/nucliadb/nucliadb/writer/api/v1/field.py +++ b/nucliadb/nucliadb/writer/api/v1/field.py @@ -108,11 +108,12 @@ async def finish_field_put( toprocess: PushPayload, partition: int, wait_on_commit: bool, -) -> int: +) -> Optional[int]: # Create processing message transaction = get_transaction_utility() processing = get_processing() + await _add_basic_data_to_processing_payload(toprocess) processing_info = await processing.send_to_process(toprocess, partition) writer.source = BrokerMessage.MessageSource.WRITER @@ -703,6 +704,7 @@ async def _append_messages_to_conversation_field( field.messages.extend(messages) await parse_conversation_field(field_id, field, writer, toprocess, kbid, rid) + await _add_basic_data_to_processing_payload(toprocess) try: processing_info = await processing.send_to_process(toprocess, partition) @@ -798,6 +800,7 @@ async def _append_blocks_to_layout_field( field = models.InputLayoutField(body=models.InputLayoutContent()) field.body.blocks.update(blocks) await parse_layout_field(field_id, field, writer, toprocess, kbid, rid) + await _add_basic_data_to_processing_payload(toprocess) try: processing_info = await processing.send_to_process(toprocess, partition) @@ -938,6 +941,9 @@ async def reprocess_file_field( if resource is None: raise HTTPException(status_code=404, detail="Resource does not exist") + if resource.basic is not None: + toprocess.title = resource.basic.title + try: await extract_file_field( field_id, @@ -966,3 +972,24 @@ async def reprocess_file_field( await transaction.commit(writer, partition, wait=False) return ResourceUpdated(seqid=processing_info.seqid) + + +async def _add_basic_data_to_processing_payload(toprocess: PushPayload): + """ + Add basic data to processing payload like title and anything + else that is on the resource level. + """ + storage = await get_storage(service_name=SERVICE_NAME) + driver = get_driver() + async with driver.transaction() as txn: + kb = KnowledgeBox(txn, storage, toprocess.kbid) + + resource = await kb.get(toprocess.uuid) + if resource is None: + return + + basic = await resource.get_basic() + if basic is None: + return + + toprocess.title = basic.title diff --git a/nucliadb/nucliadb/writer/api/v1/resource.py b/nucliadb/nucliadb/writer/api/v1/resource.py index 93026eaa57..089ca31ef9 100644 --- a/nucliadb/nucliadb/writer/api/v1/resource.py +++ b/nucliadb/nucliadb/writer/api/v1/resource.py @@ -128,6 +128,7 @@ async def create_resource( writer.uuid = uuid toprocess.uuid = uuid toprocess.source = Source.HTTP + toprocess.title = item.title if item.slug: if await resource_slug_exists(kbid, item.slug): @@ -342,6 +343,7 @@ async def modify_resource( set_status_modify(writer.basic, item) + toprocess.title = writer.basic.title seqid = await maybe_send_to_process(writer, toprocess, partition) writer.source = BrokerMessage.MessageSource.WRITER diff --git a/nucliadb/nucliadb/writer/api/v1/upload.py b/nucliadb/nucliadb/writer/api/v1/upload.py index 28179c22f7..2b7812c654 100644 --- a/nucliadb/nucliadb/writer/api/v1/upload.py +++ b/nucliadb/nucliadb/writer/api/v1/upload.py @@ -834,7 +834,7 @@ async def store_file_on_nuclia_db( md5: Optional[str] = None, item: Optional[CreateResourcePayload] = None, wait_on_commit: bool = False, -) -> int: +) -> Optional[int]: # File is on NucliaDB Storage at path partitioning = get_partitioning() @@ -873,6 +873,8 @@ async def store_file_on_nuclia_db( if item.extra is not None: parse_extra(writer.extra, item.extra) + toprocess.title = writer.basic.title + await parse_fields( writer=writer, item=item, @@ -884,6 +886,7 @@ async def store_file_on_nuclia_db( if override_resource_title and filename is not None: set_title(writer, toprocess, filename) + writer.basic.icon = content_type writer.basic.created.FromDatetime(datetime.now()) diff --git a/nucliadb/nucliadb/writer/resource/basic.py b/nucliadb/nucliadb/writer/resource/basic.py index d7b4fb97b5..a95be12762 100644 --- a/nucliadb/nucliadb/writer/resource/basic.py +++ b/nucliadb/nucliadb/writer/resource/basic.py @@ -225,10 +225,19 @@ def set_status_modify(basic: Basic, item: UpdateResourcePayload): def set_processing_info(bm: BrokerMessage, processing_info: ProcessingInfo): - bm.basic.last_seqid = processing_info.seqid + """ + Processing V2 does not have this awkward processing info data field and storage + but keeping for b/w compatibility. + + Once V1 is removed, this code can be removed because status checking will be done + in a separate API that is not part of NucliaDB. + """ + if processing_info.seqid is not None: + bm.basic.last_seqid = processing_info.seqid if processing_info.account_seq is not None: bm.basic.last_account_seq = processing_info.account_seq - bm.basic.queue = bm.basic.QueueType.Value(processing_info.queue.name) + if processing_info.queue is not None: + bm.basic.queue = bm.basic.QueueType.Value(processing_info.queue.name) def validate_classifications(paragraph: ParagraphAnnotation): diff --git a/nucliadb_utils/nucliadb_utils/const.py b/nucliadb_utils/nucliadb_utils/const.py index 7f1ac254a2..bdde85ff29 100644 --- a/nucliadb_utils/nucliadb_utils/const.py +++ b/nucliadb_utils/nucliadb_utils/const.py @@ -76,3 +76,4 @@ class Features: EXPERIMENTAL_KB = "nucliadb_experimental_kb" READ_REPLICA_SEARCHES = "nucliadb_read_replica_searches" SUMMARIZATION = "nuclia_summarization" + PROCESSING_V2 = "nucliadb_processing_v2" diff --git a/nucliadb_utils/nucliadb_utils/featureflagging.py b/nucliadb_utils/nucliadb_utils/featureflagging.py index e14cfc15b8..bdc5756c1d 100644 --- a/nucliadb_utils/nucliadb_utils/featureflagging.py +++ b/nucliadb_utils/nucliadb_utils/featureflagging.py @@ -55,6 +55,10 @@ class Settings(pydantic.BaseSettings): "rollout": 0, "variants": {"environment": ["local"]}, }, + const.Features.PROCESSING_V2: { + "rollout": 0, + "variants": {"environment": ["testing"]}, + }, } diff --git a/nucliadb_utils/nucliadb_utils/settings.py b/nucliadb_utils/nucliadb_utils/settings.py index 72b690265e..e086ba23a9 100644 --- a/nucliadb_utils/nucliadb_utils/settings.py +++ b/nucliadb_utils/nucliadb_utils/settings.py @@ -125,6 +125,9 @@ class NucliaSettings(BaseSettings): nuclia_service_account: Optional[str] = None nuclia_public_url: str = "https://{zone}.nuclia.cloud" nuclia_cluster_url: str = "http://nucliadb_proxy.processing.svc.cluster.local:8080" + nuclia_processing_cluster_url: str = ( + "http://processing-api.processing.svc.cluster.local:8080" + ) nuclia_inner_predict_url: str = "http://predict.learning.svc.cluster.local:8080" nuclia_zone: str = "europe-1"