Skip to content

Commit

Permalink
Fix: ASGI processes did not support lifetime events
Browse files Browse the repository at this point in the history
  • Loading branch information
hoh committed May 12, 2023
1 parent 95a204f commit bdf5367
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 6 deletions.
16 changes: 16 additions & 0 deletions examples/example_fastapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
app = AlephApp(http_app=http_app)
cache = VmCache()

startup_lifespan_executed: bool = False


@app.on_event("startup")
async def startup_event():
global startup_lifespan_executed
startup_lifespan_executed = True


@app.get("/")
async def index():
Expand All @@ -45,6 +53,14 @@ async def index():
}


@app.get("/lifespan")
async def check_lifespan():
"""
Check that ASGI lifespan startup signal has been received
"""
return {"Lifetime": startup_lifespan_executed}


@app.get("/environ")
async def environ() -> Dict[str, str]:
"""List environment variables"""
Expand Down
49 changes: 43 additions & 6 deletions runtimes/aleph-alpine-3.13-python/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from io import StringIO
from os import system
from shutil import make_archive
from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable
from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable, Literal

import aiohttp
import msgpack
Expand Down Expand Up @@ -164,7 +164,39 @@ def setup_volumes(volumes: List[Volume]):
system("mount")


def setup_code_asgi(
async def wait_for_lifespan_event_completion(application: ASGIApplication,
event: Union[Literal['startup', 'shutdown']]):
"""
Send the startup lifespan signal to the ASGI app.
Specification: https://asgi.readthedocs.io/en/latest/specs/lifespan.html
"""

lifespan_completion = asyncio.Event()

async def receive():
return {
'type': f'lifespan.{event}',
}

async def send(response: Dict):
response_type = response.get('type')
if response_type == f'lifespan.{event}.complete':
lifespan_completion.set()
return
else:
logger.warning(f"Unexpected response to {event}: {response_type}")

while not lifespan_completion.is_set():
await application(
scope={
'type': 'lifespan',
},
receive=receive,
send=send,
)


async def setup_code_asgi(
code: bytes, encoding: Encoding, entrypoint: str
) -> ASGIApplication:
# Allow importing packages from /opt/packages
Expand Down Expand Up @@ -200,6 +232,8 @@ def setup_code_asgi(
app = locals[entrypoint]
else:
raise ValueError(f"Unknown encoding '{encoding}'")

await wait_for_lifespan_event_completion(application=app, event='startup')
return app


Expand Down Expand Up @@ -235,12 +269,12 @@ def setup_code_executable(
return process


def setup_code(
async def setup_code(
code: bytes, encoding: Encoding, entrypoint: str, interface: Interface
) -> Union[ASGIApplication, subprocess.Popen]:

if interface == Interface.asgi:
return setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint)
return await setup_code_asgi(code=code, encoding=encoding, entrypoint=entrypoint)
elif interface == Interface.executable:
return setup_code_executable(
code=code, encoding=encoding, entrypoint=entrypoint
Expand Down Expand Up @@ -364,7 +398,10 @@ async def process_instruction(
logger.debug("Application terminated")
# application.communicate()
else:
# Close the cached session in aleph_client:
assert isinstance(application, ASGIApplication)
await wait_for_lifespan_event_completion(application=application, event='shutdown')

# Close the cached session in aleph_client: TODO: remove this, use SDK
from aleph_client.asynchronous import get_fallback_session

session: aiohttp.ClientSession = get_fallback_session()
Expand Down Expand Up @@ -480,7 +517,7 @@ async def main():
setup_system(config)

try:
app: Union[ASGIApplication, subprocess.Popen] = setup_code(
app: Union[ASGIApplication, subprocess.Popen] = await setup_code(
config.code, config.encoding, config.entrypoint, config.interface
)
client.send(msgpack.dumps({"success": True}))
Expand Down
1 change: 1 addition & 0 deletions vm_supervisor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ async def fake_read() -> bytes:
settings.REUSE_TIMEOUT = 0.1
for path in (
"/",
"/lifespan",
"/environ",
"/messages",
"/internet",
Expand Down
8 changes: 8 additions & 0 deletions vm_supervisor/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ async def check_index(session: ClientSession) -> bool:
return False


async def check_lifespan(session: ClientSession) -> bool:
try:
result: Dict = await get_json_from_vm(session, "/lifespan")
return result["Lifetime"] is True
except ClientResponseError:
return False


async def check_environ(session: ClientSession) -> bool:
try:
result: Dict = await get_json_from_vm(session, "/environ")
Expand Down
1 change: 1 addition & 0 deletions vm_supervisor/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ async def status_check_fastapi(request: web.Request):
async with aiohttp.ClientSession() as session:
result = {
"index": await status.check_index(session),
"lifespan": await status.check_lifespan(session),
"environ": await status.check_environ(session),
"messages": await status.check_messages(session),
"internet": await status.check_internet(session),
Expand Down

0 comments on commit bdf5367

Please sign in to comment.