Skip to content

Commit

Permalink
Index only upon processing message received
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran committed Nov 15, 2024
1 parent 7ca8964 commit 3e5d508
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from nucliadb.common import datamanagers, locking
from nucliadb.common.cluster.settings import settings as cluster_settings
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.external_index_providers.base import ExternalIndexManager
from nucliadb.common.external_index_providers.manager import get_external_index_manager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.common.maindb.exceptions import ConflictError, MaindbServerError
Expand Down Expand Up @@ -188,11 +189,14 @@ async def delete_resource(
shard = await kb.get_resource_shard(shard_id)
if shard is None:
raise AttributeError("Shard not available")
await self._maybe_external_index_delete_resource(message.kbid, uuid)
await pgcatalog_delete(txn, message.kbid, uuid)
await self.index_node_shard_manager.delete_resource(
shard, message.uuid, seqid, partition, message.kbid
)
external_index_manager = await get_external_index_manager(kbid=message.kbid)
if external_index_manager is not None:
await self.external_index_delete_resource(external_index_manager, uuid)
else:
await self.index_node_shard_manager.delete_resource(
shard, message.uuid, seqid, partition, message.kbid
)
try:
await kb.delete_resource(message.uuid)
except Exception as exc:
Expand Down Expand Up @@ -281,17 +285,18 @@ async def txn(
resource.replace_indexer(await resource.generate_index_message(reindex=True))

if resource and resource.modified:
await self.index_resource( # noqa
resource=resource,
txn=txn,
uuid=uuid,
kbid=kbid,
seqid=seqid,
partition=partition,
kb=kb,
source=messages_source(messages),
)

await pgcatalog_update(txn, kbid, resource)
if message.source == writer_pb2.BrokerMessage.MessageSource.PROCESSOR:
await self.index_resource( # noqa
resource=resource,
txn=txn,
uuid=uuid,
kbid=kbid,
seqid=seqid,
partition=partition,
kb=kb,
source=messages_source(messages),
)
if transaction_check:
await sequence_manager.set_last_seqid(txn, partition, seqid)
await txn.commit()
Expand Down Expand Up @@ -409,9 +414,14 @@ async def index_resource(
txn, kbid=kbid, rid=uuid, shard=shard.shard
)

if shard is not None:
index_message = resource.indexer.brain
await self._maybe_external_index_add_resource(kbid, uuid, index_message)
if shard is None:
raise AttributeError("Shard is not available")

index_message = resource.indexer.brain
external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is not None:
await self.external_index_add_resource(external_index_manager, uuid, index_message)
else:
await self.index_node_shard_manager.add_resource(
shard,
index_message,
Expand All @@ -421,39 +431,47 @@ async def index_resource(
source=source,
)

await pgcatalog_update(txn, kbid, resource)
else:
raise AttributeError("Shard is not available")
def skip_external_index(self, kbid: str, provider_type: str) -> bool:
"""
This is a safety measure to skip external indexing in case that the external index provider is not working.
As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers
to index to external indexes.
"""
return has_feature(
const.Features.SKIP_EXTERNAL_INDEX,
context={"kbid": kbid, "provider": provider_type},
default=False,
)

async def _maybe_external_index_delete_resource(self, kbid: str, resource_uuid: str):
external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is None:
async def external_index_delete_resource(
self, external_index_manager: ExternalIndexManager, resource_uuid: str
):
kbid = external_index_manager.kbid
provider_type = external_index_manager.type.value
if self.skip_external_index(kbid, provider_type):
logger.warning(
"Skipping external index delete resource",
extra={
"kbid": kbid,
"rid": resource_uuid,
"provider": provider_type,
},
)
return
await external_index_manager.delete_resource(resource_uuid=resource_uuid)

async def _maybe_external_index_add_resource(
async def external_index_add_resource(
self,
kbid: str,
external_index_manager: ExternalIndexManager,
resource_uuid: str,
index_message: PBBrainResource,
):
if not has_vectors_operation(index_message):
return

external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is None:
# No external index manager, nothing to do
return

kbid = external_index_manager.kbid
provider_type = external_index_manager.type.value
if has_feature(
const.Features.SKIP_EXTERNAL_INDEX,
context={"kbid": kbid, "provider": provider_type},
default=False,
):
# This is a safety measure to skip external indexing in case that the external index provider is not working.
# As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers
# to index to external indexes.
if self.skip_external_index(kbid, provider_type):
logger.warning(
"Skipping external index for resource",
extra={
Expand All @@ -462,10 +480,10 @@ async def _maybe_external_index_add_resource(
"provider": provider_type,
},
)
else:
await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)
return
await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)

async def multi(self, message: writer_pb2.BrokerMessage, seqid: int) -> None:
self.messages.setdefault(message.multiid, []).append(message)
Expand Down

0 comments on commit 3e5d508

Please sign in to comment.