refactor(led-driver): transport layout, fixed tcp port, server_ip settings

Made-with: Cursor
This commit is contained in:
pi
2026-04-05 20:59:30 +12:00
parent 7bfdcd9bee
commit 7e3aca491c
2 changed files with 58 additions and 60 deletions

View File

@@ -11,13 +11,15 @@ import select
import socket import socket
import ubinascii import ubinascii
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
CONTROLLER_TCP_PORT = 8765
settings = Settings() settings = Settings()
print(settings) print(settings)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
# Use the default preset name from settings (set via controller or defaults)
default_preset = settings.get("default", "") default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets: if default_preset and default_preset in presets.presets:
presets.select(default_preset) presets.select(default_preset)
@@ -27,19 +29,18 @@ wdt = WDT(timeout=10000)
wdt.feed() wdt.feed()
# --- Controller JSON (bytes or str): parse v1, then apply -------------------------
def process_data(payload):
"""Read one controller message; json.loads (bytes or str), then apply fields."""
def process_data(msg):
"""Read one ESPNow message and decode JSON dict payload."""
try: try:
data = json.loads(msg) data = json.loads(payload)
print(msg) print(payload)
if data.get("v", "") != "1": if data.get("v", "") != "1":
return None return
except (ValueError, TypeError): except (ValueError, TypeError):
return None return
if "b" in data: if "b" in data:
apply_brightness(data) apply_brightness(data)
if "presets" in data: if "presets" in data:
@@ -53,7 +54,6 @@ def process_data(msg):
def apply_brightness(data): def apply_brightness(data):
"""Apply and persist global brightness from payload."""
try: try:
presets.b = max(0, min(255, int(data["b"]))) presets.b = max(0, min(255, int(data["b"])))
settings["brightness"] = presets.b settings["brightness"] = presets.b
@@ -62,9 +62,7 @@ def apply_brightness(data):
def apply_presets(data): def apply_presets(data):
"""Create/update preset definitions from payload.""" presets_map = data["presets"]
presets_map = data.get("presets")
for id, preset_data in presets_map.items(): for id, preset_data in presets_map.items():
if not preset_data: if not preset_data:
continue continue
@@ -81,12 +79,9 @@ def apply_presets(data):
def apply_select(data): def apply_select(data):
"""Select preset for this device when addressed.""" select_map = data["select"]
select_map = data.get("select")
device_name = settings["name"] device_name = settings["name"]
select_list = select_map.get(device_name, [])
# Case-sensitive: select key must match device name exactly.
select_list = select_map.get(device_name)
if not select_list: if not select_list:
return return
preset_name = select_list[0] preset_name = select_list[0]
@@ -95,8 +90,8 @@ def apply_select(data):
def apply_default(data): def apply_default(data):
targets = data.get("targets") targets = data.get("targets") or []
default_name = data.get("default", "") default_name = data["default"]
if ( if (
settings["name"] in targets settings["name"] in targets
and isinstance(default_name, str) and isinstance(default_name, str)
@@ -105,41 +100,52 @@ def apply_default(data):
settings["default"] = default_name settings["default"] = default_name
def receive_data(e): # --- TCP framing (bytes) → process_data -------------------------------------------
_, msg = e.recv()
if not msg:
return None
try:
return msg.decode()
except UnicodeError:
return None
def tcp_append_and_drain_lines(buf, chunk):
"""Return (new_buf, list of non-empty stripped line byte strings)."""
buf += chunk
lines = []
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if line:
lines.append(line)
return buf, lines
# --- Network + hello --------------------------------------------------------------
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
sta_if.active(True) sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
mac = sta_if.config("mac") mac = sta_if.config("mac")
hello = (json.dumps({ hello = {
"v": "1", "v": "1",
"device_name": settings.get("name", ""), "device_name": settings.get("name", ""),
"mac": ubinascii.hexlify(mac).decode().lower(), "mac": ubinascii.hexlify(mac).decode().lower(),
}) + "\n").encode("utf-8") }
hello_bytes = json.dumps(hello).encode("utf-8")
hello_line = hello_bytes + b"\n"
if settings["transport_type"] == "espnow": if settings["transport_type"] == "espnow":
sta_if.disconnect() sta_if.disconnect()
sta_if.config(channel=settings.get("wifi_channel", 1)) sta_if.config(channel=settings.get("wifi_channel", 1))
e = ESPNow() e = ESPNow()
e.active(True) e.active(True)
e.add_peer(b"\xff\xff\xff\xff\xff\xff") e.add_peer(BROADCAST_MAC)
e.add_peer(mac) e.add_peer(mac)
e.send(hello) e.send(BROADCAST_MAC, hello_bytes)
while True: while True:
if e.any() and (data := receive_data(e)) is not None: if e.any():
process_data(data) _peer, msg = e.recv()
if msg:
process_data(msg)
presets.tick() presets.tick()
wdt.feed() wdt.feed()
elif settings["transport_type"] == "wifi": elif settings["transport_type"] == "wifi":
sta_if.connect(settings["ssid"], settings["password"]) sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected(): while not sta_if.isconnected():
@@ -155,11 +161,11 @@ elif settings["transport_type"] == "wifi":
now = utime.ticks_ms() now = utime.ticks_ms()
if client is None and utime.ticks_diff(now, next_connect_at) >= 0: if client is None and utime.ticks_diff(now, next_connect_at) >= 0:
c = None
try: try:
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect((settings["server_ip"], "8765")) c.connect((settings["server_ip"], CONTROLLER_TCP_PORT))
# On connect, send a single JSON message with device name. c.send(hello_line)
c.send(hello)
c.setblocking(False) c.setblocking(False)
p = select.poll() p = select.poll()
p.register(c, select.POLLIN) p.register(c, select.POLLIN)
@@ -168,10 +174,11 @@ elif settings["transport_type"] == "wifi":
buf = b"" buf = b""
print("TCP connected") print("TCP connected")
except Exception: except Exception:
try: if c is not None:
c.close() try:
except Exception: c.close()
pass except Exception:
pass
next_connect_at = utime.ticks_add(now, reconnect_ms) next_connect_at = utime.ticks_add(now, reconnect_ms)
if client is not None and poller is not None: if client is not None and poller is not None:
@@ -196,14 +203,9 @@ elif settings["transport_type"] == "wifi":
reconnect_needed = True reconnect_needed = True
break break
buf += chunk buf, lines = tcp_append_and_drain_lines(buf, chunk)
for raw_line in lines:
# Newline-delimited JSON from controller TCP endpoint. process_data(raw_line)
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if line:
process_data(line)
if reconnect_needed: if reconnect_needed:
print("TCP disconnected, reconnecting...") print("TCP disconnected, reconnecting...")
@@ -220,9 +222,5 @@ elif settings["transport_type"] == "wifi":
buf = b"" buf = b""
next_connect_at = utime.ticks_add(now, reconnect_ms) next_connect_at = utime.ticks_add(now, reconnect_ms)
# Always advance patterns and feed WDT each loop
presets.tick() presets.tick()
wdt.feed() wdt.feed()

View File

@@ -21,11 +21,12 @@ class Settings(dict):
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["brightness"] = 32 self["brightness"] = 32
# STA + TCP to led-controller (Pi): leave wifi_ssid empty for ESP-NOW-only (channel 1). self["transport_type"] = "espnow"
self["wifi_ssid"] = "" self["wifi_channel"] = 1
self["wifi_password"] = "" # Wi-Fi + TCP to Pi: leave ssid empty for ESP-NOW-only.
self["controller_host"] = "" self["ssid"] = ""
self["controller_port"] = 8765 self["password"] = ""
self["server_ip"] = ""
def save(self): def save(self):
try: try:
@@ -47,7 +48,6 @@ class Settings(dict):
self.set_defaults() self.set_defaults()
self.save() self.save()
def get_color_order(self, color_order): def get_color_order(self, color_order):
"""Convert color order string to tuple of hex string indices.""" """Convert color order string to tuple of hex string indices."""
color_orders = { color_orders = {