-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Internal: executor selection policy system (#12)
Problem: for testing, we wish to switch between different ways of selecting executors (ex: repeat tests on a specific list of CRNs, use local executors for integration tests). Solution: introduce the `ExecutorSelectionPolicy` class. Using this class, the caller can parameterize the way executors are selected, blacklisted, etc.
- Loading branch information
1 parent
bb91c12
commit 7cb7e58
Showing
4 changed files
with
121 additions
and
82 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import abc | ||
import json | ||
from pathlib import Path | ||
from typing import List, Dict, Any, AsyncIterator | ||
import random | ||
|
||
import aiohttp | ||
from aleph_message.models import ItemHash | ||
|
||
from aleph_vrf.models import Executor, Node, AlephExecutor, ComputeResourceNode | ||
from aleph_vrf.settings import settings | ||
|
||
|
||
class ExecutorSelectionPolicy(abc.ABC): | ||
@abc.abstractmethod | ||
async def select_executors(self, nb_executors: int) -> List[Executor]: | ||
... | ||
|
||
|
||
async def _get_corechannel_aggregate() -> Dict[str, Any]: | ||
async with aiohttp.ClientSession(settings.API_HOST) as session: | ||
url = ( | ||
f"/api/v0/aggregates/{settings.CORECHANNEL_AGGREGATE_ADDRESS}.json?" | ||
f"keys={settings.CORECHANNEL_AGGREGATE_KEY}" | ||
) | ||
async with session.get(url) as response: | ||
if response.status != 200: | ||
raise ValueError(f"CRN list not available") | ||
|
||
return await response.json() | ||
|
||
|
||
class ExecuteOnAleph(ExecutorSelectionPolicy): | ||
def __init__(self, vm_function: ItemHash): | ||
self.vm_function = vm_function | ||
|
||
@staticmethod | ||
async def _list_compute_nodes() -> AsyncIterator[ComputeResourceNode]: | ||
content = await _get_corechannel_aggregate() | ||
|
||
if ( | ||
not content["data"]["corechannel"] | ||
or not content["data"]["corechannel"]["resource_nodes"] | ||
): | ||
raise ValueError(f"Bad CRN list format") | ||
|
||
resource_nodes = content["data"]["corechannel"]["resource_nodes"] | ||
|
||
for resource_node in resource_nodes: | ||
# Filter nodes by score, with linked status | ||
if resource_node["status"] == "linked" and resource_node["score"] > 0.9: | ||
node_address = resource_node["address"].strip("/") | ||
node = ComputeResourceNode( | ||
hash=resource_node["hash"], | ||
address=node_address, | ||
score=resource_node["score"], | ||
) | ||
yield node | ||
|
||
@staticmethod | ||
def _get_unauthorized_nodes() -> List[str]: | ||
unauthorized_nodes_list_path = Path(__file__).with_name( | ||
"unauthorized_node_list.json" | ||
) | ||
if unauthorized_nodes_list_path.is_file(): | ||
with open(unauthorized_nodes_list_path, "rb") as fd: | ||
return json.load(fd) | ||
|
||
return [] | ||
|
||
async def select_executors(self, nb_executors: int) -> List[Executor]: | ||
compute_nodes = self._list_compute_nodes() | ||
blacklisted_nodes = self._get_unauthorized_nodes() | ||
whitelisted_nodes = ( | ||
node | ||
async for node in compute_nodes | ||
if node.address not in blacklisted_nodes | ||
) | ||
executors = [ | ||
AlephExecutor(node=node, vm_function=self.vm_function) | ||
async for node in whitelisted_nodes | ||
] | ||
|
||
if len(executors) < nb_executors: | ||
raise ValueError( | ||
f"Not enough CRNs linked, only {len(executors)} " | ||
f"available from {nb_executors} requested" | ||
) | ||
return random.sample(executors, nb_executors) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters