Skip to content

Commit

Permalink
Retry query to primary if secondary (read replica) fails (#1789)
Browse files Browse the repository at this point in the history
* Unrelated doc fix

* Use function to get queried nodes debug info

* Do not return redundant node_id nor queried_shards

* Fallback to primary if secondary fails

* More unrelated docs

* Fix mocks on ingest consumer unit tests

* Fix find debug flag

* Fix warning due to marking all tests as async

* Fix test

* Lint...

* Add test and fix debug_nodes_info

* Update models for new nodes debug info
  • Loading branch information
jotare authored Feb 1, 2024
1 parent 87c61c1 commit 0a1f431
Show file tree
Hide file tree
Showing 21 changed files with 248 additions and 78 deletions.
10 changes: 5 additions & 5 deletions nucliadb/nucliadb/common/cluster/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ async def get_shards_by_kbid(self, kbid: str) -> list[writer_pb2.ShardObject]:
async def apply_for_all_shards(
self,
kbid: str,
aw: Callable[[AbstractIndexNode, str, str], Awaitable[Any]],
aw: Callable[[AbstractIndexNode, str], Awaitable[Any]],
timeout: float,
) -> list[Any]:
shards = await self.get_shards_by_kbid(kbid)
ops = []

for shard_obj in shards:
node, shard_id, node_id = choose_node(shard_obj)
node, shard_id = choose_node(shard_obj)
if shard_id is None:
raise ShardNotFound("Fount a node but not a shard")

ops.append(aw(node, shard_id, node_id))
ops.append(aw(node, shard_id))

try:
results = await asyncio.wait_for(
Expand Down Expand Up @@ -512,7 +512,7 @@ def choose_node(
*,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = False,
) -> tuple[AbstractIndexNode, str, str]:
) -> tuple[AbstractIndexNode, str]:
"""Choose an arbitrary node storing `shard` following these rules:
- nodes containing a shard replica from `target_replicas` are the preferred
- when enabled, read replica nodes are preferred over primaries
Expand Down Expand Up @@ -548,7 +548,7 @@ def choose_node(

top = ranked_nodes[max(ranked_nodes)]
selected_node, shard_replica_id = random.choice(top)
return selected_node, shard_replica_id, selected_node.id
return selected_node, shard_replica_id


def check_enough_nodes():
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/ingest/consumer/auditing.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def process_kb(self, kbid: str) -> None:
total_paragraphs = 0

for shard_obj in shard_groups:
node, shard_id, _ = choose_node(shard_obj)
node, shard_id = choose_node(shard_obj)
shard: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/ingest/consumer/shard_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def process_kb(self, kbid: str) -> None:
kb_shards = await self.shard_manager.get_shards_by_kbid_inner(kbid)
current_shard: writer_pb2.ShardObject = kb_shards.shards[kb_shards.actual]

node, shard_id, _ = choose_node(current_shard)
node, shard_id = choose_node(current_shard)
shard: nodereader_pb2.Shard = await node.reader.GetShard(
nodereader_pb2.GetShardRequest(shard_id=noderesources_pb2.ShardId(id=shard_id)) # type: ignore
)
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/nucliadb/ingest/orm/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def get_indexed_entities_group(self, group: str) -> Optional[EntitiesGroup
shard_manager = get_shard_manager()

async def do_entities_search(
node: AbstractIndexNode, shard_id: str, node_id: str
node: AbstractIndexNode, shard_id: str
) -> RelationSearchResponse:
request = RelationSearchRequest(
shard_id=shard_id,
Expand Down Expand Up @@ -288,7 +288,7 @@ async def get_indexed_entities_groups_names(self) -> set[str]:
shard_manager = get_shard_manager()

async def query_indexed_entities_group_names(
node: AbstractIndexNode, shard_id: str, node_id: str
node: AbstractIndexNode, shard_id: str
) -> TypeList:
return await node.reader.RelationTypes(ShardId(id=shard_id)) # type: ignore

Expand Down
5 changes: 4 additions & 1 deletion nucliadb/nucliadb/ingest/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ class DriverSettings(BaseSettings):
)
driver_tikv_url: Optional[list[str]] = Field(
default=None,
description="TiKV PD (Placement Dricer) URL. The URL to the cluster manager of TiKV. Example: tikv-pd.svc:2379",
description=(
"TiKV PD (Placement Driver) URLs. The URL to the cluster manager of"
"TiKV. Example: '[\"tikv-pd.svc:2379\"]'"
),
)
driver_local_url: Optional[str] = Field(
default=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def shard_manager(reader):
"nucliadb.ingest.consumer.auditing.get_shard_manager", return_value=nm
), patch(
"nucliadb.ingest.consumer.auditing.choose_node",
return_value=(node, "shard_id", None),
return_value=(node, "shard_id"),
):
yield nm

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def shard_manager(reader):
"nucliadb.ingest.consumer.shard_creator.get_shard_manager", return_value=sm
), patch(
"nucliadb.ingest.consumer.shard_creator.choose_node",
return_value=(node, "shard_id", None),
return_value=(node, "shard_id"),
):
yield sm

Expand Down
2 changes: 1 addition & 1 deletion nucliadb/nucliadb/search/api/v1/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def knowledgebox_counters(
queried_shards = []
for shard_object in shard_groups:
try:
node, shard_id, _ = choose_node(shard_object)
node, shard_id = choose_node(shard_object)
except KeyError:
raise HTTPException(
status_code=500,
Expand Down
7 changes: 4 additions & 3 deletions nucliadb/nucliadb/search/api/v1/resource/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from nucliadb.models.responses import HTTPClientError
from nucliadb.search.api.v1.router import KB_PREFIX, RESOURCE_PREFIX, api
from nucliadb.search.api.v1.utils import fastapi_query
from nucliadb.search.requesters.utils import Method, node_query
from nucliadb.search.requesters.utils import Method, debug_nodes_info, node_query
from nucliadb.search.search.exceptions import InvalidQueryError
from nucliadb.search.search.merge import merge_paragraphs_results
from nucliadb.search.search.query import paragraph_query_to_pb
Expand Down Expand Up @@ -118,7 +118,7 @@ async def resource_search(
except InvalidQueryError as exc:
return HTTPClientError(status_code=412, detail=str(exc))

results, incomplete_results, queried_nodes, queried_shards = await node_query(
results, incomplete_results, queried_nodes = await node_query(
kbid, Method.PARAGRAPH, pb_query, shards
)

Expand All @@ -136,7 +136,8 @@ async def resource_search(

response.status_code = 206 if incomplete_results else 200
if debug:
search_results.nodes = queried_nodes
search_results.nodes = debug_nodes_info(queried_nodes)

queried_shards = [shard_id for _, shard_id in queried_nodes]
search_results.shards = queried_shards
return search_results
9 changes: 5 additions & 4 deletions nucliadb/nucliadb/search/api/v1/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from nucliadb.models.responses import HTTPClientError
from nucliadb.search.api.v1.router import KB_PREFIX, api
from nucliadb.search.api.v1.utils import fastapi_query
from nucliadb.search.requesters.utils import Method, node_query
from nucliadb.search.requesters.utils import Method, debug_nodes_info, node_query
from nucliadb.search.search.exceptions import InvalidQueryError
from nucliadb.search.search.merge import merge_results
from nucliadb.search.search.query import QueryParser
Expand Down Expand Up @@ -237,7 +237,7 @@ async def catalog(
)
pb_query, _, _ = await query_parser.parse()

(results, _, _, _) = await node_query(
(results, _, _) = await node_query(
kbid,
Method.SEARCH,
pb_query,
Expand Down Expand Up @@ -359,7 +359,7 @@ async def search(
)
pb_query, incomplete_results, autofilters = await query_parser.parse()

results, query_incomplete_results, queried_nodes, queried_shards = await node_query(
results, query_incomplete_results, queried_nodes = await node_query(
kbid, Method.SEARCH, pb_query, target_shard_replicas=item.shards
)

Expand Down Expand Up @@ -391,8 +391,9 @@ async def search(
len(search_results.resources),
)
if item.debug:
search_results.nodes = queried_nodes
search_results.nodes = debug_nodes_info(queried_nodes)

queried_shards = [shard_id for _, shard_id in queried_nodes]
search_results.shards = queried_shards
search_results.autofilters = autofilters
return search_results, incomplete_results
4 changes: 3 additions & 1 deletion nucliadb/nucliadb/search/api/v1/suggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def suggest(
range_modification_start,
range_modification_end,
)
results, incomplete_results, _, queried_shards = await node_query(
results, incomplete_results, queried_nodes = await node_query(
kbid, Method.SUGGEST, pb_query
)

Expand All @@ -162,6 +162,8 @@ async def suggest(
)

response.status_code = 206 if incomplete_results else 200

queried_shards = [shard_id for _, shard_id in queried_nodes]
if debug and queried_shards:
search_results.shards = queried_shards

Expand Down
74 changes: 49 additions & 25 deletions nucliadb/nucliadb/search/requesters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from nucliadb_protos.writer_pb2 import ShardObject as PBShardObject

from nucliadb.common.cluster import manager as cluster_manager
from nucliadb.common.cluster.base import AbstractIndexNode
from nucliadb.common.cluster.exceptions import ShardsNotFound
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.search import logger
Expand Down Expand Up @@ -86,7 +87,7 @@ async def node_query(
pb_query: SuggestRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
) -> tuple[list[SuggestResponse], bool, list[tuple[str, str, str]], list[str]]:
) -> tuple[list[SuggestResponse], bool, list[tuple[AbstractIndexNode, str]]]:
...


Expand All @@ -97,7 +98,7 @@ async def node_query(
pb_query: ParagraphSearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
) -> tuple[list[ParagraphSearchResponse], bool, list[tuple[str, str, str]], list[str]]:
) -> tuple[list[ParagraphSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]:
...


Expand All @@ -108,7 +109,7 @@ async def node_query(
pb_query: SearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
) -> tuple[list[SearchResponse], bool, list[tuple[str, str, str]], list[str]]:
) -> tuple[list[SearchResponse], bool, list[tuple[AbstractIndexNode, str]]]:
...


Expand All @@ -119,7 +120,7 @@ async def node_query(
pb_query: RelationSearchRequest,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
) -> tuple[list[RelationSearchResponse], bool, list[tuple[str, str, str]], list[str]]:
) -> tuple[list[RelationSearchResponse], bool, list[tuple[AbstractIndexNode, str]]]:
...


Expand All @@ -129,7 +130,7 @@ async def node_query(
pb_query: REQUEST_TYPE,
target_shard_replicas: Optional[list[str]] = None,
use_read_replica_nodes: bool = True,
) -> tuple[list[T], bool, list[tuple[str, str, str]], list[str]]:
) -> tuple[list[T], bool, list[tuple[AbstractIndexNode, str]]]:
use_read_replica_nodes = use_read_replica_nodes and has_feature(
const.Features.READ_REPLICA_SEARCHES, context={"kbid": kbid}
)
Expand All @@ -144,13 +145,12 @@ async def node_query(
)

ops = []
queried_shards = []
queried_nodes = []
incomplete_results = False

for shard_obj in shard_groups:
try:
node, shard_id, node_id = cluster_manager.choose_node(
node, shard_id = cluster_manager.choose_node(
shard_obj,
use_read_replica_nodes=use_read_replica_nodes,
target_shard_replicas=target_shard_replicas,
Expand All @@ -163,8 +163,7 @@ async def node_query(
# let's add it ot the query list if has a valid value
func = METHODS[method]
ops.append(func(node, shard_id, pb_query)) # type: ignore
queried_nodes.append((node.label, shard_id, node_id))
queried_shards.append(shard_id)
queried_nodes.append((node, shard_id))

if not ops:
logger.warning(f"No node found for any of this resources shards {kbid}")
Expand All @@ -179,30 +178,39 @@ async def node_query(
timeout=settings.search_timeout,
)
except asyncio.TimeoutError as exc: # pragma: no cover
queried_nodes_details = []
for _, shard_id, node_id in queried_nodes:
queried_node = cluster_manager.get_index_node(node_id)
if queried_node is None:
node_address = "unknown"
else:
node_address = node.address
queried_nodes_details.append(
{
"id": node_id,
"shard_id": shard_id,
"address": node_address,
}
)
logger.warning(
"Timeout while querying nodes", extra={"nodes": queried_nodes_details}
"Timeout while querying nodes",
extra={"nodes": debug_nodes_info(queried_nodes)},
)
results = [exc]

error = validate_node_query_results(results or [])
if error is not None:
if (
error.status_code >= 500
and use_read_replica_nodes
and any([node.is_read_replica() for node, _ in queried_nodes])
):
# We had an error querying a secondary node, instead of raising an
# error directly, retry query to primaries and hope it works
logger.warning(
"Query to read replica failed. Trying again with primary",
extra={"nodes": debug_nodes_info(queried_nodes)},
)

results, incomplete_results, primary_queried_nodes = await node_query( # type: ignore
kbid,
method,
pb_query,
target_shard_replicas,
use_read_replica_nodes=False,
)
queried_nodes.extend(primary_queried_nodes)
return results, incomplete_results, queried_nodes

raise error

return results, incomplete_results, queried_nodes, queried_shards
return results, incomplete_results, queried_nodes


def validate_node_query_results(results: list[Any]) -> Optional[HTTPException]:
Expand Down Expand Up @@ -241,3 +249,19 @@ def validate_node_query_results(results: list[Any]) -> Optional[HTTPException]:
return HTTPException(status_code=status_code, detail=reason)

return None


def debug_nodes_info(
nodes: list[tuple[AbstractIndexNode, str]]
) -> list[dict[str, str]]:
details: list[dict[str, str]] = []
for node, shard_id in nodes:
info = {
"id": node.id,
"shard_id": shard_id,
"address": node.address,
}
if node.primary_id:
info["primary_id"] = node.primary_id
details.append(info)
return details
1 change: 0 additions & 1 deletion nucliadb/nucliadb/search/search/chat/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ async def get_relations_results(
relations_results,
_,
_,
_,
) = await node_query(
kbid,
Method.RELATIONS,
Expand Down
7 changes: 4 additions & 3 deletions nucliadb/nucliadb/search/search/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
from time import time

from nucliadb.search.requesters.utils import Method, node_query
from nucliadb.search.requesters.utils import Method, debug_nodes_info, node_query
from nucliadb.search.search.find_merge import find_merge_results
from nucliadb.search.search.query import QueryParser
from nucliadb.search.search.utils import should_disable_vector_search
Expand Down Expand Up @@ -70,7 +70,7 @@ async def find(
security=item.security,
)
pb_query, incomplete_results, autofilters = await query_parser.parse()
results, query_incomplete_results, queried_nodes, queried_shards = await node_query(
results, query_incomplete_results, queried_nodes = await node_query(
kbid, Method.SEARCH, pb_query, target_shard_replicas=item.shards
)
incomplete_results = incomplete_results or query_incomplete_results
Expand Down Expand Up @@ -100,8 +100,9 @@ async def find(
len(search_results.resources),
)
if item.debug:
search_results.nodes = queried_nodes
search_results.nodes = debug_nodes_info(queried_nodes)

queried_shards = [shard_id for _, shard_id in queried_nodes]
search_results.shards = queried_shards
search_results.autofilters = autofilters
return search_results, incomplete_results
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ async def test_vector_result_metadata(
),
).parse()

results, _, _, _ = await node_query(kbid, Method.SEARCH, pb_query)
results, _, _ = await node_query(kbid, Method.SEARCH, pb_query)
assert len(results[0].vector.documents) > 0
assert results[0].vector.documents[0].HasField("metadata")
Loading

3 comments on commit 0a1f431

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 0a1f431 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13255.231283578116 iter/sec (stddev: 1.493888450567394e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 0.98

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 0a1f431 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13623.196121712066 iter/sec (stddev: 0.0000011466736447082302) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 0.96

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 0a1f431 Previous: d4afd82 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13017.964760051957 iter/sec (stddev: 6.702670275419219e-7) 13028.533525895236 iter/sec (stddev: 4.192637045977425e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.