From fcad2573e0392b5cd30c8c394d2dbecf9e86ab91 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Fri, 15 Nov 2024 12:56:04 +0100 Subject: [PATCH] Shard assigation on ingesting processing message --- .../nucliadb/ingest/orm/processor/__init__.py | 136 +++++++++++------- 1 file changed, 85 insertions(+), 51 deletions(-) diff --git a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py index eda0abfb3c..8ee4209aba 100644 --- a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py +++ b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py @@ -28,6 +28,7 @@ from nucliadb.common import datamanagers, locking from nucliadb.common.cluster.settings import settings as cluster_settings from nucliadb.common.cluster.utils import get_shard_manager +from nucliadb.common.external_index_providers.base import ExternalIndexManager from nucliadb.common.external_index_providers.manager import get_external_index_manager from nucliadb.common.maindb.driver import Driver, Transaction from nucliadb.common.maindb.exceptions import ConflictError, MaindbServerError @@ -154,7 +155,9 @@ async def process( elif message.type == writer_pb2.BrokerMessage.MessageType.ROLLBACK: await self.rollback(message, seqid, partition) - async def get_resource_uuid(self, kb: KnowledgeBox, message: writer_pb2.BrokerMessage) -> str: + async def get_resource_uuid( + self, kb: KnowledgeBox, message: writer_pb2.BrokerMessage + ) -> Optional[str]: if message.uuid is None: uuid = await kb.get_resource_uuid_by_slug(message.slug) else: @@ -172,8 +175,16 @@ async def delete_resource( async with self.driver.transaction() as txn: try: kb = KnowledgeBox(txn, self.storage, message.kbid) - uuid = await self.get_resource_uuid(kb, message) + if uuid is None: + logger.warning( + "Resource not found for deletion", + extra={"kbid": message.kbid, "rid": message.uuid}, + ) + return + + await pgcatalog_delete(txn, message.kbid, uuid) + async with locking.distributed_lock( locking.RESOURCE_INDEX_LOCK.format(kbid=message.kbid, resource_id=uuid) ): @@ -183,16 +194,21 @@ async def delete_resource( txn, kbid=message.kbid, rid=uuid ) if shard_id is None: - logger.warning(f"Resource {uuid} does not exist") + logger.info( + "Resource shard not found for deletion", + extra={"rid": uuid, "kbid": message.kbid}, + ) else: shard = await kb.get_resource_shard(shard_id) if shard is None: raise AttributeError("Shard not available") - await self._maybe_external_index_delete_resource(message.kbid, uuid) - await pgcatalog_delete(txn, message.kbid, uuid) - await self.index_node_shard_manager.delete_resource( - shard, message.uuid, seqid, partition, message.kbid - ) + external_index_manager = await get_external_index_manager(kbid=message.kbid) + if external_index_manager is not None: + await self.external_index_delete_resource(external_index_manager, uuid) + else: + await self.index_node_shard_manager.delete_resource( + shard, message.uuid, seqid, partition, message.kbid + ) try: await kb.delete_resource(message.uuid) except Exception as exc: @@ -273,6 +289,7 @@ async def txn( created = created or _created if resource: + uuid = resource.uuid await resource.compute_global_text() await resource.compute_global_tags(resource.indexer) await resource.compute_security(resource.indexer) @@ -281,17 +298,18 @@ async def txn( resource.replace_indexer(await resource.generate_index_message(reindex=True)) if resource and resource.modified: - await self.index_resource( # noqa - resource=resource, - txn=txn, - uuid=uuid, - kbid=kbid, - seqid=seqid, - partition=partition, - kb=kb, - source=messages_source(messages), - ) - + await pgcatalog_update(txn, kbid, resource) + if message.source == writer_pb2.BrokerMessage.MessageSource.PROCESSOR: + await self.index_resource( + resource=resource, + txn=txn, + uuid=resource.uuid, + kbid=kbid, + seqid=seqid, + partition=partition, + kb=kb, + source=messages_source(messages), + ) if transaction_check: await sequence_manager.set_last_seqid(txn, partition, seqid) await txn.commit() @@ -317,7 +335,7 @@ async def txn( seqid=seqid, multi=multi, kbid=kbid, - rid=uuid, + rid=resource.uuid, source=message.source, ) logger.warning("This message did not modify the resource") @@ -337,7 +355,7 @@ async def txn( seqid=seqid, multi=multi, kbid=kbid, - rid=uuid, + rid=uuid or "unknown", source=message.source, ) raise @@ -351,7 +369,7 @@ async def txn( seqid=seqid, multi=multi, kbid=kbid, - rid=uuid, + rid=uuid or "unknown", source=message.source, ) handled_exception = exc @@ -409,9 +427,18 @@ async def index_resource( txn, kbid=kbid, rid=uuid, shard=shard.shard ) - if shard is not None: - index_message = resource.indexer.brain - await self._maybe_external_index_add_resource(kbid, uuid, index_message) + if shard is None: + raise AttributeError("Shard is not available") + + index_message = resource.indexer.brain + external_index_manager = await get_external_index_manager(kbid=kbid) + if external_index_manager is not None: + await self.external_index_add_resource( + external_index_manager, + uuid, + index_message, + ) + else: await self.index_node_shard_manager.add_resource( shard, index_message, @@ -421,39 +448,46 @@ async def index_resource( source=source, ) - await pgcatalog_update(txn, kbid, resource) - else: - raise AttributeError("Shard is not available") + def skip_external_index(self, kbid: str, provider_type: str) -> bool: + """ + This is a safety measure to skip external indexing in case that the external index provider is not working. + As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers + to index to external indexes. + """ + return has_feature( + const.Features.SKIP_EXTERNAL_INDEX, + context={"kbid": kbid, "provider": provider_type}, + default=False, + ) - async def _maybe_external_index_delete_resource(self, kbid: str, resource_uuid: str): - external_index_manager = await get_external_index_manager(kbid=kbid) - if external_index_manager is None: + async def external_index_delete_resource( + self, external_index_manager: ExternalIndexManager, resource_uuid: str + ): + kbid = external_index_manager.kbid + provider_type = external_index_manager.type.value + if self.skip_external_index(kbid, provider_type): + logger.warning( + "Skipping external index delete resource", + extra={ + "kbid": kbid, + "rid": resource_uuid, + "provider": provider_type, + }, + ) return await external_index_manager.delete_resource(resource_uuid=resource_uuid) - async def _maybe_external_index_add_resource( + async def external_index_add_resource( self, - kbid: str, + external_index_manager: ExternalIndexManager, resource_uuid: str, index_message: PBBrainResource, ): if not has_vectors_operation(index_message): return - - external_index_manager = await get_external_index_manager(kbid=kbid) - if external_index_manager is None: - # No external index manager, nothing to do - return - + kbid = external_index_manager.kbid provider_type = external_index_manager.type.value - if has_feature( - const.Features.SKIP_EXTERNAL_INDEX, - context={"kbid": kbid, "provider": provider_type}, - default=False, - ): - # This is a safety measure to skip external indexing in case that the external index provider is not working. - # As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers - # to index to external indexes. + if self.skip_external_index(kbid, provider_type): logger.warning( "Skipping external index for resource", extra={ @@ -462,10 +496,10 @@ async def _maybe_external_index_add_resource( "provider": provider_type, }, ) - else: - await external_index_manager.index_resource( - resource_uuid=resource_uuid, resource_data=index_message - ) + return + await external_index_manager.index_resource( + resource_uuid=resource_uuid, resource_data=index_message + ) async def multi(self, message: writer_pb2.BrokerMessage, seqid: int) -> None: self.messages.setdefault(message.multiid, []).append(message)