aiohttp-security/tests/test_jwt_identity.py

83 lines
2.2 KiB
Python
Raw Normal View History

import jwt
2018-04-25 20:52:36 +00:00
import pytest
from aiohttp import web
2018-04-25 20:52:36 +00:00
from aiohttp_security import setup as _setup
from aiohttp_security import AbstractAuthorizationPolicy
2018-04-25 20:52:36 +00:00
from aiohttp_security.api import IDENTITY_KEY
from aiohttp_security.jwt_identity import JWTIdentityPolicy
@pytest.fixture
def make_token():
def factory(payload, secret):
return jwt.encode(
payload,
secret,
algorithm='HS256',
)
return factory
2018-04-25 20:52:36 +00:00
class Autz(AbstractAuthorizationPolicy):
async def permits(self, identity, permission, context=None):
pass
async def authorized_userid(self, identity):
pass
async def test_no_pyjwt_installed(mocker):
mocker.patch('aiohttp_security.jwt_identity.jwt', None)
with pytest.raises(RuntimeError):
JWTIdentityPolicy('secret')
2018-09-06 10:06:55 +00:00
async def test_identify(loop, make_token, aiohttp_client):
2018-04-25 20:52:36 +00:00
kwt_secret_key = 'Key'
token = make_token({'login': 'Andrew'}, kwt_secret_key)
2018-04-25 20:52:36 +00:00
async def check(request):
policy = request.app[IDENTITY_KEY]
identity = await policy.identify(request)
assert 'Andrew' == identity['login']
return web.Response()
2018-04-25 20:52:36 +00:00
2018-09-06 10:06:55 +00:00
app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
2018-09-06 10:06:55 +00:00
client = await aiohttp_client(app)
headers = {'Authorization': 'Bearer {}'.format(token.decode('utf-8'))}
resp = await client.get('/', headers=headers)
assert 200 == resp.status
2018-09-06 10:06:55 +00:00
async def test_identify_broken_scheme(loop, make_token, aiohttp_client):
kwt_secret_key = 'Key'
token = make_token({'login': 'Andrew'}, kwt_secret_key)
2018-04-25 20:52:36 +00:00
async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except ValueError as exc:
2020-12-18 17:58:38 +00:00
raise web.HTTPBadRequest(reason=str(exc))
2018-04-25 20:52:36 +00:00
return web.Response()
2018-09-06 10:06:55 +00:00
app = web.Application()
2018-04-25 20:52:36 +00:00
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
2018-09-06 10:06:55 +00:00
client = await aiohttp_client(app)
headers = {'Authorization': 'Token {}'.format(token.decode('utf-8'))}
2018-04-25 20:52:36 +00:00
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason