From 6b7403650fb8e64aafa5a20e1f9007d5e2c9ed8a Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 29 Jun 2022 16:38:18 +0200 Subject: [PATCH 1/6] Feature: VM executions can be marked as long running --- vm_supervisor/models.py | 8 +++++++- vm_supervisor/pool.py | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/vm_supervisor/models.py b/vm_supervisor/models.py index 75070b7dc..3117c0d6e 100644 --- a/vm_supervisor/models.py +++ b/vm_supervisor/models.py @@ -57,6 +57,8 @@ class VmExecution: expire_task: Optional[asyncio.Task] = None update_task: Optional[asyncio.Task] = None + marked_as_long_running: bool = False + @property def is_running(self): return self.times.starting_at and not self.times.stopping_at @@ -117,7 +119,11 @@ async def create(self, vm_id: int) -> AlephFirecrackerVM: await vm.teardown() raise - def stop_after_timeout(self, timeout: float = 5.0) -> Task: + def stop_after_timeout(self, timeout: float = 5.0) -> Optional[Task]: + if self.marked_as_long_running: + logger.debug("VM marked as long running. Ignoring timeout.") + return + if self.expire_task: logger.debug("VM already has a timeout. Extending it.") self.expire_task.cancel() diff --git a/vm_supervisor/pool.py b/vm_supervisor/pool.py index e3b25e393..a584a2bf3 100644 --- a/vm_supervisor/pool.py +++ b/vm_supervisor/pool.py @@ -90,3 +90,8 @@ async def stop(self): await asyncio.gather( *(execution.stop() for vm_hash, execution in self.executions.items()) ) + + def get_long_running_executions(self) -> Iterable[VmExecution]: + for vm_hash, execution in self.executions.items(): + if execution.marked_as_long_running and execution.is_running: + yield execution From a174a0ddb3bc70502c4cd93f4442cd487a49c1fa Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 29 Jun 2022 16:41:08 +0200 Subject: [PATCH 2/6] WIP: Add API to update long-running VM allocations --- vm_supervisor/resources.py | 8 ++++++++ vm_supervisor/run.py | 28 ++++++++++++++++++++++++++++ vm_supervisor/supervisor.py | 5 ++++- vm_supervisor/views.py | 37 +++++++++++++++++++++++++++++++++++-- 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/vm_supervisor/resources.py b/vm_supervisor/resources.py index b2fd8968f..0755ed01d 100644 --- a/vm_supervisor/resources.py +++ b/vm_supervisor/resources.py @@ -1,5 +1,7 @@ + from datetime import datetime, timezone from functools import lru_cache +from typing import Set, Optional from typing import Tuple import cpuinfo @@ -117,3 +119,9 @@ async def about_system_usage(request: web.Request): return web.json_response( text=usage.json(exclude_none=True), ) + + +class Allocation(BaseModel): + long_running_vms: Set[str] + on_demand_vms: Optional[Set[str]] = None + jobs: Optional[Set] = None diff --git a/vm_supervisor/run.py b/vm_supervisor/run.py index 3280276ee..db2a114c7 100644 --- a/vm_supervisor/run.py +++ b/vm_supervisor/run.py @@ -213,3 +213,31 @@ async def run_code_on_event(vm_hash: VmHash, event, pubsub: PubSub): execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT) else: await execution.stop() + + +async def start_long_running(vm_hash: VmHash, pubsub: PubSub) -> VmExecution: + execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) + + if not execution: + execution = await create_vm_execution(vm_hash=vm_hash)\ + + # If the VM was already running in lambda mode, it should not expire + # as long as it is also scheduled as long-running + execution.marked_as_long_running = True + execution.cancel_expiration() + + await execution.becomes_ready() + + # if settings.WATCH_FOR_UPDATES: + # # FIXME: Is this added for every request ? + # execution.start_watching_for_updates(pubsub=request.app["pubsub"]) + + return execution + + +async def stop_long_running(vm_hash: VmHash) -> Optional[VmExecution]: + logger.info(f"Stopping long running {vm_hash}") + execution = await pool.get_running_vm(vm_hash) + if execution: + await execution.stop() + return execution diff --git a/vm_supervisor/supervisor.py b/vm_supervisor/supervisor.py index b70d891ae..868ed6ab3 100644 --- a/vm_supervisor/supervisor.py +++ b/vm_supervisor/supervisor.py @@ -24,7 +24,9 @@ about_executions, about_config, status_check_fastapi, - about_execution_records, status_check_version, + about_execution_records, + status_check_version, + update_allocations, ) logger = logging.getLogger(__name__) @@ -52,6 +54,7 @@ async def server_version_middleware( web.get("/about/executions/records", about_execution_records), web.get("/about/usage/system", about_system_usage), web.get("/about/config", about_config), + web.post("/control/allocations", update_allocations), web.get("/status/check/fastapi", status_check_fastapi), web.get("/status/check/version", status_check_version), web.route("*", "/vm/{ref}{suffix:.*}", run_code_from_path), diff --git a/vm_supervisor/views.py b/vm_supervisor/views.py index f065e7f06..f669fd64b 100644 --- a/vm_supervisor/views.py +++ b/vm_supervisor/views.py @@ -3,18 +3,20 @@ import os.path from string import Template from typing import Awaitable, Optional -from packaging.version import Version, InvalidVersion import aiodns import aiohttp from aiohttp import web from aiohttp.web_exceptions import HTTPNotFound +from packaging.version import Version, InvalidVersion +from pydantic import ValidationError from . import status, __version__ from .conf import settings from .metrics import get_execution_records from .models import VmHash -from .run import run_code_on_request, pool +from .resources import Allocation +from .run import run_code_on_request, pool, start_long_running from .utils import b32_to_b16, get_ref_from_dns, dumps_for_json logger = logging.getLogger(__name__) @@ -166,3 +168,34 @@ async def status_check_version(request: web.Request): ) else: return web.HTTPForbidden(text=f"Outdated: version {current} < {reference}") + + +async def update_allocations(request: web.Request): + # TODO: Add some form of authentication + try: + data = await request.json() + allocation = Allocation(**data) + except ValidationError as error: + return web.json_response( + data=error.json(), status=web.HTTPBadRequest.status_code + ) + + pubsub = request.app["pubsub"] + + for vm_hash in allocation.long_running_vms: + vm_hash = VmHash(vm_hash) + logger.info(f"Starting long running VM {vm_hash}") + await start_long_running(vm_hash, pubsub) + + for execution in pool.get_long_running_executions(): + if execution.vm_hash not in allocation.long_running_vms: + logger.info(f"Stopping long running VM {execution.vm_hash}") + await execution.stop() + execution.marked_as_long_running = False + + if allocation.on_demand_vms: + logger.info("Not supported yet: 'allocation.on_demand_vms'") + if allocation.jobs: + logger.info("Not supported yet: 'allocation.on_demand_vms'") + + return web.json_response(data={"success": True}) From 74a8d541835cd2a1fded1c904ec2dbcf44ecc461 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Thu, 15 Sep 2022 10:48:58 +0200 Subject: [PATCH 3/6] WIP: Updating allocations was not authenticated Use a simple authentication method: the hash of the signature should match the value in the settings --- vm_supervisor/conf.py | 3 +++ vm_supervisor/views.py | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/vm_supervisor/conf.py b/vm_supervisor/conf.py index f26b22546..0ab8c0c41 100644 --- a/vm_supervisor/conf.py +++ b/vm_supervisor/conf.py @@ -105,6 +105,9 @@ class Settings(BaseSettings): MAX_PROGRAM_ARCHIVE_SIZE = 10_000_000 # 10 MB MAX_DATA_ARCHIVE_SIZE = 10_000_000 # 10 MB + # hashlib.sha256(b"secret-token").hexdigest() + ALLOCATION_TOKEN_HASH = "151ba92f2eb90bce67e912af2f7a5c17d8654b3d29895b042107ea312a7eebda" + FAKE_DATA_PROGRAM: Optional[Path] = None BENCHMARK_FAKE_DATA_PROGRAM = Path( abspath(join(__file__, "../../examples/example_fastapi")) diff --git a/vm_supervisor/views.py b/vm_supervisor/views.py index f669fd64b..28146f530 100644 --- a/vm_supervisor/views.py +++ b/vm_supervisor/views.py @@ -1,6 +1,7 @@ import binascii import logging import os.path +from hashlib import sha256 from string import Template from typing import Awaitable, Optional @@ -170,8 +171,26 @@ async def status_check_version(request: web.Request): return web.HTTPForbidden(text=f"Outdated: version {current} < {reference}") +def authenticate_api_request(request: web.Request) -> bool: + """Authenticate an API request to update the VM allocations. + """ + signature: str = request.headers.get('X-Auth-Signature') + # body: bytes = await request.read() + if not signature: + raise web.HTTPUnauthorized(text="Authentication token is missing") + + # Use a simple authentication method: the hash of the signature should match the value in the settings + if sha256(signature).hexdigest() != settings.ALLOCATION_TOKEN_HASH: + raise web.HTTPUnauthorized(text="Authentication token received is invalid") + + return True + + + async def update_allocations(request: web.Request): - # TODO: Add some form of authentication + if not authenticate_api_request(request): + return web.HTTPUnauthorized(text="Invalid authentication") + try: data = await request.json() allocation = Allocation(**data) From 639111679956ddb93350cbdd920a84ffb1b55bdb Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Tue, 20 Sep 2022 12:02:30 +0100 Subject: [PATCH 4/6] Dependencies: persistent was not supported by aleph-message --- docker/vm_supervisor-dev.dockerfile | 2 +- examples/volumes/Dockerfile | 2 +- packaging/Makefile | 2 +- vm_supervisor/README.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/vm_supervisor-dev.dockerfile b/docker/vm_supervisor-dev.dockerfile index e4be2a238..d30422061 100644 --- a/docker/vm_supervisor-dev.dockerfile +++ b/docker/vm_supervisor-dev.dockerfile @@ -19,7 +19,7 @@ RUN curl -fsSL -o /opt/firecracker/vmlinux.bin https://s3.amazonaws.com/spec.ccf RUN ln /opt/firecracker/release-*/firecracker-v* /opt/firecracker/firecracker RUN ln /opt/firecracker/release-*/jailer-v* /opt/firecracker/jailer -RUN pip3 install typing-extensions 'aleph-message>=0.1.19' +RUN pip3 install typing-extensions 'aleph-message>=0.2.0' RUN mkdir -p /var/lib/aleph/vm/jailer diff --git a/examples/volumes/Dockerfile b/examples/volumes/Dockerfile index f80fdea60..a450a0a2e 100644 --- a/examples/volumes/Dockerfile +++ b/examples/volumes/Dockerfile @@ -6,6 +6,6 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y \ && rm -rf /var/lib/apt/lists/* RUN python3 -m venv /opt/venv -RUN /opt/venv/bin/pip install 'aleph-message>=0.1.19' +RUN /opt/venv/bin/pip install 'aleph-message>=0.2.0' CMD mksquashfs /opt/venv /mnt/volume-venv.squashfs diff --git a/packaging/Makefile b/packaging/Makefile index 22bfb4404..62b187a89 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -16,7 +16,7 @@ debian-package-code: cp ../examples/message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/message_from_aleph.json cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes - pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message>=0.1.19' + pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.2.0' python3 -m compileall ./aleph-vm/opt/aleph-vm/ debian-package-resources: firecracker-bins vmlinux diff --git a/vm_supervisor/README.md b/vm_supervisor/README.md index ac06beccf..e2dde9d23 100644 --- a/vm_supervisor/README.md +++ b/vm_supervisor/README.md @@ -87,7 +87,7 @@ is used to parse and validate Aleph messages. ```shell apt install -y --no-install-recommends --no-install-suggests python3-pip pip3 install pydantic[dotenv] -pip3 install aleph-message +pip3 install 'aleph-message==0.2.0' ``` ### 2.f. Create the jailer working directory: From 555dcc8a42ac2aa094a918d72165a6f944393710 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Tue, 20 Sep 2022 18:36:38 +0100 Subject: [PATCH 5/6] WIP: Fix encoding must use bytes --- vm_supervisor/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vm_supervisor/views.py b/vm_supervisor/views.py index 28146f530..3d6ddb4a9 100644 --- a/vm_supervisor/views.py +++ b/vm_supervisor/views.py @@ -174,7 +174,7 @@ async def status_check_version(request: web.Request): def authenticate_api_request(request: web.Request) -> bool: """Authenticate an API request to update the VM allocations. """ - signature: str = request.headers.get('X-Auth-Signature') + signature: bytes = request.headers.get('X-Auth-Signature').encode() # body: bytes = await request.read() if not signature: raise web.HTTPUnauthorized(text="Authentication token is missing") From d93ad97f6bea7bf049e992d09797563928f3164f Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 5 Oct 2022 15:00:58 +0200 Subject: [PATCH 6/6] WIP: Add support for IPv4 port publishing --- firecracker/microvm.py | 17 +++++++++++++++ vm_supervisor/models.py | 11 +++++++--- vm_supervisor/pool.py | 8 ++++--- vm_supervisor/resources.py | 10 +++++++-- vm_supervisor/run.py | 28 +++++++++++++++++++------ vm_supervisor/vm/firecracker_microvm.py | 7 ++++++- 6 files changed, 66 insertions(+), 15 deletions(-) diff --git a/firecracker/microvm.py b/firecracker/microvm.py index f101d50ec..eb630be63 100644 --- a/firecracker/microvm.py +++ b/firecracker/microvm.py @@ -11,6 +11,8 @@ from tempfile import NamedTemporaryFile from typing import Optional, Tuple, List +from aleph_message.models.program import PortMapping + from .config import FirecrackerConfig from .config import Drive @@ -327,6 +329,17 @@ async def create_network_interface(self, interface: str = "eth0") -> str: return host_dev_name + def publish_ports(self, port_mappings: List[PortMapping]): + interface = self.network_interface + + for mapping in port_mappings: + system(f"iptables -A PREROUTING -t nat " + f"-i {interface} " + f"-p {mapping.protocol} " + f"--dport {mapping.public_port} " + f"-j DNAT " + f"--to {self.guest_ip}:{mapping.port}") + async def print_logs(self): while not self.proc: await asyncio.sleep(0.01) # Todo: Use signal here @@ -444,6 +457,10 @@ async def teardown(self): f"iptables -D FORWARD -i {self.network_tap} -o {self.network_interface} -j ACCEPT" ) + # system( + # ... # TODO remove nat rules for the VM + # ) + if self._unix_socket: logger.debug("Closing unix socket") self._unix_socket.close() diff --git a/vm_supervisor/models.py b/vm_supervisor/models.py index 3117c0d6e..a7e951a62 100644 --- a/vm_supervisor/models.py +++ b/vm_supervisor/models.py @@ -5,16 +5,17 @@ from asyncio import Task from dataclasses import dataclass from datetime import datetime -from typing import NewType, Optional, Dict +from typing import NewType, Optional, Dict, List from aleph_message.models import ProgramContent +from aleph_message.models.program import PortMapping +from .conf import settings from .metrics import save_record, save_execution_data, ExecutionRecord from .pubsub import PubSub from .utils import dumps_for_json from .vm import AlephFirecrackerVM from .vm.firecracker_microvm import AlephFirecrackerResources -from .conf import settings logger = logging.getLogger(__name__) @@ -58,6 +59,7 @@ class VmExecution: update_task: Optional[asyncio.Task] = None marked_as_long_running: bool = False + port_mappings: List[PortMapping] @property def is_running(self): @@ -68,7 +70,8 @@ def becomes_ready(self): return self.ready_event.wait def __init__( - self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent + self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent, + port_mappings: Optional[List[PortMapping]] = None ): self.uuid = uuid.uuid1() # uuid1() includes the hardware address and timestamp self.vm_hash = vm_hash @@ -78,6 +81,7 @@ def __init__( self.ready_event = asyncio.Event() self.concurrent_runs = 0 self.runs_done_event = asyncio.Event() + self.port_mappings = port_mappings or [] def to_dict(self) -> Dict: return { @@ -109,6 +113,7 @@ async def create(self, vm_id: int) -> AlephFirecrackerVM: ) try: await vm.setup() + await vm.publish_ports(self.port_mappings) await vm.start() await vm.configure() await vm.start_guest_api() diff --git a/vm_supervisor/pool.py b/vm_supervisor/pool.py index a584a2bf3..b36458122 100644 --- a/vm_supervisor/pool.py +++ b/vm_supervisor/pool.py @@ -1,8 +1,9 @@ import asyncio import logging -from typing import Dict, Optional, Iterable +from typing import Dict, Optional, Iterable, List from aleph_message.models import ProgramContent, ProgramMessage +from aleph_message.models.program import PortMapping from .conf import settings from .models import VmHash, VmExecution @@ -28,10 +29,11 @@ def __init__(self): self.executions = {} async def create_a_vm( - self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent + self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent, + port_mappings: Optional[List[PortMapping]] = None, ) -> VmExecution: """Create a new Aleph Firecracker VM from an Aleph function message.""" - execution = VmExecution(vm_hash=vm_hash, program=program, original=original) + execution = VmExecution(vm_hash=vm_hash, program=program, original=original, port_mappings=port_mappings) self.executions[vm_hash] = execution await execution.prepare() vm_id = self.get_unique_vm_id() diff --git a/vm_supervisor/resources.py b/vm_supervisor/resources.py index 0755ed01d..7e23e4043 100644 --- a/vm_supervisor/resources.py +++ b/vm_supervisor/resources.py @@ -1,16 +1,17 @@ from datetime import datetime, timezone from functools import lru_cache -from typing import Set, Optional +from typing import Set, Optional, List from typing import Tuple import cpuinfo import psutil from aiohttp import web -from aleph_message.models.program import CpuProperties +from aleph_message.models.program import CpuProperties, PortMapping from pydantic import BaseModel from .conf import settings +from .models import VmHash class Period(BaseModel): @@ -121,6 +122,11 @@ async def about_system_usage(request: web.Request): ) +class ProgramAllocation(BaseModel): + program_id: VmHash + port_mappings: Optional[List[PortMapping]] = None + + class Allocation(BaseModel): long_running_vms: Set[str] on_demand_vms: Optional[Set[str]] = None diff --git a/vm_supervisor/run.py b/vm_supervisor/run.py index db2a114c7..a866669aa 100644 --- a/vm_supervisor/run.py +++ b/vm_supervisor/run.py @@ -1,10 +1,14 @@ import asyncio import logging -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List import msgpack from aiohttp import web -from aiohttp.web_exceptions import HTTPBadRequest, HTTPInternalServerError +from aiohttp.web_exceptions import ( + HTTPBadRequest, + HTTPInternalServerError, +) +from aleph_message.models.program import PortMapping from msgpack import UnpackValueError from firecracker.microvm import MicroVMFailedInit @@ -13,6 +17,7 @@ from .models import VmHash, VmExecution from .pool import VmPool from .pubsub import PubSub +from .resources import ProgramAllocation from .vm.firecracker_microvm import ( ResourceDownloadError, VmSetupError, @@ -44,8 +49,17 @@ async def build_event_scope(event) -> Dict[str, Any]: } -async def create_vm_execution(vm_hash: VmHash) -> VmExecution: +async def create_vm_execution(vm_hash: VmHash, port_mappings: Optional[List[PortMapping]] = None) -> VmExecution: + + # try: + # message, original_message = try_load_updated_message(vm_hash, attempts=5) + # except HTTPNotFound as error: + # raise HTTPServiceUnavailable(text=f"Message could not be loaded {error.args}") + message, original_message = await load_updated_message(vm_hash) + + # TODO: Verify that port mappings match ports published + pool.message_cache[vm_hash] = message try: @@ -53,6 +67,7 @@ async def create_vm_execution(vm_hash: VmHash) -> VmExecution: vm_hash=vm_hash, program=message.content, original=original_message.content, + port_mappings=port_mappings, ) except ResourceDownloadError as error: logger.exception(error) @@ -215,11 +230,12 @@ async def run_code_on_event(vm_hash: VmHash, event, pubsub: PubSub): await execution.stop() -async def start_long_running(vm_hash: VmHash, pubsub: PubSub) -> VmExecution: - execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash) +async def start_long_running(program_allocation: ProgramAllocation, pubsub: PubSub) -> VmExecution: + execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=program_allocation.program_id) if not execution: - execution = await create_vm_execution(vm_hash=vm_hash)\ + execution = await create_vm_execution(vm_hash=program_allocation.program_id, + port_mappings=program_allocation.port_mappings) # If the VM was already running in lambda mode, it should not expire # as long as it is also scheduled as long-running diff --git a/vm_supervisor/vm/firecracker_microvm.py b/vm_supervisor/vm/firecracker_microvm.py index 525e7ea74..ff8fb67d4 100644 --- a/vm_supervisor/vm/firecracker_microvm.py +++ b/vm_supervisor/vm/firecracker_microvm.py @@ -19,7 +19,7 @@ from aiohttp import ClientResponseError from aleph_message.models import ProgramContent -from aleph_message.models.program import MachineResources, Encoding +from aleph_message.models.program import MachineResources, Encoding, PortMapping from firecracker.config import ( BootSource, Drive, @@ -320,6 +320,11 @@ async def setup(self): await fvm.teardown() raise + async def publish_ports(self, port_mappings: List[PortMapping]): + if not self.fvm: + raise ValueError("MicroVM is not defined yet") + return self.fvm.publish_ports(port_mappings=port_mappings) + async def start(self): logger.debug(f"starting vm {self.vm_id}") if not self.fvm: