Skip to content

Commit

Permalink
Vectorsets with pinecone (#2346)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jul 26, 2024
1 parent 25a84b5 commit 78f4d7c
Show file tree
Hide file tree
Showing 16 changed files with 657 additions and 251 deletions.
7 changes: 7 additions & 0 deletions nucliadb/src/nucliadb/common/external_index_providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ class ExternalIndexManager(abc.ABC, metaclass=abc.ABCMeta):
def __init__(self, kbid: str):
self.kbid = kbid

@classmethod
def get_index_name(cls, kbid: str, vectorset_id: str) -> str: # pragma: no cover
"""
Returns the name of the index in the external index provider.
"""
raise NotImplementedError()

async def delete_resource(self, resource_uuid: str) -> None:
"""
Deletes a resource from the external index provider.
Expand Down
13 changes: 8 additions & 5 deletions nucliadb/src/nucliadb/common/external_index_providers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@


async def get_external_index_manager(kbid: str) -> Optional[ExternalIndexManager]:
"""
Returns an ExternalIndexManager for the given kbid
"""
metadata = await get_external_index_metadata(kbid)
if metadata is None or metadata.type != ExternalIndexProviderType.PINECONE:
return None

encrypted_api_key = metadata.pinecone_config.encrypted_api_key
endecryptor = get_endecryptor()
api_key = endecryptor.decrypt(encrypted_api_key)
main_index_name = f"{kbid}--default"
if main_index_name not in metadata.pinecone_config.index_hosts: # pragma: no cover
raise KeyError(f"Host not found for main index '{main_index_name}' in Pinecone configuration")
main_index_host = metadata.pinecone_config.index_hosts[main_index_name]
index_hosts: dict[str, str] = {}
for index_name, index_metadata in metadata.pinecone_config.indexes.items():
index_hosts[index_name] = index_metadata.index_host
return PineconeIndexManager(
kbid=kbid,
api_key=api_key,
index_host=main_index_host,
index_hosts=index_hosts,
upsert_parallelism=settings.pinecone_upsert_parallelism,
delete_parallelism=settings.pinecone_delete_parallelism,
upsert_timeout=settings.pinecone_upsert_timeout,
Expand Down
343 changes: 254 additions & 89 deletions nucliadb/src/nucliadb/common/external_index_providers/pinecone.py

Large diffs are not rendered by default.

110 changes: 72 additions & 38 deletions nucliadb/src/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from typing import AsyncGenerator, Optional, Sequence, cast
Expand All @@ -36,6 +37,7 @@
KB_RESOURCE_SLUG_BASE,
)
from nucliadb.common.external_index_providers.exceptions import ExternalIndexCreationError
from nucliadb.common.external_index_providers.pinecone import PineconeIndexManager
from nucliadb.common.maindb.driver import Driver, Transaction
from nucliadb.ingest import SERVICE_NAME, logger
from nucliadb.ingest.orm.exceptions import (
Expand Down Expand Up @@ -86,6 +88,12 @@
KB_VECTORSET_TO_DELETE = f"{KB_VECTORSET_TO_DELETE_BASE}/{{kbid}}/{{vectorset}}"


@dataclass
class VectorsetExternalIndex:
vectorset_id: str
dimension: int


class KnowledgeBox:
def __init__(self, txn: Transaction, storage: Storage, kbid: str):
self.txn = txn
Expand Down Expand Up @@ -166,30 +174,30 @@ async def create(
)
kb_shards.similarity = semantic_model.similarity_function
kb_shards.model.CopyFrom(semantic_model)
stored_external_index_provider = await cls._maybe_create_external_index(
stored_external_index_provider = await cls._maybe_create_external_indexes(
kbid,
request=external_index_provider,
vector_dimension=semantic_model.vector_dimension,
indexes=[
VectorsetExternalIndex(
vectorset_id="default", dimension=semantic_model.vector_dimension
)
],
)
rollback_ops.append(
partial(cls._maybe_delete_external_index, kbid, stored_external_index_provider)
partial(cls._maybe_delete_external_indexes, kbid, stored_external_index_provider)
)
else:
stored_external_index_provider = StoredExternalIndexProviderMetadata(
type=external_index_provider.type
)
if external_index_provider.type != ExternalIndexProviderType.UNSET:
raise KnowledgeBoxCreationError(
"External index provider is only supported with a single semantic model for now"
)

vs_external_indexes = []
for vectorset_id, semantic_model in semantic_models.items(): # type: ignore
# if this KB uses a matryoshka model, we can choose a different
# dimension
if len(semantic_model.matryoshka_dimensions) > 0:
dimension = choose_matryoshka_dimension(semantic_model.matryoshka_dimensions)
else:
dimension = semantic_model.vector_dimension
vs_external_indexes.append(
VectorsetExternalIndex(vectorset_id=vectorset_id, dimension=dimension)
)
vectorset_config = knowledgebox_pb2.VectorSetConfig(
vectorset_id=vectorset_id,
vectorset_index_config=nodewriter_pb2.VectorIndexConfig(
Expand All @@ -202,6 +210,19 @@ async def create(
matryoshka_dimensions=semantic_model.matryoshka_dimensions,
)
await datamanagers.vectorsets.set(txn, kbid=kbid, config=vectorset_config)

stored_external_index_provider = await cls._maybe_create_external_indexes(
kbid, request=external_index_provider, indexes=vs_external_indexes
)
rollback_ops.append(
partial(
cls._maybe_delete_external_indexes,
kbid,
stored_external_index_provider,
vs_external_indexes,
)
)

config = KnowledgeBoxConfig(
title=title,
description=description,
Expand Down Expand Up @@ -306,7 +327,7 @@ async def delete(cls, driver: Driver, kbid: str):
await txn.commit()

if kb_config is not None:
await cls._maybe_delete_external_index(kbid, kb_config.external_index_provider)
await cls._maybe_delete_external_indexes(kbid, kb_config.external_index_provider)

usage_utility = get_usage_utility()
if usage_utility is not None:
Expand Down Expand Up @@ -501,55 +522,68 @@ async def delete_vectorset(self, vectorset_id: str):
await shard_manager.delete_vectorset(self.kbid, vectorset_id)

@classmethod
async def _maybe_create_external_index(
async def _maybe_create_external_indexes(
cls,
kbid: str,
request: CreateExternalIndexProviderMetadata,
vector_dimension: int,
indexes: list[VectorsetExternalIndex],
) -> StoredExternalIndexProviderMetadata:
metadata = StoredExternalIndexProviderMetadata(type=request.type)
if request.type != ExternalIndexProviderType.PINECONE:
# Only pinecone is supported for now
return metadata

index_name = f"{kbid}--default"
logger.info(
"Creating pincone index",
extra={"kbid": kbid, "index_name": index_name, "vector_dimension": vector_dimension},
)
api_key = request.pinecone_config.api_key
pinecone = get_pinecone().control_plane(api_key=api_key)
try:
index_host = await pinecone.create_index(
name=index_name,
dimension=vector_dimension,
)
except PineconeAPIError as exc:
raise ExternalIndexCreationError("pinecone", exc.message) from exc
endecryptor = get_endecryptor()
encrypted_api_key = endecryptor.encrypt(api_key)
metadata.pinecone_config.encrypted_api_key = encrypted_api_key
metadata.pinecone_config.index_hosts[index_name] = index_host
pinecone = get_pinecone().control_plane(api_key=api_key)
for index in indexes:
index_name = PineconeIndexManager.get_index_name(kbid, index.vectorset_id)
logger.info(
"Creating pincone index",
extra={"kbid": kbid, "index_name": index_name, "vector_dimension": index.dimension},
)
try:
index_host = await pinecone.create_index(
name=index_name,
dimension=index.dimension,
)
except PineconeAPIError as exc:
raise ExternalIndexCreationError("pinecone", exc.message) from exc
metadata.pinecone_config.indexes[index_name].index_host = index_host
metadata.pinecone_config.indexes[index_name].vector_dimension = index.dimension
return metadata

@classmethod
async def _maybe_delete_external_index(
cls, kbid: str, external_index_provider: StoredExternalIndexProviderMetadata
async def _maybe_delete_external_indexes(
cls,
kbid: str,
external_index_provider: StoredExternalIndexProviderMetadata,
created_external_indexes: Optional[list[VectorsetExternalIndex]] = None,
):
if external_index_provider.type != ExternalIndexProviderType.PINECONE:
# Only pinecone is supported for now
return

index_name = f"{kbid}--default"
logger.info("Deleting pincone index", extra={"kbid": kbid, "index_name": index_name})
encrypted_api_key = external_index_provider.pinecone_config.encrypted_api_key
endecryptor = get_endecryptor()
api_key = endecryptor.decrypt(encrypted_api_key)
pinecone = get_pinecone().control_plane(api_key=api_key)
try:
await pinecone.delete_index(name=index_name)
except Exception:
logger.exception(
"Error deleting pinecone index", extra={"kbid": kbid, "index_name": index_name}
)

index_names_to_delete: set[str] = set()
index_names_to_delete.update(
{index_name for index_name in external_index_provider.pinecone_config.indexes.keys()}
)
index_names_to_delete.update({ei.vectorset_id for ei in created_external_indexes or []})
for index_name in index_names_to_delete:
logger.info("Deleting pincone index", extra={"kbid": kbid, "index_name": index_name})
try:
await pinecone.delete_index(name=index_name)
except Exception:
logger.exception(
"Error deleting pinecone index", extra={"kbid": kbid, "index_name": index_name}
)


def release_channel_for_kb(
Expand Down
1 change: 0 additions & 1 deletion nucliadb/src/nucliadb/ingest/service/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ async def NewKnowledgeBoxV2(
return writer_pb2.NewKnowledgeBoxV2Response(
status=KnowledgeBoxResponseStatus.ERROR,
)

# Hosted KBs are created through backend endpoints. We assume learning
# configuration has been already created for it and we are given the
# model metadata in the request
Expand Down
Loading

0 comments on commit 78f4d7c

Please sign in to comment.