Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predict is compatible with vectorsets #2370

Merged
merged 19 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions nucliadb/src/nucliadb/common/datamanagers/vectorsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@

from nucliadb.common.datamanagers.utils import get_kv_pb
from nucliadb.common.maindb.driver import Transaction
from nucliadb_protos import knowledgebox_pb2, nodewriter_pb2
from nucliadb_protos import knowledgebox_pb2

KB_VECTORSETS = "/kbs/{kbid}/vectorsets"


class BrokenInvariant(Exception):
pass


async def initialize(txn: Transaction, *, kbid: str):
key = KB_VECTORSETS.format(kbid=kbid)
await txn.set(key, knowledgebox_pb2.KnowledgeBoxVectorSetsConfig().SerializeToString())
Expand All @@ -46,26 +50,12 @@ async def get_default_vectorset(
*,
kbid: str,
) -> knowledgebox_pb2.VectorSetConfig:
from . import kb

vectorset_id = "__default__"
semantic_model = await kb.get_model_metadata(txn, kbid=kbid)
vector_dimension = semantic_model.vector_dimension
similarity = semantic_model.similarity_function
matryoshka_dimensions = semantic_model.matryoshka_dimensions
normalize_vectors = len(matryoshka_dimensions) > 0

return knowledgebox_pb2.VectorSetConfig(
vectorset_id=vectorset_id,
vectorset_index_config=nodewriter_pb2.VectorIndexConfig(
vector_dimension=vector_dimension,
similarity=similarity,
# we only support this for now
vector_type=nodewriter_pb2.VectorType.DENSE_F32,
normalize_vectors=normalize_vectors,
),
matryoshka_dimensions=matryoshka_dimensions,
)
"""XXX: For now, default vectorset is the first on the list, we should
implement an API to let users decide which is their default though
"""
async for _, vectorset in iter(txn, kbid=kbid):
return vectorset
raise BrokenInvariant("KB without vectorsets this shouldn't be possible!")


async def exists(txn, *, kbid: str, vectorset_id: str) -> bool:
Expand Down
42 changes: 33 additions & 9 deletions nucliadb/src/nucliadb/search/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@

from nucliadb.search import logger
from nucliadb.tests.vectors import Q, Qm2023
from nucliadb_models.internal.predict import Ner, QueryInfo, SentenceSearch, TokenSearch
from nucliadb_models.search import (
ChatModel,
FeedbackRequest,
Ner,
QueryInfo,
RephraseModel,
SentenceSearch,
SummarizedResource,
SummarizedResponse,
SummarizeModel,
TokenSearch,
)
from nucliadb_protos.utils_pb2 import RelationNode
from nucliadb_telemetry import errors, metrics
from nucliadb_utils import const
from nucliadb_utils.const import Features
from nucliadb_utils.exceptions import LimitsExceededError
from nucliadb_utils.settings import nuclia_settings
from nucliadb_utils.utilities import Utility, has_feature, set_utility
Expand Down Expand Up @@ -229,7 +226,7 @@ def get_predict_url(self, endpoint: str, kbid: str) -> str:
# /api/v1/predict/rephrase/{kbid}
return f"{self.public_url}{PUBLIC_PREDICT}{endpoint}/{kbid}"
else:
if has_feature(const.Features.VERSIONED_PRIVATE_PREDICT):
if has_feature(Features.VERSIONED_PRIVATE_PREDICT):
return f"{self.cluster_url}{VERSIONED_PRIVATE_PREDICT}{endpoint}"
else:
return f"{self.cluster_url}{PRIVATE_PREDICT}{endpoint}"
Expand Down Expand Up @@ -378,6 +375,7 @@ async def query(
self,
kbid: str,
sentence: str,
semantic_model: Optional[str] = None,
generative_model: Optional[str] = None,
rephrase: Optional[bool] = False,
) -> QueryInfo:
Expand All @@ -388,10 +386,13 @@ async def query(
logger.warning(error)
raise SendToPredictError(error)

params = {
params: dict[str, Any] = {
"text": sentence,
"rephrase": str(rephrase),
}
if has_feature(Features.VECTORSETS_V0, context={"kbid": kbid}):
if semantic_model is not None:
params["semantic_models"] = [semantic_model]
if generative_model is not None:
params["generative_model"] = generative_model

Expand Down Expand Up @@ -519,6 +520,7 @@ async def query(
self,
kbid: str,
sentence: str,
semantic_model: Optional[str] = None,
generative_model: Optional[str] = None,
rephrase: Optional[bool] = False,
) -> QueryInfo:
Expand All @@ -528,21 +530,43 @@ async def query(
language="en",
stop_words=[],
semantic_threshold=0.7,
semantic_thresholds={semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": 0.7},
visual_llm=True,
max_context=self.max_context,
entities=TokenSearch(tokens=[Ner(text="text", ner="PERSON", start=0, end=2)], time=0.0),
sentence=SentenceSearch(data=Qm2023, time=0.0),
sentence=SentenceSearch(
data=Qm2023,
vectors={
semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": Qm2023,
},
time=0.0,
timings={
semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": 0.0,
},
),
query=sentence,
)
else:
return QueryInfo(
language="en",
stop_words=[],
semantic_threshold=0.7,
semantic_thresholds={
semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": 0.7,
},
visual_llm=True,
max_context=self.max_context,
entities=TokenSearch(tokens=[Ner(text="text", ner="PERSON", start=0, end=2)], time=0.0),
sentence=SentenceSearch(data=Q, time=0.0),
sentence=SentenceSearch(
data=Q,
vectors={
semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": Q,
},
time=0.0,
timings={
semantic_model or "<MUST-PROVIDE-SEMANTIC-MODEL>": 0.0,
},
),
query=sentence,
)

Expand Down
84 changes: 66 additions & 18 deletions nucliadb/src/nucliadb/search/search/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from async_lru import alru_cache

from nucliadb.common import datamanagers
from nucliadb.common.datamanagers.vectorsets import BrokenInvariant
from nucliadb.common.maindb.utils import get_driver
from nucliadb.search import logger
from nucliadb.search.predict import SendToPredictError, convert_relations
Expand All @@ -41,13 +42,13 @@
query_parse_dependency_observer,
)
from nucliadb.search.utilities import get_predict
from nucliadb_models.internal.predict import QueryInfo
from nucliadb_models.labels import translate_system_to_alias_label
from nucliadb_models.metadata import ResourceProcessingStatus
from nucliadb_models.search import (
Filter,
MaxTokens,
MinScore,
QueryInfo,
SearchOptions,
SortField,
SortFieldMap,
Expand All @@ -59,6 +60,8 @@
from nucliadb_models.security import RequestSecurity
from nucliadb_protos import knowledgebox_pb2, nodereader_pb2, utils_pb2
from nucliadb_protos.noderesources_pb2 import Resource
from nucliadb_utils.const import Features
from nucliadb_utils.utilities import has_feature

from .exceptions import InvalidQueryError

Expand Down Expand Up @@ -134,7 +137,12 @@ def __init__(
self.range_modification_end = range_modification_end
self.fields = fields or []
self.user_vector = user_vector
self.vectorset = vectorset
# until vectorsets is properly implemented, we'll have this parameter
# under FF and always set None for anyone else
if has_feature(Features.VECTORSETS_V0, context={"kbid": kbid}):
self.vectorset = vectorset
else:
self.vectorset = None
self.with_duplicates = with_duplicates
self.with_status = with_status
self.with_synonyms = with_synonyms
Expand All @@ -159,11 +167,15 @@ def has_relations_search(self) -> bool:

def _get_query_information(self) -> Awaitable[QueryInfo]:
if self._query_information_task is None: # pragma: no cover
self._query_information_task = asyncio.create_task(
query_information(self.kbid, self.query, self.generative_model, self.rephrase)
)
self._query_information_task = asyncio.create_task(self._query_information())
return self._query_information_task

async def _query_information(self) -> QueryInfo:
vectorset = await self.select_vectorset()
return await query_information(
self.kbid, self.query, vectorset, self.generative_model, self.rephrase
)

def _get_matryoshka_dimension(self) -> Awaitable[Optional[int]]:
if self._get_matryoshka_dimension_task is None:
self._get_matryoshka_dimension_task = asyncio.create_task(
Expand Down Expand Up @@ -367,13 +379,53 @@ def parse_paragraph_search(self, request: nodereader_pb2.SearchRequest) -> None:
request.paragraph = True
node_features.inc({"type": "paragraphs"})

@alru_cache(maxsize=1)
async def select_vectorset(self) -> Optional[str]:
"""Validate the vectorset parameter and override it with a default if
needed.
"""
if not has_feature(Features.VECTORSETS_V0, context={"kbid": self.kbid}):
return None
if self.vectorset:
# validate vectorset
async with datamanagers.with_ro_transaction() as txn:
if not await datamanagers.vectorsets.exists(
txn, kbid=self.kbid, vectorset_id=self.vectorset
):
raise InvalidQueryError(
"vectorset",
f"Vectorset {self.vectorset} doesn't exist in you Knowledge Box",
)
return self.vectorset
else:
# no vectorset specified, get the default one
async with datamanagers.with_ro_transaction() as txn:
try:
default_vectorset = await datamanagers.vectorsets.get_default_vectorset(
txn, kbid=self.kbid
)
except BrokenInvariant:
# XXX: fix to avoid tests complaining too much, it should be
# an error at some point though
return None
# logger.exception("KB has no default vectorset", extra={"kbid": self.kbid})
# raise InvalidQueryError("vectorset", f"KB has no default vectorset") from exc
else:
return default_vectorset.vectorset_id
return None

async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bool:
if not self.has_vector_search:
return False

node_features.inc({"type": "vectors"})

incomplete = False

vectorset = await self.select_vectorset()
if vectorset is not None:
request.vectorset = vectorset

query_vector = None
if self.user_vector is None:
try:
Expand All @@ -383,23 +435,18 @@ async def parse_vector_search(self, request: nodereader_pb2.SearchRequest) -> bo
incomplete = True
else:
if query_info and query_info.sentence:
query_vector = query_info.sentence.data
if vectorset and has_feature(Features.VECTORSETS_V0, context={"kbid": self.kbid}):
if vectorset in query_info.sentence.vectors:
query_vector = query_info.sentence.vectors[vectorset]
else:
incomplete = True
else:
query_vector = query_info.sentence.data
else:
incomplete = True
else:
query_vector = self.user_vector

if self.vectorset:
async with get_driver().transaction(read_only=True) as txn:
if not await datamanagers.vectorsets.exists(
txn, kbid=self.kbid, vectorset_id=self.vectorset
):
raise InvalidQueryError(
"vectorset",
f"Vectorset {self.vectorset} doesn't exist in you Knowledge Box",
)
request.vectorset = self.vectorset

if query_vector is not None:
matryoshka_dimension = await self._get_matryoshka_dimension()
if matryoshka_dimension is not None:
Expand Down Expand Up @@ -554,11 +601,12 @@ async def paragraph_query_to_pb(
async def query_information(
kbid: str,
query: str,
semantic_model: Optional[str],
generative_model: Optional[str] = None,
rephrase: bool = False,
) -> QueryInfo:
predict = get_predict()
return await predict.query(kbid, query, generative_model, rephrase)
return await predict.query(kbid, query, semantic_model, generative_model, rephrase)


@query_parse_dependency_observer.wrap({"type": "detect_entities"})
Expand Down
4 changes: 3 additions & 1 deletion nucliadb/tests/ingest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ async def knowledgebox_ingest(storage, maindb_driver: Driver, shard_manager, lea
model = SemanticModelMetadata(
similarity_function=upb.VectorSimilarity.COSINE, vector_dimension=len(V1)
)
await KnowledgeBox.create(maindb_driver, kbid=kbid, slug=kbslug, semantic_model=model)
await KnowledgeBox.create(
maindb_driver, kbid=kbid, slug=kbslug, semantic_models={"my-semantic-model": model}
)

yield kbid

Expand Down
2 changes: 1 addition & 1 deletion nucliadb/tests/nucliadb/integration/common/test_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def test_lock(for_seconds: float):

tasks = []
for _ in range(5):
tasks.append(asyncio.create_task(test_lock(random.uniform(0, 0.2))))
tasks.append(asyncio.create_task(test_lock(random.uniform(0.1, 0.2))))
results = await asyncio.gather(*tasks, return_exceptions=True)

# Check that 4 out of 5 tasks returned ResourceLocked error
Expand Down
19 changes: 17 additions & 2 deletions nucliadb/tests/nucliadb/integration/search/test_autofilters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from httpx import AsyncClient

from nucliadb.tests.vectors import Q
from nucliadb_models.search import Ner, QueryInfo, SentenceSearch, TokenSearch
from nucliadb_models.internal.predict import Ner, QueryInfo, SentenceSearch, TokenSearch
from nucliadb_utils.utilities import Utility, set_utility


Expand All @@ -40,14 +40,29 @@ async def test_autofilters_are_returned(

predict_mock.query = AsyncMock(
return_value=QueryInfo(
language="en",
stop_words=[],
semantic_threshold=0.7,
semantic_thresholds={
"my-semantic-model": 0.7,
},
entities=TokenSearch(
tokens=[
Ner(text="Newton", ner="scientist", start=0, end=1),
Ner(text="Becquer", ner="poet", start=0, end=1),
],
time=0.1,
),
sentence=SentenceSearch(data=Q, time=0.1),
sentence=SentenceSearch(
data=Q,
vectors={
"my-semantic-model": Q,
},
time=0.1,
timings={
"my-semantic-model": 0.1,
},
),
visual_llm=False,
max_context=10000,
query="What relates Newton and Becquer?",
Expand Down
Loading
Loading