16 Commits

Author SHA1 Message Date
pyup-bot
140804e4b5 Update aiohttp from 3.0.7 to 3.0.9 2018-03-19 20:14:07 +07:00
pyup-bot
6d9f5f03ed Update cryptography from 2.1.4 to 2.2 2018-03-19 20:14:06 +07:00
pyup.io bot
363f3b71c0 Scheduled weekly dependency update for week 10 (#138)
* Update pytest from 3.4.1 to 3.4.2

* Update aiohttp from 3.0.6 to 3.0.7
2018-03-12 19:20:07 +02:00
Oleh Kuchuk
9da55517b2 correct example linking (#136) 2018-03-06 14:21:40 +02:00
pyup.io bot
d375b22f1b Scheduled weekly dependency update for week 09 (#135)
* Update pytest from 3.3.2 to 3.4.1

* Update coverage from 4.4.2 to 4.5.1

* Update sphinx from 1.6.6 to 1.7.1

* Update aiohttp-session from 2.1.0 to 2.3.0

* Update aioredis from 1.0.0 to 1.1.0

* Update aiohttp from 2.3.9 to 3.0.6
2018-03-05 19:38:35 +02:00
Andrew Svetlov
4506c306a7 Relax travis matrix 2018-02-26 17:50:56 +02:00
Phil Elson
9b9b848fdd Two trivial typos. (#129) 2018-02-01 10:50:58 +02:00
Eduard Nabokov
1679f6713b Update docs and demo with login required, has permission (#128)
* Work on

* Update docs with login_required and has_permission
2018-01-24 12:29:20 +02:00
pyup.io bot
f9628b0ac1 Update aiohttp from 2.3.7 to 2.3.9 (#127) 2018-01-23 11:31:22 +02:00
pyup.io bot
f8940c0696 Scheduled weekly dependency update for week 01 (#126)
* Update pytest from 3.3.1 to 3.3.2

* Update sphinx from 1.6.5 to 1.6.6

* Update aiopg from 0.13.1 to 0.13.2
2018-01-08 23:42:12 +02:00
Grygorii Yermolenko
5d1195b85d Small updates in readme and docs (#125)
- add info about installation with extra ([]session])
- add links to examples
- move `Public api` header into related section
2018-01-03 12:09:11 +02:00
Andrew Svetlov
8360095011 Fix years 2018-01-02 14:31:39 +02:00
Grygorii Yermolenko
d89f6b7e3d Fix in demos related to async-await syntax (#123)
- remove 3.4 version from setup.py
- add cryprography package to requirements since it is used in
dictionary_auth
- add a few missed async kwords
2018-01-02 14:31:19 +02:00
pyup.io bot
db7dbd9b07 Scheduled weekly dependency update for week 00 (#124)
* Update pytest from 3.2.5 to 3.3.1

* Update aiohttp-session from 1.2.1 to 2.1.0

* Update aiohttp from 2.3.3 to 2.3.7

* Update pytest-aiohttp from 0.1.3 to 0.3.0
2018-01-02 14:28:32 +02:00
Andrew Svetlov
5b2ff779c3 Switch to async/await syntax 2017-12-13 16:51:46 +02:00
pyup.io bot
b9dee120c3 Scheduled weekly dependency update for week 47 (#115)
* Update pytest from 3.2.3 to 3.2.5

* Update aiohttp-session from 1.2.0 to 1.2.1

* Update aioredis from 0.3.5 to 1.0.0

* Update aiohttp from 2.3.2 to 2.3.3
2017-11-22 19:59:14 +02:00
24 changed files with 380 additions and 526 deletions

View File

@@ -1,11 +1,15 @@
language: python
python:
- 3.4.3
- 3.5
- 3.6
- 3.7-dev
- nightly
matrix:
allow_failures:
- python: 3.7-dev
- python: nightly
install:
- pip install --upgrade pip
- pip install -r requirements-dev.txt

View File

@@ -186,7 +186,7 @@ Apache License
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2015-2016 Andrew Svetlov
Copyright 2015-2018 Andrew Svetlov and aio-libs team.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@@ -9,17 +9,34 @@ aiohttp_security
.. image:: https://img.shields.io/pypi/v/aiohttp-security.svg
:target: https://pypi.python.org/pypi/aiohttp-security
The library provides identity and autorization for `aiohttp.web`__.
The library provides identity and authorization for `aiohttp.web`__.
.. _aiohttp_web: http://aiohttp.readthedocs.org/en/latest/web.html
__ aiohttp_web_
Usage
-----
To install type ``pip install aiohttp_security``.
Launch ``make doc`` and see examples or look under **demo** directory for a
sample project.
Installation
------------
Simplest case (authorization via cookies) ::
$ pip install aiohttp_security
With `aiohttp-session` support ::
$ pip install aiohttp_security[session]
Examples
--------
Take a look at examples:
`Basic example`_
`Example with DB auth`_
.. _`Basic example`: docs/example.rst
.. _`Example with db auth`: docs/example_db_auth.rst
and demos at **demo** directory.
Documentation
-------------

View File

@@ -1,21 +1,18 @@
import abc
import asyncio
# see http://plope.com/pyramid_auth_design_api_postmortem
class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
@asyncio.coroutine
@abc.abstractmethod
def identify(self, request):
async def identify(self, request):
"""Return the claimed identity of the user associated request or
``None`` if no identity can be found associated with the request."""
pass
@asyncio.coroutine
@abc.abstractmethod
def remember(self, request, response, identity, **kwargs):
async def remember(self, request, response, identity, **kwargs):
"""Remember identity.
Modify response object by filling it's headers with remembered user.
@@ -25,9 +22,8 @@ class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
"""
pass
@asyncio.coroutine
@abc.abstractmethod
def forget(self, request, response):
async def forget(self, request, response):
""" Modify response which can be used to 'forget' the
current identity on subsequent requests."""
pass
@@ -35,9 +31,8 @@ class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta):
@asyncio.coroutine
@abc.abstractmethod
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
"""Check user permissions.
Return True if the identity is allowed the permission in the
@@ -45,9 +40,8 @@ class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta):
"""
pass
@asyncio.coroutine
@abc.abstractmethod
def authorized_userid(self, identity):
async def authorized_userid(self, identity):
"""Retrieve authorized user id.
Return the user_id of the user identified by the identity

View File

@@ -1,4 +1,3 @@
import asyncio
import enum
from aiohttp import web
from aiohttp_security.abc import (AbstractIdentityPolicy,
@@ -9,13 +8,12 @@ IDENTITY_KEY = 'aiohttp_security_identity_policy'
AUTZ_KEY = 'aiohttp_security_autz_policy'
@asyncio.coroutine
def remember(request, response, identity, **kwargs):
async def remember(request, response, identity, **kwargs):
"""Remember identity into response.
The action is performed by identity_policy.remember()
Usually the idenity is stored in user cookies homehow but may be
Usually the identity is stored in user cookies somehow but may be
pushed into custom header also.
"""
assert isinstance(identity, str), identity
@@ -28,11 +26,10 @@ def remember(request, response, identity, **kwargs):
# output and rendered page we add same message to *reason* and
# *text* arguments.
raise web.HTTPInternalServerError(reason=text, text=text)
yield from identity_policy.remember(request, response, identity, **kwargs)
await identity_policy.remember(request, response, identity, **kwargs)
@asyncio.coroutine
def forget(request, response):
async def forget(request, response):
"""Forget previously remembered identity.
Usually it clears cookie or server-side storage to forget user
@@ -46,38 +43,35 @@ def forget(request, response):
# output and rendered page we add same message to *reason* and
# *text* arguments.
raise web.HTTPInternalServerError(reason=text, text=text)
yield from identity_policy.forget(request, response)
await identity_policy.forget(request, response)
@asyncio.coroutine
def authorized_userid(request):
async def authorized_userid(request):
identity_policy = request.app.get(IDENTITY_KEY)
autz_policy = request.app.get(AUTZ_KEY)
if identity_policy is None or autz_policy is None:
return None
identity = yield from identity_policy.identify(request)
identity = await identity_policy.identify(request)
if identity is None:
return None # non-registered user has None user_id
user_id = yield from autz_policy.authorized_userid(identity)
user_id = await autz_policy.authorized_userid(identity)
return user_id
@asyncio.coroutine
def permits(request, permission, context=None):
async def permits(request, permission, context=None):
assert isinstance(permission, (str, enum.Enum)), permission
assert permission
identity_policy = request.app.get(IDENTITY_KEY)
autz_policy = request.app.get(AUTZ_KEY)
if identity_policy is None or autz_policy is None:
return True
identity = yield from identity_policy.identify(request)
identity = await identity_policy.identify(request)
# non-registered user still may has some permissions
access = yield from autz_policy.permits(identity, permission, context)
access = await autz_policy.permits(identity, permission, context)
return access
@asyncio.coroutine
def is_anonymous(request):
async def is_anonymous(request):
"""Check if user is anonymous.
User is considered anonymous if there is not identity
@@ -86,7 +80,7 @@ def is_anonymous(request):
identity_policy = request.app.get(IDENTITY_KEY)
if identity_policy is None:
return True
identity = yield from identity_policy.identify(request)
identity = await identity_policy.identify(request)
if identity is None:
return True
return False
@@ -98,9 +92,8 @@ def login_required(fn):
User is considered authorized if authorized_userid
returns some value.
"""
@asyncio.coroutine
@wraps(fn)
def wrapped(*args, **kwargs):
async def wrapped(*args, **kwargs):
request = args[-1]
if not isinstance(request, web.BaseRequest):
msg = ("Incorrect decorator usage. "
@@ -108,11 +101,11 @@ def login_required(fn):
"or `def handler(self, request)`.")
raise RuntimeError(msg)
userid = yield from authorized_userid(request)
userid = await authorized_userid(request)
if userid is None:
raise web.HTTPUnauthorized
ret = yield from fn(*args, **kwargs)
ret = await fn(*args, **kwargs)
return ret
return wrapped
@@ -130,9 +123,8 @@ def has_permission(
raises HTTPForbidden.
"""
def wrapper(fn):
@asyncio.coroutine
@wraps(fn)
def wrapped(*args, **kwargs):
async def wrapped(*args, **kwargs):
request = args[-1]
if not isinstance(request, web.BaseRequest):
msg = ("Incorrect decorator usage. "
@@ -140,14 +132,14 @@ def has_permission(
"or `def handler(self, request)`.")
raise RuntimeError(msg)
userid = yield from authorized_userid(request)
userid = await authorized_userid(request)
if userid is None:
raise web.HTTPUnauthorized
allowed = yield from permits(request, permission, context)
allowed = await permits(request, permission, context)
if not allowed:
raise web.HTTPForbidden
ret = yield from fn(*args, **kwargs)
ret = await fn(*args, **kwargs)
return ret
return wrapped

View File

@@ -5,8 +5,6 @@ more handy.
"""
import asyncio
from .abc import AbstractIdentityPolicy
@@ -19,19 +17,16 @@ class CookiesIdentityPolicy(AbstractIdentityPolicy):
self._cookie_name = 'AIOHTTP_SECURITY'
self._max_age = 30 * 24 * 3600
@asyncio.coroutine
def identify(self, request):
async def identify(self, request):
identity = request.cookies.get(self._cookie_name)
return identity
@asyncio.coroutine
def remember(self, request, response, identity, max_age=sentinel,
**kwargs):
async def remember(self, request, response, identity, max_age=sentinel,
**kwargs):
if max_age is sentinel:
max_age = self._max_age
response.set_cookie(self._cookie_name, identity,
max_age=max_age, **kwargs)
@asyncio.coroutine
def forget(self, request, response):
async def forget(self, request, response):
response.del_cookie(self._cookie_name)

View File

@@ -4,8 +4,6 @@ aiohttp_session.setup() should be called on application initialization
to configure aiohttp_session properly.
"""
import asyncio
try:
from aiohttp_session import get_session
HAS_AIOHTTP_SESSION = True
@@ -24,17 +22,14 @@ class SessionIdentityPolicy(AbstractIdentityPolicy):
raise ImportError(
'SessionIdentityPolicy requires `aiohttp_session`')
@asyncio.coroutine
def identify(self, request):
session = yield from get_session(request)
async def identify(self, request):
session = await get_session(request)
return session.get(self._session_key)
@asyncio.coroutine
def remember(self, request, response, identity, **kwargs):
session = yield from get_session(request)
async def remember(self, request, response, identity, **kwargs):
session = await get_session(request)
session[self._session_key] = identity
@asyncio.coroutine
def forget(self, request, response):
session = yield from get_session(request)
async def forget(self, request, response):
session = await get_session(request)
session.pop(self._session_key, None)

View File

@@ -1,5 +1,3 @@
import asyncio
import sqlalchemy as sa
from aiohttp_security.abc import AbstractAuthorizationPolicy
@@ -12,29 +10,27 @@ class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
def __init__(self, dbengine):
self.dbengine = dbengine
@asyncio.coroutine
def authorized_userid(self, identity):
with (yield from self.dbengine) as conn:
async def authorized_userid(self, identity):
async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.count().where(where)
ret = yield from conn.scalar(query)
ret = await conn.scalar(query)
if ret:
return identity
else:
return None
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
if identity is None:
return False
with (yield from self.dbengine) as conn:
async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = yield from conn.execute(query)
user = yield from ret.fetchone()
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
user_id = user[0]
is_superuser = user[3]
@@ -43,8 +39,8 @@ class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
where = db.permissions.c.user_id == user_id
query = db.permissions.select().where(where)
ret = yield from conn.execute(query)
result = yield from ret.fetchall()
ret = await conn.execute(query)
result = await ret.fetchall()
if ret is not None:
for record in result:
if record.perm_name == permission:
@@ -53,14 +49,13 @@ class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
return False
@asyncio.coroutine
def check_credentials(db_engine, username, password):
with (yield from db_engine) as conn:
async def check_credentials(db_engine, username, password):
async with db_engine as conn:
where = sa.and_(db.users.c.login == username,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = yield from conn.execute(query)
user = yield from ret.fetchone()
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
hash = user[2]
return sha256_crypt.verify(password, hash)

View File

@@ -1,48 +1,34 @@
import asyncio
import functools
from textwrap import dedent
from aiohttp import web
from aiohttp_security import remember, forget, authorized_userid, permits
from aiohttp_security import (
remember, forget, authorized_userid,
has_permission, login_required,
)
from .db_auth import check_credentials
def require(permission):
def wrapper(f):
@asyncio.coroutine
@functools.wraps(f)
def wrapped(self, request):
has_perm = yield from permits(request, permission)
if not has_perm:
message = 'User has no permission {}'.format(permission)
raise web.HTTPForbidden(body=message.encode())
return (yield from f(self, request))
return wrapped
return wrapper
class Web(object):
index_template = """
<!doctype html>
<head>
</head>
<body>
<p>{message}</p>
<form action="/login" method="post">
Login:
<input type="text" name="login">
Password:
<input type="password" name="password">
<input type="submit" value="Login">
</form>
<a href="/logout">Logout</a>
</body>
"""
index_template = dedent("""
<!doctype html>
<head></head>
<body>
<p>{message}</p>
<form action="/login" method="post">
Login:
<input type="text" name="login">
Password:
<input type="password" name="password">
<input type="submit" value="Login">
</form>
<a href="/logout">Logout</a>
</body>
""")
@asyncio.coroutine
def index(self, request):
username = yield from authorized_userid(request)
async def index(self, request):
username = await authorized_userid(request)
if username:
template = self.index_template.format(
message='Hello, {username}!'.format(username=username))
@@ -51,37 +37,33 @@ class Web(object):
response = web.Response(body=template.encode())
return response
@asyncio.coroutine
def login(self, request):
async def login(self, request):
response = web.HTTPFound('/')
form = yield from request.post()
form = await request.post()
login = form.get('login')
password = form.get('password')
db_engine = request.app.db_engine
if (yield from check_credentials(db_engine, login, password)):
yield from remember(request, response, login)
if await check_credentials(db_engine, login, password):
await remember(request, response, login)
return response
return web.HTTPUnauthorized(
body=b'Invalid username/password combination')
@require('public')
@asyncio.coroutine
def logout(self, request):
@login_required
async def logout(self, request):
response = web.Response(body=b'You have been logged out')
yield from forget(request, response)
await forget(request, response)
return response
@require('public')
@asyncio.coroutine
def internal_page(self, request):
@has_permission('public')
async def internal_page(self, request):
response = web.Response(
body=b'This page is visible for all registered users')
return response
@require('protected')
@asyncio.coroutine
def protected_page(self, request):
@has_permission('protected')
async def protected_page(self, request):
response = web.Response(body=b'You are on protected page')
return response

View File

@@ -9,17 +9,16 @@ from aiopg.sa import create_engine
from aioredis import create_pool
from demo.db_auth import DBAuthorizationPolicy
from demo.handlers import Web
from demo.database_auth.db_auth import DBAuthorizationPolicy
from demo.database_auth.handlers import Web
@asyncio.coroutine
def init(loop):
redis_pool = yield from create_pool(('localhost', 6379))
db_engine = yield from create_engine(user='aiohttp_security',
password='aiohttp_security',
database='aiohttp_security',
host='127.0.0.1')
async def init(loop):
redis_pool = await create_pool(('localhost', 6379))
db_engine = await create_engine(user='aiohttp_security',
password='aiohttp_security',
database='aiohttp_security',
host='127.0.0.1')
app = web.Application(loop=loop)
app.db_engine = db_engine
setup_session(app, RedisStorage(redis_pool))
@@ -31,21 +30,20 @@ def init(loop):
web_handlers.configure(app)
handler = app.make_handler()
srv = yield from loop.create_server(handler, '127.0.0.1', 8080)
srv = await loop.create_server(handler, '127.0.0.1', 8080)
print('Server started at http://127.0.0.1:8080')
return srv, app, handler
@asyncio.coroutine
def finalize(srv, app, handler):
async def finalize(srv, app, handler):
sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno())
sock.close()
yield from handler.finish_connections(1.0)
await handler.finish_connections(1.0)
srv.close()
yield from srv.wait_closed()
yield from app.finish()
await srv.wait_closed()
await app.finish()
def main():

View File

@@ -1,44 +1,30 @@
import asyncio
import functools
from textwrap import dedent
from aiohttp import web
from aiohttp_security import remember, forget, authorized_userid, permits
from aiohttp_security import (
remember, forget, authorized_userid,
has_permission, login_required,
)
from .authz import check_credentials
def require(permission):
def wrapper(f):
@asyncio.coroutine
@functools.wraps(f)
def wrapped(request):
has_perm = yield from permits(request, permission)
if not has_perm:
message = 'User has no permission {}'.format(permission)
raise web.HTTPForbidden(body=message.encode())
return (yield from f(request))
return wrapped
return wrapper
index_template = dedent("""
<!doctype html>
<head>
</head>
<body>
<p>{message}</p>
<form action="/login" method="post">
Login:
<input type="text" name="username">
Password:
<input type="password" name="password">
<input type="submit" value="Login">
</form>
<a href="/logout">Logout</a>
</body>
""")
<head></head>
<body>
<p>{message}</p>
<form action="/login" method="post">
Login:
<input type="text" name="username">
Password:
<input type="password" name="password">
<input type="submit" value="Login">
</form>
<a href="/logout">Logout</a>
</body>
""")
async def index(request):
@@ -60,7 +46,8 @@ async def login(request):
username = form.get('username')
password = form.get('password')
verified = await check_credentials(request.app.user_map, username, password)
verified = await check_credentials(
request.app.user_map, username, password)
if verified:
await remember(request, response, username)
return response
@@ -68,7 +55,7 @@ async def login(request):
return web.HTTPUnauthorized(body='Invalid username / password combination')
@require('public')
@login_required
async def logout(request):
response = web.Response(
text='You have been logged out',
@@ -78,7 +65,7 @@ async def logout(request):
return response
@require('public')
@has_permission('public')
async def internal_page(request):
# pylint: disable=unused-argument
response = web.Response(
@@ -88,7 +75,7 @@ async def internal_page(request):
return response
@require('protected')
@has_permission('protected')
async def protected_page(request):
# pylint: disable=unused-argument
response = web.Response(

View File

@@ -6,9 +6,9 @@ from aiohttp_session.cookie_storage import EncryptedCookieStorage
from aiohttp_security import setup as setup_security
from aiohttp_security import SessionIdentityPolicy
from .authz import DictionaryAuthorizationPolicy
from .handlers import configure_handlers
from .users import user_map
from demo.dictionary_auth.authz import DictionaryAuthorizationPolicy
from demo.dictionary_auth.handlers import configure_handlers
from demo.dictionary_auth.users import user_map
def make_app():

View File

@@ -10,16 +10,14 @@ Simple example::
import asyncio
from aiohttp import web
@asyncio.coroutine
def root_handler(request):
async def root_handler(request):
text = "Alive and kicking!"
return web.Response(body=text.encode('utf-8'))
# option 2: auth at a higher level?
# set user_id and allowed in the wsgi handler
@protect('view_user')
@asyncio.coroutine
def user_handler(request):
async def user_handler(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(body=text.encode('utf-8'))
@@ -27,14 +25,12 @@ Simple example::
# option 3: super low
# wsgi doesn't do anything
@asyncio.coroutine
def user_update_handler(request):
async def user_update_handler(request):
# identity, asked_permission
user_id = yield from identity_policy.identify(request)
identity = yield from auth_policy.authorized_user_id(user_id)
allowed = yield from request.auth_policy.permits(
identity, asked_permission
)
user_id = await identity_policy.identify(request)
identity = await auth_policy.authorized_userid(user_id)
allowed = await request.auth_policy.permits(
identity, asked_permission)
if not allowed:
# how is this pluggable as well?
# ? return NotAllowedStream()
@@ -42,8 +38,7 @@ Simple example::
update_user()
@asyncio.coroutine
def init(loop):
async def init(loop):
# set up identity and auth
auth_policy = DictionaryAuthorizationPolicy({'me': ('view_user',),
'you': ('view_user',
@@ -60,8 +55,8 @@ Simple example::
app.router.add_route('GET', '/{user}/edit', user_update_handler)
# get it started
srv = yield from loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
srv = await loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv

View File

@@ -21,12 +21,14 @@ Launch these sql scripts to init database and fill it with sample data:
``psql template1 < demo/sql/init_db.sql``
and then
and
``psql template1 < demo/sql/sample_data.sql``
You will have two tables for storing users and their permissions
Now you have two tables:
- for storing users
+--------------+
| users |
@@ -42,7 +44,7 @@ You will have two tables for storing users and their permissions
| disabled |
+--------------+
and second table is permissions table:
- for storing their permissions
+-----------------+
| permissions |
@@ -63,48 +65,46 @@ First one should have these methods: *identify*, *remember* and *forget*.
For second one: *authorized_userid* and *permits*. We will use built-in
*SessionIdentityPolicy* and write our own database-based authorization policy.
In our example we will lookup database by user login and if present return
In our example we will lookup database by user login and if presents then return
this identity::
@asyncio.coroutine
def authorized_userid(self, identity):
with (yield from self.dbengine) as conn:
async def authorized_userid(self, identity):
async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.count().where(where)
ret = yield from conn.scalar(query)
ret = await conn.scalar(query)
if ret:
return identity
else:
return None
For permission check we will fetch the user first, check if he is superuser
For permission checking we will fetch the user first, check if he is superuser
(all permissions are allowed), otherwise check if permission is explicitly set
for that user::
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
if identity is None:
return False
with (yield from self.dbengine) as conn:
async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = yield from conn.execute(query)
user = yield from ret.fetchone()
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
user_id = user[0]
is_superuser = user[4]
is_superuser = user[3]
if is_superuser:
return True
where = db.permissions.c.user_id == user_id
query = db.permissions.select().where(where)
ret = yield from conn.execute(query)
result = yield from ret.fetchall()
ret = await conn.execute(query)
result = await ret.fetchall()
if ret is not None:
for record in result:
if record.perm_name == permission:
@@ -127,13 +127,12 @@ Once we have all the code in place we can install it for our application::
from .db_auth import DBAuthorizationPolicy
@asyncio.coroutine
def init(loop):
redis_pool = yield from create_pool(('localhost', 6379))
dbengine = yield from create_engine(user='aiohttp_security',
password='aiohttp_security',
database='aiohttp_security',
host='127.0.0.1')
async def init(loop):
redis_pool = await create_pool(('localhost', 6379))
dbengine = await create_engine(user='aiohttp_security',
password='aiohttp_security',
database='aiohttp_security',
host='127.0.0.1')
app = web.Application(loop=loop)
setup_session(app, RedisStorage(redis_pool))
setup_security(app,
@@ -143,39 +142,33 @@ Once we have all the code in place we can install it for our application::
Now we have authorization and can decorate every other view with access rights
based on permissions. This simple decorator (for class-based handlers) will
help to do that::
based on permissions. There are already implemented two decorators::
def require(permission):
def wrapper(f):
@asyncio.coroutine
@functools.wraps(f)
def wrapped(self, request):
has_perm = yield from permits(request, permission)
if not has_perm:
message = 'User has no permission {}'.format(permission)
raise web.HTTPForbidden(body=message.encode())
return (yield from f(self, request))
return wrapped
return wrapper
from aiohttp_security import has_permission, login_required
For each view you need to protect just apply the decorator on it::
For each view you need to protect - just apply the decorator on it::
class Web:
@require('protected')
@asyncio.coroutine
def protected_page(self, request):
@has_permission('protected')
async def protected_page(self, request):
response = web.Response(body=b'You are on protected page')
return response
or::
If someone will try to access this protected page he will see::
class Web:
@login_required
async def logout(self, request):
response = web.Response(body=b'You have been logged out')
await forget(request, response)
return response
403, User has no permission "protected"
If someone try to access that protected page he will see::
403: Forbidden
The best part about it is that you can implement any logic you want until it
The best part of it - you can implement any logic you want until it
follows the API conventions.
Launch application
@@ -183,18 +176,17 @@ Launch application
For working with passwords there is a good library passlib_. Once you've
created some users you want to check their credentials on login. Similar
function may do what you trying to accomplish::
function may do what you are trying to accomplish::
from passlib.hash import sha256_crypt
@asyncio.coroutine
def check_credentials(db_engine, username, password):
with (yield from db_engine) as conn:
async def check_credentials(db_engine, username, password):
async with db_engine as conn:
where = sa.and_(db.users.c.login == username,
sa.not_(db.users.c.disabled))
query = db.users.select().where(where)
ret = yield from conn.execute(query)
user = yield from ret.fetchone()
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
hash = user[2]
return sha256_crypt.verify(password, hash)
@@ -203,8 +195,8 @@ function may do what you trying to accomplish::
Final step is to launch your application::
python demo/main.py
python demo/database_auth/main.py
Try to login with admin/moderator/user accounts (with *password* password)
Try to login with admin/moderator/user accounts (with **password** password)
and access **/public** or **/protected** endpoints.

View File

@@ -3,16 +3,9 @@ aiohttp_security
The library provides security for :ref:`aiohttp.web<aiohttp-web>`.
Usage
-----
License
-------
``aiohttp_security`` is offered under the Apache 2 license.
Contents:
Contents
--------
.. toctree::
:maxdepth: 2
@@ -23,7 +16,10 @@ Contents:
example_db_auth
glossary
License
-------
``aiohttp_security`` is offered under the Apache 2 license.
Indices and tables
==================

View File

@@ -21,7 +21,7 @@ Public API functions
The action is performed by registered
:meth:`AbstractIdentityPolicy.remember`.
Usually the *idenity* is stored in user cookies homehow for using by
Usually the *identity* is stored in user cookies somehow for using by
:func:`authorized_userid` and :func:`permits`.
:param request: :class:`aiohttp.web.Request` object.

View File

@@ -13,6 +13,10 @@ First of all, what is *aiohttp_security* about?
It is a set of public API functions and standard for implementation details.
Public API
==========
API is implementation agnostic, all client code should not call policy
code (see below) directly but use API only.
@@ -27,9 +31,6 @@ base classes for both concepts as well as several implementations
shipped with the library. End user is free to build own implemetations
if needed.
Public API
==========
Authentication
==============
@@ -43,11 +44,6 @@ knowledge is there the user still registered in DB.
If :class:`aiohttp.web.Request` has an :term:`identity` it means the user has
some ID that should be checked by :term:`authorization` policy.
identity is a string shared between browser and server.
:term:`identity` is a string shared between browser and server.
Thus it's not supposed to be database primary key, user login/email etc.
Random string like uuid or hash is better choice.

View File

@@ -1,14 +1,15 @@
-e .
flake8==3.5.0
pytest==3.2.3
pytest==3.4.2
pytest-cov==2.5.1
coverage==4.4.2
sphinx==1.6.5
coverage==4.5.1
sphinx==1.7.1
pep257==0.7.0
aiohttp-session==1.2.0
aiopg[sa]==0.13.1
aioredis==0.3.5
aiohttp-session==2.3.0
aiopg[sa]==0.13.2
aioredis==1.1.0
hiredis==0.2.0
passlib==1.7.1
aiohttp==2.3.2
pytest-aiohttp==0.1.3
cryptography==2.2
aiohttp==3.0.9
pytest-aiohttp==0.3.0

View File

@@ -42,7 +42,6 @@ setup(name='aiohttp-security',
'Intended Audience :: Developers',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Internet :: WWW/HTTP',

View File

@@ -1,5 +1,3 @@
import asyncio
from aiohttp import web
from aiohttp_security import (remember, forget,
AbstractAuthorizationPolicy)
@@ -10,47 +8,39 @@ from aiohttp_security.api import IDENTITY_KEY
class Autz(AbstractAuthorizationPolicy):
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
pass
@asyncio.coroutine
def authorized_userid(self, identity):
async def authorized_userid(self, identity):
pass
@asyncio.coroutine
def test_remember(loop, test_client):
async def test_remember(loop, test_client):
@asyncio.coroutine
def handler(request):
async def handler(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', handler)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert 200 == resp.status
assert 'Andrew' == resp.cookies['AIOHTTP_SECURITY'].value
yield from resp.release()
@asyncio.coroutine
def test_identify(loop, test_client):
async def test_identify(loop, test_client):
@asyncio.coroutine
def create(request):
async def create(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def check(request):
async def check(request):
policy = request.app[IDENTITY_KEY]
user_id = yield from policy.identify(request)
user_id = await policy.identify(request)
assert 'Andrew' == user_id
return web.Response()
@@ -58,32 +48,27 @@ def test_identify(loop, test_client):
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create)
client = yield from test_client(app)
resp = yield from client.post('/')
client = await test_client(app)
resp = await client.post('/')
assert 200 == resp.status
yield from resp.release()
resp = yield from client.get('/')
await resp.release()
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_forget(loop, test_client):
async def test_forget(loop, test_client):
@asyncio.coroutine
def index(request):
async def index(request):
return web.Response()
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def logout(request):
async def logout(request):
response = web.HTTPFound(location='/')
yield from forget(request, response)
await forget(request, response)
return response
app = web.Application(loop=loop)
@@ -91,18 +76,17 @@ def test_forget(loop, test_client):
app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = yield from test_client(app)
resp = yield from client.post('/login')
client = await test_client(app)
resp = await client.post('/login')
assert 200 == resp.status
assert str(resp.url).endswith('/')
cookies = client.session.cookie_jar.filter_cookies(
client.make_url('/'))
assert 'Andrew' == cookies['AIOHTTP_SECURITY'].value
yield from resp.release()
resp = yield from client.post('/logout')
resp = await client.post('/logout')
assert 200 == resp.status
assert str(resp.url).endswith('/')
cookies = client.session.cookie_jar.filter_cookies(
client.make_url('/'))
assert 'AIOHTTP_SECURITY' not in cookies
yield from resp.release()

View File

@@ -1,4 +1,3 @@
import asyncio
import enum
from aiohttp import web
@@ -11,33 +10,28 @@ from aiohttp_security.cookies_identity import CookiesIdentityPolicy
class Autz(AbstractAuthorizationPolicy):
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
if identity == 'UserID':
return permission in {'read', 'write'}
else:
return False
@asyncio.coroutine
def authorized_userid(self, identity):
async def authorized_userid(self, identity):
if identity == 'UserID':
return 'Andrew'
else:
return None
@asyncio.coroutine
def test_authorized_userid(loop, test_client):
async def test_authorized_userid(loop, test_client):
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'UserID')
await remember(request, response, 'UserID')
return response
@asyncio.coroutine
def check(request):
userid = yield from authorized_userid(request)
async def check(request):
userid = await authorized_userid(request)
assert 'Andrew' == userid
return web.Response(text=userid)
@@ -45,36 +39,31 @@ def test_authorized_userid(loop, test_client):
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/login', login)
client = yield from test_client(app)
client = await test_client(app)
resp = yield from client.post('/login')
resp = await client.post('/login')
assert 200 == resp.status
txt = yield from resp.text()
txt = await resp.text()
assert 'Andrew' == txt
yield from resp.release()
@asyncio.coroutine
def test_authorized_userid_not_authorized(loop, test_client):
async def test_authorized_userid_not_authorized(loop, test_client):
@asyncio.coroutine
def check(request):
userid = yield from authorized_userid(request)
async def check(request):
userid = await authorized_userid(request)
assert userid is None
return web.Response()
app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check)
client = yield from test_client(app)
client = await test_client(app)
resp = yield from client.get('/')
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_permits_enum_permission(loop, test_client):
async def test_permits_enum_permission(loop, test_client):
class Permission(enum.Enum):
READ = '101'
WRITE = '102'
@@ -82,33 +71,29 @@ def test_permits_enum_permission(loop, test_client):
class Autz(AbstractAuthorizationPolicy):
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
if identity == 'UserID':
return permission in {Permission.READ, Permission.WRITE}
else:
return False
@asyncio.coroutine
def authorized_userid(self, identity):
async def authorized_userid(self, identity):
if identity == 'UserID':
return 'Andrew'
else:
return None
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'UserID')
await remember(request, response, 'UserID')
return response
@asyncio.coroutine
def check(request):
ret = yield from permits(request, Permission.READ)
async def check(request):
ret = await permits(request, Permission.READ)
assert ret
ret = yield from permits(request, Permission.WRITE)
ret = await permits(request, Permission.WRITE)
assert ret
ret = yield from permits(request, Permission.UNKNOWN)
ret = await permits(request, Permission.UNKNOWN)
assert not ret
return web.Response()
@@ -116,54 +101,46 @@ def test_permits_enum_permission(loop, test_client):
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/login', login)
client = yield from test_client(app)
resp = yield from client.post('/login')
client = await test_client(app)
resp = await client.post('/login')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_permits_unauthorized(loop, test_client):
async def test_permits_unauthorized(loop, test_client):
@asyncio.coroutine
def check(request):
ret = yield from permits(request, 'read')
async def check(request):
ret = await permits(request, 'read')
assert not ret
ret = yield from permits(request, 'write')
ret = await permits(request, 'write')
assert not ret
ret = yield from permits(request, 'unknown')
ret = await permits(request, 'unknown')
assert not ret
return web.Response()
app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_is_anonymous(loop, test_client):
async def test_is_anonymous(loop, test_client):
@asyncio.coroutine
def index(request):
is_anon = yield from is_anonymous(request)
async def index(request):
is_anon = await is_anonymous(request)
if is_anon:
return web.HTTPUnauthorized()
return web.HTTPOk()
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'UserID')
await remember(request, response, 'UserID')
return response
@asyncio.coroutine
def logout(request):
async def logout(request):
response = web.HTTPFound(location='/')
yield from forget(request, response)
await forget(request, response)
return response
app = web.Application(loop=loop)
@@ -171,36 +148,32 @@ def test_is_anonymous(loop, test_client):
app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
yield from client.post('/login')
resp = yield from client.get('/')
await client.post('/login')
resp = await client.get('/')
assert web.HTTPOk.status_code == resp.status
yield from client.post('/logout')
resp = yield from client.get('/')
await client.post('/logout')
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
@asyncio.coroutine
def test_login_required(loop, test_client):
async def test_login_required(loop, test_client):
@login_required
@asyncio.coroutine
def index(request):
async def index(request):
return web.HTTPOk()
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'UserID')
await remember(request, response, 'UserID')
return response
@asyncio.coroutine
def logout(request):
async def logout(request):
response = web.HTTPFound(location='/')
yield from forget(request, response)
await forget(request, response)
return response
app = web.Application(loop=loop)
@@ -208,47 +181,41 @@ def test_login_required(loop, test_client):
app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
yield from client.post('/login')
resp = yield from client.get('/')
await client.post('/login')
resp = await client.get('/')
assert web.HTTPOk.status_code == resp.status
yield from client.post('/logout')
resp = yield from client.get('/')
await client.post('/logout')
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
@asyncio.coroutine
def test_has_permission(loop, test_client):
async def test_has_permission(loop, test_client):
@has_permission('read')
@asyncio.coroutine
def index_read(request):
async def index_read(request):
return web.HTTPOk()
@has_permission('write')
@asyncio.coroutine
def index_write(request):
async def index_write(request):
return web.HTTPOk()
@has_permission('forbid')
@asyncio.coroutine
def index_forbid(request):
async def index_forbid(request):
return web.HTTPOk()
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'UserID')
await remember(request, response, 'UserID')
return response
@asyncio.coroutine
def logout(request):
async def logout(request):
response = web.HTTPFound(location='/')
yield from forget(request, response)
await forget(request, response)
return response
app = web.Application(loop=loop)
@@ -258,27 +225,27 @@ def test_has_permission(loop, test_client):
app.router.add_route('GET', '/permission/forbid', index_forbid)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = yield from test_client(app)
client = await test_client(app)
resp = yield from client.get('/permission/read')
resp = await client.get('/permission/read')
assert web.HTTPUnauthorized.status_code == resp.status
resp = yield from client.get('/permission/write')
resp = await client.get('/permission/write')
assert web.HTTPUnauthorized.status_code == resp.status
resp = yield from client.get('/permission/forbid')
resp = await client.get('/permission/forbid')
assert web.HTTPUnauthorized.status_code == resp.status
yield from client.post('/login')
resp = yield from client.get('/permission/read')
await client.post('/login')
resp = await client.get('/permission/read')
assert web.HTTPOk.status_code == resp.status
resp = yield from client.get('/permission/write')
resp = await client.get('/permission/write')
assert web.HTTPOk.status_code == resp.status
resp = yield from client.get('/permission/forbid')
resp = await client.get('/permission/forbid')
assert web.HTTPForbidden.status_code == resp.status
yield from client.post('/logout')
resp = yield from client.get('/permission/read')
await client.post('/logout')
resp = await client.get('/permission/read')
assert web.HTTPUnauthorized.status_code == resp.status
resp = yield from client.get('/permission/write')
resp = await client.get('/permission/write')
assert web.HTTPUnauthorized.status_code == resp.status
resp = yield from client.get('/permission/forbid')
resp = await client.get('/permission/forbid')
assert web.HTTPUnauthorized.status_code == resp.status

View File

@@ -1,42 +1,34 @@
import asyncio
from aiohttp import web
from aiohttp_security import authorized_userid, permits
@asyncio.coroutine
def test_authorized_userid(loop, test_client):
async def test_authorized_userid(loop, test_client):
@asyncio.coroutine
def check(request):
userid = yield from authorized_userid(request)
async def check(request):
userid = await authorized_userid(request)
assert userid is None
return web.Response()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', check)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_permits(loop, test_client):
async def test_permits(loop, test_client):
@asyncio.coroutine
def check(request):
ret = yield from permits(request, 'read')
async def check(request):
ret = await permits(request, 'read')
assert ret
ret = yield from permits(request, 'write')
ret = await permits(request, 'write')
assert ret
ret = yield from permits(request, 'unknown')
ret = await permits(request, 'unknown')
assert ret
return web.Response()
app = web.Application(loop=loop)
app.router.add_route('GET', '/', check)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()

View File

@@ -1,42 +1,34 @@
import asyncio
from aiohttp import web
from aiohttp_security import remember, forget
@asyncio.coroutine
def test_remember(loop, test_client):
async def test_remember(loop, test_client):
@asyncio.coroutine
def do_remember(request):
async def do_remember(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
app = web.Application(loop=loop)
app.router.add_route('POST', '/', do_remember)
client = yield from test_client(app)
resp = yield from client.post('/')
client = await test_client(app)
resp = await client.post('/')
assert 500 == resp.status
assert (('Security subsystem is not initialized, '
'call aiohttp_security.setup(...) first') ==
resp.reason)
yield from resp.release()
@asyncio.coroutine
def test_forget(loop, test_client):
async def test_forget(loop, test_client):
@asyncio.coroutine
def do_forget(request):
async def do_forget(request):
response = web.Response()
yield from forget(request, response)
await forget(request, response)
app = web.Application(loop=loop)
app.router.add_route('POST', '/', do_forget)
client = yield from test_client(app)
resp = yield from client.post('/')
client = await test_client(app)
resp = await client.post('/')
assert 500 == resp.status
assert (('Security subsystem is not initialized, '
'call aiohttp_security.setup(...) first') ==
resp.reason)
yield from resp.release()

View File

@@ -1,4 +1,3 @@
import asyncio
import pytest
from aiohttp import web
@@ -13,12 +12,10 @@ from aiohttp_session import setup as setup_session
class Autz(AbstractAuthorizationPolicy):
@asyncio.coroutine
def permits(self, identity, permission, context=None):
async def permits(self, identity, permission, context=None):
pass
@asyncio.coroutine
def authorized_userid(self, identity):
async def authorized_userid(self, identity):
pass
@@ -30,81 +27,67 @@ def make_app(loop):
return app
@asyncio.coroutine
def test_remember(make_app, test_client):
async def test_remember(make_app, test_client):
@asyncio.coroutine
def handler(request):
async def handler(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def check(request):
session = yield from get_session(request)
async def check(request):
session = await get_session(request)
assert session['AIOHTTP_SECURITY'] == 'Andrew'
return web.HTTPOk()
app = make_app()
app.router.add_route('GET', '/', handler)
app.router.add_route('GET', '/check', check)
client = yield from test_client(app)
resp = yield from client.get('/')
client = await test_client(app)
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
resp = yield from client.get('/check')
resp = await client.get('/check')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_identify(make_app, test_client):
async def test_identify(make_app, test_client):
@asyncio.coroutine
def create(request):
async def create(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def check(request):
async def check(request):
policy = request.app[IDENTITY_KEY]
user_id = yield from policy.identify(request)
user_id = await policy.identify(request)
assert 'Andrew' == user_id
return web.Response()
app = make_app()
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create)
client = yield from test_client(app)
resp = yield from client.post('/')
client = await test_client(app)
resp = await client.post('/')
assert 200 == resp.status
yield from resp.release()
resp = yield from client.get('/')
resp = await client.get('/')
assert 200 == resp.status
yield from resp.release()
@asyncio.coroutine
def test_forget(make_app, test_client):
async def test_forget(make_app, test_client):
@asyncio.coroutine
def index(request):
session = yield from get_session(request)
async def index(request):
session = await get_session(request)
return web.HTTPOk(text=session.get('AIOHTTP_SECURITY', ''))
@asyncio.coroutine
def login(request):
async def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'Andrew')
await remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def logout(request):
async def logout(request):
response = web.HTTPFound('/')
yield from forget(request, response)
await forget(request, response)
return response
app = make_app()
@@ -112,18 +95,16 @@ def test_forget(make_app, test_client):
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = yield from test_client(app)
client = await test_client(app)
resp = yield from client.post('/login')
resp = await client.post('/login')
assert 200 == resp.status
assert str(resp.url).endswith('/')
txt = yield from resp.text()
txt = await resp.text()
assert 'Andrew' == txt
yield from resp.release()
resp = yield from client.post('/logout')
resp = await client.post('/logout')
assert 200 == resp.status
assert str(resp.url).endswith('/')
txt = yield from resp.text()
txt = await resp.text()
assert '' == txt
yield from resp.release()