Compare commits
2 Commits
62be2d30f3
...
master
Author | SHA1 | Date | |
---|---|---|---|
c3141df775 | |||
0217d98d4f |
@@ -6,11 +6,12 @@ from fastapi.exceptions import HTTPException
|
|||||||
async def auth_hook(request: Request):
|
async def auth_hook(request: Request):
|
||||||
try:
|
try:
|
||||||
text = await request.body()
|
text = await request.body()
|
||||||
|
json = await request.json()
|
||||||
except:
|
except:
|
||||||
raise HTTPException(status_code=204, detail="Missing or bad content")
|
raise HTTPException(status_code=204, detail="Missing or bad content")
|
||||||
header_signature = request.headers.get('X-Hub-Signature')
|
|
||||||
|
|
||||||
|
|
||||||
|
header_signature = request.headers.get('X-Hub-Signature')
|
||||||
if not header_signature:
|
if not header_signature:
|
||||||
raise HTTPException(status_code=400, detail="Missing signature")
|
raise HTTPException(status_code=400, detail="Missing signature")
|
||||||
|
|
||||||
@@ -27,6 +28,7 @@ async def auth_hook(request: Request):
|
|||||||
mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1')
|
mac = hmac.new(secret_key.encode(), msg=text, digestmod='sha1')
|
||||||
|
|
||||||
# verify the digest matches the signature
|
# verify the digest matches the signature
|
||||||
|
print(f'{mac.hexdigest()} {signature}')
|
||||||
if not hmac.compare_digest(mac.hexdigest(), signature):
|
if not hmac.compare_digest(mac.hexdigest(), signature):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
|
@@ -46,16 +46,6 @@ def test_auth():
|
|||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
assert response.text == '{"detail":"Unauthorized"}'
|
assert response.text == '{"detail":"Unauthorized"}'
|
||||||
|
|
||||||
|
|
||||||
# def test_branch():
|
|
||||||
# payload = {"ref": "refs/heads/master"}
|
|
||||||
# response = client.post("/test_ref", json= payload)
|
|
||||||
# assert response.status_code == 202
|
|
||||||
|
|
||||||
# payload = {"ref": "refs/heads/test"}
|
|
||||||
# response = client.post("/test_ref", json= payload)
|
|
||||||
# assert response.status_code == 403
|
|
||||||
|
|
||||||
def test_web():
|
def test_web():
|
||||||
response = client.get('/test_web?token={}'.format(getenv("TOKEN")))
|
response = client.get('/test_web?token={}'.format(getenv("TOKEN")))
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
Reference in New Issue
Block a user