Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred resource to shard assignation #2640

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 85 additions & 51 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from nucliadb.common import datamanagers, locking
from nucliadb.common.cluster.settings import settings as cluster_settings
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.external_index_providers.base import ExternalIndexManager
from nucliadb.common.external_index_providers.manager import get_external_index_manager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.common.maindb.exceptions import ConflictError, MaindbServerError
Expand Down Expand Up @@ -154,7 +155,9 @@ async def process(
elif message.type == writer_pb2.BrokerMessage.MessageType.ROLLBACK:
await self.rollback(message, seqid, partition)

async def get_resource_uuid(self, kb: KnowledgeBox, message: writer_pb2.BrokerMessage) -> str:
async def get_resource_uuid(
self, kb: KnowledgeBox, message: writer_pb2.BrokerMessage
) -> Optional[str]:
if message.uuid is None:
uuid = await kb.get_resource_uuid_by_slug(message.slug)
else:
Expand All @@ -172,8 +175,16 @@ async def delete_resource(
async with self.driver.transaction() as txn:
try:
kb = KnowledgeBox(txn, self.storage, message.kbid)

uuid = await self.get_resource_uuid(kb, message)
if uuid is None:
logger.warning(
"Resource not found for deletion",
extra={"kbid": message.kbid, "rid": message.uuid},
)
return

await pgcatalog_delete(txn, message.kbid, uuid)

async with locking.distributed_lock(
locking.RESOURCE_INDEX_LOCK.format(kbid=message.kbid, resource_id=uuid)
):
Expand All @@ -183,16 +194,21 @@ async def delete_resource(
txn, kbid=message.kbid, rid=uuid
)
if shard_id is None:
logger.warning(f"Resource {uuid} does not exist")
logger.info(
"Resource shard not found for deletion",
extra={"rid": uuid, "kbid": message.kbid},
)
else:
shard = await kb.get_resource_shard(shard_id)
if shard is None:
raise AttributeError("Shard not available")
await self._maybe_external_index_delete_resource(message.kbid, uuid)
await pgcatalog_delete(txn, message.kbid, uuid)
await self.index_node_shard_manager.delete_resource(
shard, message.uuid, seqid, partition, message.kbid
)
external_index_manager = await get_external_index_manager(kbid=message.kbid)
if external_index_manager is not None:
await self.external_index_delete_resource(external_index_manager, uuid)
else:
await self.index_node_shard_manager.delete_resource(
shard, message.uuid, seqid, partition, message.kbid
)
try:
await kb.delete_resource(message.uuid)
except Exception as exc:
Expand Down Expand Up @@ -273,6 +289,7 @@ async def txn(
created = created or _created

if resource:
uuid = resource.uuid
await resource.compute_global_text()
await resource.compute_global_tags(resource.indexer)
await resource.compute_security(resource.indexer)
Expand All @@ -281,17 +298,18 @@ async def txn(
resource.replace_indexer(await resource.generate_index_message(reindex=True))

if resource and resource.modified:
await self.index_resource( # noqa
resource=resource,
txn=txn,
uuid=uuid,
kbid=kbid,
seqid=seqid,
partition=partition,
kb=kb,
source=messages_source(messages),
)

await pgcatalog_update(txn, kbid, resource)
if message.source == writer_pb2.BrokerMessage.MessageSource.PROCESSOR:
await self.index_resource(
resource=resource,
txn=txn,
uuid=resource.uuid,
kbid=kbid,
seqid=seqid,
partition=partition,
kb=kb,
source=messages_source(messages),
)
if transaction_check:
await sequence_manager.set_last_seqid(txn, partition, seqid)
await txn.commit()
Expand All @@ -317,7 +335,7 @@ async def txn(
seqid=seqid,
multi=multi,
kbid=kbid,
rid=uuid,
rid=resource.uuid,
source=message.source,
)
logger.warning("This message did not modify the resource")
Expand All @@ -337,7 +355,7 @@ async def txn(
seqid=seqid,
multi=multi,
kbid=kbid,
rid=uuid,
rid=uuid or "unknown",
source=message.source,
)
raise
Expand All @@ -351,7 +369,7 @@ async def txn(
seqid=seqid,
multi=multi,
kbid=kbid,
rid=uuid,
rid=uuid or "unknown",
source=message.source,
)
handled_exception = exc
Expand Down Expand Up @@ -409,9 +427,18 @@ async def index_resource(
txn, kbid=kbid, rid=uuid, shard=shard.shard
)

if shard is not None:
index_message = resource.indexer.brain
await self._maybe_external_index_add_resource(kbid, uuid, index_message)
if shard is None:
raise AttributeError("Shard is not available")

index_message = resource.indexer.brain
external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is not None:
await self.external_index_add_resource(
external_index_manager,
uuid,
index_message,
)
else:
await self.index_node_shard_manager.add_resource(
shard,
index_message,
Expand All @@ -421,39 +448,46 @@ async def index_resource(
source=source,
)

await pgcatalog_update(txn, kbid, resource)
else:
raise AttributeError("Shard is not available")
def skip_external_index(self, kbid: str, provider_type: str) -> bool:
"""
This is a safety measure to skip external indexing in case that the external index provider is not working.
As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers
to index to external indexes.
"""
return has_feature(
const.Features.SKIP_EXTERNAL_INDEX,
context={"kbid": kbid, "provider": provider_type},
default=False,
)

async def _maybe_external_index_delete_resource(self, kbid: str, resource_uuid: str):
external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is None:
async def external_index_delete_resource(
self, external_index_manager: ExternalIndexManager, resource_uuid: str
):
kbid = external_index_manager.kbid
provider_type = external_index_manager.type.value
if self.skip_external_index(kbid, provider_type):
logger.warning(
"Skipping external index delete resource",
extra={
"kbid": kbid,
"rid": resource_uuid,
"provider": provider_type,
},
)
return
await external_index_manager.delete_resource(resource_uuid=resource_uuid)

async def _maybe_external_index_add_resource(
async def external_index_add_resource(
self,
kbid: str,
external_index_manager: ExternalIndexManager,
resource_uuid: str,
index_message: PBBrainResource,
):
if not has_vectors_operation(index_message):
return

external_index_manager = await get_external_index_manager(kbid=kbid)
if external_index_manager is None:
# No external index manager, nothing to do
return

kbid = external_index_manager.kbid
provider_type = external_index_manager.type.value
if has_feature(
const.Features.SKIP_EXTERNAL_INDEX,
context={"kbid": kbid, "provider": provider_type},
default=False,
):
# This is a safety measure to skip external indexing in case that the external index provider is not working.
# As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers
# to index to external indexes.
if self.skip_external_index(kbid, provider_type):
logger.warning(
"Skipping external index for resource",
extra={
Expand All @@ -462,10 +496,10 @@ async def _maybe_external_index_add_resource(
"provider": provider_type,
},
)
else:
await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)
return
await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)

async def multi(self, message: writer_pb2.BrokerMessage, seqid: int) -> None:
self.messages.setdefault(message.multiid, []).append(message)
Expand Down
Loading