From 0ba9a4d34a3ce33b7e70779269128bda2e87a615 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 16 Jul 2024 15:53:15 +0200 Subject: [PATCH 01/18] Move predict models to internal module --- .../src/nucliadb_models/internal/predict.py | 65 +++++++++++++++++++ nucliadb_models/src/nucliadb_models/search.py | 28 -------- 2 files changed, 65 insertions(+), 28 deletions(-) create mode 100644 nucliadb_models/src/nucliadb_models/internal/predict.py diff --git a/nucliadb_models/src/nucliadb_models/internal/predict.py b/nucliadb_models/src/nucliadb_models/internal/predict.py new file mode 100644 index 0000000000..92c79d9391 --- /dev/null +++ b/nucliadb_models/src/nucliadb_models/internal/predict.py @@ -0,0 +1,65 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +""" +Models for Predict API v1. + +ATENTION! Keep these models in sync with models on Predict API +""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class SentenceSearch(BaseModel): + data: List[float] = Field(deprecated=True, default_factory=list) + vectors: dict[str, List[float]] = Field(description="Sentence vectors for each semantic model") + time: float = Field(deprecated=True) + timings: dict[str, float] = Field( + description="Time taken to compute the sentence vector for each semantic model" + ) + + +class Ner(BaseModel): + text: str + ner: str + start: int + end: int + + +class TokenSearch(BaseModel): + tokens: List[Ner] = [] + time: float + input_tokens: int = 0 + + +class QueryInfo(BaseModel): + language: str + stop_words: List[str] + semantic_threshold: float = Field(deprecated=True) + semantic_thresholds: dict[str, float] = Field( + description="Semantic threshold for each semantic model" + ) + visual_llm: bool + max_context: int + entities: Optional[TokenSearch] + sentence: Optional[SentenceSearch] + query: str diff --git a/nucliadb_models/src/nucliadb_models/search.py b/nucliadb_models/src/nucliadb_models/search.py index 479f4f4701..fe6cc83e8e 100644 --- a/nucliadb_models/src/nucliadb_models/search.py +++ b/nucliadb_models/src/nucliadb_models/search.py @@ -284,34 +284,6 @@ class EntitySubgraph(BaseModel): # path: List[DirectionalRelation] -class SentenceSearch(BaseModel): - data: List[float] = [] - time: float - - -class Ner(BaseModel): - text: str - ner: str - start: int - end: int - - -class TokenSearch(BaseModel): - tokens: List[Ner] = [] - time: float - - -class QueryInfo(BaseModel): - language: Optional[str] = None - stop_words: List[str] = [] - semantic_threshold: Optional[float] = None - visual_llm: bool - max_context: int - entities: TokenSearch - sentence: SentenceSearch - query: str - - class Relations(BaseModel): entities: Dict[str, EntitySubgraph] # TODO: implement in the next iteration of knowledge graph search From c36a6ba42ed8ce46954ffadb601b48e37a0e3cbe Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 16 Jul 2024 15:53:36 +0200 Subject: [PATCH 02/18] Add feature flag for vectorsets v0 --- nucliadb_utils/src/nucliadb_utils/const.py | 1 + nucliadb_utils/src/nucliadb_utils/featureflagging.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/nucliadb_utils/src/nucliadb_utils/const.py b/nucliadb_utils/src/nucliadb_utils/const.py index 3bfaf79442..89863e2ead 100644 --- a/nucliadb_utils/src/nucliadb_utils/const.py +++ b/nucliadb_utils/src/nucliadb_utils/const.py @@ -81,3 +81,4 @@ class Features: FIND_MERGE_ORDER_FIX = "nucliadb_find_merge_order_fix" PG_CATALOG_READ = "nucliadb_pg_catalog_read" PG_CATALOG_WRITE = "nucliadb_pg_catalog_write" + VECTORSETS_V0 = "vectorsets_v0_new_kbs_with_multiple_vectorsets" diff --git a/nucliadb_utils/src/nucliadb_utils/featureflagging.py b/nucliadb_utils/src/nucliadb_utils/featureflagging.py index 9bbb2aac62..0eab147c2b 100644 --- a/nucliadb_utils/src/nucliadb_utils/featureflagging.py +++ b/nucliadb_utils/src/nucliadb_utils/featureflagging.py @@ -73,6 +73,10 @@ class Settings(pydantic_settings.BaseSettings): "rollout": 0, "variants": {"environment": ["local"]}, }, + const.Features.VECTORSETS_V0: { + "rollout": 0, + "variants": {"environment": ["local"]}, + }, } From cca55eeb559c6baf6253debdd31d74722878a864 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 23 Jul 2024 15:43:57 +0200 Subject: [PATCH 03/18] Fully bw/c predict models with population of new fields --- .../src/nucliadb_models/internal/predict.py | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/nucliadb_models/src/nucliadb_models/internal/predict.py b/nucliadb_models/src/nucliadb_models/internal/predict.py index 92c79d9391..51b8dbf7e7 100644 --- a/nucliadb_models/src/nucliadb_models/internal/predict.py +++ b/nucliadb_models/src/nucliadb_models/internal/predict.py @@ -26,17 +26,28 @@ from typing import List, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator class SentenceSearch(BaseModel): data: List[float] = Field(deprecated=True, default_factory=list) - vectors: dict[str, List[float]] = Field(description="Sentence vectors for each semantic model") + vectors: dict[str, List[float]] = Field( + default_factory=dict, description="Sentence vectors for each semantic model" + ) time: float = Field(deprecated=True) timings: dict[str, float] = Field( - description="Time taken to compute the sentence vector for each semantic model" + default_factory=dict, + description="Time taken to compute the sentence vector for each semantic model", ) + @model_validator(mode="after") + def keep_backwards_compatibility(self): + if len(self.vectors) == 0: + self.vectors["__default__"] = self.data + if len(self.timings) == 0: + self.timings["__default__"] = self.time + return self + class Ner(BaseModel): text: str @@ -52,14 +63,20 @@ class TokenSearch(BaseModel): class QueryInfo(BaseModel): - language: str - stop_words: List[str] + language: Optional[str] + stop_words: List[str] = Field(default_factory=list) semantic_threshold: float = Field(deprecated=True) semantic_thresholds: dict[str, float] = Field( - description="Semantic threshold for each semantic model" + default_factory=dict, description="Semantic threshold for each semantic model" ) visual_llm: bool max_context: int entities: Optional[TokenSearch] sentence: Optional[SentenceSearch] query: str + + @model_validator(mode="after") + def keep_backwards_compatibiliy(self): + if len(self.semantic_thresholds) == 0: + self.semantic_thresholds["__default__"] = self.semantic_threshold + return self From 607a720d96f0c813451b9c7201c28fb7662209e5 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Wed, 24 Jul 2024 15:57:25 +0200 Subject: [PATCH 04/18] Less bw/c predict models (will be deployed before) --- .../src/nucliadb_models/internal/predict.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/nucliadb_models/src/nucliadb_models/internal/predict.py b/nucliadb_models/src/nucliadb_models/internal/predict.py index 51b8dbf7e7..6a1f312536 100644 --- a/nucliadb_models/src/nucliadb_models/internal/predict.py +++ b/nucliadb_models/src/nucliadb_models/internal/predict.py @@ -26,7 +26,7 @@ from typing import List, Optional -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field class SentenceSearch(BaseModel): @@ -40,14 +40,6 @@ class SentenceSearch(BaseModel): description="Time taken to compute the sentence vector for each semantic model", ) - @model_validator(mode="after") - def keep_backwards_compatibility(self): - if len(self.vectors) == 0: - self.vectors["__default__"] = self.data - if len(self.timings) == 0: - self.timings["__default__"] = self.time - return self - class Ner(BaseModel): text: str @@ -74,9 +66,3 @@ class QueryInfo(BaseModel): entities: Optional[TokenSearch] sentence: Optional[SentenceSearch] query: str - - @model_validator(mode="after") - def keep_backwards_compatibiliy(self): - if len(self.semantic_thresholds) == 0: - self.semantic_thresholds["__default__"] = self.semantic_threshold - return self From 4f31a2a56c6caa0a84648f80b78164d8d294530f Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Thu, 25 Jul 2024 08:55:46 +0200 Subject: [PATCH 05/18] Change predict model imports --- nucliadb/src/nucliadb/search/predict.py | 5 +---- nucliadb/src/nucliadb/search/search/query.py | 2 +- .../integration/search/test_autofilters.py | 19 +++++++++++++++++-- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/nucliadb/src/nucliadb/search/predict.py b/nucliadb/src/nucliadb/search/predict.py index b6e7e5e233..207b2debf1 100644 --- a/nucliadb/src/nucliadb/search/predict.py +++ b/nucliadb/src/nucliadb/search/predict.py @@ -29,17 +29,14 @@ from nucliadb.search import logger from nucliadb.tests.vectors import Q, Qm2023 +from nucliadb_models.internal.predict import Ner, QueryInfo, SentenceSearch, TokenSearch from nucliadb_models.search import ( ChatModel, FeedbackRequest, - Ner, - QueryInfo, RephraseModel, - SentenceSearch, SummarizedResource, SummarizedResponse, SummarizeModel, - TokenSearch, ) from nucliadb_protos.utils_pb2 import RelationNode from nucliadb_telemetry import errors, metrics diff --git a/nucliadb/src/nucliadb/search/search/query.py b/nucliadb/src/nucliadb/search/search/query.py index cb1d78d3f2..abb4bfd7b2 100644 --- a/nucliadb/src/nucliadb/search/search/query.py +++ b/nucliadb/src/nucliadb/search/search/query.py @@ -41,13 +41,13 @@ query_parse_dependency_observer, ) from nucliadb.search.utilities import get_predict +from nucliadb_models.internal.predict import QueryInfo from nucliadb_models.labels import translate_system_to_alias_label from nucliadb_models.metadata import ResourceProcessingStatus from nucliadb_models.search import ( Filter, MaxTokens, MinScore, - QueryInfo, SearchOptions, SortField, SortFieldMap, diff --git a/nucliadb/tests/nucliadb/integration/search/test_autofilters.py b/nucliadb/tests/nucliadb/integration/search/test_autofilters.py index 73684ac45b..477a583718 100644 --- a/nucliadb/tests/nucliadb/integration/search/test_autofilters.py +++ b/nucliadb/tests/nucliadb/integration/search/test_autofilters.py @@ -23,7 +23,7 @@ from httpx import AsyncClient from nucliadb.tests.vectors import Q -from nucliadb_models.search import Ner, QueryInfo, SentenceSearch, TokenSearch +from nucliadb_models.internal.predict import Ner, QueryInfo, SentenceSearch, TokenSearch from nucliadb_utils.utilities import Utility, set_utility @@ -40,6 +40,12 @@ async def test_autofilters_are_returned( predict_mock.query = AsyncMock( return_value=QueryInfo( + language="en", + stop_words=[], + semantic_threshold=0.7, + semantic_thresholds={ + "my-semantic-model": 0.7, + }, entities=TokenSearch( tokens=[ Ner(text="Newton", ner="scientist", start=0, end=1), @@ -47,7 +53,16 @@ async def test_autofilters_are_returned( ], time=0.1, ), - sentence=SentenceSearch(data=Q, time=0.1), + sentence=SentenceSearch( + data=Q, + vectors={ + "my-semantic-model": Q, + }, + time=0.1, + timings={ + "my-semantic-model": 0.1, + }, + ), visual_llm=False, max_context=10000, query="What relates Newton and Becquer?", From 53d1dbb27d0a24aea6870dab674765691de87e76 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Thu, 25 Jul 2024 16:19:22 +0200 Subject: [PATCH 06/18] Default vectorsets return the first if defined --- .../common/datamanagers/vectorsets.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py b/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py index 73642b75cd..efe54f4c3b 100644 --- a/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py +++ b/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py @@ -48,24 +48,30 @@ async def get_default_vectorset( ) -> knowledgebox_pb2.VectorSetConfig: from . import kb - vectorset_id = "__default__" - semantic_model = await kb.get_model_metadata(txn, kbid=kbid) - vector_dimension = semantic_model.vector_dimension - similarity = semantic_model.similarity_function - matryoshka_dimensions = semantic_model.matryoshka_dimensions - normalize_vectors = len(matryoshka_dimensions) > 0 - - return knowledgebox_pb2.VectorSetConfig( - vectorset_id=vectorset_id, - vectorset_index_config=nodewriter_pb2.VectorIndexConfig( - vector_dimension=vector_dimension, - similarity=similarity, - # we only support this for now - vector_type=nodewriter_pb2.VectorType.DENSE_F32, - normalize_vectors=normalize_vectors, - ), - matryoshka_dimensions=matryoshka_dimensions, - ) + async for _, vectorset in iter(txn, kbid=kbid): + # KB with populated vectorsets, the default is the first we find + return vectorset + else: + # KB without populated vectorsets, we retrieve information from other + # maindb keys and compose a "default" vectorset + vectorset_id = "__default__" + semantic_model = await kb.get_model_metadata(txn, kbid=kbid) + vector_dimension = semantic_model.vector_dimension + similarity = semantic_model.similarity_function + matryoshka_dimensions = semantic_model.matryoshka_dimensions + normalize_vectors = len(matryoshka_dimensions) > 0 + + return knowledgebox_pb2.VectorSetConfig( + vectorset_id=vectorset_id, + vectorset_index_config=nodewriter_pb2.VectorIndexConfig( + vector_dimension=vector_dimension, + similarity=similarity, + # we only support this for now + vector_type=nodewriter_pb2.VectorType.DENSE_F32, + normalize_vectors=normalize_vectors, + ), + matryoshka_dimensions=matryoshka_dimensions, + ) async def exists(txn, *, kbid: str, vectorset_id: str) -> bool: From 74c77da8973717e8280ba02e8c67b52a4c09f88a Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Thu, 25 Jul 2024 16:20:08 +0200 Subject: [PATCH 07/18] Select and forward vectorset to predict API --- nucliadb/src/nucliadb/search/predict.py | 37 +++++++++-- nucliadb/src/nucliadb/search/search/query.py | 64 ++++++++++++++----- .../nucliadb/integration/test_vectorsets.py | 2 +- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/nucliadb/src/nucliadb/search/predict.py b/nucliadb/src/nucliadb/search/predict.py index 207b2debf1..83a55e0bc6 100644 --- a/nucliadb/src/nucliadb/search/predict.py +++ b/nucliadb/src/nucliadb/search/predict.py @@ -40,7 +40,7 @@ ) from nucliadb_protos.utils_pb2 import RelationNode from nucliadb_telemetry import errors, metrics -from nucliadb_utils import const +from nucliadb_utils.const import Features from nucliadb_utils.exceptions import LimitsExceededError from nucliadb_utils.settings import nuclia_settings from nucliadb_utils.utilities import Utility, has_feature, set_utility @@ -226,7 +226,7 @@ def get_predict_url(self, endpoint: str, kbid: str) -> str: # /api/v1/predict/rephrase/{kbid} return f"{self.public_url}{PUBLIC_PREDICT}{endpoint}/{kbid}" else: - if has_feature(const.Features.VERSIONED_PRIVATE_PREDICT): + if has_feature(Features.VERSIONED_PRIVATE_PREDICT): return f"{self.cluster_url}{VERSIONED_PRIVATE_PREDICT}{endpoint}" else: return f"{self.cluster_url}{PRIVATE_PREDICT}{endpoint}" @@ -375,6 +375,7 @@ async def query( self, kbid: str, sentence: str, + semantic_model: Optional[str] = None, generative_model: Optional[str] = None, rephrase: Optional[bool] = False, ) -> QueryInfo: @@ -385,10 +386,13 @@ async def query( logger.warning(error) raise SendToPredictError(error) - params = { + params: dict[str, Any] = { "text": sentence, "rephrase": str(rephrase), } + if has_feature(Features.VECTORSETS_V0, context={"kbid": kbid}): + if semantic_model is not None: + params["semantic_models"] = [semantic_model] if generative_model is not None: params["generative_model"] = generative_model @@ -516,6 +520,7 @@ async def query( self, kbid: str, sentence: str, + semantic_model: Optional[str] = None, generative_model: Optional[str] = None, rephrase: Optional[bool] = False, ) -> QueryInfo: @@ -525,10 +530,20 @@ async def query( language="en", stop_words=[], semantic_threshold=0.7, + semantic_thresholds={semantic_model or "": 0.7}, visual_llm=True, max_context=self.max_context, entities=TokenSearch(tokens=[Ner(text="text", ner="PERSON", start=0, end=2)], time=0.0), - sentence=SentenceSearch(data=Qm2023, time=0.0), + sentence=SentenceSearch( + data=Qm2023, + vectors={ + semantic_model or "": Qm2023, + }, + time=0.0, + timings={ + semantic_model or "": 0.0, + }, + ), query=sentence, ) else: @@ -536,10 +551,22 @@ async def query( language="en", stop_words=[], semantic_threshold=0.7, + semantic_thresholds={ + semantic_model or "": 0.7, + }, visual_llm=True, max_context=self.max_context, entities=TokenSearch(tokens=[Ner(text="text", ner="PERSON", start=0, end=2)], time=0.0), - sentence=SentenceSearch(data=Q, time=0.0), + sentence=SentenceSearch( + data=Q, + vectors={ + semantic_model or "": Q, + }, + time=0.0, + timings={ + semantic_model or "": 0.0, + }, + ), query=sentence, ) diff --git a/nucliadb/src/nucliadb/search/search/query.py b/nucliadb/src/nucliadb/search/search/query.py index abb4bfd7b2..f948ad1b1d 100644 --- a/nucliadb/src/nucliadb/search/search/query.py +++ b/nucliadb/src/nucliadb/search/search/query.py @@ -59,6 +59,8 @@ from nucliadb_models.security import RequestSecurity from nucliadb_protos import knowledgebox_pb2, nodereader_pb2, utils_pb2 from nucliadb_protos.noderesources_pb2 import Resource +from nucliadb_utils.const import Features +from nucliadb_utils.utilities import has_feature from .exceptions import InvalidQueryError @@ -134,7 +136,12 @@ def __init__( self.range_modification_end = range_modification_end self.fields = fields or [] self.user_vector = user_vector - self.vectorset = vectorset + # until vectorsets is properly implemented, we'll have this parameter + # under FF and always set None for anyone else + if has_feature(Features.VECTORSETS_V0, context={"kbid": kbid}): + self.vectorset = vectorset + else: + self.vectorset = None self.with_duplicates = with_duplicates self.with_status = with_status self.with_synonyms = with_synonyms @@ -160,7 +167,9 @@ def has_relations_search(self) -> bool: def _get_query_information(self) -> Awaitable[QueryInfo]: if self._query_information_task is None: # pragma: no cover self._query_information_task = asyncio.create_task( - query_information(self.kbid, self.query, self.generative_model, self.rephrase) + query_information( + self.kbid, self.query, self.vectorset, self.generative_model, self.rephrase + ) ) return self._query_information_task @@ -237,6 +246,8 @@ async def parse(self) -> tuple[nodereader_pb2.SearchRequest, bool, list[str]]: request.with_duplicates = self.with_duplicates self.parse_sorting(request) + if self.has_vector_search: + await self.select_vectorset(request) await self._schedule_dependency_tasks() @@ -367,6 +378,31 @@ def parse_paragraph_search(self, request: nodereader_pb2.SearchRequest) -> None: request.paragraph = True node_features.inc({"type": "paragraphs"}) + async def select_vectorset(self, request: nodereader_pb2.SearchRequest): + """Validate the vectorset parameter and override it with a default if + needed. + """ + if not has_feature(Features.VECTORSETS_V0, context={"kbid": self.kbid}): + return + if self.vectorset: + # validate vectorset + async with datamanagers.with_ro_transaction() as txn: + if not await datamanagers.vectorsets.exists( + txn, kbid=self.kbid, vectorset_id=self.vectorset + ): + raise InvalidQueryError( + "vectorset", + f"Vectorset {self.vectorset} doesn't exist in you Knowledge Box", + ) + request.vectorset = self.vectorset + else: + # no vectorset specified, get the default one + async with datamanagers.with_ro_transaction() as txn: + default_vectorset = await datamanagers.vectorsets.get_default_vectorset( + txn, kbid=self.kbid + ) + request.vectorset = default_vectorset.vectorset_id + async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bool: if not self.has_vector_search: return False @@ -383,23 +419,20 @@ async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bo incomplete = True else: if query_info and query_info.sentence: - query_vector = query_info.sentence.data + if self.vectorset and has_feature( + Features.VECTORSETS_V0, context={"kbid": self.kbid} + ): + if self.vectorset in query_info.sentence.vectors: + query_vector = query_info.sentence.vectors[self.vectorset] + else: + incomplete = True + else: + query_vector = query_info.sentence.data else: incomplete = True else: query_vector = self.user_vector - if self.vectorset: - async with get_driver().transaction(read_only=True) as txn: - if not await datamanagers.vectorsets.exists( - txn, kbid=self.kbid, vectorset_id=self.vectorset - ): - raise InvalidQueryError( - "vectorset", - f"Vectorset {self.vectorset} doesn't exist in you Knowledge Box", - ) - request.vectorset = self.vectorset - if query_vector is not None: matryoshka_dimension = await self._get_matryoshka_dimension() if matryoshka_dimension is not None: @@ -554,11 +587,12 @@ async def paragraph_query_to_pb( async def query_information( kbid: str, query: str, + semantic_model: Optional[str], generative_model: Optional[str] = None, rephrase: bool = False, ) -> QueryInfo: predict = get_predict() - return await predict.query(kbid, query, generative_model, rephrase) + return await predict.query(kbid, query, semantic_model, generative_model, rephrase) @query_parse_dependency_observer.wrap({"type": "detect_entities"}) diff --git a/nucliadb/tests/nucliadb/integration/test_vectorsets.py b/nucliadb/tests/nucliadb/integration/test_vectorsets.py index d9767cecbd..2b1047592d 100644 --- a/nucliadb/tests/nucliadb/integration/test_vectorsets.py +++ b/nucliadb/tests/nucliadb/integration/test_vectorsets.py @@ -108,7 +108,7 @@ async def test_vectorsets( @pytest.mark.parametrize( "vectorset,expected", - [(None, ""), ("", ""), ("myvectorset", "myvectorset")], + [(None, "__default__"), ("", "__default__"), ("myvectorset", "myvectorset")], ) @pytest.mark.asyncio @pytest.mark.parametrize("knowledgebox", ("EXPERIMENTAL", "STABLE"), indirect=True) From 4f598672f76915c8c11e58288000bcbb416dd7aa Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Fri, 26 Jul 2024 12:20:11 +0200 Subject: [PATCH 08/18] Bw/c import in models --- nucliadb_models/src/nucliadb_models/search.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nucliadb_models/src/nucliadb_models/search.py b/nucliadb_models/src/nucliadb_models/search.py index fe6cc83e8e..2a0f9ac1ce 100644 --- a/nucliadb_models/src/nucliadb_models/search.py +++ b/nucliadb_models/src/nucliadb_models/search.py @@ -39,6 +39,9 @@ from nucliadb_protos.writer_pb2 import ShardObject as PBShardObject from nucliadb_protos.writer_pb2 import Shards as PBShards +# Bw/c import to avoid breaking users +from nucliadb_models.internal.predict import Ner, QueryInfo, SentenceSearch, TokenSearch # noqa isort: skip + _T = TypeVar("_T") ANSWER_JSON_SCHEMA_EXAMPLE = { From a41be769360cc3286fbfaba599b45cb035fbf74f Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:08:38 +0200 Subject: [PATCH 09/18] Assume all KBs have vectorset key populated --- .../common/datamanagers/vectorsets.py | 34 +++++-------------- nucliadb/src/nucliadb/search/search/query.py | 14 +++++--- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py b/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py index efe54f4c3b..26bff4c842 100644 --- a/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py +++ b/nucliadb/src/nucliadb/common/datamanagers/vectorsets.py @@ -21,11 +21,15 @@ from nucliadb.common.datamanagers.utils import get_kv_pb from nucliadb.common.maindb.driver import Transaction -from nucliadb_protos import knowledgebox_pb2, nodewriter_pb2 +from nucliadb_protos import knowledgebox_pb2 KB_VECTORSETS = "/kbs/{kbid}/vectorsets" +class BrokenInvariant(Exception): + pass + + async def initialize(txn: Transaction, *, kbid: str): key = KB_VECTORSETS.format(kbid=kbid) await txn.set(key, knowledgebox_pb2.KnowledgeBoxVectorSetsConfig().SerializeToString()) @@ -46,32 +50,12 @@ async def get_default_vectorset( *, kbid: str, ) -> knowledgebox_pb2.VectorSetConfig: - from . import kb - + """XXX: For now, default vectorset is the first on the list, we should + implement an API to let users decide which is their default though + """ async for _, vectorset in iter(txn, kbid=kbid): - # KB with populated vectorsets, the default is the first we find return vectorset - else: - # KB without populated vectorsets, we retrieve information from other - # maindb keys and compose a "default" vectorset - vectorset_id = "__default__" - semantic_model = await kb.get_model_metadata(txn, kbid=kbid) - vector_dimension = semantic_model.vector_dimension - similarity = semantic_model.similarity_function - matryoshka_dimensions = semantic_model.matryoshka_dimensions - normalize_vectors = len(matryoshka_dimensions) > 0 - - return knowledgebox_pb2.VectorSetConfig( - vectorset_id=vectorset_id, - vectorset_index_config=nodewriter_pb2.VectorIndexConfig( - vector_dimension=vector_dimension, - similarity=similarity, - # we only support this for now - vector_type=nodewriter_pb2.VectorType.DENSE_F32, - normalize_vectors=normalize_vectors, - ), - matryoshka_dimensions=matryoshka_dimensions, - ) + raise BrokenInvariant("KB without vectorsets this shouldn't be possible!") async def exists(txn, *, kbid: str, vectorset_id: str) -> bool: diff --git a/nucliadb/src/nucliadb/search/search/query.py b/nucliadb/src/nucliadb/search/search/query.py index f948ad1b1d..7c5f181aab 100644 --- a/nucliadb/src/nucliadb/search/search/query.py +++ b/nucliadb/src/nucliadb/search/search/query.py @@ -25,6 +25,7 @@ from async_lru import alru_cache from nucliadb.common import datamanagers +from nucliadb.common.datamanagers.vectorsets import BrokenInvariant from nucliadb.common.maindb.utils import get_driver from nucliadb.search import logger from nucliadb.search.predict import SendToPredictError, convert_relations @@ -398,10 +399,15 @@ async def select_vectorset(self, request: nodereader_pb2.SearchRequest): else: # no vectorset specified, get the default one async with datamanagers.with_ro_transaction() as txn: - default_vectorset = await datamanagers.vectorsets.get_default_vectorset( - txn, kbid=self.kbid - ) - request.vectorset = default_vectorset.vectorset_id + try: + default_vectorset = await datamanagers.vectorsets.get_default_vectorset( + txn, kbid=self.kbid + ) + except BrokenInvariant as exc: + logger.exception("KB has no default vectorset", extra={"kbid": self.kbid}) + raise InvalidQueryError("vectorset", f"KB has no default vectorset") from exc + else: + request.vectorset = default_vectorset.vectorset_id async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bool: if not self.has_vector_search: From 6d6fba4f3fc0556c233f6b256e28c504bbafb843 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:15:34 +0200 Subject: [PATCH 10/18] Cache default vectorset and use it always with predict --- nucliadb/src/nucliadb/search/search/query.py | 37 +++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/nucliadb/src/nucliadb/search/search/query.py b/nucliadb/src/nucliadb/search/search/query.py index 7c5f181aab..9ca8733112 100644 --- a/nucliadb/src/nucliadb/search/search/query.py +++ b/nucliadb/src/nucliadb/search/search/query.py @@ -167,13 +167,15 @@ def has_relations_search(self) -> bool: def _get_query_information(self) -> Awaitable[QueryInfo]: if self._query_information_task is None: # pragma: no cover - self._query_information_task = asyncio.create_task( - query_information( - self.kbid, self.query, self.vectorset, self.generative_model, self.rephrase - ) - ) + self._query_information_task = asyncio.create_task(self._query_information()) return self._query_information_task + async def _query_information(self) -> QueryInfo: + vectorset = await self.select_vectorset() + return await query_information( + self.kbid, self.query, vectorset, self.generative_model, self.rephrase + ) + def _get_matryoshka_dimension(self) -> Awaitable[Optional[int]]: if self._get_matryoshka_dimension_task is None: self._get_matryoshka_dimension_task = asyncio.create_task( @@ -247,8 +249,6 @@ async def parse(self) -> tuple[nodereader_pb2.SearchRequest, bool, list[str]]: request.with_duplicates = self.with_duplicates self.parse_sorting(request) - if self.has_vector_search: - await self.select_vectorset(request) await self._schedule_dependency_tasks() @@ -379,12 +379,13 @@ def parse_paragraph_search(self, request: nodereader_pb2.SearchRequest) -> None: request.paragraph = True node_features.inc({"type": "paragraphs"}) - async def select_vectorset(self, request: nodereader_pb2.SearchRequest): + @alru_cache(maxsize=1) + async def select_vectorset(self) -> Optional[str]: """Validate the vectorset parameter and override it with a default if needed. """ if not has_feature(Features.VECTORSETS_V0, context={"kbid": self.kbid}): - return + return None if self.vectorset: # validate vectorset async with datamanagers.with_ro_transaction() as txn: @@ -395,7 +396,7 @@ async def select_vectorset(self, request: nodereader_pb2.SearchRequest): "vectorset", f"Vectorset {self.vectorset} doesn't exist in you Knowledge Box", ) - request.vectorset = self.vectorset + return self.vectorset else: # no vectorset specified, get the default one async with datamanagers.with_ro_transaction() as txn: @@ -407,7 +408,8 @@ async def select_vectorset(self, request: nodereader_pb2.SearchRequest): logger.exception("KB has no default vectorset", extra={"kbid": self.kbid}) raise InvalidQueryError("vectorset", f"KB has no default vectorset") from exc else: - request.vectorset = default_vectorset.vectorset_id + return default_vectorset.vectorset_id + return None async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bool: if not self.has_vector_search: @@ -416,6 +418,11 @@ async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bo node_features.inc({"type": "vectors"}) incomplete = False + + vectorset = await self.select_vectorset() + if vectorset is not None: + request.vectorset = vectorset + query_vector = None if self.user_vector is None: try: @@ -425,11 +432,9 @@ async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bo incomplete = True else: if query_info and query_info.sentence: - if self.vectorset and has_feature( - Features.VECTORSETS_V0, context={"kbid": self.kbid} - ): - if self.vectorset in query_info.sentence.vectors: - query_vector = query_info.sentence.vectors[self.vectorset] + if vectorset and has_feature(Features.VECTORSETS_V0, context={"kbid": self.kbid}): + if vectorset in query_info.sentence.vectors: + query_vector = query_info.sentence.vectors[vectorset] else: incomplete = True else: From 99d2d19abcc84983bf64e913b84fbbdb459eb990 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:23:15 +0200 Subject: [PATCH 11/18] Be more relaxed for testing --- nucliadb/src/nucliadb/search/search/query.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nucliadb/src/nucliadb/search/search/query.py b/nucliadb/src/nucliadb/search/search/query.py index 9ca8733112..d04aa76004 100644 --- a/nucliadb/src/nucliadb/search/search/query.py +++ b/nucliadb/src/nucliadb/search/search/query.py @@ -404,9 +404,12 @@ async def select_vectorset(self) -> Optional[str]: default_vectorset = await datamanagers.vectorsets.get_default_vectorset( txn, kbid=self.kbid ) - except BrokenInvariant as exc: - logger.exception("KB has no default vectorset", extra={"kbid": self.kbid}) - raise InvalidQueryError("vectorset", f"KB has no default vectorset") from exc + except BrokenInvariant: + # XXX: fix to avoid tests complaining too much, it should be + # an error at some point though + return None + # logger.exception("KB has no default vectorset", extra={"kbid": self.kbid}) + # raise InvalidQueryError("vectorset", f"KB has no default vectorset") from exc else: return default_vectorset.vectorset_id return None From 77aed0c8ca3ca00fd1724afcacd67e60d7571f05 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:23:34 +0200 Subject: [PATCH 12/18] Widely used KB fixture now uses vectorsets keys :) --- nucliadb/tests/ingest/fixtures.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nucliadb/tests/ingest/fixtures.py b/nucliadb/tests/ingest/fixtures.py index 0f6564523b..411f550f0e 100644 --- a/nucliadb/tests/ingest/fixtures.py +++ b/nucliadb/tests/ingest/fixtures.py @@ -190,7 +190,9 @@ async def knowledgebox_ingest(storage, maindb_driver: Driver, shard_manager, lea model = SemanticModelMetadata( similarity_function=upb.VectorSimilarity.COSINE, vector_dimension=len(V1) ) - await KnowledgeBox.create(maindb_driver, kbid=kbid, slug=kbslug, semantic_model=model) + await KnowledgeBox.create( + maindb_driver, kbid=kbid, slug=kbslug, semantic_models={"my-semantic-model": model} + ) yield kbid From 71a41724bcde4b1de28ff7dcc0ee8e7152ba64f3 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:40:52 +0200 Subject: [PATCH 13/18] Fix vectorset test --- nucliadb/tests/nucliadb/integration/test_vectorsets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nucliadb/tests/nucliadb/integration/test_vectorsets.py b/nucliadb/tests/nucliadb/integration/test_vectorsets.py index 2b1047592d..d9767cecbd 100644 --- a/nucliadb/tests/nucliadb/integration/test_vectorsets.py +++ b/nucliadb/tests/nucliadb/integration/test_vectorsets.py @@ -108,7 +108,7 @@ async def test_vectorsets( @pytest.mark.parametrize( "vectorset,expected", - [(None, "__default__"), ("", "__default__"), ("myvectorset", "myvectorset")], + [(None, ""), ("", ""), ("myvectorset", "myvectorset")], ) @pytest.mark.asyncio @pytest.mark.parametrize("knowledgebox", ("EXPERIMENTAL", "STABLE"), indirect=True) From 21e1f02d674fae3a845fc46b31ae4fa9955905d6 Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Tue, 30 Jul 2024 19:41:14 +0200 Subject: [PATCH 14/18] Add missing __init__ file for nucliadb_models.internal --- .../src/nucliadb_models/internal/__init__.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 nucliadb_models/src/nucliadb_models/internal/__init__.py diff --git a/nucliadb_models/src/nucliadb_models/internal/__init__.py b/nucliadb_models/src/nucliadb_models/internal/__init__.py new file mode 100644 index 0000000000..3b734776ac --- /dev/null +++ b/nucliadb_models/src/nucliadb_models/internal/__init__.py @@ -0,0 +1,19 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# From 5b7379b7211984e44ea1c5cad637670396c6bc3d Mon Sep 17 00:00:00 2001 From: Joan Antoni RE Date: Wed, 31 Jul 2024 14:30:48 +0200 Subject: [PATCH 15/18] Update vectorset tests as now we get a defult one --- .../nucliadb/integration/test_vectorsets.py | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/nucliadb/tests/nucliadb/integration/test_vectorsets.py b/nucliadb/tests/nucliadb/integration/test_vectorsets.py index d9767cecbd..a7c8e2e7a3 100644 --- a/nucliadb/tests/nucliadb/integration/test_vectorsets.py +++ b/nucliadb/tests/nucliadb/integration/test_vectorsets.py @@ -201,11 +201,13 @@ async def query_shard_wrapper( query = (spy, result, None) return result - def predict_query_wrapper(original, dimension): + def predict_query_wrapper(original, dimension: int, vectorset_dimensions: dict[str, int]): @functools.wraps(original) async def inner(*args, **kwargs): query_info = await original(*args, **kwargs) query_info.sentence.data = [1.0] * dimension + for vectorset_id, vectorset_dimension in vectorset_dimensions.items(): + query_info.sentence.vectors[vectorset_id] = [1.0] * vectorset_dimension return query_info return inner @@ -234,7 +236,9 @@ async def inner(*args, **kwargs): ): with ( patch.object( - dummy_predict, "query", side_effect=predict_query_wrapper(dummy_predict.query, 768) + dummy_predict, + "query", + side_effect=predict_query_wrapper(dummy_predict.query, 768, {"model": 768}), ), ): resp = await nucliadb_reader.post( @@ -246,10 +250,12 @@ async def inner(*args, **kwargs): assert resp.status_code == 200 node_search_spy, result, error = query + assert result is not None assert error is None request = node_search_spy.call_args[0][0] - assert request.vectorset == "" + # there's only one model and we get it as the default + assert request.vectorset == "model" assert len(request.vector) == 768 resp = await nucliadb_reader.post( @@ -262,6 +268,7 @@ async def inner(*args, **kwargs): assert resp.status_code == 200 node_search_spy, result, error = query + assert result is not None assert error is None request = node_search_spy.call_args[0][0] @@ -297,7 +304,9 @@ async def inner(*args, **kwargs): ): with ( patch.object( - dummy_predict, "query", side_effect=predict_query_wrapper(dummy_predict.query, 768) + dummy_predict, + "query", + side_effect=predict_query_wrapper(dummy_predict.query, 500, {"model-A": 768}), ), ): resp = await nucliadb_reader.post( @@ -310,6 +319,7 @@ async def inner(*args, **kwargs): assert resp.status_code == 200 node_search_spy, result, error = query + assert result is not None assert error is None request = node_search_spy.call_args[0][0] @@ -318,7 +328,9 @@ async def inner(*args, **kwargs): with ( patch.object( - dummy_predict, "query", side_effect=predict_query_wrapper(dummy_predict.query, 1024) + dummy_predict, + "query", + side_effect=predict_query_wrapper(dummy_predict.query, 500, {"model-B": 1024}), ), ): resp = await nucliadb_reader.post( @@ -331,24 +343,35 @@ async def inner(*args, **kwargs): assert resp.status_code == 200 node_search_spy, result, error = query + assert result is not None assert error is None request = node_search_spy.call_args[0][0] assert request.vectorset == "model-B" assert len(request.vector) == 1024 - resp = await nucliadb_reader.get( - f"/kb/{kbid}/find", - params={ - "query": "foo", - }, - ) - assert resp.status_code == 500 - node_search_spy, result, error = query - request = node_search_spy.call_args[0][0] - assert result is None - assert request.vectorset == "" - assert "Query without vectorset but shard has multiple vector indexes" in str(error) + with ( + patch.object( + dummy_predict, + "query", + side_effect=predict_query_wrapper( + dummy_predict.query, 500, {"model-A": 768, "model-B": 1024} + ), + ), + ): + resp = await nucliadb_reader.get( + f"/kb/{kbid}/find", + params={ + "query": "foo", + }, + ) + assert resp.status_code == 200 + node_search_spy, result, error = query + request = node_search_spy.call_args[0][0] + assert result is not None + assert error is None + # with more than one vectorset, we get the first one + assert request.vectorset == "model-A" @pytest.fixture(scope="function") From 758e7e7a8011ed704b9dd881a33d27efdcf00c5e Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 1 Aug 2024 15:27:51 +0200 Subject: [PATCH 16/18] Make test less flaky --- nucliadb/tests/nucliadb/integration/common/test_locking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nucliadb/tests/nucliadb/integration/common/test_locking.py b/nucliadb/tests/nucliadb/integration/common/test_locking.py index 53c8bf40e5..f4b544dc66 100644 --- a/nucliadb/tests/nucliadb/integration/common/test_locking.py +++ b/nucliadb/tests/nucliadb/integration/common/test_locking.py @@ -72,7 +72,7 @@ async def test_lock(for_seconds: float): tasks = [] for _ in range(5): - tasks.append(asyncio.create_task(test_lock(random.uniform(0, 0.2)))) + tasks.append(asyncio.create_task(test_lock(random.uniform(0.1, 0.2)))) results = await asyncio.gather(*tasks, return_exceptions=True) # Check that 4 out of 5 tasks returned ResourceLocked error From 45fa56744961c67aa75b5050640e8a184149bf48 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 1 Aug 2024 15:38:20 +0200 Subject: [PATCH 17/18] Fix mocks --- nucliadb/tests/search/unit/search/test_query.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nucliadb/tests/search/unit/search/test_query.py b/nucliadb/tests/search/unit/search/test_query.py index d562fe650a..5df01bf0cb 100644 --- a/nucliadb/tests/search/unit/search/test_query.py +++ b/nucliadb/tests/search/unit/search/test_query.py @@ -184,7 +184,8 @@ async def test_query_without_vectorset_nor_matryoshka( "nucliadb.search.search.query.get_matryoshka_dimension_cached", new=AsyncMock(return_value=matryoshka_dimension), ), - patch("nucliadb.search.search.query.get_driver"), + patch("nucliadb.common.datamanagers.utils.get_driver"), + patch("nucliadb.common.datamanagers.vectorsets.get_kv_pb"), ): request, incomplete, _ = await parser.parse() assert not incomplete From 15e695581a865a7bdf051881855080fc077384aa Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 1 Aug 2024 17:03:31 +0200 Subject: [PATCH 18/18] Fallback to __default__ index if there are no other vectorsets --- nucliadb_node/src/shards/shard_reader.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nucliadb_node/src/shards/shard_reader.rs b/nucliadb_node/src/shards/shard_reader.rs index f45ae1990f..4757cfa72d 100644 --- a/nucliadb_node/src/shards/shard_reader.rs +++ b/nucliadb_node/src/shards/shard_reader.rs @@ -695,6 +695,10 @@ impl ShardReader { let reader = vector_readers.get(vectorset); if let Some(reader) = reader { reader.search(request, context) + } else if vector_readers.len() == 1 && vector_readers.contains_key(DEFAULT_VECTORS_INDEX_NAME) { + // Only one vectorset with default name, use it! + // We can remove this once all vectorsets are named (there are no default vectorsets) + vector_readers.get(DEFAULT_VECTORS_INDEX_NAME).unwrap().search(request, context) } else { Err(node_error!("Vectorset '{vectorset}' not found")) }