diff --git a/packaging/Makefile b/packaging/Makefile index d5140c8ad..b8aba680c 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -17,7 +17,7 @@ debian-package-code: cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes - pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0' + pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0' 'aioipfs==0.6.2' 'aleph-sdk-python==0.7.0' python3 -m compileall ./aleph-vm/opt/aleph-vm/ debian-package-resources: firecracker-bins vmlinux diff --git a/packaging/debian-12.dockerfile b/packaging/debian-12.dockerfile index 2e62644dd..4fde64fa7 100644 --- a/packaging/debian-12.dockerfile +++ b/packaging/debian-12.dockerfile @@ -5,6 +5,7 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y \ git \ curl \ sudo \ + libssl-dev build-essential automake pkg-config libtool libffi-dev libgmp-dev libyaml-cpp-dev \ python3-pip \ && rm -rf /var/lib/apt/lists/* diff --git a/vm_supervisor/messages.py b/vm_supervisor/messages.py index c5167a1d0..8a8c19d99 100644 --- a/vm_supervisor/messages.py +++ b/vm_supervisor/messages.py @@ -1,13 +1,17 @@ import asyncio import copy -from typing import Tuple +import logging +from typing import List, Tuple from aiohttp import ClientConnectorError, ClientResponseError from aiohttp.web_exceptions import HTTPNotFound, HTTPServiceUnavailable -from aleph_message.models import ExecutableMessage, ItemHash, MessageType +from aleph.sdk.client import AlephClient +from aleph_message.models import ExecutableMessage, ItemHash, MessageType, StoreMessage from .storage import get_latest_amend, get_message +logger = logging.getLogger(__name__) + async def try_get_message(ref: str) -> ExecutableMessage: """Get the message or raise an aiohttp HTTP error""" @@ -78,3 +82,12 @@ async def load_updated_message( message = copy.deepcopy(original_message) await update_message(message) return message, original_message + + +async def try_get_store_messages_sdk(ref: str) -> List[StoreMessage]: + async with AlephClient(api_server="https://official.aleph.cloud") as client: + response = await client.get_messages( + message_type=MessageType.store, + refs=[ref], + ) + return response.messages diff --git a/vm_supervisor/snapshot_manager.py b/vm_supervisor/snapshot_manager.py index 3ddb9c06d..d1fdeaee0 100644 --- a/vm_supervisor/snapshot_manager.py +++ b/vm_supervisor/snapshot_manager.py @@ -43,6 +43,7 @@ async def do_execution_snapshot( return None except ValueError: + execution.snapshot_running = False raise ValueError("Something failed taking an snapshot") diff --git a/vm_supervisor/snapshots.py b/vm_supervisor/snapshots.py index cb8723a27..13dce3b2f 100644 --- a/vm_supervisor/snapshots.py +++ b/vm_supervisor/snapshots.py @@ -4,14 +4,20 @@ from aleph_message.models import ItemHash -from .conf import SnapshotCompressionAlgorithm +from .conf import SnapshotCompressionAlgorithm, settings from .ipfs import ( ipfs_remove_file, ipfs_upload_file, send_forget_ipfs_message, send_store_ipfs_message, ) -from .storage import compress_volume_snapshot, create_volume_snapshot +from .messages import try_get_store_messages_sdk +from .storage import ( + compress_volume_snapshot, + create_volume_snapshot, + decompress_volume_snapshot, + get_persistent_path, +) logger = logging.getLogger(__name__) @@ -45,6 +51,11 @@ def __init__( def delete(self) -> None: self.path.unlink(missing_ok=True) + async def decompress(self, algorithm: SnapshotCompressionAlgorithm): + decompressed_snapshot = await decompress_volume_snapshot(self.path, algorithm) + decompressed = DiskVolumeSnapshot(path=decompressed_snapshot) + return decompressed + async def upload(self, vm_hash: ItemHash) -> ItemHash: ref = f"snapshot_{vm_hash}" snapshot_hash = await ipfs_upload_file(self.path) @@ -92,3 +103,28 @@ class DiskVolume(DiskVolumeFile): async def take_snapshot(self) -> DiskVolumeSnapshot: snapshot = await create_volume_snapshot(self.path) return DiskVolumeSnapshot(snapshot) + + +async def get_last_snapshot_by_ref( + ref: str, namespace: str +) -> Optional[DiskVolumeSnapshot]: + messages = await try_get_store_messages_sdk(ref) + if len(messages) == 0: + return None + + message = messages[0] + logger.debug(f"Last snapshot message found: {message}") + snapshot_path = ( + Path(settings.PERSISTENT_VOLUMES_DIR) / namespace / message.item_hash + ) + if not snapshot_path.is_file(): + compressed_snapshot_path = Path(f"{snapshot_path}.gz") + downloaded_snapshot_path = await get_persistent_path(message.item_hash) + downloaded_snapshot_path.rename(compressed_snapshot_path) + compressed_snapshot = CompressedDiskVolumeSnapshot( + compressed_snapshot_path, SnapshotCompressionAlgorithm.gz + ) + snapshot = await compressed_snapshot.decompress(SnapshotCompressionAlgorithm.gz) + else: + snapshot = DiskVolumeSnapshot(snapshot_path) + return snapshot diff --git a/vm_supervisor/storage.py b/vm_supervisor/storage.py index 82ff332bc..309c432f3 100644 --- a/vm_supervisor/storage.py +++ b/vm_supervisor/storage.py @@ -11,7 +11,8 @@ from datetime import datetime from pathlib import Path from shutil import copy2, disk_usage, make_archive -from typing import Union +from subprocess import CalledProcessError +from typing import Optional, Union import aiohttp from aleph_message.models import ( @@ -188,6 +189,13 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path: return cache_path +async def get_persistent_path(ref: str) -> Path: + cache_path = Path(settings.PERSISTENT_VOLUMES_DIR) / ref + url = f"{settings.CONNECTOR_URL}/download/data/{ref}" + await download_file(url, cache_path) + return cache_path + + async def create_ext4(path: Path, size_mib: int) -> bool: if path.is_file(): logger.debug(f"File already exists, skipping ext4 creation on {path}") @@ -245,13 +253,21 @@ async def create_mapped_device(device_name: str, table_command: str) -> None: async def resize_and_tune_file_system(device_path: Path, mount_path: Path) -> None: # This tune is needed to assign a random fsid to BTRFS device to be able to mount it await run_in_subprocess(["btrfstune", "-m", str(device_path)]) - await run_in_subprocess(["mount", str(device_path), str(mount_path)]) + try: + await run_in_subprocess(["mount", str(device_path), str(mount_path)]) + except CalledProcessError: + # Sometime BTRFS don't unmount well, for this cases, try to rescue it cleaning disk logs and mount it again + await run_in_subprocess(["btrfs", "rescue", "zero-log", str(device_path)]) + await run_in_subprocess(["mount", str(device_path), str(mount_path)]) + await run_in_subprocess(["btrfs", "filesystem", "resize", "max", str(mount_path)]) await run_in_subprocess(["umount", str(mount_path)]) async def create_devmapper( - volume: Union[PersistentVolume, RootfsVolume], namespace: str + volume: Union[PersistentVolume, RootfsVolume], + namespace: str, + snapshot_path: Optional[Path] = None, ) -> Path: """It creates a /dev/mapper/DEVICE inside the VM, that is an extended mapped device of the volume specified. We follow the steps described here: https://community.aleph.im/t/deploying-mutable-vm-instances-on-aleph/56/2 @@ -277,7 +293,10 @@ async def create_devmapper( base_table_command = f"0 {image_block_size} linear {image_loop_device} 0" await create_mapped_device(image_volume_name, base_table_command) - volume_path = await create_volume_file(volume, namespace) + if snapshot_path: + volume_path = snapshot_path + else: + volume_path = await create_volume_file(volume, namespace) extended_block_size: int = await get_block_size(volume_path) mapped_volume_name_base = f"{namespace}_base" @@ -368,6 +387,26 @@ async def compress_volume_snapshot( return new_path +async def decompress_volume_snapshot( + path: Path, + algorithm: SnapshotCompressionAlgorithm = SnapshotCompressionAlgorithm.gz, +) -> Path: + if algorithm != SnapshotCompressionAlgorithm.gz: + raise NotImplementedError + + new_path = Path(str(path).split(".gz")[0]) + + await run_in_subprocess( + [ + "gzip", + "-d", + str(path), + ] + ) + + return new_path + + def check_disk_space(bytes_to_use: int) -> bool: host_disk_usage = disk_usage("/") return host_disk_usage.free >= bytes_to_use diff --git a/vm_supervisor/vm/firecracker/instance.py b/vm_supervisor/vm/firecracker/instance.py index 5cc4b6411..d97c7baf8 100644 --- a/vm_supervisor/vm/firecracker/instance.py +++ b/vm_supervisor/vm/firecracker/instance.py @@ -23,6 +23,7 @@ CompressedDiskVolumeSnapshot, DiskVolume, DiskVolumeSnapshot, + get_last_snapshot_by_ref, ) from vm_supervisor.storage import ( NotEnoughDiskSpace, @@ -44,8 +45,15 @@ class AlephInstanceResources(AlephFirecrackerResources): async def download_runtime(self): + ref = f"snapshot_{self.namespace}" + snapshot_path = None + snapshot = await get_last_snapshot_by_ref(ref, self.namespace) + if snapshot: + logger.debug(f"Snapshot found on path {snapshot.path}") + snapshot_path = snapshot.path + self.rootfs_path = await create_devmapper( - self.message_content.rootfs, self.namespace + self.message_content.rootfs, self.namespace, snapshot_path ) assert ( self.rootfs_path.is_block_device()