From 0e80072b3b3cc0e81809cd7fc8d500f6a2564bfa Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Wed, 27 Sep 2023 13:49:49 +0200 Subject: [PATCH] Fix: Concurrent calls to stop() caused issues. In particular, when recording the resource usage of the VM, the database raised the error `UNIQUE constraint failed: records.uuid External`. --- vm_supervisor/models.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/vm_supervisor/models.py b/vm_supervisor/models.py index 912fa0796..f4d803d5e 100644 --- a/vm_supervisor/models.py +++ b/vm_supervisor/models.py @@ -68,6 +68,7 @@ class VmExecution: ready_event: asyncio.Event concurrent_runs: int runs_done_event: asyncio.Event + stop_pending: asyncio.Semaphore expire_task: Optional[asyncio.Task] = None update_task: Optional[asyncio.Task] = None @@ -108,6 +109,7 @@ def __init__( self.ready_event = asyncio.Event() self.concurrent_runs = 0 self.runs_done_event = asyncio.Event() + self.stop_pending = asyncio.Semaphore() self.snapshot_manager = snapshot_manager def to_dict(self) -> Dict: @@ -215,19 +217,23 @@ def cancel_update(self) -> bool: return False async def stop(self): - if self.times.stopped_at is not None: - logger.debug(f"VM={self.vm.vm_id} already stopped") - return - await self.all_runs_complete() - self.times.stopping_at = datetime.now() - await self.record_usage() - await self.vm.teardown() - self.times.stopped_at = datetime.now() - self.cancel_expiration() - self.cancel_update() - - if isinstance(self.message, InstanceContent): - await self.snapshot_manager.stop_for(self.vm_hash) + """Stop the VM and release resources""" + + # Prevent concurrent calls to stop() using a semaphore + async with self.stop_pending: + if self.times.stopped_at is not None: + logger.debug(f"VM={self.vm.vm_id} already stopped") + return + await self.all_runs_complete() + self.times.stopping_at = datetime.now() + await self.record_usage() + await self.vm.teardown() + self.times.stopped_at = datetime.now() + self.cancel_expiration() + self.cancel_update() + + if isinstance(self.message, InstanceContent): + await self.snapshot_manager.stop_for(self.vm_hash) def start_watching_for_updates(self, pubsub: PubSub): if not self.update_task: