Compare commits

..

7 Commits

Author SHA1 Message Date
c3141df775 Check json 2022-02-23 10:25:19 +00:00
0217d98d4f Remove ref check 2022-02-23 10:24:26 +00:00
62be2d30f3 print json 2022-02-21 02:25:35 +00:00
0c113739ee Remove json request 2022-02-21 02:25:20 +00:00
a04f24fcff Change is to == 2022-02-20 05:53:10 +00:00
8c0872ea1a Remove ref check 2022-02-20 05:52:15 +00:00
de0a1809a4 Update readme 2021-07-28 21:55:02 +12:00
4 changed files with 14 additions and 27 deletions

View File

@@ -1,6 +1,6 @@
# Github Web Hook # Github Web Hook
Example of how to use Github weeb hooks using Fastapi Example of how to use Github web hooks using Fastapi
```pipenv sync``` ```pipenv sync```
@@ -8,5 +8,8 @@ Example of how to use Github weeb hooks using Fastapi
```cp .env.sample .env``` ```cp .env.sample .env```
```pipenv run dev```
```docker-compose up --build```

View File

@@ -3,20 +3,15 @@ import hmac
from fastapi import Request from fastapi import Request
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
async def check_ref(request: Request):
json = await request.json()
if json["ref"] and json["ref"] == f"refs/heads/{getenv('BRANCH')}":
return
raise HTTPException(status_code=202, detail="Invalid branch")
async def auth_hook(request: Request): async def auth_hook(request: Request):
try: try:
json = await request.json()
text = await request.body() text = await request.body()
json = await request.json()
except: except:
raise HTTPException(status_code=204, detail="Missing or bad content") raise HTTPException(status_code=204, detail="Missing or bad content")
header_signature = request.headers.get('X-Hub-Signature')
header_signature = request.headers.get('X-Hub-Signature')
if not header_signature: if not header_signature:
raise HTTPException(status_code=400, detail="Missing signature") raise HTTPException(status_code=400, detail="Missing signature")
@@ -33,12 +28,13 @@ async def auth_hook(request: Request):
mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1') mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1')
# verify the digest matches the signature # verify the digest matches the signature
print(f'{mac.hexdigest()} {signature}')
if not hmac.compare_digest(mac.hexdigest(), signature): if not hmac.compare_digest(mac.hexdigest(), signature):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
async def auth_web(request: Request): async def auth_web(request: Request):
token = request._query_params.get("token") token = request._query_params.get("token")
if token is None or token is "": if token == None or token == "":
raise HTTPException(status_code=400, detail="Missing token") raise HTTPException(status_code=400, detail="Missing token")
if token == getenv("TOKEN"): if token == getenv("TOKEN"):
return return

View File

@@ -1,14 +1,16 @@
from fastapi import FastAPI, Request, Depends from fastapi import FastAPI, Request, Depends
from fastapi_responses import custom_openapi from fastapi_responses import custom_openapi
from app.dependencies import auth_hook, auth_web, check_ref from app.dependencies import auth_hook, auth_web
app = FastAPI() app = FastAPI()
app.openapi = custom_openapi(app) app.openapi = custom_openapi(app)
@app.get("/", dependencies=[Depends(auth_web)]) @app.get("/", dependencies=[Depends(auth_web)])
@app.post("/", dependencies=[Depends(auth_hook), Depends(check_ref)]) @app.post("/", dependencies=[Depends(auth_hook)])
async def hook(req: Request): async def hook(req: Request):
json = await req.json()
print(json)
return "Update" return "Update"

View File

@@ -3,7 +3,7 @@ from fastapi import FastAPI, Request, Depends
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
import hmac import hmac
from app.main import app from app.main import app
from app.dependencies import auth_hook, auth_web, check_ref from app.dependencies import auth_hook, auth_web
from os import environ, getenv from os import environ, getenv
import json import json
@@ -18,10 +18,6 @@ client = TestClient(app)
async def auth_test_handler(request: Request): async def auth_test_handler(request: Request):
return 200 return 200
@app.post("/test_ref", dependencies=[Depends(check_ref)])
async def auth_test_handler(request: Request):
return 200
@app.get("/test_web", dependencies=[Depends(auth_web)]) @app.get("/test_web", dependencies=[Depends(auth_web)])
async def web_test_hnadler(request: Request): async def web_test_hnadler(request: Request):
return 200 return 200
@@ -50,16 +46,6 @@ def test_auth():
assert response.status_code == 403 assert response.status_code == 403
assert response.text == '{"detail":"Unauthorized"}' assert response.text == '{"detail":"Unauthorized"}'
def test_branch():
payload = {"ref": "refs/heads/master"}
response = client.post("/test_ref", json= payload)
assert response.status_code == 202
payload = {"ref": "refs/heads/test"}
response = client.post("/test_ref", json= payload)
assert response.status_code == 403
def test_web(): def test_web():
response = client.get('/test_web?token={}'.format(getenv("TOKEN"))) response = client.get('/test_web?token={}'.format(getenv("TOKEN")))
assert response.status_code == 200 assert response.status_code == 200