Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect already running Persistent VMs #541

Merged
merged 7 commits into from
Feb 20, 2024
5 changes: 4 additions & 1 deletion src/aleph/vm/controllers/firecracker/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,11 @@ async def start_guest_api(self):
logger.debug(f"started guest API for {self.vm_id}")

async def stop_guest_api(self):
if self.guest_api_process and self.guest_api_process._popen:
if self.guest_api_process and self.guest_api_process.is_alive():
self.guest_api_process.terminate()
await asyncio.sleep(5)
if self.guest_api_process.is_alive():
self.guest_api_process.kill()

async def teardown(self):
if self.fvm:
Expand Down
1 change: 0 additions & 1 deletion src/aleph/vm/orchestrator/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def setup_engine():

async def create_tables(engine: Engine):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)


Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/views/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def operate_reboot(request: web.Request, authenticated_sender: str) -> web
if execution.is_running:
logger.info(f"Rebooting {execution.vm_hash}")
if execution.persistent:
await pool.systemd_manager.restart(execution.controller_service)
pool.systemd_manager.restart(execution.controller_service)
hoh marked this conversation as resolved.
Show resolved Hide resolved
else:
await pool.stop_vm(vm_hash)
pool.forget_vm(vm_hash)
Expand All @@ -197,7 +197,7 @@ async def operate_erase(request: web.Request, authenticated_sender: str) -> web.

# Stop the VM
await pool.stop_vm(execution.vm_hash)
await pool.forget_vm(execution.vm_hash)
pool.forget_vm(execution.vm_hash)
nesitor marked this conversation as resolved.
Show resolved Hide resolved

# Delete all data
if execution.resources is not None:
Expand Down
12 changes: 7 additions & 5 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ async def stop_persistent_execution(self, execution):
assert execution.persistent, "Execution isn't persistent"
self.systemd_manager.stop_and_disable(execution.controller_service)
await execution.stop()
execution.persistent = False

def forget_vm(self, vm_hash: ItemHash) -> None:
"""Remove a VM from the executions pool.
Expand All @@ -209,7 +208,7 @@ async def _load_persistent_executions(self):
for saved_execution in saved_executions:
# Prevent to load the same execution twice
if self.executions.get(saved_execution.vm_hash):
break
continue

vm_id = saved_execution.vm_id
message_dict = json.loads(saved_execution.message)
Expand Down Expand Up @@ -249,25 +248,28 @@ async def stop(self):
await asyncio.gather(*(execution.stop() for vm_hash, execution in self.get_ephemeral_executions()))

def get_ephemeral_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and not execution.persistent
)
return executions or []

def get_persistent_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and execution.persistent
)
return executions or []

def get_instance_executions(self) -> Iterable[VmExecution]:
return (
executions = (
execution
for _vm_hash, execution in self.executions.items()
if execution.is_running and execution.is_instance
)
return executions or []

def get_executions_by_sender(self, payment_type: PaymentType) -> Dict[str, Dict[str, list[VmExecution]]]:
"""Return all executions of the given type, grouped by sender and by chain."""
Expand Down
7 changes: 2 additions & 5 deletions src/aleph/vm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
import aiodns
import msgpack
from aleph_message.models import ExecutableContent, InstanceContent, ProgramContent
from aleph_message.models.execution.base import MachineType
from eth_typing import HexAddress, HexStr
from eth_utils import hexstr_if_str, is_address, to_hex

logger = logging.getLogger(__name__)


def get_message_executable_content(message_dict: Dict) -> ExecutableContent:
if message_dict["type"] == MachineType.vm_function:
try:
return ProgramContent.parse_obj(message_dict)
elif message_dict["type"] == MachineType.vm_instance:
except ValueError as error:
return InstanceContent.parse_obj(message_dict)
hoh marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError(f"Unknown message type {message_dict['type']}")


class MsgpackSerializable:
Expand Down
Loading