Skip to content

Commit

Permalink
Delete inflight processing callbacks (#1792)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem authored Feb 1, 2024
1 parent a177521 commit 87c61c1
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
37 changes: 37 additions & 0 deletions nucliadb/nucliadb/ingest/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,14 @@ def __init__(
self.nuclia_internal_push_v2 = (
f"{nuclia_processing_cluster_url}/api/internal/v2/processing/push"
)
self.nuclia_internal_delete = f"{nuclia_processing_cluster_url}/api/internal/v2/processing/delete-requests"
self.nuclia_external_push = f"{self.nuclia_public_url}/api/v1/processing/push"
self.nuclia_external_push_v2 = (
f"{self.nuclia_public_url}/api/v2/processing/push"
)
self.nuclia_external_delete = (
f"{self.nuclia_public_url}/api/v2/processing/delete-requests"
)

self.nuclia_jwt_key = nuclia_jwt_key
self.days_to_keep = days_to_keep
Expand Down Expand Up @@ -459,6 +463,34 @@ async def send_to_process(
queue=QueueType(queue_type) if queue_type is not None else None,
)

async def delete_from_processing(
self, *, kbid: str, resource_id: Optional[str] = None
) -> None:
"""
Delete a resource from processing. This prevents inflight resources from being processed
and wasting resources.
Ideally, this is done by publishing an event to NATS; however, since we also need to work
for hybrid on-prem installations, this is a simple way to handle it.
Long term, if we want to publish object events out to a NATS stream, we can implement
that instead of this method.
"""
headers = {"CONTENT-TYPE": "application/json"}
data = {"kbid": kbid, "resource_id": resource_id}
if self.onprem is False:
# Upload the payload
url = self.nuclia_internal_delete
else:
url = self.nuclia_external_delete
headers.update({"X-NUCLIA-NUAKEY": f"Bearer {self.nuclia_service_account}"})

resp = await self.session.post(url=url, data=data, headers=headers)
if resp.status != 200:
logger.warning(
"Error deleting from processing", extra={"status": resp.status}
)


class DummyProcessingEngine(ProcessingEngine):
def __init__(self):
Expand Down Expand Up @@ -508,3 +540,8 @@ async def send_to_process(
return ProcessingInfo(
seqid=len(self.calls), account_seq=0, queue=QueueType.SHARED
)

async def delete_from_processing(
self, *, kbid: str, resource_id: Optional[str] = None
) -> None:
self.calls.append([kbid, resource_id])
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,33 @@ async def test_send_to_process(onprem, mock_payload, ingest_util):
await processing_engine.send_to_process(payload, partition=0)

await processing_engine.finalize()


@pytest.mark.parametrize("onprem", [True, False])
@pytest.mark.asyncio
async def test_delete_from_processing(onprem, ingest_util):
"""
Test that send_to_process does not fail
"""

from nucliadb.ingest.processing import ProcessingEngine

fake_nuclia_proxy_url = "http://fake_proxy"
processing_engine = ProcessingEngine(
onprem=onprem,
nuclia_cluster_url=fake_nuclia_proxy_url,
nuclia_public_url=fake_nuclia_proxy_url,
)
await processing_engine.initialize()

processing_engine.session = get_mocked_session(
"POST",
200,
json={"kbid": "kbid", "resource_id": "resource_id"},
context_manager=False,
)
await processing_engine.delete_from_processing(
kbid="kbid", resource_id="resource_id"
)

await processing_engine.finalize()
6 changes: 6 additions & 0 deletions nucliadb/nucliadb/writer/api/v1/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import asyncio

from fastapi import HTTPException, Response
from fastapi_versioning import version # type: ignore
from nucliadb_protos.knowledgebox_pb2 import (
Expand All @@ -31,6 +33,7 @@
from starlette.requests import Request

from nucliadb.writer.api.v1.router import KB_PREFIX, KBS_PREFIX, api
from nucliadb.writer.utilities import get_processing
from nucliadb_models.resource import (
KnowledgeBoxConfig,
KnowledgeBoxObj,
Expand Down Expand Up @@ -139,4 +142,7 @@ async def delete_kb(request: Request, kbid: str):
elif kbobj.status == KnowledgeBoxResponseStatus.ERROR:
raise HTTPException(status_code=500, detail="Error on deleting knowledge box")

processing = get_processing()
asyncio.create_task(processing.delete_from_processing(kbid=kbid))

return Response(status_code=204)
4 changes: 4 additions & 0 deletions nucliadb/nucliadb/writer/api/v1/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import asyncio
import contextlib
from time import time
from typing import Optional
Expand Down Expand Up @@ -546,6 +547,9 @@ async def _delete_resource(
# Create processing message
await transaction.commit(writer, partition, wait=x_synchronous)

processing = get_processing()
asyncio.create_task(processing.delete_from_processing(kbid=kbid, resource_id=rid))

return Response(status_code=204)


Expand Down

3 comments on commit 87c61c1

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 87c61c1 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13003.150834839334 iter/sec (stddev: 7.015648010025978e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 87c61c1 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13103.727215151313 iter/sec (stddev: 9.392814873521937e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 0.99

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 87c61c1 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13054.186167924041 iter/sec (stddev: 6.236202571783795e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.