aiohttp-security/demo/database_auth/db_auth.py

63 lines
2.4 KiB
Python
Raw Permalink Normal View History

2020-12-18 17:58:38 +00:00
from enum import Enum
from typing import Any, Optional, Union
2015-07-08 17:30:24 +00:00
2020-12-18 17:58:38 +00:00
import sqlalchemy as sa
2016-02-01 17:25:14 +00:00
from aiohttp_security.abc import AbstractAuthorizationPolicy
from passlib.hash import sha256_crypt
2015-07-08 17:30:24 +00:00
2015-11-21 07:37:44 +00:00
from . import db
2015-07-08 17:30:24 +00:00
2015-11-21 06:45:08 +00:00
class DBAuthorizationPolicy(AbstractAuthorizationPolicy):
2020-12-18 17:58:38 +00:00
def __init__(self, dbengine: Any):
2015-11-26 18:09:00 +00:00
self.dbengine = dbengine
2015-07-08 17:30:24 +00:00
2020-12-18 17:58:38 +00:00
async def authorized_userid(self, identity: str) -> Optional[str]:
2018-06-13 20:19:25 +00:00
async with self.dbengine.acquire() as conn:
where = sa.and_(db.users.c.login == identity,
2020-12-18 17:58:38 +00:00
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call]
query = db.users.count().where(where)
2017-12-13 14:51:46 +00:00
ret = await conn.scalar(query)
2015-11-21 07:37:44 +00:00
if ret:
return identity
else:
return None
2020-12-18 17:58:38 +00:00
async def permits(self, identity: str, permission: Union[str, Enum],
context: None = None) -> bool:
2018-06-13 20:19:25 +00:00
async with self.dbengine.acquire() as conn:
where = sa.and_(db.users.c.login == identity,
2020-12-18 17:58:38 +00:00
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call]
query = db.users.select().where(where)
2017-12-13 14:51:46 +00:00
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
user_id = user[0]
is_superuser = user[3]
if is_superuser:
return True
where = db.permissions.c.user_id == user_id
query = db.permissions.select().where(where)
2017-12-13 14:51:46 +00:00
ret = await conn.execute(query)
result = await ret.fetchall()
if ret is not None:
for record in result:
if record.perm_name == permission:
return True
return False
2020-12-18 17:58:38 +00:00
async def check_credentials(db_engine: Any, username: str, password: str) -> bool:
2018-06-13 20:19:25 +00:00
async with db_engine.acquire() as conn:
where = sa.and_(db.users.c.login == username,
2020-12-18 17:58:38 +00:00
sa.not_(db.users.c.disabled)) # type: ignore[no-untyped-call]
query = db.users.select().where(where)
2017-12-13 14:51:46 +00:00
ret = await conn.execute(query)
user = await ret.fetchone()
if user is not None:
2019-12-10 21:10:54 +00:00
hashed = user[2]
2020-12-18 17:58:38 +00:00
return sha256_crypt.verify(password, hashed) # type: ignore[no-any-return]
return False