From 5a8866add76678681b4d73882e03b0399e28ca87 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Sun, 19 Apr 2026 23:27:33 +1200 Subject: [PATCH] feat(esp32): pattern upload route and ws controller ip Made-with: Cursor --- src/main.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index a3dcb21..43d333e 100644 --- a/src/main.py +++ b/src/main.py @@ -3,11 +3,16 @@ from machine import WDT import network import utime import asyncio +import json from microdot import Microdot from microdot.websocket import WebSocketError, with_websocket from presets import Presets from controller_messages import process_data from hello import broadcast_hello_udp +try: + import uos as os +except ImportError: + import os settings = Settings() print(settings) @@ -38,24 +43,111 @@ print(sta_if.ifconfig()) app = Microdot() +def _safe_pattern_filename(name): + if not isinstance(name, str): + return False + if not name.endswith(".py"): + return False + if "/" in name or "\\" in name or ".." in name: + return False + return True + + @app.route("/ws") @with_websocket async def ws_handler(request, ws): print("WS client connected") + controller_ip = None + try: + client_addr = getattr(request, "client_addr", None) + if isinstance(client_addr, (tuple, list)) and client_addr: + controller_ip = client_addr[0] + elif isinstance(client_addr, str): + controller_ip = client_addr + except Exception: + controller_ip = None + print("WS controller_ip:", controller_ip) try: while True: data = await ws.receive() if not data: print("WS client disconnected (closed)") break + print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data))) print(data) - process_data(data, settings, presets) + process_data(data, settings, presets, controller_ip=controller_ip) except WebSocketError as e: print("WS client disconnected:", e) except OSError as e: print("WS client dropped (OSError):", e) +@app.post("/patterns/upload") +async def upload_pattern(request): + """Receive one pattern file body from led-controller and reload patterns.""" + raw_name = request.args.get("name") + reload_raw = request.args.get("reload", "1") + reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off") + print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns}) + + if not isinstance(raw_name, str) or not raw_name.strip(): + return json.dumps({"error": "name is required"}), 400, { + "Content-Type": "application/json" + } + body = request.body + if not isinstance(body, (bytes, bytearray)) or not body: + print("patterns/upload rejected: empty body") + return json.dumps({"error": "code is required"}), 400, { + "Content-Type": "application/json" + } + print("patterns/upload body_bytes:", len(body)) + try: + code = body.decode("utf-8") + except UnicodeError: + print("patterns/upload rejected: body not utf-8") + return json.dumps({"error": "body must be utf-8 text"}), 400, { + "Content-Type": "application/json" + } + if not code.strip(): + return json.dumps({"error": "code is required"}), 400, { + "Content-Type": "application/json" + } + + name = raw_name.strip() + if not name.endswith(".py"): + name += ".py" + if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"): + return json.dumps({"error": "invalid pattern filename"}), 400, { + "Content-Type": "application/json" + } + + try: + os.mkdir("patterns") + except OSError: + pass + + path = "patterns/" + name + try: + print("patterns/upload writing:", path) + with open(path, "w") as f: + f.write(code) + if reload_patterns: + print("patterns/upload reloading patterns") + presets.reload_patterns() + except OSError as e: + print("patterns/upload failed:", e) + return json.dumps({"error": str(e)}), 500, { + "Content-Type": "application/json" + } + print("patterns/upload success:", {"name": name, "reloaded": reload_patterns}) + + return json.dumps({ + "message": "pattern uploaded", + "name": name, + "reloaded": reload_patterns, + }), 201, {"Content-Type": "application/json"} + + async def presets_loop(): while True: presets.tick()