From bdf53677905ca843029a43613d85251ee6bba837 Mon Sep 17 00:00:00 2001 From: Hugo Herter Date: Thu, 20 Apr 2023 17:03:35 +0200 Subject: [PATCH] Fix: ASGI processes did not support lifetime events ASGI specification https://asgi.readthedocs.io/en/latest/specs/lifespan.html Fixes #293 Replaces #294 --- examples/example_fastapi/main.py | 16 +++++++ runtimes/aleph-alpine-3.13-python/init1.py | 49 +++++++++++++++++++--- vm_supervisor/__main__.py | 1 + vm_supervisor/status.py | 8 ++++ vm_supervisor/views.py | 1 + 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/examples/example_fastapi/main.py b/examples/example_fastapi/main.py index 4ad5f108f..693a4812a 100644 --- a/examples/example_fastapi/main.py +++ b/examples/example_fastapi/main.py @@ -28,6 +28,14 @@ app = AlephApp(http_app=http_app) cache = VmCache() +startup_lifespan_executed: bool = False + + +@app.on_event("startup") +async def startup_event(): + global startup_lifespan_executed + startup_lifespan_executed = True + @app.get("/") async def index(): @@ -45,6 +53,14 @@ async def index(): } +@app.get("/lifespan") +async def check_lifespan(): + """ + Check that ASGI lifespan startup signal has been received + """ + return {"Lifetime": startup_lifespan_executed} + + @app.get("/environ") async def environ() -> Dict[str, str]: """List environment variables""" diff --git a/runtimes/aleph-alpine-3.13-python/init1.py b/runtimes/aleph-alpine-3.13-python/init1.py index 47f435f62..b3a659964 100644 --- a/runtimes/aleph-alpine-3.13-python/init1.py +++ b/runtimes/aleph-alpine-3.13-python/init1.py @@ -23,7 +23,7 @@ from io import StringIO from os import system from shutil import make_archive -from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable +from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable, Literal import aiohttp import msgpack @@ -164,7 +164,39 @@ def setup_volumes(volumes: List[Volume]): system("mount") -def setup_code_asgi( +async def wait_for_lifespan_event_completion(application: ASGIApplication, + event: Union[Literal['startup', 'shutdown']]): + """ + Send the startup lifespan signal to the ASGI app. + Specification: https://asgi.readthedocs.io/en/latest/specs/lifespan.html + """ + + lifespan_completion = asyncio.Event() + + async def receive(): + return { + 'type': f'lifespan.{event}', + } + + async def send(response: Dict): + response_type = response.get('type') + if response_type == f'lifespan.{event}.complete': + lifespan_completion.set() + return + else: + logger.warning(f"Unexpected response to {event}: {response_type}") + + while not lifespan_completion.is_set(): + await application( + scope={ + 'type': 'lifespan', + }, + receive=receive, + send=send, + ) + + +async def setup_code_asgi( code: bytes, encoding: Encoding, entrypoint: str ) -> ASGIApplication: # Allow importing packages from /opt/packages @@ -200,6 +232,8 @@ def setup_code_asgi( app = locals[entrypoint] else: raise ValueError(f"Unknown encoding '{encoding}'") + + await wait_for_lifespan_event_completion(application=app, event='startup') return app @@ -235,12 +269,12 @@ def setup_code_executable( return process -def setup_code( +async def setup_code( code: bytes, encoding: Encoding, entrypoint: str, interface: Interface ) -> Union[ASGIApplication, subprocess.Popen]: if interface == Interface.asgi: - return setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint) + return await setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint) elif interface == Interface.executable: return setup_code_executable( code=code, encoding=encoding, entrypoint=entrypoint @@ -364,7 +398,10 @@ async def process_instruction( logger.debug("Application terminated") # application.communicate() else: - # Close the cached session in aleph_client: + assert isinstance(application, ASGIApplication) + await wait_for_lifespan_event_completion(application=application, event='shutdown') + + # Close the cached session in aleph_client: TODO: remove this, use SDK from aleph_client.asynchronous import get_fallback_session session: aiohttp.ClientSession = get_fallback_session() @@ -480,7 +517,7 @@ async def main(): setup_system(config) try: - app: Union[ASGIApplication, subprocess.Popen] = setup_code( + app: Union[ASGIApplication, subprocess.Popen] = await setup_code( config.code, config.encoding, config.entrypoint, config.interface ) client.send(msgpack.dumps({"success": True})) diff --git a/vm_supervisor/__main__.py b/vm_supervisor/__main__.py index acf34ae24..3c2b95647 100644 --- a/vm_supervisor/__main__.py +++ b/vm_supervisor/__main__.py @@ -171,6 +171,7 @@ async def fake_read() -> bytes: settings.REUSE_TIMEOUT = 0.1 for path in ( "/", + "/lifespan", "/environ", "/messages", "/internet", diff --git a/vm_supervisor/status.py b/vm_supervisor/status.py index 37156f81e..90187a2c8 100644 --- a/vm_supervisor/status.py +++ b/vm_supervisor/status.py @@ -30,6 +30,14 @@ async def check_index(session: ClientSession) -> bool: return False +async def check_lifespan(session: ClientSession) -> bool: + try: + result: Dict = await get_json_from_vm(session, "/lifespan") + return result["Lifetime"] is True + except ClientResponseError: + return False + + async def check_environ(session: ClientSession) -> bool: try: result: Dict = await get_json_from_vm(session, "/environ") diff --git a/vm_supervisor/views.py b/vm_supervisor/views.py index 4b21dd2f9..7804feb9f 100644 --- a/vm_supervisor/views.py +++ b/vm_supervisor/views.py @@ -141,6 +141,7 @@ async def status_check_fastapi(request: web.Request): async with aiohttp.ClientSession() as session: result = { "index": await status.check_index(session), + "lifespan": await status.check_lifespan(session), "environ": await status.check_environ(session), "messages": await status.check_messages(session), "internet": await status.check_internet(session),