jwt/server.py

53 lines
1.4 KiB
Python

# examples/server_simple.py
from aiohttp import web
from aiohttp.web import middleware
import jwt
import rsa
with open('key', mode='rb') as f:
private_key = f.read()
with open('key.pub', mode='rb') as f:
public_key = f.read()
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
@middleware
async def auth(request, handler):
try:
scheme, token = request.headers['Authorization'].strip().split(' ')
except KeyError:
return web.HTTPUnauthorized(reason='Missing authorization header',)
except ValueError:
return web.HTTPForbidden(reason='Invalid authorization header',)
if scheme.lower() != 'bearer':
return web.HTTPForbidden(reason='Invalid token scheme',)
print(token)
try:
payload = jwt.decode(token, public_key, algorithms='RS256')
print(payload)
resp = await handler(request)
except jwt.InvalidTokenError:
print("Invalid token")
resp = web.Response(text="Invalid token", status=401)
return resp
def main():
token = jwt.encode({"servers": ["a", "b", "c"]}, private_key, algorithm="RS256")
print("==========Token==========")
print(len(token), token)
print("=========================")
app = web.Application(middlewares=[auth])
app.add_routes([web.get('/', handle)])
web.run_app(app)
if __name__ == '__main__':
main()