added simplistic dictionary_auth example (#105)
This commit is contained in:
parent
b0895806af
commit
1a9ab6424e
|
@ -0,0 +1,34 @@
|
||||||
|
from aiohttp_security.abc import AbstractAuthorizationPolicy
|
||||||
|
|
||||||
|
|
||||||
|
class DictionaryAuthorizationPolicy(AbstractAuthorizationPolicy):
|
||||||
|
def __init__(self, user_map):
|
||||||
|
super().__init__()
|
||||||
|
self.user_map = user_map
|
||||||
|
|
||||||
|
async def authorized_userid(self, identity):
|
||||||
|
"""Retrieve authorized user id.
|
||||||
|
Return the user_id of the user identified by the identity
|
||||||
|
or 'None' if no user exists related to the identity.
|
||||||
|
"""
|
||||||
|
if identity in self.user_map:
|
||||||
|
return identity
|
||||||
|
|
||||||
|
async def permits(self, identity, permission, context=None):
|
||||||
|
"""Check user permissions.
|
||||||
|
Return True if the identity is allowed the permission in the
|
||||||
|
current context, else return False.
|
||||||
|
"""
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
user = self.user_map.get(identity)
|
||||||
|
if not user:
|
||||||
|
return False
|
||||||
|
return permission in user.permissions
|
||||||
|
|
||||||
|
|
||||||
|
async def check_credentials(user_map, username, password):
|
||||||
|
user = user_map.get(username)
|
||||||
|
if not user:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return user.password == password
|
|
@ -0,0 +1,107 @@
|
||||||
|
import asyncio
|
||||||
|
import functools
|
||||||
|
from textwrap import dedent
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
from aiohttp_security import remember, forget, authorized_userid, permits
|
||||||
|
|
||||||
|
from .authz import check_credentials
|
||||||
|
|
||||||
|
|
||||||
|
def require(permission):
|
||||||
|
def wrapper(f):
|
||||||
|
@asyncio.coroutine
|
||||||
|
@functools.wraps(f)
|
||||||
|
def wrapped(request):
|
||||||
|
has_perm = yield from permits(request, permission)
|
||||||
|
if not has_perm:
|
||||||
|
message = 'User has no permission {}'.format(permission)
|
||||||
|
raise web.HTTPForbidden(body=message.encode())
|
||||||
|
return (yield from f(request))
|
||||||
|
return wrapped
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
index_template = dedent("""
|
||||||
|
<!doctype html>
|
||||||
|
<head>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<p>{message}</p>
|
||||||
|
<form action="/login" method="post">
|
||||||
|
Login:
|
||||||
|
<input type="text" name="username">
|
||||||
|
Password:
|
||||||
|
<input type="password" name="password">
|
||||||
|
<input type="submit" value="Login">
|
||||||
|
</form>
|
||||||
|
<a href="/logout">Logout</a>
|
||||||
|
</body>
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
async def index(request):
|
||||||
|
username = await authorized_userid(request)
|
||||||
|
if username:
|
||||||
|
template = index_template.format(
|
||||||
|
message='Hello, {username}!'.format(username=username))
|
||||||
|
else:
|
||||||
|
template = index_template.format(message='You need to login')
|
||||||
|
return web.Response(
|
||||||
|
text=template,
|
||||||
|
content_type='text/html',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def login(request):
|
||||||
|
response = web.HTTPFound('/')
|
||||||
|
form = await request.post()
|
||||||
|
username = form.get('username')
|
||||||
|
password = form.get('password')
|
||||||
|
|
||||||
|
verified = await check_credentials(request.app.user_map, username, password)
|
||||||
|
if verified:
|
||||||
|
await remember(request, response, username)
|
||||||
|
return response
|
||||||
|
|
||||||
|
return web.HTTPUnauthorized(body='Invalid username / password combination')
|
||||||
|
|
||||||
|
|
||||||
|
@require('public')
|
||||||
|
async def logout(request):
|
||||||
|
response = web.Response(
|
||||||
|
text='You have been logged out',
|
||||||
|
content_type='text/html',
|
||||||
|
)
|
||||||
|
await forget(request, response)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@require('public')
|
||||||
|
async def internal_page(request):
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
response = web.Response(
|
||||||
|
text='This page is visible for all registered users',
|
||||||
|
content_type='text/html',
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@require('protected')
|
||||||
|
async def protected_page(request):
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
response = web.Response(
|
||||||
|
text='You are on protected page',
|
||||||
|
content_type='text/html',
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def configure_handlers(app):
|
||||||
|
router = app.router
|
||||||
|
router.add_get('/', index, name='index')
|
||||||
|
router.add_post('/login', login, name='login')
|
||||||
|
router.add_get('/logout', logout, name='logout')
|
||||||
|
router.add_get('/public', internal_page, name='public')
|
||||||
|
router.add_get('/protected', protected_page, name='protected')
|
|
@ -0,0 +1,33 @@
|
||||||
|
import base64
|
||||||
|
from cryptography import fernet
|
||||||
|
from aiohttp import web
|
||||||
|
from aiohttp_session import setup as setup_session
|
||||||
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
||||||
|
from aiohttp_security import setup as setup_security
|
||||||
|
from aiohttp_security import SessionIdentityPolicy
|
||||||
|
|
||||||
|
from .authz import DictionaryAuthorizationPolicy
|
||||||
|
from .handlers import configure_handlers
|
||||||
|
from .users import user_map
|
||||||
|
|
||||||
|
|
||||||
|
def make_app():
|
||||||
|
app = web.Application()
|
||||||
|
app.user_map = user_map
|
||||||
|
configure_handlers(app)
|
||||||
|
|
||||||
|
# secret_key must be 32 url-safe base64-encoded bytes
|
||||||
|
fernet_key = fernet.Fernet.generate_key()
|
||||||
|
secret_key = base64.urlsafe_b64decode(fernet_key)
|
||||||
|
|
||||||
|
storage = EncryptedCookieStorage(secret_key, cookie_name='API_SESSION')
|
||||||
|
setup_session(app, storage)
|
||||||
|
|
||||||
|
policy = SessionIdentityPolicy()
|
||||||
|
setup_security(app, policy, DictionaryAuthorizationPolicy(user_map))
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
web.run_app(make_app(), port=9000)
|
|
@ -0,0 +1,10 @@
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
User = namedtuple('User', ['username', 'password', 'permissions'])
|
||||||
|
|
||||||
|
user_map = {
|
||||||
|
user.username: user for user in [
|
||||||
|
User('devin', 'password', ('public',)),
|
||||||
|
User('jack', 'password', ('public', 'protected',)),
|
||||||
|
]
|
||||||
|
}
|
Loading…
Reference in New Issue