diff --git a/nucliadb/nucliadb/ingest/processing.py b/nucliadb/nucliadb/ingest/processing.py index 326846658e..59878a9904 100644 --- a/nucliadb/nucliadb/ingest/processing.py +++ b/nucliadb/nucliadb/ingest/processing.py @@ -183,10 +183,14 @@ def __init__( self.nuclia_internal_push_v2 = ( f"{nuclia_processing_cluster_url}/api/internal/v2/processing/push" ) + self.nuclia_internal_delete = f"{nuclia_processing_cluster_url}/api/internal/v2/processing/delete-requests" self.nuclia_external_push = f"{self.nuclia_public_url}/api/v1/processing/push" self.nuclia_external_push_v2 = ( f"{self.nuclia_public_url}/api/v2/processing/push" ) + self.nuclia_external_delete = ( + f"{self.nuclia_public_url}/api/v2/processing/delete-requests" + ) self.nuclia_jwt_key = nuclia_jwt_key self.days_to_keep = days_to_keep @@ -459,6 +463,34 @@ async def send_to_process( queue=QueueType(queue_type) if queue_type is not None else None, ) + async def delete_from_processing( + self, *, kbid: str, resource_id: Optional[str] = None + ) -> None: + """ + Delete a resource from processing. This prevents inflight resources from being processed + and wasting resources. + + Ideally, this is done by publishing an event to NATS; however, since we also need to work + for hybrid on-prem installations, this is a simple way to handle it. + + Long term, if we want to publish object events out to a NATS stream, we can implement + that instead of this method. + """ + headers = {"CONTENT-TYPE": "application/json"} + data = {"kbid": kbid, "resource_id": resource_id} + if self.onprem is False: + # Upload the payload + url = self.nuclia_internal_delete + else: + url = self.nuclia_external_delete + headers.update({"X-NUCLIA-NUAKEY": f"Bearer {self.nuclia_service_account}"}) + + resp = await self.session.post(url=url, data=data, headers=headers) + if resp.status != 200: + logger.warning( + "Error deleting from processing", extra={"status": resp.status} + ) + class DummyProcessingEngine(ProcessingEngine): def __init__(self): @@ -508,3 +540,8 @@ async def send_to_process( return ProcessingInfo( seqid=len(self.calls), account_seq=0, queue=QueueType.SHARED ) + + async def delete_from_processing( + self, *, kbid: str, resource_id: Optional[str] = None + ) -> None: + self.calls.append([kbid, resource_id]) diff --git a/nucliadb/nucliadb/ingest/tests/integration/ingest/test_processing_engine.py b/nucliadb/nucliadb/ingest/tests/integration/ingest/test_processing_engine.py index dd9c2c252e..b73b4e1893 100644 --- a/nucliadb/nucliadb/ingest/tests/integration/ingest/test_processing_engine.py +++ b/nucliadb/nucliadb/ingest/tests/integration/ingest/test_processing_engine.py @@ -73,3 +73,33 @@ async def test_send_to_process(onprem, mock_payload, ingest_util): await processing_engine.send_to_process(payload, partition=0) await processing_engine.finalize() + + +@pytest.mark.parametrize("onprem", [True, False]) +@pytest.mark.asyncio +async def test_delete_from_processing(onprem, ingest_util): + """ + Test that send_to_process does not fail + """ + + from nucliadb.ingest.processing import ProcessingEngine + + fake_nuclia_proxy_url = "http://fake_proxy" + processing_engine = ProcessingEngine( + onprem=onprem, + nuclia_cluster_url=fake_nuclia_proxy_url, + nuclia_public_url=fake_nuclia_proxy_url, + ) + await processing_engine.initialize() + + processing_engine.session = get_mocked_session( + "POST", + 200, + json={"kbid": "kbid", "resource_id": "resource_id"}, + context_manager=False, + ) + await processing_engine.delete_from_processing( + kbid="kbid", resource_id="resource_id" + ) + + await processing_engine.finalize() diff --git a/nucliadb/nucliadb/writer/api/v1/knowledgebox.py b/nucliadb/nucliadb/writer/api/v1/knowledgebox.py index c2399c5882..3546a6da69 100644 --- a/nucliadb/nucliadb/writer/api/v1/knowledgebox.py +++ b/nucliadb/nucliadb/writer/api/v1/knowledgebox.py @@ -17,6 +17,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # +import asyncio + from fastapi import HTTPException, Response from fastapi_versioning import version # type: ignore from nucliadb_protos.knowledgebox_pb2 import ( @@ -31,6 +33,7 @@ from starlette.requests import Request from nucliadb.writer.api.v1.router import KB_PREFIX, KBS_PREFIX, api +from nucliadb.writer.utilities import get_processing from nucliadb_models.resource import ( KnowledgeBoxConfig, KnowledgeBoxObj, @@ -139,4 +142,7 @@ async def delete_kb(request: Request, kbid: str): elif kbobj.status == KnowledgeBoxResponseStatus.ERROR: raise HTTPException(status_code=500, detail="Error on deleting knowledge box") + processing = get_processing() + asyncio.create_task(processing.delete_from_processing(kbid=kbid)) + return Response(status_code=204) diff --git a/nucliadb/nucliadb/writer/api/v1/resource.py b/nucliadb/nucliadb/writer/api/v1/resource.py index 089ca31ef9..afeb7ca860 100644 --- a/nucliadb/nucliadb/writer/api/v1/resource.py +++ b/nucliadb/nucliadb/writer/api/v1/resource.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # +import asyncio import contextlib from time import time from typing import Optional @@ -546,6 +547,9 @@ async def _delete_resource( # Create processing message await transaction.commit(writer, partition, wait=x_synchronous) + processing = get_processing() + asyncio.create_task(processing.delete_from_processing(kbid=kbid, resource_id=rid)) + return Response(status_code=204)