webhook/app/dependencies.py

41 lines
1.5 KiB
Python
Raw Normal View History

2021-07-28 09:51:48 +00:00
from os import getenv
2021-07-26 09:01:43 +00:00
import hmac
from fastapi import Request
from fastapi.exceptions import HTTPException
async def auth_hook(request: Request):
try:
text = await request.body()
2022-02-23 10:25:19 +00:00
json = await request.json()
2021-07-26 09:01:43 +00:00
except:
raise HTTPException(status_code=204, detail="Missing or bad content")
2022-02-23 10:25:19 +00:00
header_signature = request.headers.get('X-Hub-Signature')
2021-07-26 09:01:43 +00:00
if not header_signature:
raise HTTPException(status_code=400, detail="Missing signature")
# separate the signature from the sha1 indication
sha_name, signature = header_signature.split('=')
if sha_name != 'sha1':
raise HTTPException(status_code=400, detail="Invalid signature")
2021-07-28 09:51:48 +00:00
secret_key = getenv('WEBHOOK_SECRET')
2021-07-26 09:01:43 +00:00
if secret_key is None:
raise HTTPException(status_code=503, detail="Missing WEBHOOK_SECRET")
# create a new hmac with the secret key and the request data
mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1')
# verify the digest matches the signature
2022-02-23 10:25:19 +00:00
print(f'{mac.hexdigest()} {signature}')
2021-07-26 09:01:43 +00:00
if not hmac.compare_digest(mac.hexdigest(), signature):
raise HTTPException(status_code=403, detail="Unauthorized")
async def auth_web(request: Request):
token = request._query_params.get("token")
2022-02-20 05:53:10 +00:00
if token == None or token == "":
2021-07-26 09:01:43 +00:00
raise HTTPException(status_code=400, detail="Missing token")
2021-07-28 09:51:48 +00:00
if token == getenv("TOKEN"):
2021-07-26 09:01:43 +00:00
return
raise HTTPException(status_code=403, detail="Invalid token")