aiohttp-security/demo/db_auth.py

36 lines
1.1 KiB
Python
Raw Normal View History

2015-07-08 17:30:24 +00:00
import asyncio
2015-11-21 07:37:44 +00:00
import sqlalchemy as sa
2015-07-08 17:30:24 +00:00
2016-02-01 17:25:14 +00:00
from aiohttp_security.abc import AbstractAuthorizationPolicy
2015-07-08 17:30:24 +00:00
2015-11-21 07:37:44 +00:00
from . import db
2015-07-08 17:30:24 +00:00
2015-11-21 06:45:08 +00:00
class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
2015-11-26 18:09:00 +00:00
def __init__(self, dbengine):
self.dbengine = dbengine
2015-07-08 17:30:24 +00:00
2015-11-21 07:37:44 +00:00
@asyncio.coroutine
def authorized_user_id(self, identity):
with (yield from self.dbengine) as conn:
2015-11-21 07:37:44 +00:00
where = [db.users.c.login == identity,
not db.users.c.disabled]
query = db.users.count().where(sa.and_(*where))
ret = yield from conn.scalar(query)
if ret:
return identity
else:
return None
2015-07-08 17:30:24 +00:00
@asyncio.coroutine
def permits(self, identity, permission, context=None):
with (yield from self.dbengine) as conn:
2015-11-21 07:37:44 +00:00
where = [db.users.c.login == identity,
not db.users.c.disabled]
2015-07-08 17:30:24 +00:00
record = self.data.get(identity)
if record is not None:
# TODO: implement actual permission checker
if permission in record:
return True
return False