From 3743ab72ec03a70c6c81d2085622ff85e24cc4ab Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Mon, 19 Aug 2024 17:39:33 +0200 Subject: [PATCH] Cap max number of relations returned --- nucliadb/src/nucliadb/search/requesters/utils.py | 14 +++++++++++++- nucliadb/src/nucliadb/search/search/chat/ask.py | 1 + nucliadb/src/nucliadb/search/search/chat/query.py | 9 ++++++++- nucliadb_relations2/src/reader.rs | 8 ++++++-- 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/nucliadb/src/nucliadb/search/requesters/utils.py b/nucliadb/src/nucliadb/search/requesters/utils.py index eb853515e0..72bccbb742 100644 --- a/nucliadb/src/nucliadb/search/requesters/utils.py +++ b/nucliadb/src/nucliadb/search/requesters/utils.py @@ -87,6 +87,8 @@ async def node_query( pb_query: SuggestRequest, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = True, + timeout: Optional[float] = None, + retry_on_primary: bool = True, ) -> tuple[list[SuggestResponse], bool, list[tuple[AbstractIndexNode, str]]]: ... @@ -97,6 +99,8 @@ async def node_query( pb_query: ParagraphSearchRequest, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = True, + timeout: Optional[float] = None, + retry_on_primary: bool = True, ) -> tuple[list[ParagraphSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ... @@ -107,6 +111,8 @@ async def node_query( pb_query: SearchRequest, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = True, + timeout: Optional[float] = None, + retry_on_primary: bool = True, ) -> tuple[list[SearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ... @@ -117,6 +123,8 @@ async def node_query( pb_query: RelationSearchRequest, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = True, + timeout: Optional[float] = None, + retry_on_primary: bool = True, ) -> tuple[list[RelationSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ... @@ -126,7 +134,10 @@ async def node_query( pb_query: REQUEST_TYPE, target_shard_replicas: Optional[list[str]] = None, use_read_replica_nodes: bool = True, + timeout: Optional[float] = None, + retry_on_primary: bool = True, ) -> tuple[Sequence[Union[T, BaseException]], bool, list[tuple[AbstractIndexNode, str]]]: + timeout = timeout or settings.search_timeout use_read_replica_nodes = use_read_replica_nodes and has_feature( const.Features.READ_REPLICA_SEARCHES, context={"kbid": kbid} ) @@ -171,7 +182,7 @@ async def node_query( try: results: list[Union[T, BaseException]] = await asyncio.wait_for( asyncio.gather(*ops, return_exceptions=True), - timeout=settings.search_timeout, + timeout=timeout, ) except asyncio.TimeoutError as exc: # pragma: no cover logger.warning( @@ -195,6 +206,7 @@ async def node_query( error.status_code >= 500 and use_read_replica_nodes and any([node.is_read_replica() for node, _ in queried_nodes]) + and retry_on_primary ): # We had an error querying a secondary node, instead of raising an # error directly, retry query to primaries and hope it works diff --git a/nucliadb/src/nucliadb/search/search/chat/ask.py b/nucliadb/src/nucliadb/search/search/chat/ask.py index 7fc8a3cb0c..988ec9ff6a 100644 --- a/nucliadb/src/nucliadb/search/search/chat/ask.py +++ b/nucliadb/src/nucliadb/search/search/chat/ask.py @@ -301,6 +301,7 @@ async def get_relations_results(self) -> Relations: kbid=self.kbid, text_answer=self._answer_text, target_shard_replicas=self.ask_request.shards, + timeout=5.0, ) return self._relations diff --git a/nucliadb/src/nucliadb/search/search/chat/query.py b/nucliadb/src/nucliadb/search/search/chat/query.py index ddf56ea4b4..d1568a94db 100644 --- a/nucliadb/src/nucliadb/search/search/chat/query.py +++ b/nucliadb/src/nucliadb/search/search/chat/query.py @@ -183,7 +183,11 @@ async def get_find_results( async def get_relations_results( - *, kbid: str, text_answer: str, target_shard_replicas: Optional[list[str]] + *, + kbid: str, + text_answer: str, + target_shard_replicas: Optional[list[str]], + timeout: Optional[float] = None, ) -> Relations: try: predict = get_predict() @@ -202,6 +206,9 @@ async def get_relations_results( Method.RELATIONS, relation_request, target_shard_replicas=target_shard_replicas, + timeout=timeout, + use_read_replica_nodes=True, + retry_on_primary=False, ) return await merge_relations_results(relations_results, relation_request.subgraph) except Exception as exc: diff --git a/nucliadb_relations2/src/reader.rs b/nucliadb_relations2/src/reader.rs index 3c94e0aefe..1e5db91546 100644 --- a/nucliadb_relations2/src/reader.rs +++ b/nucliadb_relations2/src/reader.rs @@ -39,6 +39,8 @@ use crate::{io_maps, schema}; const FUZZY_DISTANCE: u8 = 1; const NUMBER_OF_RESULTS_SUGGEST: usize = 10; +// Hard limit until we have pagination in place +const MAX_NUM_RELATIONS_RESULTS: usize = 500; pub struct RelationsReaderService { index: Index, @@ -258,10 +260,12 @@ impl RelationsReaderService { let query = BooleanQuery::from(queries); let searcher = self.reader.searcher(); - let matching_docs = searcher.search(&query, &DocSetCollector)?; + + let topdocs = TopDocs::with_limit(MAX_NUM_RELATIONS_RESULTS); + let matching_docs = searcher.search(&query, &topdocs)?; let mut response = EntitiesSubgraphResponse::default(); - for doc_addr in matching_docs { + for (_, doc_addr) in matching_docs { let source = searcher.doc(doc_addr)?; let relation = io_maps::doc_to_relation(&self.schema, &source); response.relations.push(relation);