Skip to content

Commit

Permalink
Fix: asyncio.run was sometimes used within a coroutine
Browse files Browse the repository at this point in the history
`asyncio.run` was called when initializing a pool object using `VmPool.__init__(...)`.

This caused two issues:
  1. The pool was sometimes created from within a coroutine in the context of tests, and this would raise an error.
  2. Having side effects inside the `__init__` method makes objects more difficult to manipulate and test.
  3. Tests should not load persistent executions automatically.
  4. The network was configured after loading persistent executions, which could cause networking issues.

A related issue is the snapshot manager being started when initializing the `VmPool`, while this is not always desirable.

Solution proposed:
  1. Explicitly load the persistent executions using `pool.load_persistent_executions()` from the `supervisor.run()` function. This is now called after `VmPool.setup()` and therefore after the networking of the host has been configured.
  2. The snapshot manager is now started by `VmPool.setup()` instead of `VmPool.__init__`. This function is almost always called just after initializing the pool.
  3. Configuring `settings.SNAPSHOT_FREQUENCY` to zero now disables the snapshot manager.
  4. `SnapshotManager.run_snapshots` is renamed `SnapshotManager.run_in_thread` to make its behaviour more explicit.
  • Loading branch information
hoh committed Mar 6, 2024
1 parent 2401133 commit 5f810ad
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ class Settings(BaseSettings):

SNAPSHOT_FREQUENCY: int = Field(
default=60,
description="Snapshot frequency interval in minutes. It will create a VM snapshot every X minutes.",
description="Snapshot frequency interval in minutes. It will create a VM snapshot every X minutes. "
"If set to zero, snapshots are disabled.",
)

SNAPSHOT_COMPRESSION_ALGORITHM: SnapshotCompressionAlgorithm = Field(
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/controllers/firecracker/snapshot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self):
self.executions = {}
self._scheduler = Scheduler()

def run_snapshots(self) -> None:
def run_in_thread(self) -> None:
job_thread = threading.Thread(
target=infinite_run_scheduler_jobs,
args=[self._scheduler],
Expand Down
4 changes: 4 additions & 0 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def run():
app.on_cleanup.append(stop_balances_monitoring_task)
app.on_cleanup.append(stop_all_vms)

logger.info("Loading existing executions ...")
asyncio.run(pool.load_persistent_executions())

Check warning on line 160 in src/aleph/vm/orchestrator/supervisor.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/supervisor.py#L159-L160

Added lines #L159 - L160 were not covered by tests

logger.info(f"Starting the web server on http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}")

Check warning on line 162 in src/aleph/vm/orchestrator/supervisor.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/supervisor.py#L162

Added line #L162 was not covered by tests
web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT)
except OSError as e:
if e.errno == 98:
Expand Down
18 changes: 9 additions & 9 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class VmPool:
executions: dict[ItemHash, VmExecution]
message_cache: dict[str, ExecutableMessage] = {}
network: Optional[Network]
snapshot_manager: SnapshotManager
snapshot_manager: Optional[SnapshotManager] = None
systemd_manager: SystemDManager

def __init__(self):
Expand All @@ -65,18 +65,18 @@ def __init__(self):
else None
)
self.systemd_manager = SystemDManager()
self.snapshot_manager = SnapshotManager()
logger.debug("Initializing SnapshotManager ...")
self.snapshot_manager.run_snapshots()

logger.debug("Loading existing executions ...")
asyncio.run(self._load_persistent_executions())
if settings.SNAPSHOT_FREQUENCY > 0:
self.snapshot_manager = SnapshotManager()

Check warning on line 69 in src/aleph/vm/pool.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/pool.py#L69

Added line #L69 was not covered by tests

def setup(self) -> None:
"""Set up the VM pool and the network."""
if self.network:
self.network.setup()

if self.snapshot_manager:
logger.debug("Initializing SnapshotManager ...")
self.snapshot_manager.run_in_thread()

Check warning on line 78 in src/aleph/vm/pool.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/pool.py#L77-L78

Added lines #L77 - L78 were not covered by tests

def teardown(self) -> None:
"""Stop the VM pool and the network properly."""
if self.network:
Expand Down Expand Up @@ -123,7 +123,7 @@ async def create_a_vm(
if execution.is_program and execution.vm:
await execution.vm.load_configuration()

if execution.vm and execution.vm.support_snapshot:
if execution.vm and execution.vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.start_for(vm=execution.vm)
except Exception:
# ensure the VM is removed from the pool on creation error
Expand Down Expand Up @@ -209,7 +209,7 @@ async def forget_on_stop(stop_event: asyncio.Event):

_ = asyncio.create_task(forget_on_stop(stop_event=execution.stop_event))

async def _load_persistent_executions(self):
async def load_persistent_executions(self):
"""Load persistent executions from the database."""
saved_executions = await get_execution_records()
for saved_execution in saved_executions:
Expand Down

0 comments on commit 5f810ad

Please sign in to comment.