Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Qemu automatic tests #615

Merged
merged 5 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test-on-droplets-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ jobs:

find /opt

- name: "Build custom runtime"
- name: "Build custom runtimes"
run: |
sudo apt update
sudo apt install -y debootstrap ndppd acl cloud-image-utils qemu-utils qemu-system-x86
cd runtimes/aleph-debian-12-python && sudo ./create_disk_image.sh && cd ../..
cd runtimes/instance-rootfs && sudo ./create_debian-12-qemu-disk.sh && cd ../..

- name: "Build example volume"
run: |
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ classifiers = [
"Topic :: System :: Distributed Computing",
]
dependencies = [
"pydantic[dotenv]==1.10.13",
"pydantic[dotenv]~=1.10.13",
"aiohttp==3.8.6",
"aiodns==3.1.0",
"setproctitle==1.3.3",
"pyyaml==6.0.1",
"aleph-message==0.4.4",
"jwskate==0.8.0",
"eth-account==0.9.0",
"eth-account~=0.10",
"sentry-sdk==1.31.0",
"aioredis==1.3.1",
"psutil==5.9.5",
Expand Down Expand Up @@ -140,7 +140,7 @@ pythonpath = [
testpaths = [
"tests"
]
ignore = [
norecursedirs = [
"runtimes/aleph-debian-11-python/rootfs/",
"runtimes/aleph-debian-12-python/rootfs/",
]
Expand Down
18 changes: 18 additions & 0 deletions runtimes/instance-rootfs/create-ubuntu-22-04-qemu-disk.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -euf

# Variables
ROOTFS_FILENAME="./rootfs.img"
IMAGE_URL="https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64-disk-kvm.img"
IMAGE_NAME="jammy-server-cloudimg-amd64-disk-kvm.img"

# Cleanup previous run
rm -f "$ROOTFS_FILENAME"

# Download Ubuntu image
echo "Downloading Ubuntu 22.04 image"
curl -L "$IMAGE_URL" -o "$IMAGE_NAME"

# Rename final file
mv "$IMAGE_NAME" "$ROOTFS_FILENAME"
4 changes: 3 additions & 1 deletion src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,15 @@ class Settings(BaseSettings):
)

USE_FAKE_INSTANCE_BASE = False
FAKE_INSTANCE_BASE = Path(abspath(join(__file__, "../../runtimes/instance-debian-rootfs/rootfs.ext4")))
FAKE_INSTANCE_BASE = Path(abspath(join(__file__, "../../runtimes/instance-rootfs/rootfs.ext4")))
nesitor marked this conversation as resolved.
Show resolved Hide resolved
FAKE_QEMU_INSTANCE_BASE = Path(abspath(join(__file__, "../../../../runtimes/instance-rootfs/rootfs.img")))
FAKE_INSTANCE_ID: str = Field(
default="decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca",
description="Identifier used for the 'fake instance' message defined in "
"examples/instance_message_from_aleph.json",
)
FAKE_INSTANCE_MESSAGE = Path(abspath(join(__file__, "../../../../examples/instance_message_from_aleph.json")))
FAKE_INSTANCE_QEMU_MESSAGE = Path(abspath(join(__file__, "../../../../examples/qemu_message_from_aleph.json")))

CHECK_FASTAPI_VM_ID = "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace"
LEGACY_CHECK_FASTAPI_VM_ID = "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8"
Expand Down
20 changes: 15 additions & 5 deletions src/aleph/vm/controllers/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import logging
import signal
import sys
from asyncio.subprocess import Process
from pathlib import Path
from typing import Union

from aleph.vm.hypervisors.firecracker.microvm import MicroVM
from aleph.vm.hypervisors.qemu.qemuvm import QemuVM
Expand Down Expand Up @@ -54,7 +56,7 @@ def parse_args(args):
return parser.parse_args(args)


async def run_persistent_vm(config: Configuration):
async def execute_persistent_vm(config: Configuration):
if config.hypervisor == HypervisorType.firecracker:
assert isinstance(config.vm_configuration, VMConfiguration)
execution = MicroVM(
Expand All @@ -73,17 +75,25 @@ async def run_persistent_vm(config: Configuration):
execution = QemuVM(config.vm_configuration)
process = await execution.start()

# Catch the terminating signal and send a proper message to the vm to stop it so it close files properly
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, execution.send_shutdown_message)
return execution, process


async def handle_persistent_vm(config: Configuration, execution: Union[MicroVM, QemuVM], process: Process):
# Catch the terminating signal and send a proper message to the vm to stop it so it close files properly
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, execution.send_shutdown_message)

if config.settings.PRINT_SYSTEM_LOGS:
execution.start_printing_logs()

await process.wait()
logger.info(f"Process terminated with {process.returncode}")

return execution

async def run_persistent_vm(config: Configuration):
execution, process = await execute_persistent_vm(config)
await handle_persistent_vm(config=config, execution=execution, process=process)
return execution, process


def main():
Expand Down
3 changes: 3 additions & 0 deletions src/aleph/vm/controllers/qemu/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instance import AlephQemuInstance

__all__ = "AlephQemuInstance"
21 changes: 11 additions & 10 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
AlephFirecrackerResources,
AlephProgramResources,
)
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
from aleph.vm.controllers.interface import AlephVmControllerInterface
from aleph.vm.controllers.qemu.instance import AlephQemuInstance, AlephQemuResources
from aleph.vm.network.interfaces import TapInterface
Expand All @@ -34,12 +35,9 @@
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

if TYPE_CHECKING:
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
from aleph.vm.systemd import SystemDManager

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -81,14 +79,17 @@ class VmExecution:
expire_task: Optional[asyncio.Task] = None
update_task: Optional[asyncio.Task] = None

snapshot_manager: Optional[SnapshotManager]
systemd_manager: Optional[SystemDManager]

persistent: bool = False

@property
def is_running(self) -> bool:
return (
bool(self.times.starting_at and not self.times.stopping_at)
if not self.persistent
else self.systemd_manager.is_service_active(self.controller_service)
self.systemd_manager.is_service_active(self.controller_service)
if self.persistent and self.systemd_manager
else bool(self.times.starting_at and not self.times.stopping_at)
)

@property
Expand Down Expand Up @@ -141,8 +142,8 @@ def __init__(
vm_hash: ItemHash,
message: ExecutableContent,
original: ExecutableContent,
snapshot_manager: "SnapshotManager",
systemd_manager: "SystemDManager",
snapshot_manager: Optional[SnapshotManager],
systemd_manager: Optional[SystemDManager],
persistent: bool,
):
self.uuid = uuid.uuid1() # uuid1() includes the hardware address and timestamp
Expand Down Expand Up @@ -322,7 +323,7 @@ async def stop(self) -> None:
self.cancel_expiration()
self.cancel_update()

if self.vm.support_snapshot:
if self.vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.stop_for(self.vm_hash)
self.stop_event.set()

Expand Down
176 changes: 176 additions & 0 deletions tests/supervisor/test_qemu_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import asyncio
import logging
from asyncio.subprocess import Process
from pathlib import Path
from typing import Optional

import pytest
from aleph_message.models import ItemHash

from aleph.vm.conf import settings
from aleph.vm.controllers.__main__ import configuration_from_file, execute_persistent_vm
from aleph.vm.controllers.qemu import AlephQemuInstance
from aleph.vm.hypervisors.qemu.qemuvm import QemuVM
from aleph.vm.models import VmExecution
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.orchestrator import metrics
from aleph.vm.storage import get_message
from aleph.vm.systemd import SystemDManager
from aleph.vm.vm_type import VmType


@pytest.mark.asyncio
class MockSystemDManager(SystemDManager):
execution: Optional[QemuVM] = None
process: Optional[Process] = None

async def enable_and_start(self, vm_hash: str):
config_path = Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json")
config = configuration_from_file(config_path)
self.execution, self.process = await execute_persistent_vm(config)
return self.execution, self.process

def is_service_enabled(self, service: str):
return self.process is not None

def is_service_active(self, service: str):
return self.process is not None

async def stop_and_disable(self, vm_hash: str):
if self.process:
self.process.kill()
self.process = None
self.execution = None
return self.execution, self.process


@pytest.mark.asyncio
async def test_create_qemu_instance():
"""
Create an instance and check that it start / init / stop properly.
"""

settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ALLOW_VM_NETWORKING = False
settings.USE_JAILER = False

logging.basicConfig(level=logging.DEBUG)
settings.PRINT_SYSTEM_LOGS = True

# Ensure that the settings are correct and required files present.
settings.setup()
settings.check()

# The database is required for the metrics and is currently not optional.
engine = metrics.setup_engine()
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

execution = VmExecution(
vm_hash=vm_hash,
message=message.content,
original=message.content,
snapshot_manager=None,
systemd_manager=None,
persistent=True,
)

await asyncio.wait_for(execution.prepare(), timeout=60)
vm_id = 3

vm = execution.create(vm_id=vm_id, tap_interface=None)

# Test that the VM is created correctly. It is not started yet.
assert isinstance(vm, AlephQemuInstance)
assert vm.vm_id == vm_id

await execution.start()
qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash)
assert isinstance(qemu_execution, QemuVM)
assert qemu_execution.qemu_process is not None
qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash)
await execution.stop()
assert qemu_execution is None


@pytest.mark.asyncio
async def test_create_qemu_instance_online():
"""
Create an instance and check that it start / init / stop properly.
"""

settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ALLOW_VM_NETWORKING = True
settings.USE_JAILER = False

logging.basicConfig(level=logging.DEBUG)
settings.PRINT_SYSTEM_LOGS = True

# Ensure that the settings are correct and required files present.
settings.setup()
settings.check()

# The database is required for the metrics and is currently not optional.
engine = metrics.setup_engine()
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

network = (
Network(
vm_ipv4_address_pool_range=settings.IPV4_ADDRESS_POOL,
vm_network_size=settings.IPV4_NETWORK_PREFIX_LENGTH,
external_interface=settings.NETWORK_INTERFACE,
ipv6_allocator=make_ipv6_allocator(
allocation_policy=settings.IPV6_ALLOCATION_POLICY,
address_pool=settings.IPV6_ADDRESS_POOL,
subnet_prefix=settings.IPV6_SUBNET_PREFIX,
),
use_ndp_proxy=False,
ipv6_forwarding_enabled=False,
)
if settings.ALLOW_VM_NETWORKING
else None
)

execution = VmExecution(
vm_hash=vm_hash,
message=message.content,
original=message.content,
snapshot_manager=None,
systemd_manager=None,
persistent=True,
)

await asyncio.wait_for(execution.prepare(), timeout=60)
vm_id = 3

vm_type = VmType.from_message_content(message.content)
tap_interface = await network.prepare_tap(vm_id, vm_hash, vm_type)
await network.create_tap(vm_id, tap_interface)

vm = execution.create(vm_id=vm_id, tap_interface=tap_interface)

# Test that the VM is created correctly. It is not started yet.
assert isinstance(vm, AlephQemuInstance)
assert vm.vm_id == vm_id

await execution.start()
qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash)
assert isinstance(qemu_execution, QemuVM)
assert qemu_execution.qemu_process is not None
await execution.wait_for_init()
qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash)
await execution.stop()
assert qemu_execution is None
Loading