Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore existing snapshots #383

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ debian-package-code:
cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.0' 'jwskate==0.8.0' 'eth-account==0.9.0' 'aioipfs==0.6.2' 'aleph-sdk-python==0.7.0'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux
Expand Down
1 change: 1 addition & 0 deletions packaging/debian-12.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y \
git \
curl \
sudo \
libssl-dev build-essential automake pkg-config libtool libffi-dev libgmp-dev libyaml-cpp-dev \
python3-pip \
&& rm -rf /var/lib/apt/lists/*

Expand Down
17 changes: 15 additions & 2 deletions vm_supervisor/messages.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import asyncio
import copy
from typing import Tuple
import logging
from typing import List, Tuple

from aiohttp import ClientConnectorError, ClientResponseError
from aiohttp.web_exceptions import HTTPNotFound, HTTPServiceUnavailable
from aleph_message.models import ExecutableMessage, ItemHash, MessageType
from aleph.sdk.client import AlephClient
from aleph_message.models import ExecutableMessage, ItemHash, MessageType, StoreMessage

from .storage import get_latest_amend, get_message

logger = logging.getLogger(__name__)


async def try_get_message(ref: str) -> ExecutableMessage:
"""Get the message or raise an aiohttp HTTP error"""
Expand Down Expand Up @@ -78,3 +82,12 @@ async def load_updated_message(
message = copy.deepcopy(original_message)
await update_message(message)
return message, original_message


async def try_get_store_messages_sdk(ref: str) -> List[StoreMessage]:
async with AlephClient(api_server="https://official.aleph.cloud") as client:
response = await client.get_messages(
message_type=MessageType.store,
refs=[ref],
)
return response.messages
1 change: 1 addition & 0 deletions vm_supervisor/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def do_execution_snapshot(

return None
except ValueError:
execution.snapshot_running = False
raise ValueError("Something failed taking an snapshot")


Expand Down
40 changes: 38 additions & 2 deletions vm_supervisor/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

from aleph_message.models import ItemHash

from .conf import SnapshotCompressionAlgorithm
from .conf import SnapshotCompressionAlgorithm, settings
from .ipfs import (
ipfs_remove_file,
ipfs_upload_file,
send_forget_ipfs_message,
send_store_ipfs_message,
)
from .storage import compress_volume_snapshot, create_volume_snapshot
from .messages import try_get_store_messages_sdk
from .storage import (
compress_volume_snapshot,
create_volume_snapshot,
decompress_volume_snapshot,
get_persistent_path,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,6 +51,11 @@ def __init__(
def delete(self) -> None:
self.path.unlink(missing_ok=True)

async def decompress(self, algorithm: SnapshotCompressionAlgorithm):
decompressed_snapshot = await decompress_volume_snapshot(self.path, algorithm)
decompressed = DiskVolumeSnapshot(path=decompressed_snapshot)
return decompressed

async def upload(self, vm_hash: ItemHash) -> ItemHash:
ref = f"snapshot_{vm_hash}"
snapshot_hash = await ipfs_upload_file(self.path)
Expand Down Expand Up @@ -92,3 +103,28 @@ class DiskVolume(DiskVolumeFile):
async def take_snapshot(self) -> DiskVolumeSnapshot:
snapshot = await create_volume_snapshot(self.path)
return DiskVolumeSnapshot(snapshot)


async def get_last_snapshot_by_ref(
ref: str, namespace: str
) -> Optional[DiskVolumeSnapshot]:
messages = await try_get_store_messages_sdk(ref)
if len(messages) == 0:
return None

message = messages[0]
logger.debug(f"Last snapshot message found: {message}")
snapshot_path = (
Path(settings.PERSISTENT_VOLUMES_DIR) / namespace / message.item_hash
)
if not snapshot_path.is_file():
compressed_snapshot_path = Path(f"{snapshot_path}.gz")
downloaded_snapshot_path = await get_persistent_path(message.item_hash)
downloaded_snapshot_path.rename(compressed_snapshot_path)
compressed_snapshot = CompressedDiskVolumeSnapshot(
compressed_snapshot_path, SnapshotCompressionAlgorithm.gz
)
snapshot = await compressed_snapshot.decompress(SnapshotCompressionAlgorithm.gz)
else:
snapshot = DiskVolumeSnapshot(snapshot_path)
return snapshot
47 changes: 43 additions & 4 deletions vm_supervisor/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from datetime import datetime
from pathlib import Path
from shutil import copy2, disk_usage, make_archive
from typing import Union
from subprocess import CalledProcessError
from typing import Optional, Union

import aiohttp
from aleph_message.models import (
Expand Down Expand Up @@ -188,6 +189,13 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path:
return cache_path


async def get_persistent_path(ref: str) -> Path:
cache_path = Path(settings.PERSISTENT_VOLUMES_DIR) / ref
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
await download_file(url, cache_path)
return cache_path


async def create_ext4(path: Path, size_mib: int) -> bool:
if path.is_file():
logger.debug(f"File already exists, skipping ext4 creation on {path}")
Expand Down Expand Up @@ -245,13 +253,21 @@ async def create_mapped_device(device_name: str, table_command: str) -> None:
async def resize_and_tune_file_system(device_path: Path, mount_path: Path) -> None:
# This tune is needed to assign a random fsid to BTRFS device to be able to mount it
await run_in_subprocess(["btrfstune", "-m", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
try:
await run_in_subprocess(["mount", str(device_path), str(mount_path)])
except CalledProcessError:
# Sometime BTRFS don't unmount well, for this cases, try to rescue it cleaning disk logs and mount it again
await run_in_subprocess(["btrfs", "rescue", "zero-log", str(device_path)])
await run_in_subprocess(["mount", str(device_path), str(mount_path)])

await run_in_subprocess(["btrfs", "filesystem", "resize", "max", str(mount_path)])
await run_in_subprocess(["umount", str(mount_path)])


async def create_devmapper(
volume: Union[PersistentVolume, RootfsVolume], namespace: str
volume: Union[PersistentVolume, RootfsVolume],
namespace: str,
snapshot_path: Optional[Path] = None,
) -> Path:
"""It creates a /dev/mapper/DEVICE inside the VM, that is an extended mapped device of the volume specified.
We follow the steps described here: https://community.aleph.im/t/deploying-mutable-vm-instances-on-aleph/56/2
Expand All @@ -277,7 +293,10 @@ async def create_devmapper(
base_table_command = f"0 {image_block_size} linear {image_loop_device} 0"
await create_mapped_device(image_volume_name, base_table_command)

volume_path = await create_volume_file(volume, namespace)
if snapshot_path:
volume_path = snapshot_path
else:
volume_path = await create_volume_file(volume, namespace)
extended_block_size: int = await get_block_size(volume_path)

mapped_volume_name_base = f"{namespace}_base"
Expand Down Expand Up @@ -368,6 +387,26 @@ async def compress_volume_snapshot(
return new_path


async def decompress_volume_snapshot(
path: Path,
algorithm: SnapshotCompressionAlgorithm = SnapshotCompressionAlgorithm.gz,
) -> Path:
if algorithm != SnapshotCompressionAlgorithm.gz:
raise NotImplementedError

new_path = Path(str(path).split(".gz")[0])

await run_in_subprocess(
[
"gzip",
"-d",
str(path),
]
)

return new_path


def check_disk_space(bytes_to_use: int) -> bool:
host_disk_usage = disk_usage("/")
return host_disk_usage.free >= bytes_to_use
10 changes: 9 additions & 1 deletion vm_supervisor/vm/firecracker/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CompressedDiskVolumeSnapshot,
DiskVolume,
DiskVolumeSnapshot,
get_last_snapshot_by_ref,
)
from vm_supervisor.storage import (
NotEnoughDiskSpace,
Expand All @@ -44,8 +45,15 @@

class AlephInstanceResources(AlephFirecrackerResources):
async def download_runtime(self):
ref = f"snapshot_{self.namespace}"
snapshot_path = None
snapshot = await get_last_snapshot_by_ref(ref, self.namespace)
if snapshot:
logger.debug(f"Snapshot found on path {snapshot.path}")
snapshot_path = snapshot.path

self.rootfs_path = await create_devmapper(
self.message_content.rootfs, self.namespace
self.message_content.rootfs, self.namespace, snapshot_path
)
assert (
self.rootfs_path.is_block_device()
Expand Down