Skip to content

Commit

Permalink
Prepare executor VMs (#26)
Browse files Browse the repository at this point in the history
* Problem: Launch executor VMS the first time, take some time for new executor functions.

Feature: Added a preload script of the executor function to candidate CRNs, and if the CRN doesn't respond, add it to unauthorized_node_list file.

* Fix: Apply black formatter and isort

* Fix: Applied PR review fixes.

* Fix: Solved logic PR issue.

* Fix: Fixed code quality issues.

* Fix: Removed unused imports.

* Fix: Solve mypy issues

* Fix: Implement better typing

* Fix: Solve module imports for old python versions and remove unused python 3.8 version.

---------

Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
nesitor and Andres D. Molins authored Jun 20, 2024
1 parent ccc24b2 commit 3a8b592
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 54 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v3
Expand Down
33 changes: 32 additions & 1 deletion deployment/deploy_vrf_vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from aleph_message.status import MessageStatus

from aleph_vrf.settings import settings
from prepare_vrf_vms import prepare_executor_nodes, create_unauthorized_file

# Debian 12 with Aleph SDK 0.9.1
DEBIAN12_RUNTIME = ItemHash(
Expand Down Expand Up @@ -62,6 +63,7 @@ async def deploy_python_program(
venv_hash: ItemHash,
channel: str,
environment: Optional[Dict[str, str]] = None,
timeout_seconds: Optional[int] = None,
) -> ProgramMessage:
program_message, status = await aleph_client.create_program(
program_ref=code_volume_hash,
Expand All @@ -80,6 +82,7 @@ async def deploy_python_program(
sync=True,
environment_variables=environment,
channel=channel,
timeout_seconds=timeout_seconds,
)

if status == MessageStatus.REJECTED:
Expand All @@ -96,15 +99,17 @@ async def deploy_python_program(
deploy_coordinator_vm = partial(
deploy_python_program,
entrypoint="aleph_vrf.coordinator.main:app",
timeout_seconds=60,
)


async def deploy_vrf(
source_dir: Path, venv_dir: Path, deploy_coordinator: bool = True
source_dir: Path, venv_dir: Path, deploy_coordinator: bool = True, prepare_nodes: bool = False
) -> Tuple[ProgramMessage, Optional[ProgramMessage]]:
private_key = get_fallback_private_key()
account = ETHAccount(private_key)
channel = "vrf-tests"
unauthorized_list = False

async with AuthenticatedAlephHttpClient(
account=account, api_server=settings.API_HOST
Expand Down Expand Up @@ -135,7 +140,24 @@ async def deploy_vrf(
environment={"PYTHONPATH": "/opt/packages"},
)

if prepare_nodes:
failed_nodes, _ = await prepare_executor_nodes(executor_program_message.item_hash)

if len(failed_nodes) > 0:
create_unauthorized_file(failed_nodes)
unauthorized_list = True

if deploy_coordinator:
if unauthorized_list:
# Upload the code volume for coordinator with unauthorized list
print("Uploading coordinator code volume...")
code_volume_hash = await upload_dir_as_volume(
aleph_client=aleph_client,
dir_path=source_dir,
channel=channel,
volume_path=Path("aleph-vrf-coordinator.squashfs"),
)

print("Creating coordinator VM...")
coordinator_program_message = await deploy_coordinator_vm(
aleph_client=aleph_client,
Expand All @@ -155,6 +177,7 @@ async def deploy_vrf(

async def main(args: argparse.Namespace):
deploy_coordinator = args.deploy_coordinator
prepare_nodes = args.prepare_nodes
root_dir = Path(__file__).parent.parent

with TemporaryDirectory() as venv_dir_str:
Expand All @@ -165,6 +188,7 @@ async def main(args: argparse.Namespace):
source_dir=root_dir / "src",
venv_dir=venv_dir,
deploy_coordinator=deploy_coordinator,
prepare_nodes=prepare_nodes,
)

print("Aleph.im VRF VMs were successfully deployed.")
Expand All @@ -188,6 +212,13 @@ def parse_args(args) -> argparse.Namespace:
default=True,
help="Deploy the coordinator as an aleph.im VM function",
)
parser.add_argument(
"--prepare-nodes",
dest="prepare_nodes",
action="store_true",
default=False,
help="Preload executor VMs on Aleph CRNs",
)
return parser.parse_args(args)


Expand Down
72 changes: 72 additions & 0 deletions deployment/prepare_vrf_vms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import argparse
import asyncio
import json
import sys
from pathlib import Path
from typing import Tuple, List

from aleph_message.models import ItemHash

from aleph_vrf.coordinator.executor_selection import ExecuteOnAleph
from aleph_vrf.coordinator.vrf import prepare_executor_api_request
from aleph_vrf.models import Executor


async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Executor], List[Executor]]:
aleph_selection_policy = ExecuteOnAleph(vm_function=executor_item_hash)
executors = await aleph_selection_policy.get_candidate_executors()
prepare_tasks = [asyncio.create_task(prepare_executor_api_request(executor.api_url))
for executor in executors]

vrf_prepare_responses = await asyncio.gather(
*prepare_tasks, return_exceptions=True
)
prepare_results = dict(zip(executors, vrf_prepare_responses))

failed_nodes = []
success_nodes = []
for executor, result in prepare_results.items():
if f"{result}" != "True":
failed_nodes.append(executor)
else:
success_nodes.append(executor)

return failed_nodes, success_nodes


def create_unauthorized_file(unauthorized_executors: List[Executor]):
source_dir = Path(__file__).parent.parent / "src"
unauthorized_nodes = [executor.node.address for executor in unauthorized_executors]
unauthorized_nodes_list_path = source_dir / "aleph_vrf" / "coordinator" / "unauthorized_node_list.json"
unauthorized_nodes_list_path.write_text(json.dumps(unauthorized_nodes))

print(f"Unauthorized node list file created on {unauthorized_nodes_list_path}")


async def main(args: argparse.Namespace):
vrf_function = args.vrf_function

failed_nodes, _ = await prepare_executor_nodes(vrf_function)

print("Aleph.im VRF VMs nodes prepared.")

if failed_nodes:
print(f"{len(failed_nodes)} preload nodes failed.")
create_unauthorized_file(failed_nodes)


def parse_args(args) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="prepare_vrf_vms", description="Prepare executor VRF VMs on the aleph.im network."
)
parser.add_argument(
"--vrf_function",
dest="vrf_function",
action="store",
help="VRF VM ItemHash",
)
return parser.parse_args(args)


if __name__ == "__main__":
asyncio.run(main(args=parse_args(sys.argv[1:])))
30 changes: 19 additions & 11 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
import random
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List
from typing import Any, AsyncIterator, Dict, List, Union

import aiohttp
from aleph_message.models import ItemHash

from aleph_vrf.exceptions import AlephNetworkError, NotEnoughExecutors
from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, Node
from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, VRFExecutor
from aleph_vrf.settings import settings


Expand Down Expand Up @@ -98,37 +98,45 @@ def _get_unauthorized_nodes() -> List[str]:

return []

async def select_executors(self, nb_executors: int) -> List[Executor]:
async def select_executors(self, nb_executors: int) -> List[VRFExecutor]:
"""
Selects nb_executors compute resource nodes at random from the aleph.im network.
"""

compute_nodes = self._list_compute_nodes()
blacklisted_nodes = self._get_unauthorized_nodes()
whitelisted_nodes = (
node
async for node in compute_nodes
if node.address not in blacklisted_nodes
)

executors = [
AlephExecutor(node=node, vm_function=self.vm_function)
async for node in whitelisted_nodes
async for node in compute_nodes
if node.address not in blacklisted_nodes
]

if len(executors) < nb_executors:
raise NotEnoughExecutors(requested=nb_executors, available=len(executors))
return random.sample(executors, nb_executors)

async def get_candidate_executors(self) -> List[VRFExecutor]:
compute_nodes = self._list_compute_nodes()
blacklisted_nodes = self._get_unauthorized_nodes()
executors: List[VRFExecutor] = [
AlephExecutor(node=node, vm_function=self.vm_function)
async for node in compute_nodes
if node.address not in blacklisted_nodes
]

return executors


class UsePredeterminedExecutors(ExecutorSelectionPolicy):
"""
Use a hardcoded list of executors.
"""

def __init__(self, executors: List[Executor]):
def __init__(self, executors: List[VRFExecutor]):
self.executors = executors

async def select_executors(self, nb_executors: int) -> List[Executor]:
async def select_executors(self, nb_executors: int) -> List[VRFExecutor]:
"""
Returns nb_executors from the hardcoded list of executors.
If nb_executors is lower than the total number of executors, this method
Expand Down
15 changes: 12 additions & 3 deletions src/aleph_vrf/coordinator/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Union
from typing import List, Optional, Union

from pydantic import BaseModel

Expand All @@ -17,7 +17,14 @@

logger.debug("local imports")
from aleph_vrf.coordinator.vrf import generate_vrf
from aleph_vrf.models import APIError, APIResponse, PublishedVRFResponse, AlephExecutor, ComputeResourceNode
from aleph_vrf.models import (
AlephExecutor,
APIError,
APIResponse,
ComputeResourceNode,
PublishedVRFResponse,
VRFExecutor,
)

logger.debug("imports done")

Expand Down Expand Up @@ -81,7 +88,9 @@ async def receive_test_vrf(
"https://CRN_URL" # CRN main URL, like https://hetzner.staging.aleph.sh
)
executor_node = ComputeResourceNode(address=executor_url, hash="", score=0)
executors = [AlephExecutor(node=executor_node, vm_function=settings.FUNCTION)]
executors: List[VRFExecutor] = [
AlephExecutor(node=executor_node, vm_function=settings.FUNCTION)
]
executor_policy = UsePredeterminedExecutors(executors)
response = await generate_vrf(
account=account,
Expand Down
12 changes: 5 additions & 7 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,13 @@ async def prepare_executor_api_request(url: str) -> bool:
async with session.get(url, timeout=120) as resp:
try:
resp.raise_for_status()
response = await resp.json()
return response["name"] == "vrf_generate_api"
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error

response = await resp.json()

return response["name"] == "vrf_generate_api"


async def _generate_vrf(
aleph_client: AuthenticatedAlephHttpClient,
Expand Down Expand Up @@ -164,7 +162,7 @@ async def generate_vrf(
aleph_api_server: Optional[str] = None,
executor_selection_policy: Optional[ExecutorSelectionPolicy] = None,
):
vrf_function = vrf_function or settings.FUNCTION
vrf_function = vrf_function or ItemHash(settings.FUNCTION)

async with AuthenticatedAlephHttpClient(
account=account,
Expand All @@ -177,7 +175,7 @@ async def generate_vrf(
request_id=request_id,
nb_executors=nb_executors or settings.NB_EXECUTORS,
nb_bytes=nb_bytes or settings.NB_BYTES,
vrf_function=vrf_function or settings.FUNCTION,
vrf_function=vrf_function,
executor_selection_policy=executor_selection_policy
or ExecuteOnAleph(vm_function=vrf_function),
)
Expand Down Expand Up @@ -350,7 +348,7 @@ async def get_existing_vrf_message(
async def get_existing_message(
aleph_client: AuthenticatedAlephHttpClient,
item_hash: ItemHash,
) -> Optional[PostMessage]:
) -> PostMessage:
logger.debug(
f"Getting VRF message on {aleph_client.api_server} for item_hash {item_hash}"
)
Expand Down
3 changes: 3 additions & 0 deletions src/aleph_vrf/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class ExecutorError(Exception):
def __init__(self, executor: Executor):
self.executor = executor

def __str__(self):
return f"Executor failed for executor {self.executor.api_url}."


class RandomNumberGenerationFailed(ExecutorError):
"""
Expand Down
6 changes: 5 additions & 1 deletion src/aleph_vrf/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

from typing import Generic, List, TypeVar
from typing import Generic, List, TypeVar, Union

import fastapi
from aleph_message.models import ItemHash, PostMessage
from aleph_message.models.abstract import HashableModel
from pydantic import BaseModel, ValidationError
from pydantic.generics import GenericModel
from typing_extensions import TypeAlias

from aleph_vrf.types import ExecutionId, Nonce, RequestId

Expand Down Expand Up @@ -46,6 +47,9 @@ class VRFRequest(BaseModel):
node_list_hash: str


VRFExecutor: TypeAlias = Union[Executor, AlephExecutor]


def get_vrf_request_from_message(message: PostMessage) -> VRFRequest:
content = message.content.content
try:
Expand Down
9 changes: 5 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
- https://docs.pytest.org/en/stable/fixture.html
- https://docs.pytest.org/en/stable/writing_plugins.html
"""

import multiprocessing
import os
import socket
from contextlib import contextmanager, ExitStack, AsyncExitStack
from contextlib import AsyncExitStack, ExitStack, contextmanager
from time import sleep
from typing import Union, Tuple, ContextManager
from typing import ContextManager, Tuple, Union

import aiohttp
import fastapi.applications
Expand All @@ -20,10 +21,10 @@
import uvicorn
from aleph.sdk.chains.common import generate_key
from hexbytes import HexBytes
from malicious_executor import app as malicious_executor_app
from mock_ccn import app as mock_ccn_app

from aleph_vrf.settings import settings
from mock_ccn import app as mock_ccn_app
from malicious_executor import app as malicious_executor_app


def wait_for_server(host: str, port: int, nb_retries: int = 10, wait_time: int = 0.1):
Expand Down
Loading

0 comments on commit 3a8b592

Please sign in to comment.