This commit is contained in:
Rick Voormolen 2021-01-14 13:51:19 +00:00 committed by GitHub
commit d40489cd6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 102 additions and 2 deletions

View File

@ -0,0 +1,5 @@
from .abc import AbstractAuthorizationPolicy as AbstractAuthorizationPolicy, AbstractIdentityPolicy as AbstractIdentityPolicy
from .api import authorized_userid as authorized_userid, check_authorized as check_authorized, check_permission as check_permission, forget as forget, has_permission as has_permission, is_anonymous as is_anonymous, login_required as login_required, permits as permits, remember as remember, setup as setup
from .cookies_identity import CookiesIdentityPolicy as CookiesIdentityPolicy
from .jwt_identity import JWTIdentityPolicy as JWTIdentityPolicy
from .session_identity import SessionIdentityPolicy as SessionIdentityPolicy

25
aiohttp_security/abc.pyi Normal file
View File

@ -0,0 +1,25 @@
import abc
import enum
from typing import Any, Optional, Union
import aiohttp.web
Context = object
OptionalContext = Optional[Context]
Permission = Union[str, enum.Enum]
UserId = str
Identity = str
class AbstractIdentityPolicy(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def identify(self, request: aiohttp.web.Request) -> Optional[Identity]: ...
@abc.abstractmethod
async def remember(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse, identity: Identity, **kwargs: Any) -> None: ...
@abc.abstractmethod
async def forget(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse) -> None: ...
class AbstractAuthorizationPolicy(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def permits(self, identity: Identity, permission: Permission, context: OptionalContext = ...) -> bool: ...
@abc.abstractmethod
async def authorized_userid(self, identity: Identity) -> Optional[UserId]: ...

28
aiohttp_security/api.pyi Normal file
View File

@ -0,0 +1,28 @@
from typing import Any, Optional, Callable, TypeVar
import aiohttp.web
from .abc import AbstractAuthorizationPolicy as AbstractAuthorizationPolicy
from .abc import AbstractIdentityPolicy as AbstractIdentityPolicy
from .abc import Permission as Permission
from .abc import UserId as UserId
from .abc import OptionalContext as OptionalContext
from .abc import Identity as Identity
IDENTITY_KEY: str
AUTZ_KEY: str
_Fn = TypeVar("_Fn")
async def remember(request: aiohttp.web.Request, response: aiohttp.web.StreamResponse, identity: Identity, **kwargs: Any) -> None: ...
async def forget(request: aiohttp.web.Request, response: aiohttp.web.StreamResponse) -> None: ...
async def authorized_userid(request: aiohttp.web.Request) -> Optional[UserId]: ...
async def permits(request: aiohttp.web.Request, permission: Permission, context: OptionalContext = ...) -> bool: ...
async def is_anonymous(request: aiohttp.web.Request) -> bool: ...
async def check_authorized(request: aiohttp.web.Request) -> UserId: ...
async def check_permission(request: aiohttp.web.Request, permission: Permission, context: OptionalContext = ...) -> None: ...
def setup(app: aiohttp.web.Application, identity_policy: AbstractIdentityPolicy, autz_policy: AbstractAuthorizationPolicy) -> None: ...
# Deprecated since 0.3
def login_required(fn: _Fn) -> _Fn: ...
def has_permission(permission: Permission, context: OptionalContext = ...) -> Callable[[_Fn], _Fn]: ...

View File

@ -4,11 +4,11 @@ Use mostly for demonstration purposes, SessionIdentityPolicy is much
more handy.
"""
from .abc import AbstractIdentityPolicy
sentinel = object()
class sentinel:
pass
class CookiesIdentityPolicy(AbstractIdentityPolicy):

View File

@ -0,0 +1,12 @@
from .abc import AbstractIdentityPolicy, Identity
from typing import Any, Union
import aiohttp.web
class sentinel: ...
class CookiesIdentityPolicy(AbstractIdentityPolicy):
def __init__(self) -> None: ...
async def identify(self, request: aiohttp.web.Request) -> Identity: ...
async def remember(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse, identity: Identity, max_age: Union[sentinel, int]=..., **kwargs: Any) -> None: ...
async def forget(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse) -> None: ...

View File

@ -0,0 +1,17 @@
from typing import Any, Optional, Union
import aiohttp.web
from cryptography.hazmat.primitives.asymmetric import rsa
from .abc import AbstractIdentityPolicy, Identity
AUTH_HEADER_NAME: str
AUTH_SCHEME: str
class JWTIdentityPolicy(AbstractIdentityPolicy):
secret: Union[str, bytes, rsa.RSAPublicKey, rsa.RSAPrivateKey] = ...
algorithm: str = ...
def __init__(self, secret: Union[str, bytes, rsa.RSAPublicKey, rsa.RSAPrivateKey], algorithm: str = ...) -> None: ...
async def identify(self, request: aiohttp.web.Request) -> Optional[Identity]: ...
async def remember(self, *args: Any, **kwargs: Any) -> None: ...
async def forget(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse) -> None: ...

View File

@ -0,0 +1,13 @@
from typing import Any, Optional
import aiohttp.web
from .abc import AbstractIdentityPolicy, Identity
HAS_AIOHTTP_SESSION: bool
class SessionIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, session_key: str = ...) -> None: ...
async def identify(self, request: aiohttp.web.Request) -> Optional[Identity]: ...
async def remember(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse, identity: Identity, **kwargs: Any) -> None: ...
async def forget(self, request: aiohttp.web.Request, response: aiohttp.web.StreamResponse) -> None: ...