From 7bfdcd9beeaaaec380fb61ff381170e74c2fc71c Mon Sep 17 00:00:00 2001 From: pi Date: Sun, 5 Apr 2026 20:20:47 +1200 Subject: [PATCH] feat(led-driver): wifi tcp transport, hex mac in utf-8 hello Made-with: Cursor --- src/main.py | 180 ++++++++++++++++++++++++++++++++++++++---------- src/settings.py | 5 ++ 2 files changed, 147 insertions(+), 38 deletions(-) diff --git a/src/main.py b/src/main.py index bf3b4b4..86a2de2 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,10 @@ import network from presets import Presets from utils import convert_and_reorder_colors import json +import time +import select +import socket +import ubinascii settings = Settings() print(settings) @@ -22,34 +26,30 @@ if default_preset and default_preset in presets.presets: wdt = WDT(timeout=10000) wdt.feed() -sta_if = network.WLAN(network.STA_IF) -sta_if.active(True) -sta_if.disconnect() -sta_if.config(channel=1) -e = ESPNow() -e.active(True) -def as_dict(value): - return value if isinstance(value, dict) else {} -def as_list(value): - return value if isinstance(value, list) else [] - -def receive_data(receiver): +def process_data(msg): """Read one ESPNow message and decode JSON dict payload.""" try: - host, msg = receiver.recv() data = json.loads(msg) print(msg) - data = as_dict(data) if data.get("v", "") != "1": return None - return data except (ValueError, TypeError): return None + if "b" in data: + apply_brightness(data) + if "presets" in data: + apply_presets(data) + if "select" in data: + apply_select(data) + if "default" in data: + apply_default(data) + if "save" in data and ("presets" in data or "default" in data): + presets.save() def apply_brightness(data): @@ -63,9 +63,9 @@ def apply_brightness(data): def apply_presets(data): """Create/update preset definitions from payload.""" - presets_map = as_dict(data.get("presets")) + presets_map = data.get("presets") + for id, preset_data in presets_map.items(): - preset_data = as_dict(preset_data) if not preset_data: continue color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) @@ -82,21 +82,20 @@ def apply_presets(data): def apply_select(data): """Select preset for this device when addressed.""" - select_map = as_dict(data.get("select")) + select_map = data.get("select") device_name = settings["name"] # Case-sensitive: select key must match device name exactly. - select_list = as_list(select_map.get(device_name)) + select_list = select_map.get(device_name) if not select_list: return preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None - if isinstance(preset_name, str): - presets.select(preset_name, step=step) + presets.select(preset_name, step=step) def apply_default(data): - targets = as_list(data.get("targets")) + targets = data.get("targets") default_name = data.get("default", "") if ( settings["name"] in targets @@ -106,19 +105,124 @@ def apply_default(data): settings["default"] = default_name -while True: - wdt.feed() - presets.tick() - if e.any(): - if (data := receive_data(e)) is None: - continue - if "b" in data: - apply_brightness(data) - if "presets" in data: - apply_presets(data) - if "select" in data: - apply_select(data) - if "default" in data: - apply_default(data) - if "save" in data and ("presets" in data or "default" in data): - presets.save() +def receive_data(e): + _, msg = e.recv() + if not msg: + return None + try: + return msg.decode() + except UnicodeError: + return None + + +sta_if = network.WLAN(network.STA_IF) +sta_if.active(True) +sta_if.config(pm=network.WLAN.PM_NONE) + +mac = sta_if.config("mac") +hello = (json.dumps({ + "v": "1", + "device_name": settings.get("name", ""), + "mac": ubinascii.hexlify(mac).decode().lower(), +}) + "\n").encode("utf-8") + + +if settings["transport_type"] == "espnow": + sta_if.disconnect() + sta_if.config(channel=settings.get("wifi_channel", 1)) + e = ESPNow() + e.active(True) + e.add_peer(b"\xff\xff\xff\xff\xff\xff") + e.add_peer(mac) + e.send(hello) + while True: + if e.any() and (data := receive_data(e)) is not None: + process_data(data) + presets.tick() + wdt.feed() +elif settings["transport_type"] == "wifi": + sta_if.connect(settings["ssid"], settings["password"]) + while not sta_if.isconnected(): + time.sleep(1) + print(f"WiFi connected {sta_if.ifconfig()[0]}") + reconnect_ms = 1000 + next_connect_at = 0 + client = None + poller = None + buf = b"" + + while True: + now = utime.ticks_ms() + + if client is None and utime.ticks_diff(now, next_connect_at) >= 0: + try: + c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + c.connect((settings["server_ip"], "8765")) + # On connect, send a single JSON message with device name. + c.send(hello) + c.setblocking(False) + p = select.poll() + p.register(c, select.POLLIN) + client = c + poller = p + buf = b"" + print("TCP connected") + except Exception: + try: + c.close() + except Exception: + pass + next_connect_at = utime.ticks_add(now, reconnect_ms) + + if client is not None and poller is not None: + try: + events = poller.poll(0) + except Exception: + events = [] + + reconnect_needed = False + for fd, event in events: + if (event & select.POLLHUP) or (event & select.POLLERR): + reconnect_needed = True + break + if event & select.POLLIN: + try: + chunk = client.recv(512) + except OSError: + reconnect_needed = True + break + + if not chunk: + reconnect_needed = True + break + + buf += chunk + + # Newline-delimited JSON from controller TCP endpoint. + while b"\n" in buf: + line, buf = buf.split(b"\n", 1) + line = line.strip() + if line: + process_data(line) + + if reconnect_needed: + print("TCP disconnected, reconnecting...") + try: + poller.unregister(client) + except Exception: + pass + try: + client.close() + except Exception: + pass + client = None + poller = None + buf = b"" + next_connect_at = utime.ticks_add(now, reconnect_ms) + + # Always advance patterns and feed WDT each loop + presets.tick() + wdt.feed() + + + diff --git a/src/settings.py b/src/settings.py index ff76a3b..c8822f5 100644 --- a/src/settings.py +++ b/src/settings.py @@ -21,6 +21,11 @@ class Settings(dict): self["debug"] = False self["default"] = "on" self["brightness"] = 32 + # STA + TCP to led-controller (Pi): leave wifi_ssid empty for ESP-NOW-only (channel 1). + self["wifi_ssid"] = "" + self["wifi_password"] = "" + self["controller_host"] = "" + self["controller_port"] = 8765 def save(self): try: