from settings import Settings import machine import network import utime import asyncio import json import gc from microdot import Microdot from microdot.websocket import WebSocketError, with_websocket from presets import Presets from controller_messages import process_data from hello import broadcast_hello_udp try: import uos as os except ImportError: import os machine.freq(160000000) settings = Settings() print(settings) wdt = machine.WDT(timeout=10000) wdt.feed() gc.collect() print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()}) presets = Presets(settings["led_pin"], settings["num_leds"]) presets.load(settings) presets.b = settings.get("brightness", 255) presets.debug = bool(settings.get("debug", False)) gc.collect() print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()}) default_preset = settings.get("default", "") if default_preset and default_preset in presets.presets: if presets.select(default_preset): print(f"Selected startup preset: {default_preset}") else: print("Startup preset failed (invalid pattern?):", default_preset) # On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated. # Reset both interfaces and collect before bringing STA up. ap_if = network.WLAN(network.AP_IF) ap_if.active(False) sta_if = network.WLAN(network.STA_IF) if sta_if.active(): sta_if.active(False) utime.sleep_ms(100) gc.collect() sta_if.active(True) sta_if.config(pm=network.WLAN.PM_NONE) sta_if.connect(settings["ssid"], settings["password"]) while not sta_if.isconnected(): utime.sleep(1) wdt.feed() print(sta_if.ifconfig()) app = Microdot() def _safe_pattern_filename(name): if not isinstance(name, str): return False if not name.endswith(".py"): return False if "/" in name or "\\" in name or ".." in name: return False return True @app.route("/ws") @with_websocket async def ws_handler(request, ws): print("WS client connected") controller_ip = None try: client_addr = getattr(request, "client_addr", None) if isinstance(client_addr, (tuple, list)) and client_addr: controller_ip = client_addr[0] elif isinstance(client_addr, str): controller_ip = client_addr except Exception: controller_ip = None print("WS controller_ip:", controller_ip) try: while True: data = await ws.receive() if not data: print("WS client disconnected (closed)") break print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data))) print(data) process_data(data, settings, presets, controller_ip=controller_ip) except WebSocketError as e: print("WS client disconnected:", e) except OSError as e: print("WS client dropped (OSError):", e) @app.post("/patterns/upload") async def upload_pattern(request): """Receive one pattern file body from led-controller and reload patterns.""" raw_name = request.args.get("name") reload_raw = request.args.get("reload", "1") reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off") print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns}) if not isinstance(raw_name, str) or not raw_name.strip(): return json.dumps({"error": "name is required"}), 400, { "Content-Type": "application/json" } body = request.body if not isinstance(body, (bytes, bytearray)) or not body: print("patterns/upload rejected: empty body") return json.dumps({"error": "code is required"}), 400, { "Content-Type": "application/json" } print("patterns/upload body_bytes:", len(body)) try: code = body.decode("utf-8") except UnicodeError: print("patterns/upload rejected: body not utf-8") return json.dumps({"error": "body must be utf-8 text"}), 400, { "Content-Type": "application/json" } if not code.strip(): return json.dumps({"error": "code is required"}), 400, { "Content-Type": "application/json" } name = raw_name.strip() if not name.endswith(".py"): name += ".py" if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"): return json.dumps({"error": "invalid pattern filename"}), 400, { "Content-Type": "application/json" } try: os.mkdir("patterns") except OSError: pass path = "patterns/" + name try: print("patterns/upload writing:", path) with open(path, "w") as f: f.write(code) if reload_patterns: print("patterns/upload reloading patterns") presets.reload_patterns() except OSError as e: print("patterns/upload failed:", e) return json.dumps({"error": str(e)}), 500, { "Content-Type": "application/json" } print("patterns/upload success:", {"name": name, "reloaded": reload_patterns}) return json.dumps({ "message": "pattern uploaded", "name": name, "reloaded": reload_patterns, }), 201, {"Content-Type": "application/json"} async def presets_loop(): last_mem_log = utime.ticks_ms() while True: presets.tick() wdt.feed() if bool(getattr(presets, "debug", False)): now = utime.ticks_ms() if utime.ticks_diff(now, last_mem_log) >= 5000: gc.collect() print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()}) last_mem_log = now # tick() does not await; yield so UDP hello and HTTP/WebSocket can run. await asyncio.sleep(0) async def _udp_hello_after_http_ready(): """Hello must run after the HTTP server binds, or discovery clients time out on /ws.""" await asyncio.sleep(1) print("UDP hello: broadcasting…") try: broadcast_hello_udp( sta_if, settings.get("name", ""), wait_reply=False, wdt=wdt, dual_destinations=True, ) except Exception as ex: print("UDP hello broadcast failed:", ex) async def main(port=80): asyncio.create_task(presets_loop()) asyncio.create_task(_udp_hello_after_http_ready()) await app.start_server(host="0.0.0.0", port=port) if __name__ == "__main__": asyncio.run(main(port=80))