Skip to content

Commit

Permalink
Added Qemu automatic tests (#615)
Browse files Browse the repository at this point in the history
* Feature: Added automatic tests to check if QEmu runs.

* Fix: Added code quality fixes.

* Fix: Changed runtime generation script name.

* Fix: Solve conflicts with main branch.
  • Loading branch information
nesitor authored May 7, 2024
1 parent c942e27 commit d26888f
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 20 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-using-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ jobs:
find /opt
- name: "Build custom runtime"
- name: "Build custom runtimes"
run: |
sudo apt update
sudo apt install -y debootstrap ndppd acl cloud-image-utils qemu-utils qemu-system-x86
cd runtimes/aleph-debian-12-python && sudo ./create_disk_image.sh && cd ../..
cd runtimes/instance-rootfs && sudo ./create-ubuntu-22-04-qemu-disk.sh && cd ../..
- name: "Build example volume"
run: |
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ classifiers = [
"Topic :: System :: Distributed Computing",
]
dependencies = [
"pydantic[dotenv]==1.10.13",
"pydantic[dotenv]~=1.10.13",
"aiohttp==3.8.6",
"aiodns==3.1.0",
"setproctitle==1.3.3",
"pyyaml==6.0.1",
"aleph-message==0.4.4",
"jwskate==0.8.0",
"eth-account==0.9.0",
"eth-account~=0.10",
"sentry-sdk==1.31.0",
"aioredis==1.3.1",
"psutil==5.9.5",
Expand Down Expand Up @@ -140,7 +140,7 @@ pythonpath = [
testpaths = [
"tests"
]
ignore = [
norecursedirs = [
"runtimes/aleph-debian-11-python/rootfs/",
"runtimes/aleph-debian-12-python/rootfs/",
]
Expand Down
18 changes: 18 additions & 0 deletions runtimes/instance-rootfs/create-ubuntu-22-04-qemu-disk.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -euf

# Variables
ROOTFS_FILENAME="./rootfs.img"
IMAGE_URL="https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64-disk-kvm.img"
IMAGE_NAME="jammy-server-cloudimg-amd64-disk-kvm.img"

# Cleanup previous run
rm -f "$ROOTFS_FILENAME"

# Download Ubuntu image
echo "Downloading Ubuntu 22.04 image"
curl -L "$IMAGE_URL" -o "$IMAGE_NAME"

# Rename final file
mv "$IMAGE_NAME" "$ROOTFS_FILENAME"
4 changes: 3 additions & 1 deletion src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,15 @@ class Settings(BaseSettings):
)

USE_FAKE_INSTANCE_BASE = False
FAKE_INSTANCE_BASE = Path(abspath(join(__file__, "../../runtimes/instance-debian-rootfs/rootfs.ext4")))
FAKE_INSTANCE_BASE = Path(abspath(join(__file__, "../../runtimes/instance-rootfs/rootfs.ext4")))
FAKE_QEMU_INSTANCE_BASE = Path(abspath(join(__file__, "../../../../runtimes/instance-rootfs/rootfs.img")))
FAKE_INSTANCE_ID: str = Field(
default="decadecadecadecadecadecadecadecadecadecadecadecadecadecadecadeca",
description="Identifier used for the 'fake instance' message defined in "
"examples/instance_message_from_aleph.json",
)
FAKE_INSTANCE_MESSAGE = Path(abspath(join(__file__, "../../../../examples/instance_message_from_aleph.json")))
FAKE_INSTANCE_QEMU_MESSAGE = Path(abspath(join(__file__, "../../../../examples/qemu_message_from_aleph.json")))

CHECK_FASTAPI_VM_ID = "63faf8b5db1cf8d965e6a464a0cb8062af8e7df131729e48738342d956f29ace"
LEGACY_CHECK_FASTAPI_VM_ID = "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8"
Expand Down
20 changes: 15 additions & 5 deletions src/aleph/vm/controllers/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import logging
import signal
import sys
from asyncio.subprocess import Process
from pathlib import Path
from typing import Union

from aleph.vm.hypervisors.firecracker.microvm import MicroVM
from aleph.vm.hypervisors.qemu.qemuvm import QemuVM
Expand Down Expand Up @@ -54,7 +56,7 @@ def parse_args(args):
return parser.parse_args(args)


async def run_persistent_vm(config: Configuration):
async def execute_persistent_vm(config: Configuration):
if config.hypervisor == HypervisorType.firecracker:
assert isinstance(config.vm_configuration, VMConfiguration)
execution = MicroVM(
Expand All @@ -73,17 +75,25 @@ async def run_persistent_vm(config: Configuration):
execution = QemuVM(config.vm_configuration)
process = await execution.start()

# Catch the terminating signal and send a proper message to the vm to stop it so it close files properly
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, execution.send_shutdown_message)
return execution, process


async def handle_persistent_vm(config: Configuration, execution: Union[MicroVM, QemuVM], process: Process):
# Catch the terminating signal and send a proper message to the vm to stop it so it close files properly
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, execution.send_shutdown_message)

if config.settings.PRINT_SYSTEM_LOGS:
execution.start_printing_logs()

await process.wait()
logger.info(f"Process terminated with {process.returncode}")

return execution

async def run_persistent_vm(config: Configuration):
execution, process = await execute_persistent_vm(config)
await handle_persistent_vm(config=config, execution=execution, process=process)
return execution, process


def main():
Expand Down
3 changes: 3 additions & 0 deletions src/aleph/vm/controllers/qemu/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instance import AlephQemuInstance

__all__ = "AlephQemuInstance"
21 changes: 11 additions & 10 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
AlephFirecrackerResources,
AlephProgramResources,
)
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
from aleph.vm.controllers.interface import AlephVmControllerInterface
from aleph.vm.controllers.qemu.instance import AlephQemuInstance, AlephQemuResources
from aleph.vm.network.interfaces import TapInterface
Expand All @@ -34,12 +35,9 @@
)
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json

if TYPE_CHECKING:
from aleph.vm.controllers.firecracker.snapshot_manager import SnapshotManager
from aleph.vm.systemd import SystemDManager

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -81,14 +79,17 @@ class VmExecution:
expire_task: Optional[asyncio.Task] = None
update_task: Optional[asyncio.Task] = None

snapshot_manager: Optional[SnapshotManager]
systemd_manager: Optional[SystemDManager]

persistent: bool = False

@property
def is_running(self) -> bool:
return (
bool(self.times.starting_at and not self.times.stopping_at)
if not self.persistent
else self.systemd_manager.is_service_active(self.controller_service)
self.systemd_manager.is_service_active(self.controller_service)
if self.persistent and self.systemd_manager
else bool(self.times.starting_at and not self.times.stopping_at)
)

@property
Expand Down Expand Up @@ -141,8 +142,8 @@ def __init__(
vm_hash: ItemHash,
message: ExecutableContent,
original: ExecutableContent,
snapshot_manager: "SnapshotManager",
systemd_manager: "SystemDManager",
snapshot_manager: Optional[SnapshotManager],
systemd_manager: Optional[SystemDManager],
persistent: bool,
):
self.uuid = uuid.uuid1() # uuid1() includes the hardware address and timestamp
Expand Down Expand Up @@ -322,7 +323,7 @@ async def stop(self) -> None:
self.cancel_expiration()
self.cancel_update()

if self.vm.support_snapshot:
if self.vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.stop_for(self.vm_hash)
self.stop_event.set()

Expand Down
176 changes: 176 additions & 0 deletions tests/supervisor/test_qemu_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import asyncio
import logging
from asyncio.subprocess import Process
from pathlib import Path
from typing import Optional

import pytest
from aleph_message.models import ItemHash

from aleph.vm.conf import settings
from aleph.vm.controllers.__main__ import configuration_from_file, execute_persistent_vm
from aleph.vm.controllers.qemu import AlephQemuInstance
from aleph.vm.hypervisors.qemu.qemuvm import QemuVM
from aleph.vm.models import VmExecution
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.orchestrator import metrics
from aleph.vm.storage import get_message
from aleph.vm.systemd import SystemDManager
from aleph.vm.vm_type import VmType


@pytest.mark.asyncio
class MockSystemDManager(SystemDManager):
execution: Optional[QemuVM] = None
process: Optional[Process] = None

async def enable_and_start(self, vm_hash: str):
config_path = Path(f"{settings.EXECUTION_ROOT}/{vm_hash}-controller.json")
config = configuration_from_file(config_path)
self.execution, self.process = await execute_persistent_vm(config)
return self.execution, self.process

def is_service_enabled(self, service: str):
return self.process is not None

def is_service_active(self, service: str):
return self.process is not None

async def stop_and_disable(self, vm_hash: str):
if self.process:
self.process.kill()
self.process = None
self.execution = None
return self.execution, self.process


@pytest.mark.asyncio
async def test_create_qemu_instance():
"""
Create an instance and check that it start / init / stop properly.
"""

settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ALLOW_VM_NETWORKING = False
settings.USE_JAILER = False

logging.basicConfig(level=logging.DEBUG)
settings.PRINT_SYSTEM_LOGS = True

# Ensure that the settings are correct and required files present.
settings.setup()
settings.check()

# The database is required for the metrics and is currently not optional.
engine = metrics.setup_engine()
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

execution = VmExecution(
vm_hash=vm_hash,
message=message.content,
original=message.content,
snapshot_manager=None,
systemd_manager=None,
persistent=True,
)

await asyncio.wait_for(execution.prepare(), timeout=60)
vm_id = 3

vm = execution.create(vm_id=vm_id, tap_interface=None)

# Test that the VM is created correctly. It is not started yet.
assert isinstance(vm, AlephQemuInstance)
assert vm.vm_id == vm_id

await execution.start()
qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash)
assert isinstance(qemu_execution, QemuVM)
assert qemu_execution.qemu_process is not None
qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash)
await execution.stop()
assert qemu_execution is None


@pytest.mark.asyncio
async def test_create_qemu_instance_online():
"""
Create an instance and check that it start / init / stop properly.
"""

settings.USE_FAKE_INSTANCE_BASE = True
settings.FAKE_INSTANCE_MESSAGE = settings.FAKE_INSTANCE_QEMU_MESSAGE
settings.FAKE_INSTANCE_BASE = settings.FAKE_QEMU_INSTANCE_BASE
settings.ALLOW_VM_NETWORKING = True
settings.USE_JAILER = False

logging.basicConfig(level=logging.DEBUG)
settings.PRINT_SYSTEM_LOGS = True

# Ensure that the settings are correct and required files present.
settings.setup()
settings.check()

# The database is required for the metrics and is currently not optional.
engine = metrics.setup_engine()
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

network = (
Network(
vm_ipv4_address_pool_range=settings.IPV4_ADDRESS_POOL,
vm_network_size=settings.IPV4_NETWORK_PREFIX_LENGTH,
external_interface=settings.NETWORK_INTERFACE,
ipv6_allocator=make_ipv6_allocator(
allocation_policy=settings.IPV6_ALLOCATION_POLICY,
address_pool=settings.IPV6_ADDRESS_POOL,
subnet_prefix=settings.IPV6_SUBNET_PREFIX,
),
use_ndp_proxy=False,
ipv6_forwarding_enabled=False,
)
if settings.ALLOW_VM_NETWORKING
else None
)

execution = VmExecution(
vm_hash=vm_hash,
message=message.content,
original=message.content,
snapshot_manager=None,
systemd_manager=None,
persistent=True,
)

await asyncio.wait_for(execution.prepare(), timeout=60)
vm_id = 3

vm_type = VmType.from_message_content(message.content)
tap_interface = await network.prepare_tap(vm_id, vm_hash, vm_type)
await network.create_tap(vm_id, tap_interface)

vm = execution.create(vm_id=vm_id, tap_interface=tap_interface)

# Test that the VM is created correctly. It is not started yet.
assert isinstance(vm, AlephQemuInstance)
assert vm.vm_id == vm_id

await execution.start()
qemu_execution, process = await mock_systemd_manager.enable_and_start(execution.vm_hash)
assert isinstance(qemu_execution, QemuVM)
assert qemu_execution.qemu_process is not None
await execution.wait_for_init()
qemu_execution, process = await mock_systemd_manager.stop_and_disable(execution.vm_hash)
await execution.stop()
assert qemu_execution is None

0 comments on commit d26888f

Please sign in to comment.