diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 141f790dc..6f0e25f36 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -243,7 +243,8 @@ class Settings(BaseSettings): SNAPSHOT_FREQUENCY: int = Field( default=60, - description="Snapshot frequency interval in minutes. It will create a VM snapshot every X minutes.", + description="Snapshot frequency interval in minutes. It will create a VM snapshot every X minutes. " + "If set to zero, snapshots are disabled.", ) SNAPSHOT_COMPRESSION_ALGORITHM: SnapshotCompressionAlgorithm = Field( diff --git a/src/aleph/vm/controllers/firecracker/snapshot_manager.py b/src/aleph/vm/controllers/firecracker/snapshot_manager.py index 2a42774e0..d5f81d1eb 100644 --- a/src/aleph/vm/controllers/firecracker/snapshot_manager.py +++ b/src/aleph/vm/controllers/firecracker/snapshot_manager.py @@ -86,7 +86,7 @@ def __init__(self): self.executions = {} self._scheduler = Scheduler() - def run_snapshots(self) -> None: + def run_in_thread(self) -> None: job_thread = threading.Thread( target=infinite_run_scheduler_jobs, args=[self._scheduler], diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 82f6979cb..f40031d48 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -156,6 +156,10 @@ def run(): app.on_cleanup.append(stop_balances_monitoring_task) app.on_cleanup.append(stop_all_vms) + logger.info("Loading existing executions ...") + asyncio.run(pool.load_persistent_executions()) + + logger.info(f"Starting the web server on http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}") web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT) except OSError as e: if e.errno == 98: diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index 0c1673cee..80f7327ea 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -41,7 +41,7 @@ class VmPool: executions: dict[ItemHash, VmExecution] message_cache: dict[str, ExecutableMessage] = {} network: Optional[Network] - snapshot_manager: SnapshotManager + snapshot_manager: Optional[SnapshotManager] = None systemd_manager: SystemDManager def __init__(self): @@ -65,18 +65,18 @@ def __init__(self): else None ) self.systemd_manager = SystemDManager() - self.snapshot_manager = SnapshotManager() - logger.debug("Initializing SnapshotManager ...") - self.snapshot_manager.run_snapshots() - - logger.debug("Loading existing executions ...") - asyncio.run(self._load_persistent_executions()) + if settings.SNAPSHOT_FREQUENCY > 0: + self.snapshot_manager = SnapshotManager() def setup(self) -> None: """Set up the VM pool and the network.""" if self.network: self.network.setup() + if self.snapshot_manager: + logger.debug("Initializing SnapshotManager ...") + self.snapshot_manager.run_in_thread() + def teardown(self) -> None: """Stop the VM pool and the network properly.""" if self.network: @@ -123,7 +123,7 @@ async def create_a_vm( if execution.is_program and execution.vm: await execution.vm.load_configuration() - if execution.vm and execution.vm.support_snapshot: + if execution.vm and execution.vm.support_snapshot and self.snapshot_manager: await self.snapshot_manager.start_for(vm=execution.vm) except Exception: # ensure the VM is removed from the pool on creation error @@ -209,7 +209,7 @@ async def forget_on_stop(stop_event: asyncio.Event): _ = asyncio.create_task(forget_on_stop(stop_event=execution.stop_event)) - async def _load_persistent_executions(self): + async def load_persistent_executions(self): """Load persistent executions from the database.""" saved_executions = await get_execution_records() for saved_execution in saved_executions: