JWTIdentityPolicy polishing (#161)

* Polishing JWT policy

* Simplify tests

* Minor cleaning
This commit is contained in:
Oleh Kuchuk 2018-05-21 22:07:14 +03:00 committed by Andrew Svetlov
parent ff2171d6c5
commit 42769df454
2 changed files with 65 additions and 28 deletions

View File

@ -3,6 +3,7 @@
"""
from .abc import AbstractIdentityPolicy
try:
import jwt
except ImportError: # pragma: no cover
@ -10,22 +11,32 @@ except ImportError: # pragma: no cover
AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '
class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret, algorithm=None):
def __init__(self, secret, algorithm='HS256'):
if jwt is None:
raise RuntimeError("Please install pyjwt")
raise RuntimeError('Please install `PyJWT`')
self.secret = secret
self.algorithm = 'HS256' if algorithm is None else algorithm
self.algorithm = algorithm
async def identify(self, request):
header_identity = request.headers.get(AUTH_HEADER_NAME)
identity = jwt.decode(header_identity,
if header_identity is None:
return
if not header_identity.startswith(AUTH_SCHEME):
raise ValueError('Invalid authorization scheme. ' +
'Should be `Bearer <token>`')
token = header_identity.split(' ')[1].strip()
identity = jwt.decode(token,
self.secret,
algorithm=self.algorithm)
return identity['identity']
return identity
async def remember(self, *args, **kwargs): # pragma: no cover
pass

View File

@ -1,10 +1,23 @@
import jwt
import pytest
from aiohttp import web
from aiohttp_security import AbstractAuthorizationPolicy
from aiohttp_security import setup as _setup
from aiohttp_security.jwt_identity import JWTIdentityPolicy
from aiohttp_security import AbstractAuthorizationPolicy
from aiohttp_security.api import IDENTITY_KEY
import jwt
from aiohttp_security.jwt_identity import JWTIdentityPolicy
@pytest.fixture
def make_token():
def factory(payload, secret):
return jwt.encode(
payload,
secret,
algorithm='HS256',
)
return factory
class Autz(AbstractAuthorizationPolicy):
@ -22,35 +35,48 @@ async def test_no_pyjwt_installed(mocker):
JWTIdentityPolicy('secret')
async def test_identify(loop, test_client):
async def test_identify(loop, make_token, test_client):
kwt_secret_key = 'Key'
async def create(request):
response = web.Response()
data = await request.post()
encoded_identity = jwt.encode({'identity': data['login']},
kwt_secret_key,
algorithm='HS256')
response.text = encoded_identity.decode('utf-8')
return response
token = make_token({'login': 'Andrew'}, kwt_secret_key)
async def check(request):
policy = request.app[IDENTITY_KEY]
user_id = await policy.identify(request)
assert 'Andrew' == user_id
identity = await policy.identify(request)
assert 'Andrew' == identity['login']
return web.Response()
app = web.Application(loop=loop)
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
app.router.add_route('POST', '/', create)
client = await test_client(app)
resp = await client.post('/', data={'login': 'Andrew'})
jwt_token = await resp.content.read()
assert 200 == resp.status
await resp.release()
headers = {'Authorization': str(jwt_token.decode('utf-8'))}
headers = {'Authorization': 'Bearer {}'.format(token.decode('utf-8'))}
resp = await client.get('/', headers=headers)
assert 200 == resp.status
async def test_identify_broken_scheme(loop, make_token, test_client):
kwt_secret_key = 'Key'
token = make_token({'login': 'Andrew'}, kwt_secret_key)
async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except ValueError as exc:
raise web.HTTPBadRequest(reason=exc)
return web.Response()
app = web.Application(loop=loop)
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)
client = await test_client(app)
headers = {'Authorization': 'Token {}'.format(token.decode('utf-8'))}
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason