From dd89ebf8554099578d33bd5b0cffbbac350deaef Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Fri, 27 Sep 2024 15:50:14 +0200 Subject: [PATCH] Problem: If a user wants to launch a VRF request with a fixed score threshold and there aren't enough node, the request fails. Solution: Change the filtering to use a percentile score threshold instead a hard one. --- .../coordinator/executor_selection.py | 32 +++++++++++++++++-- src/aleph_vrf/utils.py | 16 ++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/aleph_vrf/coordinator/executor_selection.py b/src/aleph_vrf/coordinator/executor_selection.py index b8f2310..92a3add 100644 --- a/src/aleph_vrf/coordinator/executor_selection.py +++ b/src/aleph_vrf/coordinator/executor_selection.py @@ -6,12 +6,13 @@ from typing import Any, AsyncIterator, Dict, List, Optional import aiohttp +import math from aleph_message.models import ItemHash from aleph_vrf.exceptions import AlephNetworkError, NotEnoughExecutors from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, VRFExecutor from aleph_vrf.settings import settings - +from aleph_vrf.utils import percentile logger = logging.getLogger(__name__) @@ -82,7 +83,12 @@ class ExecuteOnAleph(ExecutorSelectionPolicy): Select executors at random on the aleph.im network. """ - def __init__(self, vm_function: ItemHash, aggregate_address: Optional[str] = None, crn_score_threshold: float = 0.95): + def __init__( + self, + vm_function: ItemHash, + aggregate_address: Optional[str] = None, + crn_score_threshold: Optional[float] = None + ): self.vm_function = vm_function self.crn_score_threshold = crn_score_threshold self.aggregate_address = aggregate_address @@ -103,11 +109,15 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]: resource_nodes = content["data"]["corechannel"]["resource_nodes"] + if not self.crn_score_threshold: + self.crn_score_threshold = self._get_minimum_score_threshold(resource_nodes) + print(f"Filtering CRNs with score better than {self.crn_score_threshold}") + for resource_node in resource_nodes: # Filter nodes by score, with linked status if ( resource_node["status"] == "linked" - and resource_node["score"] > self.crn_score_threshold + and resource_node["score"] >= self.crn_score_threshold ): node_address = resource_node["address"].strip("/") node = ComputeResourceNode( @@ -117,6 +127,22 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]: ) yield node + @staticmethod + def _get_minimum_score_threshold( + resource_nodes: List[ComputeResourceNode], + percentile_value: int = 75 + ) -> float: + """ + Returns the 75 percentile of all CRN scores as a minimum score threshold + """ + # Returns score and filter by linked status + scores = [resource_node["score"] for resource_node in resource_nodes if resource_node["status"] == "linked"] + + score_percentile = percentile(scores, percentile_value) + # Round down minimum score to 3 decimals + rounded_score = math.floor(score_percentile * 1000) / 1000 + return rounded_score + @staticmethod def _get_unauthorized_nodes_file(unauthorized_nodes_list_path: Optional[Path]) -> List[str]: """ diff --git a/src/aleph_vrf/utils.py b/src/aleph_vrf/utils.py index 8330e97..1dcd884 100644 --- a/src/aleph_vrf/utils.py +++ b/src/aleph_vrf/utils.py @@ -1,3 +1,5 @@ +import math + from hashlib import sha3_256 from random import randint from typing import List, Tuple, TypeVar @@ -41,3 +43,17 @@ def generate(n: int, nonce: Nonce) -> Tuple[bytes, str]: def verify(random_number: bytes, nonce: int, random_hash: str) -> bool: """Verifies that the random number was generated by the given nonce.""" return random_hash == sha3_256(random_number + int_to_bytes(nonce)).hexdigest() + + +def percentile(value_list, percentile_value) -> float: + """ + Find the percentile of a list of values + + @parameter value_list - A list of values. + @parameter percentile_value - A in value from 0 to 100 + + @return - The percentile of the values. + """ + data_sorted = sorted(value_list) # Sort in ascending order + index = math.ceil(percentile_value / 100 * len(data_sorted)) + return data_sorted[index]