from settings import Settings from machine import WDT from espnow import ESPNow import utime import network from presets import Presets from utils import convert_and_reorder_colors import json import time import select import socket import ubinascii settings = Settings() print(settings) presets = Presets(settings["led_pin"], settings["num_leds"]) presets.load(settings) presets.b = settings.get("brightness", 255) # Use the default preset name from settings (set via controller or defaults) default_preset = settings.get("default", "") if default_preset and default_preset in presets.presets: presets.select(default_preset) print(f"Selected startup preset: {default_preset}") wdt = WDT(timeout=10000) wdt.feed() def process_data(msg): """Read one ESPNow message and decode JSON dict payload.""" try: data = json.loads(msg) print(msg) if data.get("v", "") != "1": return None except (ValueError, TypeError): return None if "b" in data: apply_brightness(data) if "presets" in data: apply_presets(data) if "select" in data: apply_select(data) if "default" in data: apply_default(data) if "save" in data and ("presets" in data or "default" in data): presets.save() def apply_brightness(data): """Apply and persist global brightness from payload.""" try: presets.b = max(0, min(255, int(data["b"]))) settings["brightness"] = presets.b except (TypeError, ValueError): pass def apply_presets(data): """Create/update preset definitions from payload.""" presets_map = data.get("presets") for id, preset_data in presets_map.items(): if not preset_data: continue color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) if color_key is not None: try: preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key], settings ) except (TypeError, ValueError, KeyError): continue presets.edit(id, preset_data) print(f"Edited preset {id}: {preset_data.get('name', '')}") def apply_select(data): """Select preset for this device when addressed.""" select_map = data.get("select") device_name = settings["name"] # Case-sensitive: select key must match device name exactly. select_list = select_map.get(device_name) if not select_list: return preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None presets.select(preset_name, step=step) def apply_default(data): targets = data.get("targets") default_name = data.get("default", "") if ( settings["name"] in targets and isinstance(default_name, str) and default_name in presets.presets ): settings["default"] = default_name def receive_data(e): _, msg = e.recv() if not msg: return None try: return msg.decode() except UnicodeError: return None sta_if = network.WLAN(network.STA_IF) sta_if.active(True) sta_if.config(pm=network.WLAN.PM_NONE) mac = sta_if.config("mac") hello = (json.dumps({ "v": "1", "device_name": settings.get("name", ""), "mac": ubinascii.hexlify(mac).decode().lower(), }) + "\n").encode("utf-8") if settings["transport_type"] == "espnow": sta_if.disconnect() sta_if.config(channel=settings.get("wifi_channel", 1)) e = ESPNow() e.active(True) e.add_peer(b"\xff\xff\xff\xff\xff\xff") e.add_peer(mac) e.send(hello) while True: if e.any() and (data := receive_data(e)) is not None: process_data(data) presets.tick() wdt.feed() elif settings["transport_type"] == "wifi": sta_if.connect(settings["ssid"], settings["password"]) while not sta_if.isconnected(): time.sleep(1) print(f"WiFi connected {sta_if.ifconfig()[0]}") reconnect_ms = 1000 next_connect_at = 0 client = None poller = None buf = b"" while True: now = utime.ticks_ms() if client is None and utime.ticks_diff(now, next_connect_at) >= 0: try: c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) c.connect((settings["server_ip"], "8765")) # On connect, send a single JSON message with device name. c.send(hello) c.setblocking(False) p = select.poll() p.register(c, select.POLLIN) client = c poller = p buf = b"" print("TCP connected") except Exception: try: c.close() except Exception: pass next_connect_at = utime.ticks_add(now, reconnect_ms) if client is not None and poller is not None: try: events = poller.poll(0) except Exception: events = [] reconnect_needed = False for fd, event in events: if (event & select.POLLHUP) or (event & select.POLLERR): reconnect_needed = True break if event & select.POLLIN: try: chunk = client.recv(512) except OSError: reconnect_needed = True break if not chunk: reconnect_needed = True break buf += chunk # Newline-delimited JSON from controller TCP endpoint. while b"\n" in buf: line, buf = buf.split(b"\n", 1) line = line.strip() if line: process_data(line) if reconnect_needed: print("TCP disconnected, reconnecting...") try: poller.unregister(client) except Exception: pass try: client.close() except Exception: pass client = None poller = None buf = b"" next_connect_at = utime.ticks_add(now, reconnect_ms) # Always advance patterns and feed WDT each loop presets.tick() wdt.feed()