Skip to content

Commit

Permalink
Add safe-guard for external indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran committed Aug 5, 2024
1 parent a1b9f55 commit 7eb2a38
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from pydantic import BaseModel

from nucliadb.common.external_index_providers.exceptions import ExternalIndexingError
from nucliadb_models.external_index_providers import ExternalIndexProviderType
from nucliadb_protos.knowledgebox_pb2 import (
CreateExternalIndexProviderMetadata,
Expand Down Expand Up @@ -156,7 +157,10 @@ async def index_resource(self, resource_uuid: str, resource_data: Resource) -> N
},
)
with manager_observer({"operation": "index_resource", "provider": self.type.value}):
await self._index_resource(resource_uuid, resource_data)
try:
await self._index_resource(resource_uuid, resource_data)
except Exception as ex:
raise ExternalIndexingError() from ex

async def query(self, request: SearchRequest) -> QueryResults:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def __init__(self, provider: str, message: str):
self.provider = provider
self.message = message
super().__init__(f"{provider} index creation error: {message}")


class ExternalIndexingError(Exception):
"""
Raised when an error occurs while indexing a resource in an external index.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
ExternalIndexProviderType,
StoredExternalIndexProviderMetadata,
)
from nucliadb_utils.utilities import decrypt
from nucliadb_utils.utilities import get_endecryptor


async def get_external_index_manager(kbid: str) -> Optional[ExternalIndexManager]:
Expand All @@ -39,7 +39,7 @@ async def get_external_index_manager(kbid: str) -> Optional[ExternalIndexManager
metadata = await get_external_index_metadata(kbid)
if metadata is None or metadata.type != ExternalIndexProviderType.PINECONE:
return None
api_key = decrypt(metadata.pinecone_config.encrypted_api_key)
api_key = get_endecryptor().decrypt(metadata.pinecone_config.encrypted_api_key)
default_vectorset = await get_default_vectorset_id(kbid)
return PineconeIndexManager(
kbid=kbid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from nucliadb_utils.aiopynecone.exceptions import MetadataTooLargeError, PineconeAPIError
from nucliadb_utils.aiopynecone.models import QueryResponse
from nucliadb_utils.aiopynecone.models import Vector as PineconeVector
from nucliadb_utils.utilities import decrypt, encrypt, get_pinecone
from nucliadb_utils.utilities import get_endecryptor, get_pinecone

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -178,7 +178,7 @@ async def create_indexes(
type=kb_pb2.ExternalIndexProviderType.PINECONE
)
api_key = request.pinecone_config.api_key
metadata.pinecone_config.encrypted_api_key = encrypt(api_key)
metadata.pinecone_config.encrypted_api_key = get_endecryptor().encrypt(api_key)
metadata.pinecone_config.serverless_cloud = request.pinecone_config.serverless_cloud
pinecone = get_pinecone().control_plane(api_key=api_key)
serverless_cloud = to_pinecone_serverless_cloud_payload(request.pinecone_config.serverless_cloud)
Expand Down Expand Up @@ -231,7 +231,7 @@ async def delete_indexes(
kbid: str,
stored: kb_pb2.StoredExternalIndexProviderMetadata,
) -> None:
api_key = decrypt(stored.pinecone_config.encrypted_api_key)
api_key = get_endecryptor().decrypt(stored.pinecone_config.encrypted_api_key)
control_plane = get_pinecone().control_plane(api_key=api_key)
# Delete all indexes stored in the config and passed as parameters
for index_metadata in stored.pinecone_config.indexes.values():
Expand Down
26 changes: 22 additions & 4 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from nucliadb_utils import const
from nucliadb_utils.cache.pubsub import PubSubDriver
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import get_storage
from nucliadb_utils.utilities import get_storage, has_feature

from .pgcatalog import pgcatalog_delete, pgcatalog_update

Expand Down Expand Up @@ -444,9 +444,27 @@ async def _maybe_external_index_add_resource(
# No external index manager, nothing to do
return

await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)
provider_type = external_index_manager.type.value
if has_feature(
const.Features.SKIP_EXTERNAL_INDEX,
context={"kbid": kbid, "provider": provider_type},
default=False,
):
# This is a safety measure to skip external indexing in case that the external index provider is not working.
# As we don't want to block the ingestion pipeline, this is a temporary measure until we implement async consumers
# to index to external indexes.
logger.warning(
"Skipping external index for resource",
extra={
"kbid": kbid,
"rid": resource_uuid,
"provider": provider_type,
},
)
else:
await external_index_manager.index_resource(
resource_uuid=resource_uuid, resource_data=index_message
)
self._clear_external_index_fields(index_message)

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/tests/nucliadb/integration/test_pinecone_kb.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async def test_get_kb(
)
assert resp.status_code == 200, resp.text
config = resp.json()["config"]
assert "external_index_provider" not in config
assert not config.get("external_index_provider")
assert config["configured_external_index_provider"]["type"] == "pinecone"


Expand Down
1 change: 1 addition & 0 deletions nucliadb_utils/src/nucliadb_utils/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ class Features:
PG_CATALOG_READ = "nucliadb_pg_catalog_read"
PG_CATALOG_WRITE = "nucliadb_pg_catalog_write"
VECTORSETS_V0 = "vectorsets_v0_new_kbs_with_multiple_vectorsets"
SKIP_EXTERNAL_INDEX = "nucliadb_skip_external_index"
4 changes: 4 additions & 0 deletions nucliadb_utils/src/nucliadb_utils/featureflagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class Settings(pydantic_settings.BaseSettings):
"rollout": 0,
"variants": {"environment": ["local"]},
},
const.Features.SKIP_EXTERNAL_INDEX: {
"rollout": 0,
"variants": {"environment": ["none"]},
},
}


Expand Down
10 changes: 0 additions & 10 deletions nucliadb_utils/src/nucliadb_utils/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,6 @@ def get_endecryptor() -> EndecryptorUtility:
return util


def encrypt(text: str) -> str:
util = get_endecryptor()
return util.encrypt(text)


def decrypt(text: str) -> str:
util = get_endecryptor()
return util.decrypt(text)


def get_pinecone() -> PineconeSession:
util = get_utility(Utility.PINECONE_SESSION)
if util is not None:
Expand Down

0 comments on commit 7eb2a38

Please sign in to comment.