diff --git a/src/aleph/vm/orchestrator/messages.py b/src/aleph/vm/orchestrator/messages.py index 3ed4f5cc..5ae67102 100644 --- a/src/aleph/vm/orchestrator/messages.py +++ b/src/aleph/vm/orchestrator/messages.py @@ -1,10 +1,12 @@ import asyncio import copy -from aiohttp import ClientConnectorError, ClientResponseError +from aiohttp import ClientConnectorError, ClientResponseError, ClientSession from aiohttp.web_exceptions import HTTPNotFound, HTTPServiceUnavailable from aleph_message.models import ExecutableMessage, ItemHash, MessageType +from aleph_message.status import MessageStatus +from aleph.vm.conf import settings from aleph.vm.storage import get_latest_amend, get_message @@ -69,3 +71,19 @@ async def load_updated_message( message = copy.deepcopy(original_message) await update_message(message) return message, original_message + + +async def get_message_status(item_hash: ItemHash) -> MessageStatus: + """ + Fetch the status of an execution from the reference API server. + We use a normal API call to the CCN instead to use the connector because we want to get the updated status of the + message and bypass the messages cache. + """ + async with ClientSession() as session: + url = f"{settings.API_SERVER}/api/v0/messages/{item_hash}" + resp = await session.get(url) + # Raise an error if the request failed + resp.raise_for_status() + + resp_data = await resp.json() + return resp_data["status"] diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index 006de6e6..fab864a6 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -16,13 +16,14 @@ ProgramMessage, parse_message, ) +from aleph_message.status import MessageStatus from yarl import URL from aleph.vm.conf import settings from aleph.vm.pool import VmPool from aleph.vm.utils import create_task_log_exceptions -from .messages import load_updated_message +from .messages import get_message_status, load_updated_message from .payment import ( compute_required_balance, compute_required_flow, @@ -148,6 +149,14 @@ async def monitor_payments(app: web.Application): while True: await asyncio.sleep(settings.PAYMENT_MONITOR_INTERVAL) + # Check if the executions continues existing or are forgotten before checking the payment + for vm_hash in pool.executions.keys(): + message_status = await get_message_status(vm_hash) + if message_status != MessageStatus.PROCESSED: + logger.debug(f"Stopping {vm_hash} execution due to {message_status} message status") + await pool.stop_vm(vm_hash) + pool.forget_vm(vm_hash) + # Check if the balance held in the wallet is sufficient holder tier resources (Not do it yet) for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.hold).items(): for chain, executions in chains.items(): @@ -171,6 +180,7 @@ async def monitor_payments(app: web.Application): logger.debug( f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" ) + required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") # Stop executions until the required stream is reached