REST Framework for aiohttp
Usage example
Create some authentication middleware that patch default request and add .user
attribute
import re
from xml.etree import ElementTree as etree
from json.decoder import JSONDecodeError
from multidict import MultiDict, MultiDictProxy
from aiohttp import web
from aiohttp.hdrs import (
METH_POST, METH_PUT, METH_PATCH, METH_DELETE
)
from aiorestframework import exceptions
from aiorestframework import serializers
from aiorestframework import Response
from aiorestframework.views import BaseViewSet
from aiorestframework.permissions import set_permissions, AllowAny
from aiorestframework.app import APIApplication
from my_project.settings import Settings # Generated by aiohttp-devtools
TOKEN_RE = re.compile(r'^\s*BEARER\s{,3}(\S{64})\s*$')
async def token_authentication(app, handler):
"""
Authorization middleware
Catching Authorization: BEARER <token> from request headers
Found user in Tarantool by token and bind User or AnonymousUser to request
"""
async def middleware_handler(request):
# Check that `Authorization` header exists
if 'authorization' in request.headers:
authorization = request.headers['authorization']
# Check matches in header value
match = TOKEN_RE.match(authorization)
if not match:
setattr(request, 'user', AnonymousUser())
return await handler(request)
else:
token = match[1]
elif 'authorization_token' in request.query:
token = request.query['authorization_token']
else:
setattr(request, 'user', AnonymousUser())
return await handler(request)
# Try select user auth record from Tarantool by token index
res = await app['tnt'].select('auth', [token, ])
cached = res.body
if not cached:
raise exceptions.AuthenticationFailed()
# Build basic user data and bind it to User instance
record = cached[0]
user = User()
user.bind_cached_tarantool(record)
# Add User to request
setattr(request, 'user', user)
return await handler(request)
return middleware_handler
And for data extraction from request:
DATA_METHODS = [METH_POST, METH_PUT, METH_PATCH, METH_DELETE]
JSON_CONTENT = ['application/json', ]
XML_CONTENT = ['application/xml', ]
FORM_CONTENT = ['application/x-www-form-urlencoded', 'multipart/form-data']
async def request_data_handler(app, handler):
"""
Request .data middleware
Try extract POST data or application/json from request body
"""
async def middleware_handler(request):
data = None
if request.method in DATA_METHODS:
if request.has_body:
if request.content_type in JSON_CONTENT:
# If request has body - try to decode it to JSON
try:
data = await request.json()
except JSONDecodeError:
raise exceptions.ParseError()
elif request.content_type in XML_CONTENT:
if request.charset is not None:
encoding = request.charset
else:
encoding = api_settings.DEFAULT_CHARSET
parser = etree.XMLParser(encoding=encoding)
try:
text = await request.text()
tree = etree.XML(text, parser=parser)
except (etree.ParseError, ValueError) as exc:
raise exceptions.ParseError(
detail='XML parse error - %s' % str(exc))
data = tree
elif request.content_type in FORM_CONTENT:
data = await request.post()
if data is None:
# If not catch any data create empty MultiDictProxy
data = MultiDictProxy(MultiDict())
# Attach extracted data to request
setattr(request, 'data', data)
return await handler(request)
return middleware_handler
Create few serializers:
class UserRegisterSerializer(s.Serializer):
"""Register new user"""
email = s.EmailField(max_length=256)
password = s.CharField(min_length=8, max_length=64)
first_name = s.CharField(min_length=2, max_length=64)
middle_name = s.CharField(default='', min_length=2, max_length=64,
required=False, allow_blank=True)
last_name = s.CharField(min_length=2, max_length=64)
phone = s.CharField(max_length=32, required=False,
allow_blank=True, default='')
async def register_user(self, app):
user = User()
data = self.validated_data
try:
await user.register_user(data, app)
except Error as e:
resolve_db_exception(e, self)
return user
And few ViewSets. It may be nested in bindings['custom']
class UserViewSet(BaseViewSet):
name = 'user'
lookup_url_kwarg = '{user_id:[0-9a-f]{32}}'
permission_classes = [AllowAny, ]
bindings = {
'list': {
'retrieve': 'get',
'update': 'put'
},
'custom': {
'list': {
'set_status': 'post',
'create_new_sip_password': 'post',
'get_registration_domain': 'get',
'report': UserReportViewSet
}
}
}
@staticmethod
async def resolve_sip_host(data, user, app):
sip_host = await resolve_registration_switch(user, app)
data.update({'sip_host': sip_host})
async def retrieve(self, request):
user = User()
await user.load_from_db(request.match_info['user_id'], request.app)
serializer = user_ser.UserProfileSerializer(instance=user)
data = serializer.data
await self.resolve_sip_host(data, user, request.app)
return Response(data=data)
@atomic
@set_permissions([AuthenticatedOnly, IsCompanyMember, CompanyIsEnabled])
async def update(self, request):
serializer = user_ser.UserProfileSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = await serializer.update_user(user_id=request.user.id,
app=request.app)
serializer = user_ser.UserProfileSerializer(instance=user)
data = serializer.data
await self.resolve_sip_host(data, user, request.app)
return Response(data=data)
Register Viewsets and run Application:
def setup_routes(app: APIApplication):
"""Add app routes here"""
# Auth API
app.router.register_viewset('/auth', auth_vws.AuthViewSet())
# User API
app.router.register_viewset('/user', user_vws.UserViewSet())
# Root redirection to Swagger
redirect = app.router.add_resource('/', name='home_redirect')
redirect.add_route('*', swagger_redirect)
def create_api_app():
sentry = get_sentry_middleware(settings.SENTRY_CONNECT_STRING, settings.SENTRY_ENVIRONMENT)
middlewares = [sentry, token_authentication, request_data_handler]
api_app = APIApplication(name='api', middlewares=middlewares,
client_max_size=10*(1024**2))
api_app.on_startup.append(startup.startup_api)
api_app.on_shutdown.append(startup.shutdown_api)
api_app.on_cleanup.append(startup.cleanup_api)
setup_routes(api_app)
if __name__ == '__main__':
app = create_api_app()
web.run_app(app)