Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with stopping VMs #554

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
else self.systemd_manager.is_service_active(self.controller_service)
)

@property
def is_stopping(self) -> bool:
return bool(self.times.stopping_at and not self.times.stopped_at)

Check warning on line 95 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L95

Added line #L95 was not covered by tests

@property
def is_program(self):
return isinstance(self.message, ProgramContent)
Expand Down Expand Up @@ -289,8 +293,8 @@
if self.times.stopped_at is not None:
logger.debug(f"VM={self.vm.vm_id} already stopped")
return
await self.all_runs_complete()
self.times.stopping_at = datetime.now(tz=timezone.utc)
await self.all_runs_complete()

Check warning on line 297 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L297

Added line #L297 was not covered by tests
await self.record_usage()
await self.vm.teardown()
self.times.stopped_at = datetime.now(tz=timezone.utc)
Expand Down
8 changes: 4 additions & 4 deletions src/aleph/vm/orchestrator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
Execute the code corresponding to the 'code id' in the path.
"""

execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)
execution: Optional[VmExecution] = pool.get_running_vm(vm_hash=vm_hash)

Check warning on line 120 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L120

Added line #L120 was not covered by tests

if not execution:
execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash, pool=pool)
Expand Down Expand Up @@ -214,7 +214,7 @@
Execute code in response to an event.
"""

execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)
execution: Optional[VmExecution] = pool.get_running_vm(vm_hash=vm_hash)

Check warning on line 217 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L217

Added line #L217 was not covered by tests

if not execution:
execution = await create_vm_execution_or_raise_http_error(vm_hash=vm_hash, pool=pool)
Expand Down Expand Up @@ -260,7 +260,7 @@


async def start_persistent_vm(vm_hash: ItemHash, pubsub: Optional[PubSub], pool: VmPool) -> VmExecution:
execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=vm_hash)
execution: Optional[VmExecution] = pool.get_running_vm(vm_hash=vm_hash)

Check warning on line 263 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L263

Added line #L263 was not covered by tests

if not execution:
logger.info(f"Starting persistent virtual machine with id: {vm_hash}")
Expand All @@ -280,7 +280,7 @@

async def stop_persistent_vm(vm_hash: ItemHash, pool: VmPool) -> Optional[VmExecution]:
logger.info(f"Stopping persistent VM {vm_hash}")
execution = await pool.get_running_vm(vm_hash)
execution = pool.get_running_vm(vm_hash)

Check warning on line 283 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L283

Added line #L283 was not covered by tests

if execution:
await execution.stop()
Expand Down
11 changes: 6 additions & 5 deletions src/aleph/vm/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@

# Check if an execution is already present for this VM, then return it.
# Do not `await` in this section.
try:
return self.executions[vm_hash]
except KeyError:
current_execution = self.get_running_vm(vm_hash)

Check warning on line 92 in src/aleph/vm/pool.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/pool.py#L92

Added line #L92 was not covered by tests
if current_execution:
return current_execution

Check warning on line 94 in src/aleph/vm/pool.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/pool.py#L94

Added line #L94 was not covered by tests
else:
execution = VmExecution(
vm_hash=vm_hash,
message=message,
Expand Down Expand Up @@ -161,10 +162,10 @@
msg = "No available value for vm_id."
raise ValueError(msg)

async def get_running_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]:
def get_running_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]:
"""Return a running VM or None. Disables the VM expiration task."""
execution = self.executions.get(vm_hash)
if execution and execution.is_running:
if execution and execution.is_running and not execution.is_stopping:
execution.cancel_expiration()
return execution
else:
Expand Down
Loading