Skip to content

Commit

Permalink
Fix: Solve stability issues implementing an executor health_check end…
Browse files Browse the repository at this point in the history
…point and using it on the preparation script.
  • Loading branch information
nesitor committed Jul 22, 2024
1 parent b49e2c0 commit c06d7b1
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 29 deletions.
56 changes: 48 additions & 8 deletions deployment/prepare_vrf_vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import json
import sys
from pathlib import Path
from typing import Tuple, List
from typing import Tuple, List, Optional

from aleph.sdk.client import AuthenticatedAlephHttpClient
from aleph_message.models import ItemHash

from aleph_vrf.coordinator.executor_selection import ExecuteOnAleph
from aleph_vrf.coordinator.vrf import prepare_executor_api_request
from aleph_vrf.coordinator.vrf import prepare_executor_api_request, prepare_executor_health_check
from aleph_vrf.models import Executor
from aleph_vrf.settings import settings


async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Executor], List[Executor]]:
Expand All @@ -19,11 +21,18 @@ async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Exe
prepare_tasks = [asyncio.create_task(prepare_executor_api_request(executor.api_url))
for executor in executors]

vrf_prepare_responses = await asyncio.gather(
await asyncio.gather(
*prepare_tasks, return_exceptions=True
)

prepare_results = dict(zip(executors, vrf_prepare_responses))
prepare_tasks = [asyncio.create_task(prepare_executor_health_check(executor.test_api_url))
for executor in executors]

vrf_health_responses = await asyncio.gather(
*prepare_tasks, return_exceptions=True
)

prepare_results = dict(zip(executors, vrf_health_responses))

failed_nodes = []
success_nodes = []
Expand All @@ -36,25 +45,50 @@ async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Exe
return failed_nodes, success_nodes


def create_unauthorized_file(unauthorized_executors: List[Executor]):
source_dir = Path(__file__).parent.parent / "src"
def create_unauthorized_file(unauthorized_executors: List[Executor], destination_path: Optional[Path] = None):
if not destination_path:
source_dir = Path(__file__).parent.parent / "src"
destination_path = source_dir / "aleph_vrf" / "coordinator"

unauthorized_nodes = [executor.node.address for executor in unauthorized_executors]
unauthorized_nodes_list_path = source_dir / "aleph_vrf" / "coordinator" / "unauthorized_node_list.json"
unauthorized_nodes_list_path = destination_path / "unauthorized_node_list.json"
unauthorized_nodes_list_path.write_text(json.dumps(unauthorized_nodes))

print(f"Unauthorized node list file created on {unauthorized_nodes_list_path}")


async def update_unauthorized_aggregate(unauthorized_executors: List[Executor]):
account = settings.aleph_account()
async with AuthenticatedAlephHttpClient(
account=account,
api_server=settings.API_HOST,
# Avoid going through the VM connector on aleph.im CRNs
allow_unix_sockets=False,
) as client:
node_list = [executor.node.address for executor in unauthorized_executors]
content = {
"unauthorized_nodes": node_list
}
await client.create_aggregate(key="vrf", content=content)


async def main(args: argparse.Namespace):
vrf_function = args.vrf_function

source_dir = Path(__file__).parent.parent / "src"
unauthorized_nodes_list_path = source_dir / "aleph_vrf" / "coordinator"

failed_nodes, _ = await prepare_executor_nodes(vrf_function)

print("Aleph.im VRF VMs nodes prepared.")

if failed_nodes:
print(f"{len(failed_nodes)} preload nodes failed.")
create_unauthorized_file(failed_nodes)
create_unauthorized_file(failed_nodes, unauthorized_nodes_list_path)
if args.save_in_aggregate:
account = settings.aleph_account()
await update_unauthorized_aggregate(failed_nodes)
print(f"Unauthorized node list saved on an aggregate on address {account.get_address()}")


def parse_args(args) -> argparse.Namespace:
Expand All @@ -67,6 +101,12 @@ def parse_args(args) -> argparse.Namespace:
action="store",
help="VRF VM ItemHash",
)
parser.add_argument(
"--save-aggregate",
dest="save_in_aggregate",
action="store_true",
help="Save node VRF list on an aggregate",
)
return parser.parse_args(args)


Expand Down
68 changes: 60 additions & 8 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import abc
import json
import logging
import random
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Union
from typing import Any, AsyncIterator, Dict, List, Optional

import aiohttp
from aleph_message.models import ItemHash
Expand All @@ -12,6 +13,9 @@
from aleph_vrf.settings import settings


logger = logging.getLogger(__name__)


class ExecutorSelectionPolicy(abc.ABC):
"""
How the coordinator selects executors.
Expand Down Expand Up @@ -43,12 +47,42 @@ async def _get_corechannel_aggregate() -> Dict[str, Any]:
return await response.json()


async def _get_unauthorized_node_list_aggregate(aggregate_address: str) -> List[str]:
"""
Returns the "vrf_unauthorized_nodes" list aggregate.
This aggregate contains an up-to-date list of nodes not allowed to run a VRF request.
"""
async with aiohttp.ClientSession(settings.API_HOST) as session:
url = (
f"/api/v0/aggregates/{aggregate_address}.json?"
f"keys={settings.VRF_AGGREGATE_KEY}"
)
async with session.get(url) as response:
if response.status != 200:
logger.debug("No VRF unauthorized nodes list found")
return []

content = await response.json()

if (
not content["data"]["vrf"]
or not content["data"]["vrf"]["unauthorized_nodes"]
):
logger.error(f"Bad VRF unauthorized nodes list format")
return []

unauthorized_nodes = content["data"]["vrf"]["unauthorized_nodes"]

unauthorized_list = [str(unauthorized_node) for unauthorized_node in unauthorized_nodes]
return unauthorized_list


class ExecuteOnAleph(ExecutorSelectionPolicy):
"""
Select executors at random on the aleph.im network.
"""

def __init__(self, vm_function: ItemHash, crn_score_threshold: float = 0.9):
def __init__(self, vm_function: ItemHash, crn_score_threshold: float = 0.95):
self.vm_function = vm_function
self.crn_score_threshold = crn_score_threshold

Expand Down Expand Up @@ -83,28 +117,46 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]:
yield node

@staticmethod
def _get_unauthorized_nodes() -> List[str]:
def _get_unauthorized_nodes_file(unauthorized_nodes_list_path: Optional[Path]) -> List[str]:
"""
Returns a list of unauthorized nodes.
The caller may provide a blacklist of nodes by specifying a list of URLs in a file
named `unauthorized_node_list.json` in the working directory.
"""
unauthorized_nodes_list_path = Path(__file__).with_name(
"unauthorized_node_list.json"
)

if not unauthorized_nodes_list_path:
unauthorized_nodes_list_path = Path(__file__).with_name(
"unauthorized_node_list.json"
)
if unauthorized_nodes_list_path.is_file():
with open(unauthorized_nodes_list_path, "rb") as fd:
return json.load(fd)

return []

async def _get_unauthorized_nodes(self, unauthorized_nodes_list_path: Optional[Path] = None) -> List[str]:
"""
Returns a list of unauthorized nodes.
The caller may provide a blacklist of nodes by specifying a list of URLs in a file
named `unauthorized_node_list.json` in the working directory.
"""
aggregate_unauthorized_list = []
if settings.VRF_AGGREGATE_ADDRESS:
aggregate_unauthorized_list = await _get_unauthorized_node_list_aggregate(settings.VRF_AGGREGATE_ADDRESS)

file_unauthorized_nodes_list = self._get_unauthorized_nodes_file(
unauthorized_nodes_list_path=unauthorized_nodes_list_path
)

return aggregate_unauthorized_list + file_unauthorized_nodes_list

async def select_executors(self, nb_executors: int) -> List[VRFExecutor]:
"""
Selects nb_executors compute resource nodes at random from the aleph.im network.
"""

compute_nodes = self._list_compute_nodes()
blacklisted_nodes = self._get_unauthorized_nodes()
blacklisted_nodes = await self._get_unauthorized_nodes()

executors = [
AlephExecutor(node=node, vm_function=self.vm_function)
Expand All @@ -118,7 +170,7 @@ async def select_executors(self, nb_executors: int) -> List[VRFExecutor]:

async def get_candidate_executors(self) -> List[VRFExecutor]:
compute_nodes = self._list_compute_nodes()
blacklisted_nodes = self._get_unauthorized_nodes()
blacklisted_nodes = await self._get_unauthorized_nodes()
executors: List[VRFExecutor] = [
AlephExecutor(node=node, vm_function=self.vm_function)
async for node in compute_nodes
Expand Down
23 changes: 20 additions & 3 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def post_executor_api_request(url: str, model: Type[M]) -> M:
async def prepare_executor_api_request(url: str) -> bool:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as resp:
async with session.get(url, timeout=10) as resp:
resp.raise_for_status()
response = await resp.json()
return response["name"] == "vrf_generate_api"
Expand All @@ -80,6 +80,23 @@ async def prepare_executor_api_request(url: str) -> bool:
) from error


async def prepare_executor_health_check(url: str) -> bool:
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, timeout=10) as resp:
resp.raise_for_status()
response = await resp.json()
return "data" in response
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error
except asyncio.TimeoutError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error


async def _generate_vrf(
aleph_client: AuthenticatedAlephHttpClient,
nb_executors: int,
Expand Down Expand Up @@ -205,7 +222,7 @@ async def send_generate_requests(

for executor, result in generation_results.items():
if isinstance(result, Exception):
raise RandomNumberGenerationFailed(executor=executor) from result
raise RandomNumberGenerationFailed(executor=executor, error=str(result)) from result

return generation_results

Expand All @@ -232,7 +249,7 @@ async def send_publish_requests(

for executor, result in publication_results.items():
if isinstance(result, Exception):
raise RandomNumberPublicationFailed(executor=executor) from result
raise RandomNumberPublicationFailed(executor=executor, error=str(result)) from result

return publication_results

Expand Down
9 changes: 5 additions & 4 deletions src/aleph_vrf/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ class ExecutorError(Exception):
An error occurred while communicating with an executor.
"""

def __init__(self, executor: Executor):
def __init__(self, executor: Executor, error: str):
self.executor = executor
self.error = error

def __str__(self):
return f"Executor failed for executor {self.executor.api_url}."
return f"Executor failed for executor {self.executor.api_url} with error: {self.error}"


class RandomNumberGenerationFailed(ExecutorError):
Expand All @@ -46,7 +47,7 @@ class RandomNumberGenerationFailed(ExecutorError):
"""

def __str__(self):
return f"Random number generation failed for executor {self.executor.api_url}."
return f"Random number generation failed for executor {self.executor.api_url} with error: {self.error}"


class RandomNumberPublicationFailed(ExecutorError):
Expand All @@ -55,7 +56,7 @@ class RandomNumberPublicationFailed(ExecutorError):
"""

def __str__(self):
return f"Random number publication failed for executor {self.executor.api_url}."
return f"Random number publication failed for executor {self.executor.api_url} with error: {self.error}"


class HashesDoNotMatch(VrfException):
Expand Down
Loading

0 comments on commit c06d7b1

Please sign in to comment.