aiohttp-security/aiohttp_security/jwt_identity.py

46 lines
1.2 KiB
Python
Raw Normal View History

2018-04-25 20:52:36 +00:00
"""Identity policy for storing info in the jwt token.
"""
from .abc import AbstractIdentityPolicy
2018-04-25 20:52:36 +00:00
try:
import jwt
except ImportError: # pragma: no cover
jwt = None
AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '
2018-04-25 20:52:36 +00:00
class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret, algorithm='HS256'):
2018-04-25 20:52:36 +00:00
if jwt is None:
raise RuntimeError('Please install `PyJWT`')
2018-04-25 20:52:36 +00:00
self.secret = secret
self.algorithm = algorithm
2018-04-25 20:52:36 +00:00
async def identify(self, request):
header_identity = request.headers.get(AUTH_HEADER_NAME)
if header_identity is None:
return
if not header_identity.startswith(AUTH_SCHEME):
raise ValueError('Invalid authorization scheme. ' +
'Should be `Bearer <token>`')
token = header_identity.split(' ')[1].strip()
identity = jwt.decode(token,
2018-04-25 20:52:36 +00:00
self.secret,
2018-09-06 10:06:55 +00:00
algorithms=[self.algorithm])
return identity
2018-04-25 20:52:36 +00:00
async def remember(self, *args, **kwargs): # pragma: no cover
pass
async def forget(self, request, response): # pragma: no cover
pass