Restructure

This commit is contained in:
2021-07-28 21:51:48 +12:00
parent 683a4b487e
commit 3ff1800869
6 changed files with 29 additions and 44 deletions

0
app/__init__.py Normal file
View File

45
app/dependencies.py Normal file
View File

@@ -0,0 +1,45 @@
from os import getenv
import hmac
from fastapi import Request
from fastapi.exceptions import HTTPException
async def check_ref(request: Request):
json = await request.json()
if json["ref"] and json["ref"] == f"refs/heads/{getenv('BRANCH')}":
return
raise HTTPException(status_code=202, detail="Invalid branch")
async def auth_hook(request: Request):
try:
json = await request.json()
text = await request.body()
except:
raise HTTPException(status_code=204, detail="Missing or bad content")
header_signature = request.headers.get('X-Hub-Signature')
if not header_signature:
raise HTTPException(status_code=400, detail="Missing signature")
# separate the signature from the sha1 indication
sha_name, signature = header_signature.split('=')
if sha_name != 'sha1':
raise HTTPException(status_code=400, detail="Invalid signature")
secret_key = getenv('WEBHOOK_SECRET')
if secret_key is None:
raise HTTPException(status_code=503, detail="Missing WEBHOOK_SECRET")
# create a new hmac with the secret key and the request data
mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1')
# verify the digest matches the signature
if not hmac.compare_digest(mac.hexdigest(), signature):
raise HTTPException(status_code=403, detail="Unauthorized")
async def auth_web(request: Request):
token = request._query_params.get("token")
if token is None or token is "":
raise HTTPException(status_code=400, detail="Missing token")
if token == getenv("TOKEN"):
return
raise HTTPException(status_code=403, detail="Invalid token")

14
app/main.py Normal file
View File

@@ -0,0 +1,14 @@
from fastapi import FastAPI, Request, Depends
from fastapi_responses import custom_openapi
from app.dependencies import auth_hook, auth_web, check_ref
app = FastAPI()
app.openapi = custom_openapi(app)
@app.get("/", dependencies=[Depends(auth_web)])
@app.post("/", dependencies=[Depends(auth_hook), Depends(check_ref)])
async def hook(req: Request):
return "Update"

3
app/test/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

71
app/test/test.py Normal file
View File

@@ -0,0 +1,71 @@
from fastapi import FastAPI, Request, Depends
from fastapi.testclient import TestClient
import hmac
from app.main import app
from app.dependencies import auth_hook, auth_web, check_ref
from os import environ, getenv
import json
environ['WEBHOOK_SECRET'] = "dfsgdsjghhgdaehlsdfjhjkdh"
environ["BRANCH"] = "master"
environ["TOKEN"] = "assdcvfgvh"
secret_key = getenv('WEBHOOK_SECRET')
client = TestClient(app)
@app.post("/test_auth", dependencies=[Depends(auth_hook)])
async def auth_test_handler(request: Request):
return 200
@app.post("/test_ref", dependencies=[Depends(check_ref)])
async def auth_test_handler(request: Request):
return 200
@app.get("/test_web", dependencies=[Depends(auth_web)])
async def web_test_hnadler(request: Request):
return 200
def test_auth():
payload = {"Hello":"World"}
msg = json.dumps(payload).encode()
mac = hmac.new(secret_key.encode(), msg=msg, digestmod='sha1').hexdigest()
response = client.post("/test_auth", json= payload, headers={"X-Hub-Signature": "sha1="+mac})
assert response.status_code == 200
response = client.post("/test_auth", headers={"X-Hub-Signature": "sha1="+mac})
assert response.status_code == 204
assert response.text == '{"detail":"Missing or bad content"}'
response = client.post("/test_auth", json= payload, headers={"X-Hub-Signature": "sha="+mac})
assert response.status_code == 400
assert response.text == '{"detail":"Invalid signature"}'
response = client.post("/test_auth", json=payload)
assert response.status_code == 400
assert response.text == '{"detail":"Missing signature"}'
response = client.post("/test_auth", json= payload, headers={"X-Hub-Signature": "sha1="+mac+"a"})
assert response.status_code == 403
assert response.text == '{"detail":"Unauthorized"}'
def test_branch():
payload = {"ref": "refs/heads/master"}
response = client.post("/test_ref", json= payload)
assert response.status_code == 202
payload = {"ref": "refs/heads/test"}
response = client.post("/test_ref", json= payload)
assert response.status_code == 403
def test_web():
response = client.get('/test_web?token={}'.format(getenv("TOKEN")))
assert response.status_code == 200
response = client.get('/test_web')
assert response.status_code == 400
response = client.get('/test_web?token=a')
assert response.status_code == 403