Skip to content

Commit

Permalink
Fix: Concurrent calls to stop() caused issues. (#415)
Browse files Browse the repository at this point in the history
Fix: Concurrent calls to stop() caused issues.

In particular, when recording the resource usage of the VM, the database raised the error `UNIQUE constraint failed: records.uuid
External`.
  • Loading branch information
hoh authored Oct 3, 2023
1 parent 14096a8 commit b2d6314
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions vm_supervisor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class VmExecution:
ready_event: asyncio.Event
concurrent_runs: int
runs_done_event: asyncio.Event
stop_pending_lock: asyncio.Lock
expire_task: Optional[asyncio.Task] = None
update_task: Optional[asyncio.Task] = None

Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
self.ready_event = asyncio.Event()
self.concurrent_runs = 0
self.runs_done_event = asyncio.Event()
self.stop_pending_lock = asyncio.Lock()
self.snapshot_manager = snapshot_manager

def to_dict(self) -> Dict:
Expand Down Expand Up @@ -215,19 +217,23 @@ def cancel_update(self) -> bool:
return False

async def stop(self):
if self.times.stopped_at is not None:
logger.debug(f"VM={self.vm.vm_id} already stopped")
return
await self.all_runs_complete()
self.times.stopping_at = datetime.now()
await self.record_usage()
await self.vm.teardown()
self.times.stopped_at = datetime.now()
self.cancel_expiration()
self.cancel_update()

if isinstance(self.message, InstanceContent):
await self.snapshot_manager.stop_for(self.vm_hash)
"""Stop the VM and release resources"""

# Prevent concurrent calls to stop() using a Lock
async with self.stop_pending_lock:
if self.times.stopped_at is not None:
logger.debug(f"VM={self.vm.vm_id} already stopped")
return
await self.all_runs_complete()
self.times.stopping_at = datetime.now()
await self.record_usage()
await self.vm.teardown()
self.times.stopped_at = datetime.now()
self.cancel_expiration()
self.cancel_update()

if isinstance(self.message, InstanceContent):
await self.snapshot_manager.stop_for(self.vm_hash)

def start_watching_for_updates(self, pubsub: PubSub):
if not self.update_task:
Expand Down

0 comments on commit b2d6314

Please sign in to comment.