Skip to content

Commit

Permalink
Fix: ASGI processes did not support lifetime events
Browse files Browse the repository at this point in the history
  • Loading branch information
hoh committed Apr 20, 2023
1 parent 9225eda commit a1e35ea
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions runtimes/aleph-alpine-3.13-python/init1.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from io import StringIO
from os import system
from shutil import make_archive
from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable
from typing import Optional, Dict, Any, Tuple, List, NewType, Union, AsyncIterable, Literal

import aiohttp
import msgpack
Expand Down Expand Up @@ -164,6 +164,36 @@ def setup_volumes(volumes: List[Volume]):
system("mount")


async def wait_for_lifespan_event_completion(application: ASGIApplication,
event: Union[Literal['startup', 'shutdown']]):
"""
Send the startup lifespan signal to the ASGI app.
Specification: https://asgi.readthedocs.io/en/latest/specs/lifespan.html
"""

lifespan_completion = asyncio.Event()

def receive():
return {
'type': f'lifespan.{event}',
}

def send(type: str):
if type == f'lifespan.{event}.complete':
pass
else:
logger.warning(f"Unexpected response to startup: {type}")

while not lifespan_completion.is_set():
await application(
scope={
'type': 'lifespan',
},
receive=receive,
send=send,
)


def setup_code_asgi(
code: bytes, encoding: Encoding, entrypoint: str
) -> ASGIApplication:
Expand Down Expand Up @@ -200,6 +230,8 @@ def setup_code_asgi(
app = locals[entrypoint]
else:
raise ValueError(f"Unknown encoding '{encoding}'")

asyncio.run(wait_for_lifespan_event_completion(application=app, event='startup'))
return app


Expand Down Expand Up @@ -364,7 +396,10 @@ async def process_instruction(
logger.debug("Application terminated")
# application.communicate()
else:
# Close the cached session in aleph_client:
assert isinstance(application, ASGIApplication)
await wait_for_lifespan_event_completion(application=application, event='shutdown')

# Close the cached session in aleph_client: TODO: remove this, use SDK
from aleph_client.asynchronous import get_fallback_session

session: aiohttp.ClientSession = get_fallback_session()
Expand Down

0 comments on commit a1e35ea

Please sign in to comment.