Skip to content

Commit

Permalink
Endpoint to get notifications of KB activity (#1702)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jan 8, 2024
1 parent d246f90 commit 64b15f2
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 5 deletions.
2 changes: 1 addition & 1 deletion charts/nucliadb_reader/templates/reader.vs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
- method:
regex: "GET|OPTIONS"
uri:
regex: '^/api/v\d+/kb/[^/]+/(entitiesgroup|labelset|vectorset|custom-synonyms|export|import)s?.*'
regex: '^/api/v\d+/kb/[^/]+/(entitiesgroup|labelset|vectorset|custom-synonyms|export|import|activity)s?.*'
- method:
regex: OPTIONS
uri:
Expand Down
41 changes: 41 additions & 0 deletions nucliadb/nucliadb/reader/api/v1/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

from typing import Union

from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from fastapi_versioning import version # type: ignore
from google.protobuf.json_format import MessageToDict
from nucliadb_protos.knowledgebox_pb2 import KnowledgeBoxID
Expand All @@ -40,7 +44,12 @@
)
from starlette.requests import Request

from nucliadb.common.context import ApplicationContext
from nucliadb.common.context.fastapi import get_app_context
from nucliadb.common.datamanagers.kb import KnowledgeBoxDataManager
from nucliadb.models.responses import HTTPClientError
from nucliadb.reader.api.v1.router import KB_PREFIX, api
from nucliadb.reader.reader.activity import kb_activity_stream
from nucliadb_models.configuration import KBConfiguration
from nucliadb_models.entities import EntitiesGroup, KnowledgeBoxEntities
from nucliadb_models.labels import KnowledgeBoxLabels, LabelSet
Expand Down Expand Up @@ -298,3 +307,35 @@ async def get_configuration(request: Request, kbid: str):
raise HTTPException(
status_code=500, detail="Error getting configuration of a Knowledge box"
)


@api.get(
f"/{KB_PREFIX}/{{kbid}}/activity",
status_code=200,
name="Knowledge Box Activity Stream",
description="Provides a stream of resource's activity notifications for the given Knowledge Box",
tags=["Knowledge Box Services"],
response_model=None,
)
@requires(NucliaDBRoles.READER)
@version(1)
async def activity_endpoint(
request: Request, kbid: str
) -> Union[StreamingResponse, HTTPClientError]:
context = get_app_context(request.app)

if not await exists_kb(context, kbid):
return HTTPClientError(status_code=404, detail="Knowledge Box not found")

response = StreamingResponse(
content=kb_activity_stream(kbid),
status_code=200,
media_type="binary/octet-stream",
)

return response


async def exists_kb(context: ApplicationContext, kbid: str) -> bool:
dm = KnowledgeBoxDataManager(context.kv_driver)
return await dm.exists_kb(kbid)
19 changes: 19 additions & 0 deletions nucliadb/nucliadb/reader/reader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
143 changes: 143 additions & 0 deletions nucliadb/nucliadb/reader/reader/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import asyncio
import contextlib
import uuid
from collections.abc import AsyncGenerator

from nucliadb.reader import logger
from nucliadb_models.activity import (
Notification,
NotificationType,
ResourceActionType,
ResourceNotificationData,
ResourceOperationType,
)
from nucliadb_protos import writer_pb2
from nucliadb_telemetry.errors import capture_exception
from nucliadb_utils import const
from nucliadb_utils.cache.pubsub import Callback, PubSubDriver
from nucliadb_utils.utilities import get_pubsub

MAX_QUEUE_SIZE = 1000


async def kb_activity_stream(kbid: str) -> AsyncGenerator[bytes, None]:
pb_notification: writer_pb2.Notification
async for pb_notification in kb_notifications(kbid):
yield encode_streamed_notification(pb_notification) + b"\n"


async def kb_notifications(kbid: str) -> AsyncGenerator[writer_pb2.Notification, None]:
"""
Returns an async generator that yields pubsub notifications for the given kbid.
"""
pubsub = await get_pubsub()
if pubsub is None: # pragma: no cover
logger.warning("PubSub is not configured")
return

queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)

subscription_key = const.PubSubChannels.RESOURCE_NOTIFY.format(kbid=kbid)

def subscription_handler(raw_data: bytes):
data = pubsub.parse(raw_data)
notification = writer_pb2.Notification()
notification.ParseFromString(data)
# We don't need the whole broker message, so we clear it to
# save space, as it can potentially be very big
notification.ClearField("message")
try:
queue.put_nowait(notification)
except asyncio.QueueFull: # pragma: no cover
logger.warning("Queue is full, dropping notification", extra={"kbid": kbid})

async with managed_subscription(
pubsub, key=subscription_key, handler=subscription_handler
):
try:
while True:
notification: writer_pb2.Notification = await queue.get()
yield notification
except Exception as ex:
capture_exception(ex)
logger.error(
"Error while streaming activity", exc_info=True, extra={"kbid": kbid}
)
return


@contextlib.asynccontextmanager
async def managed_subscription(pubsub: PubSubDriver, key: str, handler: Callback):
# We assign a random group to the subscription so that each reader gets all notifications.
subscription_id = group = uuid.uuid4().hex

await pubsub.subscribe(
handler=handler,
key=key,
group=group,
subscription_id=subscription_id,
)
try:
yield
finally:
try:
await pubsub.unsubscribe(key=key, subscription_id=subscription_id)
except Exception: # pragma: no cover
logger.warning(
"Error while unsubscribing from activity stream", exc_info=True
)


RESOURCE_OP_PB_TO_MODEL = {
writer_pb2.Notification.WriteType.CREATED: ResourceOperationType.CREATED,
writer_pb2.Notification.WriteType.MODIFIED: ResourceOperationType.MODIFIED,
writer_pb2.Notification.WriteType.DELETED: ResourceOperationType.DELETED,
}

RESOURCE_ACTION_PB_TO_MODEL = {
writer_pb2.Notification.Action.COMMIT: ResourceActionType.COMMIT,
writer_pb2.Notification.Action.INDEXED: ResourceActionType.INDEXED,
writer_pb2.Notification.Action.ABORT: ResourceActionType.ABORTED,
}


def serialize_notification(pb: writer_pb2.Notification) -> Notification:
operation = RESOURCE_OP_PB_TO_MODEL.get(
pb.write_type, ResourceOperationType.CREATED
)
action = RESOURCE_ACTION_PB_TO_MODEL.get(pb.action, ResourceActionType.COMMIT)
return Notification(
type=NotificationType.RESOURCE,
data=ResourceNotificationData(
kbid=pb.kbid,
resource_uuid=pb.uuid,
seqid=pb.seqid,
operation=operation,
action=action,
),
)


def encode_streamed_notification(pb: writer_pb2.Notification) -> bytes:
notification = serialize_notification(pb)
encoded_nofication = notification.json().encode("utf-8")
return encoded_nofication
81 changes: 81 additions & 0 deletions nucliadb/nucliadb/reader/tests/integration/api/v1/test_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import asyncio
from collections.abc import AsyncGenerator
from typing import Callable
from unittest import mock

import pytest
from httpx import AsyncClient

from nucliadb.reader.api.v1.router import KB_PREFIX
from nucliadb_models.activity import Notification, ResourceNotification
from nucliadb_models.resource import NucliaDBRoles
from nucliadb_protos import writer_pb2


@pytest.fixture(scope="function")
def kb_notifications():
async def _kb_notifications(
kbid: str,
) -> AsyncGenerator[writer_pb2.Notification, None]:
for i in range(10):
await asyncio.sleep(0.001)
yield writer_pb2.Notification(kbid=kbid, seqid=i, uuid=f"resource-{i}")

with mock.patch(
"nucliadb.reader.reader.activity.kb_notifications", new=_kb_notifications
) as mocked:
yield mocked


@pytest.mark.asyncio
async def test_activity(
kb_notifications,
reader_api,
knowledgebox_ingest,
):
kbid = knowledgebox_ingest
async with reader_api(roles=[NucliaDBRoles.READER]) as client:
async with client.stream(
method="GET",
url=f"/{KB_PREFIX}/{kbid}/activity",
) as resp:
assert resp.status_code == 200

notifs = []
async for line in resp.aiter_lines():
assert Notification.parse_raw(line).type == "resource"
notif = ResourceNotification.parse_raw(line)
notif.type == "resource"
notif.data.kbid == "kbid"
assert notif.data.resource_uuid.startswith("resource-")
notifs.append(notif)

assert len(notifs) == 10


@pytest.mark.asyncio
async def test_activity_kb_not_found(
reader_api: Callable[..., AsyncClient],
):
async with reader_api(roles=[NucliaDBRoles.READER]) as client:
resp = await client.get(f"/{KB_PREFIX}/foobar/activity")
assert resp.status_code == 404
19 changes: 19 additions & 0 deletions nucliadb/nucliadb/reader/tests/integration/reader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
Loading

1 comment on commit 64b15f2

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 64b15f2 Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 12688.939254911718 iter/sec (stddev: 0.0000020817366244606582) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.