Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing v2 integration #1719

Merged
merged 15 commits into from
Jan 20, 2024
132 changes: 132 additions & 0 deletions nucliadb/nucliadb/common/http_clients/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
from datetime import datetime
from typing import Any, Optional

import aiohttp
Expand All @@ -41,6 +42,19 @@ def get_processing_api_url() -> str:
return nuclia_settings.nuclia_cluster_url + "/api/internal/processing"


def get_processing_api_url_v2() -> str:
if nuclia_settings.nuclia_service_account:
return (
nuclia_settings.nuclia_public_url.format(zone=nuclia_settings.nuclia_zone)
+ "/api/v2/processing"
)
else:
return (
nuclia_settings.nuclia_processing_cluster_url
+ "/api/internal/v2/processing"
)


def check_status(resp: aiohttp.ClientResponse, resp_text: str) -> None:
if resp.status < 300:
return
Expand Down Expand Up @@ -125,3 +139,121 @@ async def pull(self, partition: str) -> PullResponse:
check_proxy_telemetry_headers(resp)
check_status(resp, resp_text)
return PullResponse.parse_raw(resp_text)


class StatusResultV2(pydantic.BaseModel):
processing_id: str = pydantic.Field(
...,
title="Processing ID",
description="Processing ID of the resource.",
)
resource_id: str = pydantic.Field(
...,
title="Resource ID",
description="Resource ID.",
)
kbid: str = pydantic.Field(..., title="KnowledgeBox ID")
title: Optional[str] = pydantic.Field(
None,
title="Title",
description="Title of the resource.",
)
labels: list[str] = pydantic.Field(
[],
title="Labels",
description="Labels of the resource.",
)
completed: bool = pydantic.Field(
...,
title="Completed",
description="Whether the resource has been completed",
)
scheduled: bool = pydantic.Field(
...,
title="Scheduled",
description="Whether the resource has been scheduled",
)
timestamp: datetime = pydantic.Field(
...,
title="Timestamp",
description="Timestamp of when the resource was first scheduled.",
)
completed_at: Optional[datetime] = pydantic.Field(
...,
title="Completed At",
description="Timestamp of when the resource was completed",
)
scheduled_at: Optional[datetime] = pydantic.Field(
...,
title="Scheduled At",
description="Timestamp of when the resource was first scheduled.",
)
failed: bool = pydantic.Field(
False,
title="Failed",
description="Whether the resource has failed to process",
)
retries: int = pydantic.Field(
0,
title="Retries",
description="Number of retries for the resource.",
)
schedule_eta: float = pydantic.Field(
0.0,
title="Schedule ETA",
description="Estimated time until the resource is scheduled.",
)


class StatusResultsV2(pydantic.BaseModel):
results: list[StatusResultV2] = pydantic.Field(
[],
title="Results",
description="List of results.",
)
cursor: Optional[str] = pydantic.Field(
None,
title="Cursor",
description="Cursor to use for the next page of results.",
)


class ProcessingV2HTTPClient:
def __init__(self):
self.session = aiohttp.ClientSession()
self.base_url = get_processing_api_url_v2()
self.headers = {}
if nuclia_settings.nuclia_service_account is not None:
self.headers[
"X-STF-NUAKEY"
] = f"Bearer {nuclia_settings.nuclia_service_account}"

async def __aenter__(self):
return self

async def __aexit__(self, *args):
await self.close()

async def close(self):
await self.session.close()

async def status(
self,
cursor: Optional[str] = None,
scheduled: Optional[bool] = None,
kbid: Optional[str] = None,
limit: int = 20,
) -> StatusResultsV2:
url = self.base_url + "/status"
params: dict[str, str] = {"limit": str(limit)}
if cursor is not None:
params["cursor"] = cursor
if scheduled is not None:
params["scheduled"] = str(scheduled)
if kbid is not None:
params["kbid"] = kbid

async with self.session.get(url, headers=self.headers, params=params) as resp:
resp_text = await resp.text()
check_status(resp, resp_text)
return StatusResultsV2.parse_raw(resp_text)
3 changes: 0 additions & 3 deletions nucliadb/nucliadb/ingest/consumer/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import nats
import nats.errors
from aiohttp.client_exceptions import ClientConnectorError
from nats.aio.subscription import Subscription
from nucliadb_protos.writer_pb2 import BrokerMessage, BrokerMessageBlobReference

from nucliadb.common.http_clients.processing import ProcessingHTTPClient
Expand All @@ -48,8 +47,6 @@ class PullWorker:
The processing pull endpoint is also described as the "processing proxy" at times.
"""

subscriptions: list[Subscription]

def __init__(
self,
driver: Driver,
Expand Down
49 changes: 38 additions & 11 deletions nucliadb/nucliadb/ingest/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
import base64
import datetime
import logging
import uuid
from collections import defaultdict
from contextlib import AsyncExitStack
Expand All @@ -39,11 +40,13 @@
from nucliadb_models.configuration import KBConfiguration
from nucliadb_models.resource import QueueType
from nucliadb_telemetry import metrics
from nucliadb_utils import logger
from nucliadb_utils import const
from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError
from nucliadb_utils.settings import nuclia_settings, storage_settings
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import Utility, get_ingest, set_utility
from nucliadb_utils.utilities import Utility, get_ingest, has_feature, set_utility

logger = logging.getLogger(__name__)

_T = TypeVar("_T")

Expand Down Expand Up @@ -72,9 +75,9 @@ class Source(SourceValue, Enum): # type: ignore


class ProcessingInfo(BaseModel):
seqid: int
seqid: Optional[int]
account_seq: Optional[int]
queue: QueueType
queue: Optional[QueueType] = None


class PushPayload(BaseModel):
Expand All @@ -85,6 +88,8 @@ class PushPayload(BaseModel):
source: Optional[Source] = None
userid: str

title: Optional[str] = None

genericfield: dict[str, models.Text] = {}

# New File
Expand Down Expand Up @@ -127,6 +132,7 @@ async def start_processing_engine():
onprem=nuclia_settings.onprem,
nuclia_jwt_key=nuclia_settings.nuclia_jwt_key,
nuclia_cluster_url=nuclia_settings.nuclia_cluster_url,
nuclia_processing_cluster_url=nuclia_settings.nuclia_processing_cluster_url,
nuclia_public_url=nuclia_settings.nuclia_public_url,
driver=storage_settings.file_backend,
days_to_keep=storage_settings.upload_token_expiration,
Expand All @@ -142,6 +148,7 @@ def __init__(
nuclia_zone: Optional[str] = None,
nuclia_public_url: Optional[str] = None,
nuclia_cluster_url: Optional[str] = None,
nuclia_processing_cluster_url: Optional[str] = None,
onprem: Optional[bool] = False,
nuclia_jwt_key: Optional[str] = None,
days_to_keep: int = 3,
Expand Down Expand Up @@ -173,7 +180,13 @@ def __init__(
self.nuclia_internal_push = (
f"{self.nuclia_cluster_url}/api/internal/processing/push"
)
self.nuclia_internal_push_v2 = (
f"{nuclia_processing_cluster_url}/api/internal/v2/processing/push"
)
self.nuclia_external_push = f"{self.nuclia_public_url}/api/v1/processing/push"
self.nuclia_external_push_v2 = (
f"{self.nuclia_public_url}/api/v2/processing/push"
)

self.nuclia_jwt_key = nuclia_jwt_key
self.days_to_keep = days_to_keep
Expand Down Expand Up @@ -389,22 +402,34 @@ async def send_to_process(
headers = {"CONTENT-TYPE": "application/json"}
if self.onprem is False:
# Upload the payload
url = self.nuclia_internal_push
if has_feature(
const.Features.PROCESSING_V2,
context={
"kbid": item.kbid,
},
):
url = self.nuclia_internal_push_v2
item.partition = partition
resp = await self.session.post(
url=f"{self.nuclia_internal_push}",
data=item.json(),
headers=headers,
url=url, data=item.json(), headers=headers
)
else:
url = self.nuclia_external_push + "?partition=" + str(partition)
if has_feature(
const.Features.PROCESSING_V2,
context={
"kbid": item.kbid,
},
):
url = self.nuclia_external_push_v2
item.learning_config = await self.get_configuration(item.kbid)
headers.update(
{"X-STF-NUAKEY": f"Bearer {self.nuclia_service_account}"}
)
# Upload the payload
resp = await self.session.post(
url=self.nuclia_external_push + "?partition=" + str(partition),
data=item.json(),
headers=headers,
url=url, data=item.json(), headers=headers
)
if resp.status == 200:
data = await resp.json()
Expand All @@ -429,7 +454,9 @@ async def send_to_process(
)

return ProcessingInfo(
seqid=seqid, account_seq=account_seq, queue=QueueType(queue_type)
seqid=seqid,
account_seq=account_seq,
queue=QueueType(queue_type) if queue_type is not None else None,
)


Expand Down
34 changes: 33 additions & 1 deletion nucliadb/nucliadb/reader/api/v1/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

from typing import Union
from typing import Optional, Union

from fastapi import HTTPException
from fastapi.responses import StreamingResponse
Expand Down Expand Up @@ -48,6 +48,7 @@
from nucliadb.common.context import ApplicationContext
from nucliadb.common.context.fastapi import get_app_context
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.common.http_clients import processing
from nucliadb.models.responses import HTTPClientError
from nucliadb.reader.api.v1.router import KB_PREFIX, api
from nucliadb.reader.reader.notifications import kb_notifications_stream
Expand Down Expand Up @@ -348,3 +349,34 @@ async def notifications_endpoint(
async def exists_kb(context: ApplicationContext, kbid: str) -> bool:
dm = KnowledgeBoxDataManager(context.kv_driver)
return await dm.exists_kb(kbid)


@api.get(
f"/{KB_PREFIX}/{{kbid}}/processing-status",
status_code=200,
name="Knowledge Box Processing Status",
description="Provides the status of the processing of the given Knowledge Box.",
tags=["Knowledge Box Services"],
response_model=processing.StatusResultsV2,
responses={
"404": {"description": "Knowledge Box not found"},
},
)
@requires(NucliaDBRoles.READER)
@version(1)
async def processing_status(
request: Request,
kbid: str,
cursor: Optional[str] = None,
scheduled: Optional[bool] = None,
limit: int = 20,
) -> Union[processing.StatusResultsV2, HTTPClientError]:
context = get_app_context(request.app)

if not await exists_kb(context, kbid):
return HTTPClientError(status_code=404, detail="Knowledge Box not found")

async with processing.ProcessingV2HTTPClient() as client:
return await client.status(
cursor=cursor, scheduled=scheduled, kbid=kbid, limit=limit
)
Loading
Loading