feat(esp32): pattern upload route and ws controller ip

Made-with: Cursor
This commit is contained in:
2026-04-19 23:27:33 +12:00
parent a2cd2f8dc2
commit 5a8866add7

View File

@@ -3,11 +3,16 @@ from machine import WDT
import network import network
import utime import utime
import asyncio import asyncio
import json
from microdot import Microdot from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
from controller_messages import process_data from controller_messages import process_data
from hello import broadcast_hello_udp from hello import broadcast_hello_udp
try:
import uos as os
except ImportError:
import os
settings = Settings() settings = Settings()
print(settings) print(settings)
@@ -38,24 +43,111 @@ print(sta_if.ifconfig())
app = Microdot() app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws") @app.route("/ws")
@with_websocket @with_websocket
async def ws_handler(request, ws): async def ws_handler(request, ws):
print("WS client connected") print("WS client connected")
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
print("WS controller_ip:", controller_ip)
try: try:
while True: while True:
data = await ws.receive() data = await ws.receive()
if not data: if not data:
print("WS client disconnected (closed)") print("WS client disconnected (closed)")
break break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data) print(data)
process_data(data, settings, presets) process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e: except WebSocketError as e:
print("WS client disconnected:", e) print("WS client disconnected:", e)
except OSError as e: except OSError as e:
print("WS client dropped (OSError):", e) print("WS client dropped (OSError):", e)
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
print("patterns/upload body_bytes:", len(body))
try:
code = body.decode("utf-8")
except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
print("patterns/upload writing:", path)
with open(path, "w") as f:
f.write(code)
if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def presets_loop(): async def presets_loop():
while True: while True:
presets.tick() presets.tick()