2 Commits

Author SHA1 Message Date
pyup-bot
140804e4b5 Update aiohttp from 3.0.7 to 3.0.9 2018-03-19 20:14:07 +07:00
pyup-bot
6d9f5f03ed Update cryptography from 2.1.4 to 2.2 2018-03-19 20:14:06 +07:00
40 changed files with 354 additions and 985 deletions

View File

@@ -1,38 +0,0 @@
name: Test
on: pull_request
jobs:
mypy:
name: Check annotations with Mypy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install aiohttp mypy
- run: mypy
test:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -r requirements-dev.txt
pip install codecov
- name: Run tests
run: |
make coverage
- name: Upload coverage to Codecov
run: |
codecov

1
.gitignore vendored
View File

@@ -56,4 +56,3 @@ docs/_build/
target/ target/
coverage coverage
.pytest_cache

View File

@@ -1,37 +0,0 @@
[mypy]
files = aiohttp_security, demo, tests
check_untyped_defs = True
follow_imports_for_stubs = True
disallow_any_decorated = True
disallow_any_generics = True
disallow_incomplete_defs = True
disallow_subclassing_any = True
disallow_untyped_calls = True
disallow_untyped_decorators = True
disallow_untyped_defs = True
implicit_reexport = False
no_implicit_optional = True
show_error_codes = True
strict_equality = True
warn_incomplete_stub = True
warn_redundant_casts = True
warn_unreachable = True
warn_unused_ignores = True
disallow_any_unimported = True
warn_return_any = True
[mypy-aiohttp_security.abc.*]
disallow_any_decorated = False
[mypy-tests.*]
disallow_any_decorated = False
disallow_untyped_defs = False
[mypy-aiopg.*]
ignore_missing_imports = True
[mypy-aioredis.*]
ignore_missing_imports = True
[mypy-passlib.*]
ignore_missing_imports = True

38
.travis.yml Normal file
View File

@@ -0,0 +1,38 @@
language: python
python:
- 3.5
- 3.6
- 3.7-dev
- nightly
matrix:
allow_failures:
- python: 3.7-dev
- python: nightly
install:
- pip install --upgrade pip
- pip install -r requirements-dev.txt
- pip install codecov
script:
- make coverage
after_success:
- codecov
env:
matrix:
- PYTHONASYNCIODEBUG=x
- PYTHONASYNCIODEBUG=
deploy:
provider: pypi
user: andrew.svetlov
password:
secure: "JdBvuOBA/198ognVDOY/qZpIKGXfCx47725kyJo/SpQ3nP+x0GLZb3PMQkR0jfSWWkx6Sisk3vOCYsoWclPyPzp+o4ZpfM8yAjHNFmtbr+k+XJdUEApEiWb6/Y3g7DCyY2Qa/L8IYlyABPWrrJI/nld2sKm5kmhFpR/z3HfeFtINP6Ivp34dUOkeRP6kOvCi9d6GyWnvTRnhlybAnk/Ngrroh8XrbKHdDv0zkQkshF8+pmxVzwao4C6S5ld5cFXIYZHLBA9lNC3zgvOMuFeGPUEN9vab3q77MvaiMIuTC9QjcgIhfw3gabH2u7knqfFzqqzXMaVptx5z8o1JtsxMyYt5NVBqS4NPIljpZjaoS/CASHJlRxniJiYfjvjOtFEcfGMNtZj8ZYsGR0nuP2jwzgpEHHWIs4qL0Y8h9t7pGirxCuQcnY10sr+Y+JKaZNJsugNLgbqE2aaZUye5gjDcEj9WY8kKNZXucLP7c0McJuwPqplDEO4CQouMttcKSYkA0QoETmpAFqaXCaMs3p/glOoU2ZyHSH9mXWir69yo84ymb2NlGPMTAstXlv/g/oLmLMSq7lbl6cSUnO1/wxBGlyfv5AAq/75YUaqsgYofzN5CjUgA3m6NedvbWxLUJaxVQ7nduYGEQKDvGEBmzCNv6CdVRCjQ9J1xX3XzkVheQGc="
distributions: "sdist bdist_wheel"
on:
tags: true
all_branches: true
python: 3.6

View File

@@ -1,31 +1,10 @@
Changes Changes
======= =======
0.4.0 (2018-09-27)
------------------
- Bump minimal supported ``aiohttp`` version to 3.2
- Use ``request.config_dict`` for accessing ``jinja2`` environment. It
allows to reuse jinja rendering engine from parent application.
0.3.0 (2018-09-06)
------------------
- Deprecate ``login_required`` and ``has_permission`` decorators.
Use ``check_authorized`` and ``check_permission`` helper functions instead.
- Bump supported ``aiohttp`` version to 3.0+
- Enable strong warnings mode for test suite, clean-up all deprecation
warnings.
- Polish documentation
0.2.0 (2017-11-17) 0.2.0 (2017-11-17)
------------------ ------------------
- Add ``is_anonymous``, ``login_required``, ``has_permission`` helpers (#114) - Add `is_anonymous`, `login_required`, `has_permission` helpers (#114)
0.1.2 (2017-10-17) 0.1.2 (2017-10-17)
------------------ ------------------

View File

@@ -1,7 +1,7 @@
aiohttp_security aiohttp_security
================ ================
.. image:: https://travis-ci.com/aio-libs/aiohttp-security.svg?branch=master .. image:: https://travis-ci.org/aio-libs/aiohttp-security.svg?branch=master
:target: https://travis-ci.com/aio-libs/aiohttp-security :target: https://travis-ci.org/aio-libs/aiohttp-security
.. image:: https://codecov.io/github/aio-libs/aiohttp-security/coverage.svg?branch=master .. image:: https://codecov.io/github/aio-libs/aiohttp-security/coverage.svg?branch=master
:target: https://codecov.io/github/aio-libs/aiohttp-security :target: https://codecov.io/github/aio-libs/aiohttp-security
.. image:: https://readthedocs.org/projects/aiohttp-security/badge/?version=latest .. image:: https://readthedocs.org/projects/aiohttp-security/badge/?version=latest
@@ -46,7 +46,7 @@ https://aiohttp-security.readthedocs.io/
Develop Develop
------- -------
``pip install -r requirements-dev.txt`` ``pip install -r requirements-dev``
License License

View File

@@ -1,18 +1,14 @@
from .abc import AbstractAuthorizationPolicy, AbstractIdentityPolicy from .abc import AbstractAuthorizationPolicy, AbstractIdentityPolicy
from .api import (authorized_userid, forget, has_permission, from .api import (authorized_userid, forget, has_permission, is_anonymous,
is_anonymous, login_required, permits, remember, login_required, permits, remember, setup)
setup, check_authorized, check_permission)
from .cookies_identity import CookiesIdentityPolicy from .cookies_identity import CookiesIdentityPolicy
from .session_identity import SessionIdentityPolicy from .session_identity import SessionIdentityPolicy
from .jwt_identity import JWTIdentityPolicy
__version__ = '0.4.0' __version__ = '0.2.0'
__all__ = ('AbstractIdentityPolicy', 'AbstractAuthorizationPolicy', __all__ = ('AbstractIdentityPolicy', 'AbstractAuthorizationPolicy',
'CookiesIdentityPolicy', 'SessionIdentityPolicy', 'CookiesIdentityPolicy', 'SessionIdentityPolicy',
'JWTIdentityPolicy',
'remember', 'forget', 'authorized_userid', 'remember', 'forget', 'authorized_userid',
'permits', 'setup', 'is_anonymous', 'permits', 'setup', 'is_anonymous',
'login_required', 'has_permission', 'login_required', 'has_permission')
'check_authorized', 'check_permission')

View File

@@ -1,8 +1,4 @@
import abc import abc
from enum import Enum
from typing import Any, Optional, Union
from aiohttp import web
# see http://plope.com/pyramid_auth_design_api_postmortem # see http://plope.com/pyramid_auth_design_api_postmortem
@@ -10,14 +6,13 @@ from aiohttp import web
class AbstractIdentityPolicy(metaclass=abc.ABCMeta): class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
async def identify(self, request: web.Request) -> Optional[str]: async def identify(self, request):
"""Return the claimed identity of the user associated request or """Return the claimed identity of the user associated request or
``None`` if no identity can be found associated with the request.""" ``None`` if no identity can be found associated with the request."""
pass pass
@abc.abstractmethod @abc.abstractmethod
async def remember(self, request: web.Request, response: web.StreamResponse, async def remember(self, request, response, identity, **kwargs):
identity: str, **kwargs: Any) -> None:
"""Remember identity. """Remember identity.
Modify response object by filling it's headers with remembered user. Modify response object by filling it's headers with remembered user.
@@ -28,7 +23,7 @@ class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
pass pass
@abc.abstractmethod @abc.abstractmethod
async def forget(self, request: web.Request, response: web.StreamResponse) -> None: async def forget(self, request, response):
""" Modify response which can be used to 'forget' the """ Modify response which can be used to 'forget' the
current identity on subsequent requests.""" current identity on subsequent requests."""
pass pass
@@ -37,8 +32,7 @@ class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta): class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
async def permits(self, identity: str, permission: Union[str, Enum], async def permits(self, identity, permission, context=None):
context: Any = None) -> bool:
"""Check user permissions. """Check user permissions.
Return True if the identity is allowed the permission in the Return True if the identity is allowed the permission in the
@@ -47,7 +41,7 @@ class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta):
pass pass
@abc.abstractmethod @abc.abstractmethod
async def authorized_userid(self, identity: str) -> Optional[str]: async def authorized_userid(self, identity):
"""Retrieve authorized user id. """Retrieve authorized user id.
Return the user_id of the user identified by the identity Return the user_id of the user identified by the identity

View File

@@ -1,23 +1,14 @@
import enum import enum
import warnings
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, Union
from aiohttp import web from aiohttp import web
from aiohttp_security.abc import AbstractAuthorizationPolicy, AbstractIdentityPolicy from aiohttp_security.abc import (AbstractIdentityPolicy,
AbstractAuthorizationPolicy)
from functools import wraps
IDENTITY_KEY = 'aiohttp_security_identity_policy' IDENTITY_KEY = 'aiohttp_security_identity_policy'
AUTZ_KEY = 'aiohttp_security_autz_policy' AUTZ_KEY = 'aiohttp_security_autz_policy'
# _AIP/_AAP are shorthand for Optional[policy] when we retrieve from request.
_AAP = Optional[AbstractAuthorizationPolicy]
_AIP = Optional[AbstractIdentityPolicy]
_Handler = TypeVar('_Handler', bound=Union[Callable[[web.Request], Any],
Callable[[object, web.Request], Any]])
async def remember(request, response, identity, **kwargs):
async def remember(request: web.Request, response: web.StreamResponse,
identity: str, **kwargs: Any) -> None:
"""Remember identity into response. """Remember identity into response.
The action is performed by identity_policy.remember() The action is performed by identity_policy.remember()
@@ -27,7 +18,7 @@ async def remember(request: web.Request, response: web.StreamResponse,
""" """
assert isinstance(identity, str), identity assert isinstance(identity, str), identity
assert identity assert identity
identity_policy = request.config_dict.get(IDENTITY_KEY) identity_policy = request.app.get(IDENTITY_KEY)
if identity_policy is None: if identity_policy is None:
text = ("Security subsystem is not initialized, " text = ("Security subsystem is not initialized, "
"call aiohttp_security.setup(...) first") "call aiohttp_security.setup(...) first")
@@ -38,13 +29,13 @@ async def remember(request: web.Request, response: web.StreamResponse,
await identity_policy.remember(request, response, identity, **kwargs) await identity_policy.remember(request, response, identity, **kwargs)
async def forget(request: web.Request, response: web.StreamResponse) -> None: async def forget(request, response):
"""Forget previously remembered identity. """Forget previously remembered identity.
Usually it clears cookie or server-side storage to forget user Usually it clears cookie or server-side storage to forget user
session. session.
""" """
identity_policy = request.config_dict.get(IDENTITY_KEY) identity_policy = request.app.get(IDENTITY_KEY)
if identity_policy is None: if identity_policy is None:
text = ("Security subsystem is not initialized, " text = ("Security subsystem is not initialized, "
"call aiohttp_security.setup(...) first") "call aiohttp_security.setup(...) first")
@@ -55,9 +46,9 @@ async def forget(request: web.Request, response: web.StreamResponse) -> None:
await identity_policy.forget(request, response) await identity_policy.forget(request, response)
async def authorized_userid(request: web.Request) -> Optional[str]: async def authorized_userid(request):
identity_policy: _AIP = request.config_dict.get(IDENTITY_KEY) identity_policy = request.app.get(IDENTITY_KEY)
autz_policy: _AAP = request.config_dict.get(AUTZ_KEY) autz_policy = request.app.get(AUTZ_KEY)
if identity_policy is None or autz_policy is None: if identity_policy is None or autz_policy is None:
return None return None
identity = await identity_policy.identify(request) identity = await identity_policy.identify(request)
@@ -67,27 +58,26 @@ async def authorized_userid(request: web.Request) -> Optional[str]:
return user_id return user_id
async def permits(request: web.Request, permission: Union[str, enum.Enum], async def permits(request, permission, context=None):
context: Any = None) -> bool:
assert isinstance(permission, (str, enum.Enum)), permission assert isinstance(permission, (str, enum.Enum)), permission
assert permission assert permission
identity_policy: _AIP = request.config_dict.get(IDENTITY_KEY) identity_policy = request.app.get(IDENTITY_KEY)
autz_policy: _AAP = request.config_dict.get(AUTZ_KEY) autz_policy = request.app.get(AUTZ_KEY)
if identity_policy is None or autz_policy is None: if identity_policy is None or autz_policy is None:
return True return True
identity = await identity_policy.identify(request) identity = await identity_policy.identify(request)
# non-registered user still may have some permissions # non-registered user still may has some permissions
access = await autz_policy.permits(identity, permission, context) access = await autz_policy.permits(identity, permission, context)
return access return access
async def is_anonymous(request: web.Request) -> bool: async def is_anonymous(request):
"""Check if user is anonymous. """Check if user is anonymous.
User is considered anonymous if there is not identity User is considered anonymous if there is not identity
in request. in request.
""" """
identity_policy = request.config_dict.get(IDENTITY_KEY) identity_policy = request.app.get(IDENTITY_KEY)
if identity_policy is None: if identity_policy is None:
return True return True
identity = await identity_policy.identify(request) identity = await identity_policy.identify(request)
@@ -96,85 +86,68 @@ async def is_anonymous(request: web.Request) -> bool:
return False return False
async def check_authorized(request: web.Request) -> str: def login_required(fn):
"""Checker that raises HTTPUnauthorized for anonymous users.
"""
userid = await authorized_userid(request)
if userid is None:
raise web.HTTPUnauthorized()
return userid
def login_required(fn: _Handler) -> _Handler:
"""Decorator that restrict access only for authorized users. """Decorator that restrict access only for authorized users.
User is considered authorized if authorized_userid User is considered authorized if authorized_userid
returns some value. returns some value.
""" """
@wraps(fn) @wraps(fn)
async def wrapped(*args: Union[object, web.Request]) -> Any: async def wrapped(*args, **kwargs):
request = args[-1] request = args[-1]
if not isinstance(request, web.Request): if not isinstance(request, web.BaseRequest):
msg = ("Incorrect decorator usage. " msg = ("Incorrect decorator usage. "
"Expecting `def handler(request)` " "Expecting `def handler(request)` "
"or `def handler(self, request)`.") "or `def handler(self, request)`.")
raise RuntimeError(msg) raise RuntimeError(msg)
await check_authorized(request) userid = await authorized_userid(request)
return await fn(*args) # type: ignore[arg-type] if userid is None:
raise web.HTTPUnauthorized
warnings.warn("login_required decorator is deprecated, " ret = await fn(*args, **kwargs)
"use check_authorized instead", return ret
DeprecationWarning)
return wrapped # type: ignore[return-value] return wrapped
async def check_permission(request: web.Request, permission: Union[str, enum.Enum], def has_permission(
context: Any = None) -> None: permission,
"""Checker that passes only to authoraised users with given permission. context=None,
):
If user is not authorized - raises HTTPUnauthorized, """Decorator that restrict access only for authorized users
if user is authorized and does not have permission -
raises HTTPForbidden.
"""
await check_authorized(request)
allowed = await permits(request, permission, context)
if not allowed:
raise web.HTTPForbidden()
def has_permission(permission: Union[str, enum.Enum], context: Any = None): # type: ignore
"""Decorator that restricts access only for authorized users
with correct permissions. with correct permissions.
If user is not authorized - raises HTTPUnauthorized, If user is not authorized - raises HTTPUnauthorized,
if user is authorized and does not have permission - if user is authorized and does not have permission -
raises HTTPForbidden. raises HTTPForbidden.
""" """
def wrapper(fn): # type: ignore def wrapper(fn):
@wraps(fn) @wraps(fn)
async def wrapped(*args, **kwargs): # type: ignore async def wrapped(*args, **kwargs):
request = args[-1] request = args[-1]
if not isinstance(request, web.Request): if not isinstance(request, web.BaseRequest):
msg = ("Incorrect decorator usage. " msg = ("Incorrect decorator usage. "
"Expecting `def handler(request)` " "Expecting `def handler(request)` "
"or `def handler(self, request)`.") "or `def handler(self, request)`.")
raise RuntimeError(msg) raise RuntimeError(msg)
await check_permission(request, permission, context) userid = await authorized_userid(request)
return await fn(*args, **kwargs) if userid is None:
raise web.HTTPUnauthorized
allowed = await permits(request, permission, context)
if not allowed:
raise web.HTTPForbidden
ret = await fn(*args, **kwargs)
return ret
return wrapped return wrapped
warnings.warn("has_permission decorator is deprecated, "
"use check_permission instead",
DeprecationWarning)
return wrapper return wrapper
def setup(app: web.Application, identity_policy: AbstractIdentityPolicy, def setup(app, identity_policy, autz_policy):
autz_policy: AbstractAuthorizationPolicy) -> None:
assert isinstance(identity_policy, AbstractIdentityPolicy), identity_policy assert isinstance(identity_policy, AbstractIdentityPolicy), identity_policy
assert isinstance(autz_policy, AbstractAuthorizationPolicy), autz_policy assert isinstance(autz_policy, AbstractAuthorizationPolicy), autz_policy

View File

@@ -5,32 +5,28 @@ more handy.
""" """
from aiohttp import web
from typing import Any, NewType, Optional, Union, cast
from .abc import AbstractIdentityPolicy from .abc import AbstractIdentityPolicy
_Sentinel = NewType('_Sentinel', object)
sentinel = _Sentinel(object()) sentinel = object()
class CookiesIdentityPolicy(AbstractIdentityPolicy): class CookiesIdentityPolicy(AbstractIdentityPolicy):
def __init__(self) -> None: def __init__(self):
self._cookie_name = 'AIOHTTP_SECURITY' self._cookie_name = 'AIOHTTP_SECURITY'
self._max_age = 30 * 24 * 3600 self._max_age = 30 * 24 * 3600
async def identify(self, request: web.Request) -> Optional[str]: async def identify(self, request):
return request.cookies.get(self._cookie_name) identity = request.cookies.get(self._cookie_name)
return identity
async def remember(self, request: web.Request, response: web.StreamResponse, async def remember(self, request, response, identity, max_age=sentinel,
identity: str, max_age: Union[_Sentinel, Optional[int]] = sentinel, **kwargs):
**kwargs: Any) -> None:
if max_age is sentinel: if max_age is sentinel:
max_age = self._max_age max_age = self._max_age
max_age = cast(Optional[int], max_age)
response.set_cookie(self._cookie_name, identity, response.set_cookie(self._cookie_name, identity,
max_age=max_age, **kwargs) max_age=max_age, **kwargs)
async def forget(self, request: web.Request, response: web.StreamResponse) -> None: async def forget(self, request, response):
response.del_cookie(self._cookie_name) response.del_cookie(self._cookie_name)

View File

@@ -1,51 +0,0 @@
"""Identity policy for storing info in the jwt token.
"""
from typing import Optional
from aiohttp import web
from .abc import AbstractIdentityPolicy
try:
import jwt
HAS_JWT = True
except ImportError: # pragma: no cover
HAS_JWT = False
AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '
class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret: str, algorithm: str = 'HS256'):
if not HAS_JWT:
raise RuntimeError('Please install `PyJWT`')
self.secret = secret
self.algorithm = algorithm
async def identify(self, request: web.Request) -> Optional[str]:
header_identity = request.headers.get(AUTH_HEADER_NAME)
if header_identity is None:
return None
if not header_identity.startswith(AUTH_SCHEME):
raise ValueError('Invalid authorization scheme. ' +
'Should be `{}<token>`'.format(AUTH_SCHEME))
token = header_identity.split(' ')[1].strip()
identity = jwt.decode(token,
self.secret,
algorithms=[self.algorithm])
return identity
async def remember(self, request: web.Request, response: web.StreamResponse,
identity: str, **kwargs: None) -> None:
pass
async def forget(self, request: web.Request, response: web.StreamResponse) -> None:
pass

View File

@@ -4,7 +4,6 @@ aiohttp_session.setup() should be called on application initialization
to configure aiohttp_session properly. to configure aiohttp_session properly.
""" """
from aiohttp import web
try: try:
from aiohttp_session import get_session from aiohttp_session import get_session
HAS_AIOHTTP_SESSION = True HAS_AIOHTTP_SESSION = True
@@ -16,22 +15,21 @@ from .abc import AbstractIdentityPolicy
class SessionIdentityPolicy(AbstractIdentityPolicy): class SessionIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, session_key: str = 'AIOHTTP_SECURITY'): def __init__(self, session_key='AIOHTTP_SECURITY'):
self._session_key = session_key self._session_key = session_key
if not HAS_AIOHTTP_SESSION: # pragma: no cover if not HAS_AIOHTTP_SESSION: # pragma: no cover
raise ImportError( raise ImportError(
'SessionIdentityPolicy requires `aiohttp_session`') 'SessionIdentityPolicy requires `aiohttp_session`')
async def identify(self, request: web.Request) -> str: async def identify(self, request):
session = await get_session(request) session = await get_session(request)
return session.get(self._session_key) return session.get(self._session_key)
async def remember(self, request: web.Request, response: web.StreamResponse, async def remember(self, request, response, identity, **kwargs):
identity: str, **kwargs: None) -> None:
session = await get_session(request) session = await get_session(request)
session[self._session_key] = identity session[self._session_key] = identity
async def forget(self, request: web.Request, response: web.StreamResponse) -> None: async def forget(self, request, response):
session = await get_session(request) session = await get_session(request)
session.pop(self._session_key, None) session.pop(self._session_key, None)

View File

View File

@@ -1,7 +1,5 @@
from enum import Enum
from typing import Any, Optional, Union
import sqlalchemy as sa import sqlalchemy as sa
from aiohttp_security.abc import AbstractAuthorizationPolicy from aiohttp_security.abc import AbstractAuthorizationPolicy
from passlib.hash import sha256_crypt from passlib.hash import sha256_crypt
@@ -9,13 +7,13 @@ from . import db
class DBAuthorizationPolicy(AbstractAuthorizationPolicy): class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
def __init__(self, dbengine: Any): def __init__(self, dbengine):
self.dbengine = dbengine self.dbengine = dbengine
async def authorized_userid(self, identity: str) -> Optional[str]: async def authorized_userid(self, identity):
async with self.dbengine.acquire() as conn: async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity, where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call] sa.not_(db.users.c.disabled))
query = db.users.count().where(where) query = db.users.count().where(where)
ret = await conn.scalar(query) ret = await conn.scalar(query)
if ret: if ret:
@@ -23,11 +21,13 @@ class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
else: else:
return None return None
async def permits(self, identity: str, permission: Union[str, Enum], async def permits(self, identity, permission, context=None):
context: None = None) -> bool: if identity is None:
async with self.dbengine.acquire() as conn: return False
async with self.dbengine as conn:
where = sa.and_(db.users.c.login == identity, where = sa.and_(db.users.c.login == identity,
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call] sa.not_(db.users.c.disabled))
query = db.users.select().where(where) query = db.users.select().where(where)
ret = await conn.execute(query) ret = await conn.execute(query)
user = await ret.fetchone() user = await ret.fetchone()
@@ -49,14 +49,14 @@ class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
return False return False
async def check_credentials(db_engine: Any, username: str, password: str) -> bool: async def check_credentials(db_engine, username, password):
async with db_engine.acquire() as conn: async with db_engine as conn:
where = sa.and_(db.users.c.login == username, where = sa.and_(db.users.c.login == username,
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call] sa.not_(db.users.c.disabled))
query = db.users.select().where(where) query = db.users.select().where(where)
ret = await conn.execute(query) ret = await conn.execute(query)
user = await ret.fetchone() user = await ret.fetchone()
if user is not None: if user is not None:
hashed = user[2] hash = user[2]
return sha256_crypt.verify(password, hashed) # type: ignore[no-any-return] return sha256_crypt.verify(password, hash)
return False return False

View File

@@ -1,11 +1,10 @@
from textwrap import dedent from textwrap import dedent
from typing import NoReturn
from aiohttp import web from aiohttp import web
from aiohttp_security import ( from aiohttp_security import (
remember, forget, authorized_userid, remember, forget, authorized_userid,
check_permission, check_authorized, has_permission, login_required,
) )
from .db_auth import check_credentials from .db_auth import check_credentials
@@ -28,7 +27,7 @@ class Web(object):
</body> </body>
""") """)
async def index(self, request: web.Request) -> web.Response: async def index(self, request):
username = await authorized_userid(request) username = await authorized_userid(request)
if username: if username:
template = self.index_template.format( template = self.index_template.format(
@@ -38,41 +37,37 @@ class Web(object):
response = web.Response(body=template.encode()) response = web.Response(body=template.encode())
return response return response
async def login(self, request: web.Request) -> NoReturn: async def login(self, request):
invalid_resp = web.HTTPUnauthorized(body=b'Invalid username/password combination') response = web.HTTPFound('/')
form = await request.post() form = await request.post()
login = form.get('login') login = form.get('login')
password = form.get('password') password = form.get('password')
db_engine = request.app['db_engine'] db_engine = request.app.db_engine
if not (isinstance(login, str) and isinstance(password, str)):
raise invalid_resp
if await check_credentials(db_engine, login, password): if await check_credentials(db_engine, login, password):
response = web.HTTPFound('/')
await remember(request, response, login) await remember(request, response, login)
raise response return response
raise invalid_resp return web.HTTPUnauthorized(
body=b'Invalid username/password combination')
async def logout(self, request: web.Request) -> web.Response: @login_required
await check_authorized(request) async def logout(self, request):
response = web.Response(body=b'You have been logged out') response = web.Response(body=b'You have been logged out')
await forget(request, response) await forget(request, response)
return response return response
async def internal_page(self, request: web.Request) -> web.Response: @has_permission('public')
await check_permission(request, 'public') async def internal_page(self, request):
response = web.Response( response = web.Response(
body=b'This page is visible for all registered users') body=b'This page is visible for all registered users')
return response return response
async def protected_page(self, request: web.Request) -> web.Response: @has_permission('protected')
await check_permission(request, 'protected') async def protected_page(self, request):
response = web.Response(body=b'You are on protected page') response = web.Response(body=b'You are on protected page')
return response return response
def configure(self, app: web.Application) -> None: def configure(self, app):
router = app.router router = app.router
router.add_route('GET', '/', self.index, name='index') router.add_route('GET', '/', self.index, name='index')
router.add_route('POST', '/login', self.login, name='login') router.add_route('POST', '/login', self.login, name='login')

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
from typing import Any, Tuple
from aiohttp import web from aiohttp import web
from aiohttp_session import setup as setup_session from aiohttp_session import setup as setup_session
@@ -14,14 +13,14 @@ from demo.database_auth.db_auth import DBAuthorizationPolicy
from demo.database_auth.handlers import Web from demo.database_auth.handlers import Web
async def init(loop: asyncio.AbstractEventLoop) -> Tuple[Any, ...]: async def init(loop):
redis_pool = await create_pool(('localhost', 6379)) redis_pool = await create_pool(('localhost', 6379))
db_engine = await create_engine(user='aiohttp_security', db_engine = await create_engine(user='aiohttp_security',
password='aiohttp_security', password='aiohttp_security',
database='aiohttp_security', database='aiohttp_security',
host='127.0.0.1') host='127.0.0.1')
app = web.Application() app = web.Application(loop=loop)
app['db_engine'] = db_engine app.db_engine = db_engine
setup_session(app, RedisStorage(redis_pool)) setup_session(app, RedisStorage(redis_pool))
setup_security(app, setup_security(app,
SessionIdentityPolicy(), SessionIdentityPolicy(),
@@ -36,7 +35,7 @@ async def init(loop: asyncio.AbstractEventLoop) -> Tuple[Any, ...]:
return srv, app, handler return srv, app, handler
async def finalize(srv: Any, app: Any, handler: Any) -> None: async def finalize(srv, app, handler):
sock = srv.sockets[0] sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno()) app.loop.remove_reader(sock.fileno())
sock.close() sock.close()
@@ -47,7 +46,7 @@ async def finalize(srv: Any, app: Any, handler: Any) -> None:
await app.finish() await app.finish()
def main() -> None: def main():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
srv, app, handler = loop.run_until_complete(init(loop)) srv, app, handler = loop.run_until_complete(init(loop))
try: try:

View File

@@ -1,25 +1,20 @@
from enum import Enum
from typing import Dict, Optional, Union
from aiohttp_security.abc import AbstractAuthorizationPolicy from aiohttp_security.abc import AbstractAuthorizationPolicy
from .users import User
class DictionaryAuthorizationPolicy(AbstractAuthorizationPolicy): class DictionaryAuthorizationPolicy(AbstractAuthorizationPolicy):
def __init__(self, user_map: Dict[str, User]): def __init__(self, user_map):
super().__init__() super().__init__()
self.user_map = user_map self.user_map = user_map
async def authorized_userid(self, identity: str) -> Optional[str]: async def authorized_userid(self, identity):
"""Retrieve authorized user id. """Retrieve authorized user id.
Return the user_id of the user identified by the identity Return the user_id of the user identified by the identity
or 'None' if no user exists related to the identity. or 'None' if no user exists related to the identity.
""" """
return identity if identity in self.user_map else None if identity in self.user_map:
return identity
async def permits(self, identity: str, permission: Union[str, Enum], async def permits(self, identity, permission, context=None):
context: None = None) -> bool:
"""Check user permissions. """Check user permissions.
Return True if the identity is allowed the permission in the Return True if the identity is allowed the permission in the
current context, else return False. current context, else return False.
@@ -31,7 +26,7 @@ class DictionaryAuthorizationPolicy(AbstractAuthorizationPolicy):
return permission in user.permissions return permission in user.permissions
async def check_credentials(user_map: Dict[str, User], username: str, password: str) -> bool: async def check_credentials(user_map, username, password):
user = user_map.get(username) user = user_map.get(username)
if not user: if not user:
return False return False

View File

@@ -1,15 +1,13 @@
from textwrap import dedent from textwrap import dedent
from typing import Dict, NoReturn
from aiohttp import web from aiohttp import web
from aiohttp_security import ( from aiohttp_security import (
remember, forget, authorized_userid, remember, forget, authorized_userid,
check_permission, check_authorized, has_permission, login_required,
) )
from .authz import check_credentials from .authz import check_credentials
from .users import User
index_template = dedent(""" index_template = dedent("""
@@ -29,7 +27,7 @@ index_template = dedent("""
""") """)
async def index(request: web.Request) -> web.Response: async def index(request):
username = await authorized_userid(request) username = await authorized_userid(request)
if username: if username:
template = index_template.format( template = index_template.format(
@@ -42,27 +40,23 @@ async def index(request: web.Request) -> web.Response:
) )
async def login(request: web.Request) -> NoReturn: async def login(request):
user_map: Dict[str, User] = request.app['user_map'] response = web.HTTPFound('/')
invalid_response = web.HTTPUnauthorized(body='Invalid username / password combination')
form = await request.post() form = await request.post()
username = form.get('username') username = form.get('username')
password = form.get('password') password = form.get('password')
if not (isinstance(username, str) and isinstance(password, str)): verified = await check_credentials(
raise invalid_response request.app.user_map, username, password)
verified = await check_credentials(user_map, username, password)
if verified: if verified:
response = web.HTTPFound('/')
await remember(request, response, username) await remember(request, response, username)
raise response return response
raise invalid_response return web.HTTPUnauthorized(body='Invalid username / password combination')
async def logout(request: web.Request) -> web.Response: @login_required
await check_authorized(request) async def logout(request):
response = web.Response( response = web.Response(
text='You have been logged out', text='You have been logged out',
content_type='text/html', content_type='text/html',
@@ -71,8 +65,9 @@ async def logout(request: web.Request) -> web.Response:
return response return response
async def internal_page(request: web.Request) -> web.Response: @has_permission('public')
await check_permission(request, 'public') async def internal_page(request):
# pylint: disable=unused-argument
response = web.Response( response = web.Response(
text='This page is visible for all registered users', text='This page is visible for all registered users',
content_type='text/html', content_type='text/html',
@@ -80,8 +75,9 @@ async def internal_page(request: web.Request) -> web.Response:
return response return response
async def protected_page(request: web.Request) -> web.Response: @has_permission('protected')
await check_permission(request, 'protected') async def protected_page(request):
# pylint: disable=unused-argument
response = web.Response( response = web.Response(
text='You are on protected page', text='You are on protected page',
content_type='text/html', content_type='text/html',
@@ -89,7 +85,7 @@ async def protected_page(request: web.Request) -> web.Response:
return response return response
def configure_handlers(app: web.Application) -> None: def configure_handlers(app):
router = app.router router = app.router
router.add_get('/', index, name='index') router.add_get('/', index, name='index')
router.add_post('/login', login, name='login') router.add_post('/login', login, name='login')

View File

@@ -11,9 +11,9 @@ from demo.dictionary_auth.handlers import configure_handlers
from demo.dictionary_auth.users import user_map from demo.dictionary_auth.users import user_map
def make_app() -> web.Application: def make_app():
app = web.Application() app = web.Application()
app['user_map'] = user_map app.user_map = user_map
configure_handlers(app) configure_handlers(app)
# secret_key must be 32 url-safe base64-encoded bytes # secret_key must be 32 url-safe base64-encoded bytes

View File

@@ -1,11 +1,6 @@
from typing import NamedTuple, Tuple from collections import namedtuple
class User(NamedTuple):
username: str
password: str
permissions: Tuple[str, ...]
User = namedtuple('User', ['username', 'password', 'permissions'])
user_map = { user_map = {
user.username: user for user in [ user.username: user for user in [

View File

@@ -1,96 +0,0 @@
from enum import Enum
from typing import NoReturn, Optional, Union
from aiohttp import web
from aiohttp_session import SimpleCookieStorage, session_middleware
from aiohttp_security import check_permission, \
is_anonymous, remember, forget, \
setup as setup_security, SessionIdentityPolicy
from aiohttp_security.abc import AbstractAuthorizationPolicy
# Demo authorization policy for only one user.
# User 'jack' has only 'listen' permission.
# For more complicated authorization policies see examples
# in the 'demo' directory.
class SimpleJack_AuthorizationPolicy(AbstractAuthorizationPolicy):
async def authorized_userid(self, identity: str) -> Optional[str]:
"""Retrieve authorized user id.
Return the user_id of the user identified by the identity
or 'None' if no user exists related to the identity.
"""
return identity if identity == 'jack' else None
async def permits(self, identity: str, permission: Union[str, Enum],
context: None = None) -> bool:
"""Check user permissions.
Return True if the identity is allowed the permission
in the current context, else return False.
"""
return identity == 'jack' and permission in ('listen',)
async def handler_root(request: web.Request) -> web.Response:
is_logged = not await is_anonymous(request)
return web.Response(text='''<html><head></head><body>
Hello, I'm Jack, I'm {logged} logged in.<br /><br />
<a href="/login">Log me in</a><br />
<a href="/logout">Log me out</a><br /><br />
Check my permissions,
when i'm logged in and logged out.<br />
<a href="/listen">Can I listen?</a><br />
<a href="/speak">Can I speak?</a><br />
</body></html>'''.format(
logged='' if is_logged else 'NOT',
), content_type='text/html')
async def handler_login_jack(request: web.Request) -> NoReturn:
redirect_response = web.HTTPFound('/')
await remember(request, redirect_response, 'jack')
raise redirect_response
async def handler_logout(request: web.Request) -> NoReturn:
redirect_response = web.HTTPFound('/')
await forget(request, redirect_response)
raise redirect_response
async def handler_listen(request: web.Request) -> web.Response:
await check_permission(request, 'listen')
return web.Response(body="I can listen!")
async def handler_speak(request: web.Request) -> web.Response:
await check_permission(request, 'speak')
return web.Response(body="I can speak!")
async def make_app() -> web.Application:
#
# WARNING!!!
# Never use SimpleCookieStorage on production!!!
# Its highly insecure!!!
#
# make app
middleware = session_middleware(SimpleCookieStorage())
app = web.Application(middlewares=[middleware])
# add the routes
app.router.add_route('GET', '/', handler_root)
app.router.add_route('GET', '/login', handler_login_jack)
app.router.add_route('GET', '/logout', handler_logout)
app.router.add_route('GET', '/listen', handler_listen)
app.router.add_route('GET', '/speak', handler_speak)
# set up policies
policy = SessionIdentityPolicy()
setup_security(app, policy, SimpleJack_AuthorizationPolicy())
return app
if __name__ == '__main__':
web.run_app(make_app(), port=9000)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

@@ -7,97 +7,63 @@ How to Make a Simple Server With Authorization
Simple example:: Simple example::
import asyncio
from aiohttp import web from aiohttp import web
from aiohttp_session import SimpleCookieStorage, session_middleware
from aiohttp_security import check_permission, \ async def root_handler(request):
is_anonymous, remember, forget, \ text = "Alive and kicking!"
setup as setup_security, SessionIdentityPolicy return web.Response(body=text.encode('utf-8'))
from aiohttp_security.abc import AbstractAuthorizationPolicy
# option 2: auth at a higher level?
# set user_id and allowed in the wsgi handler
@protect('view_user')
async def user_handler(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(body=text.encode('utf-8'))
# Demo authorization policy for only one user. # option 3: super low
# User 'jack' has only 'listen' permission. # wsgi doesn't do anything
# For more complicated authorization policies see examples async def user_update_handler(request):
# in the 'demo' directory. # identity, asked_permission
class SimpleJack_AuthorizationPolicy(AbstractAuthorizationPolicy): user_id = await identity_policy.identify(request)
async def authorized_userid(self, identity): identity = await auth_policy.authorized_userid(user_id)
"""Retrieve authorized user id. allowed = await request.auth_policy.permits(
Return the user_id of the user identified by the identity identity, asked_permission)
or 'None' if no user exists related to the identity. if not allowed:
""" # how is this pluggable as well?
if identity == 'jack': # ? return NotAllowedStream()
return identity raise NotAllowedResponse()
async def permits(self, identity, permission, context=None): update_user()
"""Check user permissions.
Return True if the identity is allowed the permission
in the current context, else return False.
"""
return identity == 'jack' and permission in ('listen',)
async def init(loop):
# set up identity and auth
auth_policy = DictionaryAuthorizationPolicy({'me': ('view_user',),
'you': ('view_user',
'edit_user',)})
identity_policy = CookieIdentityPolicy()
auth = authorization_middleware(auth_policy, identity_policy)
async def handler_root(request): # wsgi app
is_logged = not await is_anonymous(request) app = web.Application(loop=loop, middlewares=*auth)
return web.Response(text='''<html><head></head><body>
Hello, I'm Jack, I'm {logged} logged in.<br /><br />
<a href="/login">Log me in</a><br />
<a href="/logout">Log me out</a><br /><br />
Check my permissions,
when i'm logged in and logged out.<br />
<a href="/listen">Can I listen?</a><br />
<a href="/speak">Can I speak?</a><br />
</body></html>'''.format(
logged='' if is_logged else 'NOT',
), content_type='text/html')
async def handler_login_jack(request):
redirect_response = web.HTTPFound('/')
await remember(request, redirect_response, 'jack')
raise redirect_response
async def handler_logout(request):
redirect_response = web.HTTPFound('/')
await forget(request, redirect_response)
raise redirect_response
async def handler_listen(request):
await check_permission(request, 'listen')
return web.Response(body="I can listen!")
async def handler_speak(request):
await check_permission(request, 'speak')
return web.Response(body="I can speak!")
async def make_app():
#
# WARNING!!!
# Never use SimpleCookieStorage on production!!!
# Its highly insecure!!!
#
# make app
middleware = session_middleware(SimpleCookieStorage())
app = web.Application(middlewares=[middleware])
# add the routes # add the routes
app.add_routes([ app.router.add_route('GET', '/', root_handler)
web.get('/', handler_root), app.router.add_route('GET', '/{user}', user_handler)
web.get('/login', handler_login_jack), app.router.add_route('GET', '/{user}/edit', user_update_handler)
web.get('/logout', handler_logout),
web.get('/listen', handler_listen),
web.get('/speak', handler_speak)])
# set up policies # get it started
policy = SessionIdentityPolicy() srv = await loop.create_server(app.make_handler(),
setup_security(app, policy, SimpleJack_AuthorizationPolicy()) '127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return app return srv
if __name__ == '__main__': loop = asyncio.get_event_loop()
web.run_app(make_app(), port=9000) loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass # TODO put handler cleanup here

View File

@@ -53,7 +53,7 @@ Now you have two tables:
+-----------------+ +-----------------+
| user_id | | user_id |
+-----------------+ +-----------------+
| perm_name | | permission_name |
+-----------------+ +-----------------+
@@ -133,7 +133,7 @@ Once we have all the code in place we can install it for our application::
password='aiohttp_security', password='aiohttp_security',
database='aiohttp_security', database='aiohttp_security',
host='127.0.0.1') host='127.0.0.1')
app = web.Application() app = web.Application(loop=loop)
setup_session(app, RedisStorage(redis_pool)) setup_session(app, RedisStorage(redis_pool))
setup_security(app, setup_security(app,
SessionIdentityPolicy(), SessionIdentityPolicy(),
@@ -142,23 +142,23 @@ Once we have all the code in place we can install it for our application::
Now we have authorization and can decorate every other view with access rights Now we have authorization and can decorate every other view with access rights
based on permissions. There are already implemented two helpers:: based on permissions. There are already implemented two decorators::
from aiohttp_security import check_authorized, check_permission from aiohttp_security import has_permission, login_required
For each view you need to protect - just apply the decorator on it:: For each view you need to protect - just apply the decorator on it::
class Web: class Web:
@has_permission('protected')
async def protected_page(self, request): async def protected_page(self, request):
await check_permission(request, 'protected')
response = web.Response(body=b'You are on protected page') response = web.Response(body=b'You are on protected page')
return response return response
or:: or::
class Web: class Web:
@login_required
async def logout(self, request): async def logout(self, request):
await check_authorized(request)
response = web.Response(body=b'You have been logged out') response = web.Response(body=b'You have been logged out')
await forget(request, response) await forget(request, response)
return response return response

View File

@@ -3,7 +3,6 @@ aiohttp_security
The library provides security for :ref:`aiohttp.web<aiohttp-web>`. The library provides security for :ref:`aiohttp.web<aiohttp-web>`.
The current version is |version|
Contents Contents
-------- --------

View File

@@ -13,19 +13,6 @@
Public API functions Public API functions
==================== ====================
.. function:: setup(app, identity_policy, autz_policy)
Setup :mod:`aiohttp` application with security policies.
:param app: aiohttp :class:`aiohttp.web.Application` instance.
:param identity_policy: indentification policy, an
:class:`AbstractIdentityPolicy` instance.
:param autz_policy: authorization policy, an
:class:`AbstractAuthorizationPolicy` instance.
.. coroutinefunction:: remember(request, response, identity, **kwargs) .. coroutinefunction:: remember(request, response, identity, **kwargs)
Remember *identity* in *response*, e.g. by storing a cookie or Remember *identity* in *response*, e.g. by storing a cookie or
@@ -63,41 +50,6 @@ Public API functions
descendants like :class:`aiohttp.web.Response`. descendants like :class:`aiohttp.web.Response`.
.. coroutinefunction:: check_authorized(request)
Checker that doesn't pass if user is not authorized by *request*.
:param request: :class:`aiohttp.web.Request` object.
:return str: authorized user ID if success
:raise: :class:`aiohttp.web.HTTPUnauthorized` for anonymous users.
Usage::
async def handler(request):
await check_authorized(request)
# this line is never executed for anonymous users
.. coroutinefunction:: check_permission(request, permission)
Checker that doesn't pass if user has no requested permission.
:param request: :class:`aiohttp.web.Request` object.
:raise: :class:`aiohttp.web.HTTPUnauthorized` for anonymous users.
:raise: :class:`aiohttp.web.HTTPForbidden` if user is
authorized but has no access rights.
Usage::
async def handler(request):
await check_permission(request, 'read')
# this line is never executed if a user has no read permission
.. coroutinefunction:: authorized_userid(request) .. coroutinefunction:: authorized_userid(request)
Retrieve :term:`userid`. Retrieve :term:`userid`.
@@ -126,8 +78,7 @@ Public API functions
:param request: :class:`aiohttp.web.Request` object. :param request: :class:`aiohttp.web.Request` object.
:param permission: Requested :term:`permission`. :class:`str` or :param permission: Requested :term:`permission`. :class:`str` or :class:`enum.Enum` object.
:class:`enum.Enum` object.
:param context: additional object may be passed into :param context: additional object may be passed into
:meth:`AbstractAuthorizationPolicy.permission` :meth:`AbstractAuthorizationPolicy.permission`
@@ -141,8 +92,7 @@ Public API functions
Checks if user is anonymous user. Checks if user is anonymous user.
Return ``True`` if user is not remembered in request, otherwise Return ``True`` if user is not remembered in request, otherwise returns ``False``.
returns ``False``.
:param request: :class:`aiohttp.web.Request` object. :param request: :class:`aiohttp.web.Request` object.
@@ -153,27 +103,29 @@ Public API functions
Raises :class:`aiohttp.web.HTTPUnauthorized` if user is not authorized. Raises :class:`aiohttp.web.HTTPUnauthorized` if user is not authorized.
.. deprecated:: 0.3
Use :func:`check_authorized` async function.
.. decorator:: has_permission(permission) .. decorator:: has_permission(permission)
Decorator for handlers that checks if user is authorized Decorator for handlers that checks if user is authorized
and has correct permission. and has correct permission.
Raises :class:`aiohttp.web.HTTPUnauthorized` if user is not Raises :class:`aiohttp.web.HTTPUnauthorized` if user is not authorized.
authorized. Raises :class:`aiohttp.web.HTTPForbidden` if user is authorized but has no access rights.
Raises :class:`aiohttp.web.HTTPForbidden` if user is
authorized but has no access rights.
:param str permission: requested :term:`permission`. :param str permission: requested :term:`permission`.
.. deprecated:: 0.3
Use :func:`check_authorized` async function. .. function:: setup(app, identity_policy, autz_policy)
Setup :mod:`aiohttp` application with security policies.
:param app: aiohttp :class:`aiohttp.web.Application` instance.
:param identity_policy: indentification policy, an
:class:`AbstractIdentityPolicy` instance.
:param autz_policy: authorization policy, an
:class:`AbstractAuthorizationPolicy` instance.
Abstract policies Abstract policies

View File

@@ -11,132 +11,39 @@
First of all, what is *aiohttp_security* about? First of all, what is *aiohttp_security* about?
*aiohttp-security* is a set of public API functions as well as a It is a set of public API functions and standard for implementation details.
reference standard for implementation details for securing access to
assets served by a wsgi server.
Assets are secured using authentication and authorization as explained
below. *aiohttp-security* is part of the
`aio-libs <https://github.com/aio-libs>`_ project which takes advantage
of asynchronous processing using Python's asyncio library.
Public API Public API
========== ==========
The API is agnostic to the low level implementation details such that API is implementation agnostic, all client code should not call policy
all client code only needs to implement the endpoints as provided by code (see below) directly but use API only.
the API (instead of calling policy code directly (see explanation
below)).
Via the API an application can:
(i) remember a user in a local session (:func:`remember`),
(ii) forget a user in a local session (:func:`forget`),
(iii) retrieve the :term:`userid` (:func:`authorized_userid`) of a
remembered user from an :term:`identity` (discussed below), and
(iv) check the :term:`permission` of a remembered user (:func:`permits`).
The library internals are built on top of two concepts:
1) :term:`authentication`, and
2) :term:`authorization`.
There are abstract base classes for both types as well as several
pre-built implementations that are shipped with the library. However,
the end user is free to build their own implementations.
The library comes with two pre-built identity policies; one that uses
cookies, and one that uses sessions [#f1]_. It is envisioned that in
most use cases developers will use one of the provided identity
policies (Cookie or Session) and implement their own authorization
policy.
The workflow is as follows:
1) User is authenticated. This has to be implemented by the developer.
2) Once user is authenticated an identity string has to be created for
that user. This has to be implemented by the developer.
3) The identity string is passed to the Identity Policy's remember
method and the user is now remembered (Cookie or Session if using
built-in). *Only once a user is remembered can the other API
methods:* :func:`permits`, :func:`forget`, *and*
:func:`authorized_userid` *be invoked* .
4) If the user tries to access a restricted asset the :func:`permits`
method is called. Usually assets are protected using the
:func:`check_permission` helper. This should return True if
permission is granted.
The :func:`permits` method is implemented by the developer as part of
the :class:`AbstractAuthorizationPolicy` and passed to the
application at runtime via setup.
In addition a :func:`check_authorized` also
exists that requires no permissions (i.e. doesn't call :func:`permits`
method) but only requires that the user is remembered
(i.e. authenticated/logged in).
Via API application can remember/forget user in local session
(:func:`remember`/:func:`forget`), retrieve :term:`userid`
(:func:`authorized_userid`) and check :term:`permission` for
remembered user (:func:`permits`).
The library internals are built on top of two policies:
:term:`authentication` and :term:`authorization`. There are abstract
base classes for both concepts as well as several implementations
shipped with the library. End user is free to build own implemetations
if needed.
Authentication Authentication
============== ==============
Authentication is the process where a user's identity is verified. It Actions related to retrieving, storing and removing user's
confirms who the user is. This is traditionally done using a user name :term:`identity`.
and password (note: this is not the only way).
A authenticated user has no access rights, rather an authenticated Authenticated user has no access rights, the system even has no
user merely confirms that the user exists and that the user is who knowledge is there the user still registered in DB.
they say they are.
In *aiohttp_security* the developer is responsible for their own If :class:`aiohttp.web.Request` has an :term:`identity` it means the user has
authentication mechanism. *aiohttp_security* only requires that the some ID that should be checked by :term:`authorization` policy.
authentication result in a identity string which corresponds to a
user's id in the underlying system.
.. note:: :term:`identity` is a string shared between browser and server.
Thus it's not supposed to be database primary key, user login/email etc.
:term:`identity` is a string that is shared between the browser and Random string like uuid or hash is better choice.
the server. Therefore it is recommended that a random string
such as a uuid or hash is used rather than things like a
database primary key, user login/email, etc.
Identity Policy
===============
Once a user is authenticated the *aiohttp_security* API is invoked for
storing, retrieving, and removing a user's :term:`identity`. This is
accommplished via AbstractIdentityPolicy's :func:`remember`,
:func:`identify`, and :func:`forget` methods. The Identity Policy is
therefore the mechanism by which a authenticated user is persisted in
the system.
*aiohttp_security* has two built in identity policy's for this
purpose. :class:`CookiesIdentityPolicy` that uses cookies and
:class:`SessionIdentityPolicy` that uses sessions via
``aiohttp-session`` library.
Authorization
==============
Once a user is authenticated (see above) it means that the user has an
:term:`identity`. This :term:`identity` can now be used for checking
access rights or :term:`permission` using a :term:`authorization`
policy.
The authorization policy's :func:`permits()` method is used for this purpose.
When :class:`aiohttp.web.Request` has an :term:`identity` it means the
user has been authenticated and therefore has an :term:`identity` that
can be checked by the :term:`authorization` policy.
As noted above, :term:`identity` is a string that is shared between
the browser and the server. Therefore it is recommended that a
random string such as a uuid or hash is used rather than things like
a database primary key, user login/email, etc.
.. rubric:: Footnotes
.. [#f1] jwt - json web tokens in the works

View File

@@ -1,18 +1,15 @@
-e . -e .
flake8==3.8.4 flake8==3.5.0
async-timeout==3.0.1 pytest==3.4.2
pytest==6.1.2 pytest-cov==2.5.1
pytest-cov==2.10.1 coverage==4.5.1
pytest-mock==3.3.1 sphinx==1.7.1
coverage==5.3
sphinx==3.3.1
pep257==0.7.0 pep257==0.7.0
aiohttp-session==2.9.0 aiohttp-session==2.3.0
aiopg[sa]==1.1.0 aiopg[sa]==0.13.2
aioredis==1.3.1 aioredis==1.1.0
hiredis==1.1.0 hiredis==0.2.0
passlib==1.7.4 passlib==1.7.1
cryptography==3.3.1 cryptography==2.2
aiohttp==3.7.3 aiohttp==3.0.9
pytest-aiohttp==0.3.0 pytest-aiohttp==0.3.0
pyjwt==1.7.1

View File

@@ -1,4 +0,0 @@
[tool:pytest]
testpaths = tests
filterwarnings=
error

View File

@@ -1,3 +1,4 @@
import codecs
from setuptools import setup, find_packages from setuptools import setup, find_packages
import os import os
import re import re
@@ -15,8 +16,8 @@ class PyTest(TestCommand):
raise SystemExit(errno) raise SystemExit(errno)
with open(os.path.join(os.path.abspath(os.path.dirname( with codecs.open(os.path.join(os.path.abspath(os.path.dirname(
__file__)), 'aiohttp_security', '__init__.py'), 'r', encoding='latin1') as fp: __file__)), 'aiohttp_security', '__init__.py'), 'r', 'latin1') as fp:
try: try:
version = re.findall(r"^__version__ = '([^']+)'$", fp.read(), re.M)[0] version = re.findall(r"^__version__ = '([^']+)'$", fp.read(), re.M)[0]
except IndexError: except IndexError:
@@ -27,7 +28,7 @@ def read(f):
return open(os.path.join(os.path.dirname(__file__), f)).read().strip() return open(os.path.join(os.path.dirname(__file__), f)).read().strip()
install_requires = ['aiohttp>=3.2.0'] install_requires = ['aiohttp>=0.18']
tests_require = install_requires + ['pytest'] tests_require = install_requires + ['pytest']
extras_require = {'session': 'aiohttp-session'} extras_require = {'session': 'aiohttp-session'}
@@ -43,7 +44,6 @@ setup(name='aiohttp-security',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: Internet :: WWW/HTTP', 'Topic :: Internet :: WWW/HTTP',
'Framework :: AsyncIO', 'Framework :: AsyncIO',
], ],

View File

View File

@@ -15,23 +15,23 @@ class Autz(AbstractAuthorizationPolicy):
pass pass
async def test_remember(loop, aiohttp_client): async def test_remember(loop, test_client):
async def handler(request): async def handler(request):
response = web.Response() response = web.Response()
await remember(request, response, 'Andrew') await remember(request, response, 'Andrew')
return response return response
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', handler) app.router.add_route('GET', '/', handler)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status
assert 'Andrew' == resp.cookies['AIOHTTP_SECURITY'].value assert 'Andrew' == resp.cookies['AIOHTTP_SECURITY'].value
async def test_identify(loop, aiohttp_client): async def test_identify(loop, test_client):
async def create(request): async def create(request):
response = web.Response() response = web.Response()
@@ -44,11 +44,11 @@ async def test_identify(loop, aiohttp_client):
assert 'Andrew' == user_id assert 'Andrew' == user_id
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create) app.router.add_route('POST', '/', create)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/') resp = await client.post('/')
assert 200 == resp.status assert 200 == resp.status
await resp.release() await resp.release()
@@ -56,7 +56,7 @@ async def test_identify(loop, aiohttp_client):
assert 200 == resp.status assert 200 == resp.status
async def test_forget(loop, aiohttp_client): async def test_forget(loop, test_client):
async def index(request): async def index(request):
return web.Response() return web.Response()
@@ -64,19 +64,19 @@ async def test_forget(loop, aiohttp_client):
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'Andrew') await remember(request, response, 'Andrew')
raise response return response
async def logout(request): async def logout(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await forget(request, response) await forget(request, response)
raise response return response
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', index) app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout) app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/login') resp = await client.post('/login')
assert 200 == resp.status assert 200 == resp.status
assert str(resp.url).endswith('/') assert str(resp.url).endswith('/')

View File

@@ -1,12 +1,10 @@
import enum import enum
import pytest
from aiohttp import web from aiohttp import web
from aiohttp_security import setup as _setup from aiohttp_security import setup as _setup
from aiohttp_security import (AbstractAuthorizationPolicy, authorized_userid, from aiohttp_security import (AbstractAuthorizationPolicy, authorized_userid,
forget, has_permission, is_anonymous, forget, has_permission, is_anonymous,
login_required, permits, remember, login_required, permits, remember)
check_authorized, check_permission)
from aiohttp_security.cookies_identity import CookiesIdentityPolicy from aiohttp_security.cookies_identity import CookiesIdentityPolicy
@@ -25,23 +23,23 @@ class Autz(AbstractAuthorizationPolicy):
return None return None
async def test_authorized_userid(loop, aiohttp_client): async def test_authorized_userid(loop, test_client):
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'UserID') await remember(request, response, 'UserID')
raise response return response
async def check(request): async def check(request):
userid = await authorized_userid(request) userid = await authorized_userid(request)
assert 'Andrew' == userid assert 'Andrew' == userid
return web.Response(text=userid) return web.Response(text=userid)
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/login') resp = await client.post('/login')
assert 200 == resp.status assert 200 == resp.status
@@ -49,23 +47,23 @@ async def test_authorized_userid(loop, aiohttp_client):
assert 'Andrew' == txt assert 'Andrew' == txt
async def test_authorized_userid_not_authorized(loop, aiohttp_client): async def test_authorized_userid_not_authorized(loop, test_client):
async def check(request): async def check(request):
userid = await authorized_userid(request) userid = await authorized_userid(request)
assert userid is None assert userid is None
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status
async def test_permits_enum_permission(loop, aiohttp_client): async def test_permits_enum_permission(loop, test_client):
class Permission(enum.Enum): class Permission(enum.Enum):
READ = '101' READ = '101'
WRITE = '102' WRITE = '102'
@@ -88,7 +86,7 @@ async def test_permits_enum_permission(loop, aiohttp_client):
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'UserID') await remember(request, response, 'UserID')
raise response return response
async def check(request): async def check(request):
ret = await permits(request, Permission.READ) ret = await permits(request, Permission.READ)
@@ -99,16 +97,16 @@ async def test_permits_enum_permission(loop, aiohttp_client):
assert not ret assert not ret
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/login') resp = await client.post('/login')
assert 200 == resp.status assert 200 == resp.status
async def test_permits_unauthorized(loop, aiohttp_client): async def test_permits_unauthorized(loop, test_client):
async def check(request): async def check(request):
ret = await permits(request, 'read') ret = await permits(request, 'read')
@@ -119,38 +117,38 @@ async def test_permits_unauthorized(loop, aiohttp_client):
assert not ret assert not ret
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status
async def test_is_anonymous(loop, aiohttp_client): async def test_is_anonymous(loop, test_client):
async def index(request): async def index(request):
is_anon = await is_anonymous(request) is_anon = await is_anonymous(request)
if is_anon: if is_anon:
raise web.HTTPUnauthorized() return web.HTTPUnauthorized()
return web.Response() return web.HTTPOk()
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'UserID') await remember(request, response, 'UserID')
raise response return response
async def logout(request): async def logout(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await forget(request, response) await forget(request, response)
raise response return response
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', index) app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout) app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status assert web.HTTPUnauthorized.status_code == resp.status
@@ -163,63 +161,27 @@ async def test_is_anonymous(loop, aiohttp_client):
assert web.HTTPUnauthorized.status_code == resp.status assert web.HTTPUnauthorized.status_code == resp.status
async def test_login_required(loop, aiohttp_client): async def test_login_required(loop, test_client):
with pytest.raises(DeprecationWarning): @login_required
@login_required
async def index(request):
return web.Response()
async def login(request):
response = web.HTTPFound(location='/')
await remember(request, response, 'UserID')
raise response
async def logout(request):
response = web.HTTPFound(location='/')
await forget(request, response)
raise response
app = web.Application()
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app)
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
await client.post('/login')
resp = await client.get('/')
assert web.HTTPOk.status_code == resp.status
await client.post('/logout')
resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status
async def test_check_authorized(loop, aiohttp_client):
async def index(request): async def index(request):
await check_authorized(request) return web.HTTPOk()
return web.Response()
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'UserID') await remember(request, response, 'UserID')
raise response return response
async def logout(request): async def logout(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await forget(request, response) await forget(request, response)
raise response return response
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/', index) app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout) app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert web.HTTPUnauthorized.status_code == resp.status assert web.HTTPUnauthorized.status_code == resp.status
@@ -232,97 +194,38 @@ async def test_check_authorized(loop, aiohttp_client):
assert web.HTTPUnauthorized.status_code == resp.status assert web.HTTPUnauthorized.status_code == resp.status
async def test_has_permission(loop, aiohttp_client): async def test_has_permission(loop, test_client):
with pytest.warns(DeprecationWarning):
@has_permission('read')
async def index_read(request):
return web.Response()
@has_permission('write')
async def index_write(request):
return web.Response()
@has_permission('forbid')
async def index_forbid(request):
return web.Response()
async def login(request):
response = web.HTTPFound(location='/')
await remember(request, response, 'UserID')
return response
async def logout(request):
response = web.HTTPFound(location='/')
await forget(request, response)
raise response
app = web.Application()
_setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/permission/read', index_read)
app.router.add_route('GET', '/permission/write', index_write)
app.router.add_route('GET', '/permission/forbid', index_forbid)
app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app)
resp = await client.get('/permission/read')
assert web.HTTPUnauthorized.status_code == resp.status
resp = await client.get('/permission/write')
assert web.HTTPUnauthorized.status_code == resp.status
resp = await client.get('/permission/forbid')
assert web.HTTPUnauthorized.status_code == resp.status
await client.post('/login')
resp = await client.get('/permission/read')
assert web.HTTPOk.status_code == resp.status
resp = await client.get('/permission/write')
assert web.HTTPOk.status_code == resp.status
resp = await client.get('/permission/forbid')
assert web.HTTPForbidden.status_code == resp.status
await client.post('/logout')
resp = await client.get('/permission/read')
assert web.HTTPUnauthorized.status_code == resp.status
resp = await client.get('/permission/write')
assert web.HTTPUnauthorized.status_code == resp.status
resp = await client.get('/permission/forbid')
assert web.HTTPUnauthorized.status_code == resp.status
async def test_check_permission(loop, aiohttp_client):
@has_permission('read')
async def index_read(request): async def index_read(request):
await check_permission(request, 'read') return web.HTTPOk()
return web.Response()
@has_permission('write')
async def index_write(request): async def index_write(request):
await check_permission(request, 'write') return web.HTTPOk()
return web.Response()
@has_permission('forbid')
async def index_forbid(request): async def index_forbid(request):
await check_permission(request, 'forbid') return web.HTTPOk()
return web.Response()
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'UserID') await remember(request, response, 'UserID')
raise response return response
async def logout(request): async def logout(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await forget(request, response) await forget(request, response)
raise response return response
app = web.Application() app = web.Application(loop=loop)
_setup(app, CookiesIdentityPolicy(), Autz()) _setup(app, CookiesIdentityPolicy(), Autz())
app.router.add_route('GET', '/permission/read', index_read) app.router.add_route('GET', '/permission/read', index_read)
app.router.add_route('GET', '/permission/write', index_write) app.router.add_route('GET', '/permission/write', index_write)
app.router.add_route('GET', '/permission/forbid', index_forbid) app.router.add_route('GET', '/permission/forbid', index_forbid)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout) app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/permission/read') resp = await client.get('/permission/read')
assert web.HTTPUnauthorized.status_code == resp.status assert web.HTTPUnauthorized.status_code == resp.status

View File

@@ -1,82 +0,0 @@
import jwt
import pytest
from aiohttp import web
from aiohttp_security import setup as _setup
from aiohttp_security import AbstractAuthorizationPolicy
from aiohttp_security.api import IDENTITY_KEY
from aiohttp_security.jwt_identity import JWTIdentityPolicy
@pytest.fixture
def make_token():
def factory(payload, secret):
return jwt.encode(
payload,
secret,
algorithm='HS256',
)
return factory
class Autz(AbstractAuthorizationPolicy):
async def permits(self, identity, permission, context=None):
pass
async def authorized_userid(self, identity):
pass
async def test_no_pyjwt_installed(mocker):
mocker.patch('aiohttp_security.jwt_identity.jwt', None)
with pytest.raises(RuntimeError):
JWTIdentityPolicy('secret')
async def test_identify(loop, make_token, aiohttp_client):
kwt_secret_key = 'Key'
token = make_token({'login': 'Andrew'}, kwt_secret_key)
async def check(request):
policy = request.app[IDENTITY_KEY]
identity = await policy.identify(request)
assert 'Andrew' == identity['login']
return web.Response()
app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
client = await aiohttp_client(app)
headers = {'Authorization': 'Bearer {}'.format(token.decode('utf-8'))}
resp = await client.get('/', headers=headers)
assert 200 == resp.status
async def test_identify_broken_scheme(loop, make_token, aiohttp_client):
kwt_secret_key = 'Key'
token = make_token({'login': 'Andrew'}, kwt_secret_key)
async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except ValueError as exc:
raise web.HTTPBadRequest(reason=str(exc))
return web.Response()
app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
client = await aiohttp_client(app)
headers = {'Authorization': 'Token {}'.format(token.decode('utf-8'))}
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason

View File

@@ -2,21 +2,21 @@ from aiohttp import web
from aiohttp_security import authorized_userid, permits from aiohttp_security import authorized_userid, permits
async def test_authorized_userid(loop, aiohttp_client): async def test_authorized_userid(loop, test_client):
async def check(request): async def check(request):
userid = await authorized_userid(request) userid = await authorized_userid(request)
assert userid is None assert userid is None
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status
async def test_permits(loop, aiohttp_client): async def test_permits(loop, test_client):
async def check(request): async def check(request):
ret = await permits(request, 'read') ret = await permits(request, 'read')
@@ -27,8 +27,8 @@ async def test_permits(loop, aiohttp_client):
assert ret assert ret
return web.Response() return web.Response()
app = web.Application() app = web.Application(loop=loop)
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status

View File

@@ -2,15 +2,15 @@ from aiohttp import web
from aiohttp_security import remember, forget from aiohttp_security import remember, forget
async def test_remember(loop, aiohttp_client): async def test_remember(loop, test_client):
async def do_remember(request): async def do_remember(request):
response = web.Response() response = web.Response()
await remember(request, response, 'Andrew') await remember(request, response, 'Andrew')
app = web.Application() app = web.Application(loop=loop)
app.router.add_route('POST', '/', do_remember) app.router.add_route('POST', '/', do_remember)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/') resp = await client.post('/')
assert 500 == resp.status assert 500 == resp.status
assert (('Security subsystem is not initialized, ' assert (('Security subsystem is not initialized, '
@@ -18,15 +18,15 @@ async def test_remember(loop, aiohttp_client):
resp.reason) resp.reason)
async def test_forget(loop, aiohttp_client): async def test_forget(loop, test_client):
async def do_forget(request): async def do_forget(request):
response = web.Response() response = web.Response()
await forget(request, response) await forget(request, response)
app = web.Application() app = web.Application(loop=loop)
app.router.add_route('POST', '/', do_forget) app.router.add_route('POST', '/', do_forget)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/') resp = await client.post('/')
assert 500 == resp.status assert 500 == resp.status
assert (('Security subsystem is not initialized, ' assert (('Security subsystem is not initialized, '

View File

@@ -20,14 +20,14 @@ class Autz(AbstractAuthorizationPolicy):
@pytest.fixture @pytest.fixture
def make_app(): def make_app(loop):
app = web.Application() app = web.Application(loop=loop)
setup_session(app, SimpleCookieStorage()) setup_session(app, SimpleCookieStorage())
setup_security(app, SessionIdentityPolicy(), Autz()) setup_security(app, SessionIdentityPolicy(), Autz())
return app return app
async def test_remember(make_app, aiohttp_client): async def test_remember(make_app, test_client):
async def handler(request): async def handler(request):
response = web.Response() response = web.Response()
@@ -37,12 +37,12 @@ async def test_remember(make_app, aiohttp_client):
async def check(request): async def check(request):
session = await get_session(request) session = await get_session(request)
assert session['AIOHTTP_SECURITY'] == 'Andrew' assert session['AIOHTTP_SECURITY'] == 'Andrew'
return web.Response() return web.HTTPOk()
app = make_app() app = make_app()
app.router.add_route('GET', '/', handler) app.router.add_route('GET', '/', handler)
app.router.add_route('GET', '/check', check) app.router.add_route('GET', '/check', check)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.get('/') resp = await client.get('/')
assert 200 == resp.status assert 200 == resp.status
@@ -50,7 +50,7 @@ async def test_remember(make_app, aiohttp_client):
assert 200 == resp.status assert 200 == resp.status
async def test_identify(make_app, aiohttp_client): async def test_identify(make_app, test_client):
async def create(request): async def create(request):
response = web.Response() response = web.Response()
@@ -66,7 +66,7 @@ async def test_identify(make_app, aiohttp_client):
app = make_app() app = make_app()
app.router.add_route('GET', '/', check) app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create) app.router.add_route('POST', '/', create)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/') resp = await client.post('/')
assert 200 == resp.status assert 200 == resp.status
@@ -74,28 +74,28 @@ async def test_identify(make_app, aiohttp_client):
assert 200 == resp.status assert 200 == resp.status
async def test_forget(make_app, aiohttp_client): async def test_forget(make_app, test_client):
async def index(request): async def index(request):
session = await get_session(request) session = await get_session(request)
return web.Response(text=session.get('AIOHTTP_SECURITY', '')) return web.HTTPOk(text=session.get('AIOHTTP_SECURITY', ''))
async def login(request): async def login(request):
response = web.HTTPFound(location='/') response = web.HTTPFound(location='/')
await remember(request, response, 'Andrew') await remember(request, response, 'Andrew')
raise response return response
async def logout(request): async def logout(request):
response = web.HTTPFound('/') response = web.HTTPFound('/')
await forget(request, response) await forget(request, response)
raise response return response
app = make_app() app = make_app()
app.router.add_route('GET', '/', index) app.router.add_route('GET', '/', index)
app.router.add_route('POST', '/login', login) app.router.add_route('POST', '/login', login)
app.router.add_route('POST', '/logout', logout) app.router.add_route('POST', '/logout', logout)
client = await aiohttp_client(app) client = await test_client(app)
resp = await client.post('/login') resp = await client.post('/login')
assert 200 == resp.status assert 200 == resp.status