Replace the Microdot-only entrypoint with a CombinedASGI app that handles FastAPI routes (audio API, websocket, dev live-reload) while delegating the rest to Microdot. Suppress noisy /__dev/ access logs during live-reload polling. Co-authored-by: Cursor <cursoragent@cursor.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
"""ASGI bridge for existing Microdot route handlers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from microdot.microdot import Microdot, NoCaseDict, Request, Response
|
|
|
|
|
|
class MicrodotASGI:
|
|
"""Dispatch HTTP requests to a :class:`Microdot` application."""
|
|
|
|
def __init__(self, microdot_app: Microdot):
|
|
self.app = microdot_app
|
|
|
|
async def __call__(self, scope: dict, receive: Any, send: Any) -> None:
|
|
if scope.get("type") != "http":
|
|
return
|
|
|
|
body = b""
|
|
while True:
|
|
message = await receive()
|
|
if message["type"] != "http.request":
|
|
continue
|
|
body += message.get("body", b"")
|
|
if not message.get("more_body"):
|
|
break
|
|
|
|
headers = NoCaseDict()
|
|
for key, value in scope.get("headers", ()):
|
|
headers[key.decode("latin-1")] = value.decode("latin-1")
|
|
|
|
path = scope.get("path", "/") or "/"
|
|
query = scope.get("query_string", b"").decode("latin-1")
|
|
url = path + (f"?{query}" if query else "")
|
|
|
|
client = scope.get("client") or ("127.0.0.1", 0)
|
|
req = Request(
|
|
self.app,
|
|
client,
|
|
scope.get("method", "GET"),
|
|
url,
|
|
"1.1",
|
|
headers,
|
|
body=body,
|
|
)
|
|
|
|
res = await self.app.dispatch_request(req)
|
|
if res is Response.already_handled:
|
|
return
|
|
await _send_microdot_response(res, send)
|
|
|
|
|
|
async def _send_microdot_response(res: Response, send: Any) -> None:
|
|
res.complete()
|
|
headers: list[tuple[bytes, bytes]] = []
|
|
for header, value in res.headers.items():
|
|
values = value if isinstance(value, list) else [value]
|
|
for item in values:
|
|
headers.append(
|
|
(header.lower().encode("latin-1"), str(item).encode("latin-1"))
|
|
)
|
|
|
|
body = res.body
|
|
if isinstance(body, str):
|
|
payload = body.encode()
|
|
elif isinstance(body, bytes):
|
|
payload = body
|
|
else:
|
|
parts: list[bytes] = []
|
|
async for chunk in res.body_iter():
|
|
if isinstance(chunk, str):
|
|
chunk = chunk.encode()
|
|
parts.append(chunk)
|
|
payload = b"".join(parts)
|
|
|
|
await send(
|
|
{
|
|
"type": "http.response.start",
|
|
"status": res.status_code,
|
|
"headers": headers,
|
|
}
|
|
)
|
|
await send({"type": "http.response.body", "body": payload})
|