Files
led-driver/src/main.py
Jimmy 170a0e05ab feat(patterns): align manual and auto behaviour
Unify manual/auto timing semantics for key patterns, add preset background support, and improve runtime observability while keeping the driver responsive under beat-triggered selects.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 20:07:58 +12:00

199 lines
5.5 KiB
Python

from settings import Settings
import machine
import network
import utime
import asyncio
import json
import gc
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets
from controller_messages import process_data
from hello import broadcast_hello_udp
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000)
wdt.feed()
machine.freq(160000000)
settings = Settings()
gc.collect()
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
if not presets.select(default_preset):
print("Startup preset failed (invalid pattern?):", default_preset)
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
print("Waiting for network connection...")
utime.sleep(1)
wdt.feed()
def _print_network_ips(controller_ip=None):
"""Always log STA address and led-controller (WS client) address when known."""
try:
led_ip = sta_if.ifconfig()[0]
except Exception:
led_ip = "?"
ctrl = controller_ip if controller_ip else "(not connected)"
print("led-driver IP:", led_ip, " led-controller IP:", ctrl)
_print_network_ips()
app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
_print_network_ips(controller_ip)
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def presets_loop():
last_mem_log = utime.ticks_ms()
while True:
presets.tick()
wdt.feed()
await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80):
asyncio.create_task(presets_loop())
asyncio.create_task(_udp_hello_after_http_ready())
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__":
asyncio.run(main(port=80))