Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cap max number of relations returned #2403

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion nucliadb/src/nucliadb/search/requesters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ async def node_query(
pb_query: SuggestRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
timeout: Optional[float] = None,
retry_on_primary: bool = True,
) -> tuple[list[SuggestResponse], bool, list[tuple[AbstractIndexNode, str]]]: ...


Expand All @@ -97,6 +99,8 @@ async def node_query(
pb_query: ParagraphSearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
timeout: Optional[float] = None,
retry_on_primary: bool = True,
) -> tuple[list[ParagraphSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ...


Expand All @@ -107,6 +111,8 @@ async def node_query(
pb_query: SearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
timeout: Optional[float] = None,
retry_on_primary: bool = True,
) -> tuple[list[SearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ...


Expand All @@ -117,6 +123,8 @@ async def node_query(
pb_query: RelationSearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
timeout: Optional[float] = None,
retry_on_primary: bool = True,
) -> tuple[list[RelationSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]: ...


Expand All @@ -126,7 +134,10 @@ async def node_query(
pb_query: REQUEST_TYPE,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
timeout: Optional[float] = None,
retry_on_primary: bool = True,
) -> tuple[Sequence[Union[T, BaseException]], bool, list[tuple[AbstractIndexNode, str]]]:
timeout = timeout or settings.search_timeout
use_read_replica_nodes = use_read_replica_nodes and has_feature(
const.Features.READ_REPLICA_SEARCHES, context={"kbid": kbid}
)
Expand Down Expand Up @@ -171,7 +182,7 @@ async def node_query(
try:
results: list[Union[T, BaseException]] = await asyncio.wait_for(
asyncio.gather(*ops, return_exceptions=True),
timeout=settings.search_timeout,
timeout=timeout,
)
except asyncio.TimeoutError as exc: # pragma: no cover
logger.warning(
Expand All @@ -195,6 +206,7 @@ async def node_query(
error.status_code >= 500
and use_read_replica_nodes
and any([node.is_read_replica() for node, _ in queried_nodes])
and retry_on_primary
):
# We had an error querying a secondary node, instead of raising an
# error directly, retry query to primaries and hope it works
Expand Down
1 change: 1 addition & 0 deletions nucliadb/src/nucliadb/search/search/chat/ask.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ async def get_relations_results(self) -> Relations:
kbid=self.kbid,
text_answer=self._answer_text,
target_shard_replicas=self.ask_request.shards,
timeout=5.0,
)
return self._relations

Expand Down
9 changes: 8 additions & 1 deletion nucliadb/src/nucliadb/search/search/chat/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ async def get_find_results(


async def get_relations_results(
*, kbid: str, text_answer: str, target_shard_replicas: Optional[list[str]]
*,
kbid: str,
text_answer: str,
target_shard_replicas: Optional[list[str]],
timeout: Optional[float] = None,
) -> Relations:
try:
predict = get_predict()
Expand All @@ -202,6 +206,9 @@ async def get_relations_results(
Method.RELATIONS,
relation_request,
target_shard_replicas=target_shard_replicas,
timeout=timeout,
use_read_replica_nodes=True,
retry_on_primary=False,
)
return await merge_relations_results(relations_results, relation_request.subgraph)
except Exception as exc:
Expand Down
8 changes: 6 additions & 2 deletions nucliadb_relations2/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use crate::{io_maps, schema};

const FUZZY_DISTANCE: u8 = 1;
const NUMBER_OF_RESULTS_SUGGEST: usize = 10;
// Hard limit until we have pagination in place
const MAX_NUM_RELATIONS_RESULTS: usize = 500;

pub struct RelationsReaderService {
index: Index,
Expand Down Expand Up @@ -258,10 +260,12 @@ impl RelationsReaderService {

let query = BooleanQuery::from(queries);
let searcher = self.reader.searcher();
let matching_docs = searcher.search(&query, &DocSetCollector)?;

let topdocs = TopDocs::with_limit(MAX_NUM_RELATIONS_RESULTS);
let matching_docs = searcher.search(&query, &topdocs)?;
let mut response = EntitiesSubgraphResponse::default();

for doc_addr in matching_docs {
for (_, doc_addr) in matching_docs {
let source = searcher.doc(doc_addr)?;
let relation = io_maps::doc_to_relation(&self.schema, &source);
response.relations.push(relation);
Expand Down
Loading