3 Commits

Author SHA1 Message Date
3286c4002d chore(settings): update default num_leds and color_order
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:56 +12:00
68eb547ec4 feat(espnow): add debug logging and channel diagnostics
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:54 +12:00
8403df531d feat(espnow): improve bridge transport and driver sync
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-28 00:38:09 +12:00
9 changed files with 279 additions and 20 deletions

8
bulk.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
PORT="${1:-/dev/ttyACM0}"
while true; do
ls "$PORT" && led-cli -p "$PORT" --erase --src --patterns && led-cli -p "$PORT" --reset -f
sleep 0.5
done

View File

@@ -68,7 +68,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
set_groups = bool(data.get("set_groups")) set_groups = bool(data.get("set_groups"))
groups = data.get("groups") groups = data.get("groups")
if set_groups and isinstance(groups, list): if set_groups and isinstance(groups, list):
dg.groups_replace(groups) dg.groups_replace(groups, settings)
print("groups set", dg.list_groups()) print("groups set", dg.list_groups())
elif isinstance(groups, list) and groups: elif isinstance(groups, list) and groups:
if not any(dg.in_group(str(g)) for g in groups): if not any(dg.in_group(str(g)) for g in groups):
@@ -96,6 +96,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
settings.save() settings.save()
if "save" in data and "device_config" in data: if "save" in data and "device_config" in data:
settings.save() settings.save()
_flush_pending_select(settings, presets)
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"}) _VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
@@ -196,6 +197,18 @@ def _run_select(presets, settings, preset_name, step=None):
return False return False
def _flush_pending_select(settings, presets):
global _pending_select
if _pending_select is None:
return
preset_name, step = _pending_select
if preset_name not in presets.presets and preset_name not in ("on", "off"):
return
_pending_select = None
if not _run_select(presets, settings, preset_name, step):
print("select failed (pending):", preset_name)
def apply_presets(data, settings, presets): def apply_presets(data, settings, presets):
global _pending_select global _pending_select
presets_map = data["presets"] presets_map = data["presets"]
@@ -219,13 +232,10 @@ def apply_presets(data, settings, presets):
pass pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
# Same message often carries select; apply now while presets are loaded. # Same message often carries select; apply now while presets are loaded.
if "select" in data: if "select" in data or "s" in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
elif _pending_select is not None: else:
preset_name, step = _pending_select _flush_pending_select(settings, presets)
_pending_select = None
if preset_name in presets.presets or preset_name in ("on", "off"):
_run_select(presets, settings, preset_name, step)
def _select_list_for_this_device(select_val, settings): def _select_list_for_this_device(select_val, settings):
@@ -276,6 +286,11 @@ def apply_select(data, settings, presets):
if not preset_name: if not preset_name:
return return
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
if preset_name not in presets.presets and preset_name not in ("on", "off"):
try:
presets.load(settings)
except Exception:
pass
if preset_name not in presets.presets and preset_name not in ("on", "off"): if preset_name not in presets.presets and preset_name not in ("on", "off"):
_pending_select = (preset_name, step) _pending_select = (preset_name, step)
print("select deferred (preset not loaded yet):", preset_name) print("select deferred (preset not loaded yet):", preset_name)

View File

@@ -1,11 +1,23 @@
"""In-memory group membership for GROUP_CMD filtering.""" """Group membership for GROUP_CMD filtering; persisted in settings.json."""
_groups = [] _groups = []
def groups_replace(group_ids): def load_from_settings(settings):
global _groups
g = settings.get("groups") if settings is not None else None
if isinstance(g, list):
_groups = [str(x) for x in g if str(x).strip()]
else:
_groups = []
def groups_replace(group_ids, settings=None, *, persist=True):
global _groups global _groups
_groups = [str(g) for g in group_ids] _groups = [str(g) for g in group_ids]
if persist and settings is not None:
settings["groups"] = list(_groups)
settings.save()
def in_group(group_id): def in_group(group_id):

View File

@@ -2,6 +2,7 @@
import asyncio import asyncio
import urandom import urandom
import ubinascii
import espnow import espnow
import network import network
@@ -31,10 +32,17 @@ _PING_DELAY_MS_MAX = 500
_esp = None _esp = None
_groups_received = False _groups_received = False
_debug = False
def _dlog(*parts):
if _debug:
print(*parts)
def init_espnow(settings): def init_espnow(settings):
global _esp global _esp, _debug
_debug = bool(settings.get("debug", False))
try: try:
ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT)) ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT))
except (TypeError, ValueError): except (TypeError, ValueError):
@@ -43,13 +51,22 @@ def init_espnow(settings):
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
sta.config(pm=network.WLAN.PM_NONE) sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=ch) try:
sta.config(channel=ch)
except Exception as e:
print("espnow sta channel set failed:", e)
_esp = espnow.ESPNow() _esp = espnow.ESPNow()
_esp.active(True) _esp.active(True)
try: try:
_esp.add_peer(BROADCAST_MAC) _esp.add_peer(BROADCAST_MAC)
_dlog("espnow add bcast ok")
except Exception as e:
print("espnow add bcast failed:", e)
try:
actual_ch = sta.config("channel")
except Exception: except Exception:
pass actual_ch = "?"
print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug)
return _esp return _esp
@@ -63,12 +80,13 @@ def _send_ping_rsp(host, settings, ping_id, delay_ms):
try: try:
try: try:
_esp.add_peer(host) _esp.add_peer(host)
except Exception: _dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode())
pass except Exception as e:
_dlog("espnow ping add_peer skip", repr(e))
_esp.send(host, pkt) _esp.send(host, pkt)
print("espnow ping rsp", ping_id, delay_ms, "ms") print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode())
except Exception as e: except Exception as e:
print("espnow ping rsp failed:", e) print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt))
async def _send_ping_rsp_delayed(host, settings, ping_id): async def _send_ping_rsp_delayed(host, settings, ping_id):
@@ -112,7 +130,7 @@ def _handle_packet(host, pkt, settings, presets):
if mt == MSG_GROUPS: if mt == MSG_GROUPS:
ids = parse_groups(pkt) ids = parse_groups(pkt)
if ids is not None: if ids is not None:
dg.groups_replace(ids) dg.groups_replace(ids, settings)
_groups_received = True _groups_received = True
print("groups", ids) print("groups", ids)
return return

View File

@@ -6,6 +6,7 @@ import gc
import json import json
import network import network
import espnow import espnow
import device_groups as dg
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern, process_data from controller_messages import apply_startup_pattern, process_data
from espnow_transport import _handle_packet, init_espnow from espnow_transport import _handle_packet, init_espnow
@@ -17,6 +18,7 @@ wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
dg.load_from_settings(settings)
print(settings) print(settings)
gc.collect() gc.collect()

View File

@@ -16,9 +16,9 @@ class Settings(dict):
def set_defaults(self): def set_defaults(self):
self["led_pin"] = 10 self["led_pin"] = 10
self["num_leds"] = 119 self["num_leds"] = 200
self["color_order"] = "rgb" self["color_order"] = "grb"
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
@@ -35,7 +35,8 @@ class Settings(dict):
self["brightness"] = 32 self["brightness"] = 32
self["wifi_channel"] = WIFI_CHANNEL_DEFAULT self["wifi_channel"] = WIFI_CHANNEL_DEFAULT
self["groups"] = []
def save(self): def save(self):
try: try:

View File

@@ -0,0 +1,116 @@
"""Device-side radio diagnostic test (MicroPython).
Checks:
1) STA/AP bring-up on channel 5
2) ESP-NOW init and broadcast peer add
3) Broadcast TX test packet send
4) RX wait window to see any incoming ESP-NOW frames
"""
import espnow
import machine
import network
import time
import ubinascii
CHANNEL = 5
RX_WINDOW_MS = 3000
WDT_TIMEOUT_MS = 10000
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00"
def _mac_hex(mac_bytes):
try:
return ubinascii.hexlify(mac_bytes).decode()
except Exception:
return "?"
def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
print("diag start")
print("cpu freq", machine.freq())
sta = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
# Clean start
try:
sta.active(False)
ap.active(False)
time.sleep_ms(100)
except Exception as e:
print("wifi reset failed", repr(e))
# STA setup
try:
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac")))
except Exception as e:
print("sta setup failed", repr(e))
# AP setup
try:
ap.active(True)
try:
ap.config(essid="diag-ap", channel=channel, hidden=True)
except TypeError:
ap.config(essid="diag-ap", channel=channel)
print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac")))
except Exception as e:
print("ap setup failed", repr(e))
wdt.feed()
# ESP-NOW setup
try:
e = espnow.ESPNow()
e.active(True)
print("espnow active ok")
except Exception as e_err:
print("espnow init failed", repr(e_err))
return
# Add broadcast peer
try:
e.add_peer(BROADCAST_MAC, channel=channel)
print("add bcast peer ok")
except TypeError:
try:
e.add_peer(BROADCAST_MAC)
print("add bcast peer ok (no channel arg)")
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
# TX test
try:
ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True)
print("tx bcast", ok, "len", len(TEST_PAYLOAD))
except Exception as e_err:
print("tx bcast failed", repr(e_err))
# RX window
print("rx window ms", rx_window_ms)
t_end = time.ticks_add(time.ticks_ms(), rx_window_ms)
rx_count = 0
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
wdt.feed()
host, msg = e.recv(100)
if host:
rx_count += 1
print("rx", rx_count, _mac_hex(host), "len", len(msg))
time.sleep_ms(5)
print("diag done rx_count", rx_count)
if __name__ == "__main__":
run_diag()

View File

@@ -0,0 +1,40 @@
"""Device test: receive ESP-NOW packets on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
import time
CHANNEL = 5
TIMEOUT_MS = 1000
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
print("recv ready ch", channel)
while True:
wdt.feed()
host, msg = e.recv(timeout_ms)
if host:
mac_hex = ubinascii.hexlify(host).decode()
print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode())
else:
print("rx timeout")
time.sleep_ms(10)
if __name__ == "__main__":
recv_loop()

View File

@@ -0,0 +1,47 @@
"""Device test: send one ESP-NOW packet on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
CHANNEL = 5
DEST_HEX = "ffffffffffff"
PAYLOAD_HEX = "4c0501000000"
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def _add_peer(esp, dest, channel):
try:
esp.add_peer(dest, channel=channel)
except TypeError:
esp.add_peer(dest)
except OSError:
pass
def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
dest = ubinascii.unhexlify(dest_hex)
pkt = ubinascii.unhexlify(payload_hex)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
_add_peer(e, dest, channel)
wdt.feed()
ok = e.send(dest, pkt, True)
print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt))
return ok
if __name__ == "__main__":
send_once()