Add session identity

This commit is contained in:
Andrew Svetlov 2015-11-19 15:25:10 +02:00
parent ab2ef11bf1
commit 52d5513838
5 changed files with 169 additions and 2 deletions

View File

@ -1,3 +1,10 @@
"""Identity polocy for storing info directly into HTTP cookie.
Use mostly for demonstration purposes, SessionIdentityPolicy is much
more handy.
"""
import asyncio import asyncio
from .abc import AbstractIdentityPolicy from .abc import AbstractIdentityPolicy

View File

@ -0,0 +1,35 @@
"""Identity policy for storing info into aiohttp_session session.
aiohttp_session.setup() should be called on application initialization
to conffigure aiohttp_session properly.
"""
import asyncio
from aiohttp_session import get_session
from .abc import AbstractIdentityPolicy
sentinel = object()
class SessionIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, session_key='AIOHTTP_SECURITY'):
self._session_key = session_key
@asyncio.coroutine
def identify(self, request):
session = yield from get_session(request)
return session.get(self._session_key)
@asyncio.coroutine
def remember(self, request, response, identity, **kwargs):
session = yield from get_session(request)
session[self._session_key] = identity
@asyncio.coroutine
def forget(self, request, response):
session = yield from get_session(request)
session.pop(self._session_key, None)

View File

@ -6,4 +6,4 @@ coverage
sphinx sphinx
alabaster>=0.6.2 alabaster>=0.6.2
pep257 pep257
aiohttp_session

View File

@ -29,7 +29,7 @@ def read(f):
install_requires = ['aiohttp>=0.18'] install_requires = ['aiohttp>=0.18']
tests_require = install_requires + ['pytest'] tests_require = install_requires + ['pytest']
extras_require = {} extras_require = {'session': 'aiohttp_session'}
setup(name='aiohttp_security', setup(name='aiohttp_security',
version=version, version=version,

View File

@ -0,0 +1,125 @@
import asyncio
import pytest
from aiohttp import web
from aiohttp_security import (remember, forget,
AbstractAuthorizationPolicy)
from aiohttp_security import setup as setup_security
from aiohttp_security.session_identity import SessionIdentityPolicy
from aiohttp_security.api import IDENTITY_KEY
from aiohttp_session import (SimpleCookieStorage, session_middleware,
get_session)
class Autz(AbstractAuthorizationPolicy):
@asyncio.coroutine
def permits(self, identity, permission, context=None):
pass
@asyncio.coroutine
def authorized_userid(self, identity):
pass
@pytest.fixture
def create_app_and_client2(create_app_and_client):
@asyncio.coroutine
def maker(*args, **kwargs):
app, client = yield from create_app_and_client(*args, **kwargs)
app.middlewares.append(session_middleware(SimpleCookieStorage()))
setup_security(app, SessionIdentityPolicy(), Autz())
return app, client
return maker
@pytest.mark.run_loop
def test_remember(create_app_and_client2):
@asyncio.coroutine
def handler(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def check(request):
session = yield from get_session(request)
assert session['AIOHTTP_SECURITY'] == 'Andrew'
return web.HTTPOk()
app, client = yield from create_app_and_client2()
app.router.add_route('GET', '/', handler)
app.router.add_route('GET', '/check', check)
resp = yield from client.get('/')
assert 200 == resp.status
yield from resp.release()
resp = yield from client.get('/check')
assert 200 == resp.status
yield from resp.release()
@pytest.mark.run_loop
def test_identify(create_app_and_client2):
@asyncio.coroutine
def create(request):
response = web.Response()
yield from remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def check(request):
policy = request.app[IDENTITY_KEY]
user_id = yield from policy.identify(request)
assert 'Andrew' == user_id
return web.Response()
app, client = yield from create_app_and_client2()
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create)
resp = yield from client.post('/')
assert 200 == resp.status
yield from resp.release()
resp = yield from client.get('/')
assert 200 == resp.status
yield from resp.release()
@pytest.mark.run_loop
def test_forget(create_app_and_client2):
@asyncio.coroutine
def index(request):
session = yield from get_session(request)
return web.HTTPOk(text=session.get('AIOHTTP_SECURITY', ''))
@asyncio.coroutine
def login(request):
response = web.HTTPFound(location='/')
yield from remember(request, response, 'Andrew')
return response
@asyncio.coroutine
def logout(request):
response = web.HTTPFound(location='/')
yield from forget(request, response)
return response
app, client = yield from create_app_and_client2()
app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
resp = yield from client.post('/login')
assert 200 == resp.status
assert resp.url.endswith('/')
txt = yield from resp.text()
assert 'Andrew' == txt
yield from resp.release()
resp = yield from client.post('/logout')
assert 200 == resp.status
assert resp.url.endswith('/')
txt = yield from resp.text()
assert '' == txt
yield from resp.release()