10 Commits

Author SHA1 Message Date
3286c4002d chore(settings): update default num_leds and color_order
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:56 +12:00
68eb547ec4 feat(espnow): add debug logging and channel diagnostics
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:54 +12:00
8403df531d feat(espnow): improve bridge transport and driver sync
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-28 00:38:09 +12:00
088fe161a8 fix(main): blocking espnow rx loop and pass peer host
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
c9895df512 fix(presets): phase-lock blink and one tick on re-select
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
39a84696c3 feat(espnow): ping request/response with jittered delay
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:39 +12:00
c7560b2e87 fix(settings): default wifi channel to 5
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:36 +12:00
ea21563900 fix(controller): apply select when presets not in message
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:30 +12:00
a97f6c7c2c feat(espnow): groups filter and v1 select list on driver
Apply group membership on RX, accept select as [preset_id, step?],
and fix identify/off plus presets layout for manual beat stepping.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 01:44:21 +12:00
1fdb2c9441 fix(espnow): handle binary and JSON RX in simplified main
Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON
v1 payloads to process_data from the poll loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:45:13 +12:00
14 changed files with 795 additions and 82 deletions

8
bulk.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
PORT="${1:-/dev/ttyACM0}"
while true; do
ls "$PORT" && led-cli -p "$PORT" --erase --src --patterns && led-cli -p "$PORT" --reset -f
sleep 0.5
done

View File

@@ -2,7 +2,11 @@
import json import json
import socket import socket
import network
import ubinascii
import device_groups as dg
from v1_wire import expand_v1
from binary_envelope import parse_binary_envelope from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -58,8 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
return return
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
data = expand_v1(data)
if save: if save:
data["save"] = True data["save"] = True
set_groups = bool(data.get("set_groups"))
groups = data.get("groups")
if set_groups and isinstance(groups, list):
dg.groups_replace(groups, settings)
print("groups set", dg.list_groups())
elif isinstance(groups, list) and groups:
if not any(dg.in_group(str(g)) for g in groups):
print("ignored: not in groups", groups)
return
if "device_config" in data: if "device_config" in data:
apply_device_config(data, settings, presets) apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
@@ -68,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data: if "clear_presets" in data:
apply_clear_presets(data, presets) apply_clear_presets(data, presets)
if "select" in data: if ("select" in data or "s" in data) and "presets" not in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
apply_default(data, settings, presets) apply_default(data, settings, presets)
@@ -82,6 +96,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
settings.save() settings.save()
if "save" in data and "device_config" in data: if "save" in data and "device_config" in data:
settings.save() settings.save()
_flush_pending_select(settings, presets)
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"}) _VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
@@ -172,7 +187,30 @@ def apply_brightness(data, settings, presets):
pass pass
_pending_select = None
def _run_select(presets, settings, preset_name, step=None):
if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
return True
return False
def _flush_pending_select(settings, presets):
global _pending_select
if _pending_select is None:
return
preset_name, step = _pending_select
if preset_name not in presets.presets and preset_name not in ("on", "off"):
return
_pending_select = None
if not _run_select(presets, settings, preset_name, step):
print("select failed (pending):", preset_name)
def apply_presets(data, settings, presets): def apply_presets(data, settings, presets):
global _pending_select
presets_map = data["presets"] presets_map = data["presets"]
for id, preset_data in presets_map.items(): for id, preset_data in presets_map.items():
if not preset_data: if not preset_data:
@@ -183,8 +221,8 @@ def apply_presets(data, settings, presets):
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings preset_data[color_key], settings
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError) as err:
continue print("preset color convert failed:", id, err)
if "bg" in preset_data: if "bg" in preset_data:
try: try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
@@ -193,18 +231,74 @@ def apply_presets(data, settings, presets):
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
pass pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
# Same message often carries select; apply now while presets are loaded.
if "select" in data or "s" in data:
apply_select(data, settings, presets)
else:
_flush_pending_select(settings, presets)
def _select_list_for_this_device(select_val, settings):
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
if isinstance(select_val, list) and select_val:
return select_val
if isinstance(select_val, str) and str(select_val).strip():
return [str(select_val).strip()]
if not isinstance(select_val, dict) or not select_val:
return None
if "preset" in select_val:
preset_name = select_val.get("preset")
if preset_name is None:
return None
out = [str(preset_name)]
if "step" in select_val:
out.append(select_val["step"])
return out
device_name = str(settings.get("name") or "").strip()
select_list = select_val.get(device_name)
if select_list:
return select_list
try:
sta = network.WLAN(network.STA_IF)
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
except Exception:
mac_hex = ""
if mac_hex:
for key in select_val:
k = str(key).lower().replace(":", "").replace("-", "")
if mac_hex in k:
return select_val[key]
if len(select_val) == 1:
return next(iter(select_val.values()))
return None
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
select_map = data["select"] global _pending_select
device_name = settings["name"] select_val = data.get("select")
select_list = select_map.get(device_name, []) if select_val is None:
select_val = data.get("s")
select_list = _select_list_for_this_device(select_val, settings)
if not select_list: if not select_list:
print("select ignored:", repr(select_val))
return
preset_name = str(select_list[0]).strip()
if not preset_name:
return return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
if presets.select(preset_name, step=step): if preset_name not in presets.presets and preset_name not in ("on", "off"):
record_last_preset(settings, preset_name) try:
presets.load(settings)
except Exception:
pass
if preset_name not in presets.presets and preset_name not in ("on", "off"):
_pending_select = (preset_name, step)
print("select deferred (preset not loaded yet):", preset_name)
return
if _run_select(presets, settings, preset_name, step):
_pending_select = None
else:
print("select failed:", preset_name)
def apply_clear_presets(data, presets): def apply_clear_presets(data, presets):

View File

@@ -1,11 +1,23 @@
"""In-memory group membership for GROUP_CMD filtering.""" """Group membership for GROUP_CMD filtering; persisted in settings.json."""
_groups = [] _groups = []
def groups_replace(group_ids): def load_from_settings(settings):
global _groups
g = settings.get("groups") if settings is not None else None
if isinstance(g, list):
_groups = [str(x) for x in g if str(x).strip()]
else:
_groups = []
def groups_replace(group_ids, settings=None, *, persist=True):
global _groups global _groups
_groups = [str(g) for g in group_ids] _groups = [str(g) for g in group_ids]
if persist and settings is not None:
settings["groups"] = list(_groups)
settings.save()
def in_group(group_id): def in_group(group_id):

View File

@@ -1,6 +1,8 @@
"""ESP-NOW receive loop and boot announce.""" """ESP-NOW receive loop and boot announce."""
import asyncio import asyncio
import urandom
import ubinascii
import espnow import espnow
import network import network
@@ -12,38 +14,99 @@ from espnow_wire import (
MSG_CMD, MSG_CMD,
MSG_GROUP_CMD, MSG_GROUP_CMD,
MSG_GROUPS, MSG_GROUPS,
MSG_PING_REQ,
cmd_envelope, cmd_envelope,
pack_announce, pack_announce,
pack_ping_rsp,
parse_group_cmd, parse_group_cmd,
parse_groups, parse_groups,
parse_ping_req,
wire_msg_type, wire_msg_type,
) )
from controller_messages import process_data from controller_messages import process_data
from settings import WIFI_CHANNEL_DEFAULT
_PING_DELAY_MS_MIN = 50
_PING_DELAY_MS_MAX = 500
_esp = None _esp = None
_groups_received = False _groups_received = False
_debug = False
def _dlog(*parts):
if _debug:
print(*parts)
def init_espnow(settings): def init_espnow(settings):
global _esp global _esp, _debug
ch = 6 _debug = bool(settings.get("debug", False))
try: try:
ch = int(settings.get("wifi_channel", 6)) ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT))
except (TypeError, ValueError): except (TypeError, ValueError):
pass ch = WIFI_CHANNEL_DEFAULT
ch = max(1, min(11, ch)) ch = max(1, min(11, ch))
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
sta.config(channel=ch) sta.config(pm=network.WLAN.PM_NONE)
try:
sta.config(channel=ch)
except Exception as e:
print("espnow sta channel set failed:", e)
_esp = espnow.ESPNow() _esp = espnow.ESPNow()
_esp.active(True) _esp.active(True)
try: try:
_esp.add_peer(BROADCAST_MAC) _esp.add_peer(BROADCAST_MAC)
_dlog("espnow add bcast ok")
except Exception as e:
print("espnow add bcast failed:", e)
try:
actual_ch = sta.config("channel")
except Exception: except Exception:
pass actual_ch = "?"
print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug)
return _esp return _esp
def _send_ping_rsp(host, settings, ping_id, delay_ms):
import utime
utime.sleep_ms(delay_ms)
if _esp is None or not host or len(host) != 6:
return
pkt = pack_ping_rsp(ping_id, settings.get("name", "led"))
try:
try:
_esp.add_peer(host)
_dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode())
except Exception as e:
_dlog("espnow ping add_peer skip", repr(e))
_esp.send(host, pkt)
print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode())
except Exception as e:
print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt))
async def _send_ping_rsp_delayed(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
await asyncio.sleep(delay_ms / 1000)
_send_ping_rsp(host, settings, ping_id, delay_ms)
def _schedule_ping_rsp(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
try:
import _thread
_thread.start_new_thread(_send_ping_rsp, (host, settings, ping_id, delay_ms))
except ImportError:
asyncio.create_task(_send_ping_rsp_delayed(host, settings, ping_id))
def send_boot_announce(settings): def send_boot_announce(settings):
if _esp is None: if _esp is None:
return return
@@ -61,13 +124,13 @@ def send_boot_announce(settings):
print("espnow announce failed:", e) print("espnow announce failed:", e)
def _handle_packet(pkt, settings, presets): def _handle_packet(host, pkt, settings, presets):
global _groups_received global _groups_received
mt = wire_msg_type(pkt) mt = wire_msg_type(pkt)
if mt == MSG_GROUPS: if mt == MSG_GROUPS:
ids = parse_groups(pkt) ids = parse_groups(pkt)
if ids is not None: if ids is not None:
dg.groups_replace(ids) dg.groups_replace(ids, settings)
_groups_received = True _groups_received = True
print("groups", ids) print("groups", ids)
return return
@@ -91,6 +154,11 @@ def _handle_packet(pkt, settings, presets):
if env: if env:
process_data(env, settings, presets, save=save) process_data(env, settings, presets, save=save)
return return
if mt == MSG_PING_REQ:
ping_id = parse_ping_req(pkt)
if ping_id is not None and host and len(host) == 6:
_schedule_ping_rsp(host, settings, ping_id)
return
if mt == MSG_ANNOUNCE: if mt == MSG_ANNOUNCE:
return return
@@ -112,7 +180,7 @@ async def espnow_receive_loop(settings, presets, wdt=None):
wdt.feed() wdt.feed()
continue continue
try: try:
_handle_packet(msg, settings, presets) _handle_packet(host, msg, settings, presets)
except Exception as e: except Exception as e:
print("espnow rx error:", e) print("espnow rx error:", e)
if wdt: if wdt:

View File

@@ -9,6 +9,8 @@ MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02 MSG_GROUPS = 0x02
MSG_CMD = 0x03 MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04 MSG_GROUP_CMD = 0x04
MSG_PING_REQ = 0x05
MSG_PING_RSP = 0x06
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff" BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
@@ -104,6 +106,29 @@ def cmd_envelope(payload):
return env[:need], save return env[:need], save
def pack_ping_req(ping_id):
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF)
return _pack_header(MSG_PING_REQ, body)
def parse_ping_req(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_PING_REQ:
return None
body = payload[2:]
else:
body = payload
if len(body) < 4:
return None
return struct.unpack("<I", body[:4])[0]
def pack_ping_rsp(ping_id, name):
name_b = name.encode("utf-8")
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF) + bytes([len(name_b)]) + name_b
return _pack_header(MSG_PING_RSP, body)
def wire_msg_type(payload): def wire_msg_type(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC: if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return payload[1] return payload[1]

View File

@@ -3,14 +3,14 @@ from settings import Settings
import machine import machine
import asyncio import asyncio
import gc import gc
import json
import network import network
import espnow import espnow
import device_groups as dg
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern from controller_messages import apply_startup_pattern, process_data
from background_tasks import presets_loop from espnow_transport import _handle_packet, init_espnow
from espnow_transport import espnow_receive_loop, init_espnow, send_boot_announce from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
from mem_stats import print_mem
import json
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
@@ -18,6 +18,8 @@ wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
dg.load_from_settings(settings)
print(settings)
gc.collect() gc.collect()
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
@@ -28,32 +30,41 @@ gc.collect()
apply_startup_pattern(settings, presets) apply_startup_pattern(settings, presets)
sta_if = network.WLAN(network.STA_IF) esp = init_espnow(settings)
sta_if.active(True) print(network.WLAN(network.STA_IF).config("channel"))
print(sta_if.ifconfig())
print(sta_if.config("channel"))
esp = espnow.ESPNow()
esp.active(True)
esp.add_peer(b"\xff\xff\xff\xff\xff\xff")
hello = json.dumps({ hello = json.dumps({
"v": "1", "v": "1",
"settings": settings, "name": settings.get("name", "led"),
"type": "led", "type": "led",
}) })
esp.send(b"\xff\xff\xff\xff\xff\xff", hello)
print(hello) print(hello)
esp.send(BROADCAST_MAC, hello)
print("espnow hello", len(hello), "B")
async def main():
while True: def _on_espnow_message(host, msg):
presets.tick() if not msg:
wdt.feed() return
if esp.any(): if msg[0] == WIRE_MAGIC:
host, msg = esp.recv(0) _handle_packet(host, msg, settings, presets)
print(host, msg) return
await asyncio.sleep(0) if msg[0:1] == b"{":
process_data(msg, settings, presets)
if __name__ == "__main__":
asyncio.run(main())
while True:
wdt.feed()
while esp.any():
host, msg = esp.recv(0)
if not host or not msg:
continue
print(host, len(msg), "B")
try:
_on_espnow_message(host, msg)
print(msg)
except Exception as e:
print("espnow rx error:", e)
presets.tick()

View File

@@ -4,6 +4,7 @@ from preset import Preset
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys import sys
import utime
try: try:
import uos as os import uos as os
except ImportError: except ImportError:
@@ -31,6 +32,7 @@ class Presets:
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": self.blink,
} }
self.patterns.update(self._load_dynamic_patterns()) self.patterns.update(self._load_dynamic_patterns())
@@ -193,20 +195,23 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
# Manual single-shot patterns: if this select arrives before the main loop has if preset.p == "off":
# tick()'d the previous frame, completing it first keeps step in sync with beats. self.generator = None
self.step = 0
self.fill((0, 0, 0))
self.selected = preset_name
return True
# If re-selecting the same preset before the main loop has tick()'d the
# previous frame, run one pending tick so step stays in sync.
if ( if (
preset_name == self.selected preset_name == self.selected
and not preset.a
and preset.p in ("chase", "pulse")
and self.generator is not None and self.generator is not None
): ):
while self.generator is not None: self.tick()
self.tick()
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
elif preset.p == "off" or self.selected != preset_name: elif self.selected != preset_name:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
@@ -256,4 +261,44 @@ class Presets:
def on(self, preset): def on(self, preset):
colors = preset.c colors = preset.c
color = colors[0] if colors else (255, 255, 255) color = colors[0] if colors else (255, 255, 255)
self.fill(self.apply_brightness(color, preset.b)) lit = self.apply_brightness(color, preset.b)
while True:
self.fill(lit)
yield
def blink(self, preset):
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0
delay_ms = max(1, int(preset.d))
period = delay_ms * 2
now = utime.ticks_ms()
# Phase-lock to wall time so group identify (broadcast select) stays in sync even
# when devices process the packet on different main-loop iterations.
phase = now % period if period else 0
state = phase < delay_ms
last_update = utime.ticks_add(now, -phase)
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
state = not state
last_update = utime.ticks_add(last_update, delay_ms)
yield
def run_tick(presets):
"""Advance one animation frame (standalone tests / mpremote demos)."""
presets.tick()

View File

@@ -3,6 +3,8 @@ import ubinascii
import machine import machine
import network import network
WIFI_CHANNEL_DEFAULT = 5
class Settings(dict): class Settings(dict):
SETTINGS_FILE = "/settings.json" SETTINGS_FILE = "/settings.json"
@@ -14,9 +16,9 @@ class Settings(dict):
def set_defaults(self): def set_defaults(self):
self["led_pin"] = 10 self["led_pin"] = 10
self["num_leds"] = 119 self["num_leds"] = 200
self["color_order"] = "rgb" self["color_order"] = "grb"
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
@@ -32,8 +34,9 @@ class Settings(dict):
self["startup_mode"] = "default" self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["wifi_channel"] = 1 self["wifi_channel"] = WIFI_CHANNEL_DEFAULT
self["groups"] = []
def save(self): def save(self):
try: try:

36
src/v1_wire.py Normal file
View File

@@ -0,0 +1,36 @@
"""Expand short v1 wire keys to long names (MicroPython)."""
K_PRESETS = "p"
K_SELECT = "s"
K_GROUPS = "g"
K_SET_GROUPS = "sg"
K_SAVE = "sv"
K_DEFAULT = "df"
K_DEVICE_CONFIG = "dc"
K_CLEAR_PRESETS = "cp"
K_MANIFEST = "mf"
_SHORT_TO_LONG = {
K_PRESETS: "presets",
K_SELECT: "select",
K_GROUPS: "groups",
K_SET_GROUPS: "set_groups",
K_SAVE: "save",
K_DEFAULT: "default",
K_DEVICE_CONFIG: "device_config",
K_CLEAR_PRESETS: "clear_presets",
K_MANIFEST: "manifest",
}
def expand_v1(data):
if not isinstance(data, dict):
return data
out = dict(data)
for short_key, long_key in _SHORT_TO_LONG.items():
if short_key in data and long_key not in out:
out[long_key] = data[short_key]
if short_key in out:
del out[short_key]
return out

169
tests/bridge_ws_blink.py Normal file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
pattern, and colours; this script reaches the driver over ESP-NOW through
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
pipenv run python led-driver/tests/bridge_ws_blink.py
pipenv run python led-driver/tests/bridge_ws_blink.py \\
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
From **led-driver** (if Pipenv/env is the parent project)::
pipenv run python tests/bridge_ws_blink.py --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# led-driver/tests -> led-driver -> led-controller
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
PROJECT_ROOT = LED_DRIVER_ROOT.parent
def _load_bridge_url(explicit: Optional[str]) -> str:
if explicit and explicit.strip():
return explicit.strip()
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
url = str(data.get("bridge_ws_url") or "").strip()
if url:
return url
except (OSError, json.JSONDecodeError, TypeError):
pass
return "ws://192.168.4.1/ws"
def _format_mac(mac: str) -> str:
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
def build_blink_envelope(
mac: str,
*,
preset_id: str = "2",
delay_ms: int = 200,
brightness: int = 64,
) -> Dict[str, Any]:
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
body = {
"p": {
preset_id: {
"p": "blink",
"b": max(0, min(255, int(brightness))),
"d": max(1, int(delay_ms)),
"c": ["#FF0000", "#0000FF"],
"a": True,
}
},
"s": [str(preset_id)],
}
return {"v": "1", "dv": {_format_mac(mac): body}}
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
import websockets
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
print(f"connecting to {url}")
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
print(f"connected, sending {len(packet)} B")
print(packet.decode("utf-8"))
await ws.send(packet)
if hold_s > 0:
print(f"holding connection {hold_s}s …")
await asyncio.sleep(hold_s)
print("done")
def main() -> int:
parser = argparse.ArgumentParser(
description="Send blink preset+select to one driver via bridge WebSocket.",
)
parser.add_argument(
"--url",
default=None,
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
)
parser.add_argument(
"--mac",
default="188b0e1560a8",
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
)
parser.add_argument(
"--preset-id",
default="2",
help="Wire preset slot id (default: 2, matches zone push)",
)
parser.add_argument(
"--delay-ms",
type=int,
default=200,
help="Blink delay in ms (default: 200)",
)
parser.add_argument(
"--brightness",
type=int,
default=64,
help="Preset brightness 0255 (default: 64)",
)
parser.add_argument(
"--hold",
type=float,
default=2.0,
help="Seconds to keep WebSocket open after send (default: 2)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print envelope only; do not connect",
)
args = parser.parse_args()
url = _load_bridge_url(args.url)
try:
envelope = build_blink_envelope(
args.mac,
preset_id=args.preset_id,
delay_ms=args.delay_ms,
brightness=args.brightness,
)
except ValueError as e:
print(f"error: {e}", file=sys.stderr)
return 1
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
if args.dry_run:
print(json.dumps(envelope, indent=2))
return 0
try:
asyncio.run(_send(url, envelope, args.hold))
except KeyboardInterrupt:
print("interrupted")
return 130
except Exception as e:
print(f"failed: {e!r}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,116 @@
"""Device-side radio diagnostic test (MicroPython).
Checks:
1) STA/AP bring-up on channel 5
2) ESP-NOW init and broadcast peer add
3) Broadcast TX test packet send
4) RX wait window to see any incoming ESP-NOW frames
"""
import espnow
import machine
import network
import time
import ubinascii
CHANNEL = 5
RX_WINDOW_MS = 3000
WDT_TIMEOUT_MS = 10000
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00"
def _mac_hex(mac_bytes):
try:
return ubinascii.hexlify(mac_bytes).decode()
except Exception:
return "?"
def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
print("diag start")
print("cpu freq", machine.freq())
sta = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
# Clean start
try:
sta.active(False)
ap.active(False)
time.sleep_ms(100)
except Exception as e:
print("wifi reset failed", repr(e))
# STA setup
try:
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac")))
except Exception as e:
print("sta setup failed", repr(e))
# AP setup
try:
ap.active(True)
try:
ap.config(essid="diag-ap", channel=channel, hidden=True)
except TypeError:
ap.config(essid="diag-ap", channel=channel)
print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac")))
except Exception as e:
print("ap setup failed", repr(e))
wdt.feed()
# ESP-NOW setup
try:
e = espnow.ESPNow()
e.active(True)
print("espnow active ok")
except Exception as e_err:
print("espnow init failed", repr(e_err))
return
# Add broadcast peer
try:
e.add_peer(BROADCAST_MAC, channel=channel)
print("add bcast peer ok")
except TypeError:
try:
e.add_peer(BROADCAST_MAC)
print("add bcast peer ok (no channel arg)")
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
# TX test
try:
ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True)
print("tx bcast", ok, "len", len(TEST_PAYLOAD))
except Exception as e_err:
print("tx bcast failed", repr(e_err))
# RX window
print("rx window ms", rx_window_ms)
t_end = time.ticks_add(time.ticks_ms(), rx_window_ms)
rx_count = 0
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
wdt.feed()
host, msg = e.recv(100)
if host:
rx_count += 1
print("rx", rx_count, _mac_hex(host), "len", len(msg))
time.sleep_ms(5)
print("diag done rx_count", rx_count)
if __name__ == "__main__":
run_diag()

View File

@@ -0,0 +1,40 @@
"""Device test: receive ESP-NOW packets on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
import time
CHANNEL = 5
TIMEOUT_MS = 1000
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
print("recv ready ch", channel)
while True:
wdt.feed()
host, msg = e.recv(timeout_ms)
if host:
mac_hex = ubinascii.hexlify(host).decode()
print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode())
else:
print("rx timeout")
time.sleep_ms(10)
if __name__ == "__main__":
recv_loop()

View File

@@ -0,0 +1,47 @@
"""Device test: send one ESP-NOW packet on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
CHANNEL = 5
DEST_HEX = "ffffffffffff"
PAYLOAD_HEX = "4c0501000000"
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def _add_peer(esp, dest, channel):
try:
esp.add_peer(dest, channel=channel)
except TypeError:
esp.add_peer(dest)
except OSError:
pass
def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
dest = ubinascii.unhexlify(dest_hex)
pkt = ubinascii.unhexlify(payload_hex)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
_add_peer(e, dest, channel)
wdt.feed()
ok = e.send(dest, pkt, True)
print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt))
return ok
if __name__ == "__main__":
send_once()

View File

@@ -1,35 +1,74 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Standalone blink pattern demo (WDT-fed tick loop).
Run on device::
mpremote connect <port> run tests/patterns/blink.py
"""
import sys
import utime import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick def _bootstrap_import_path():
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
try:
import uos as os
except ImportError:
import os
candidates = []
try:
here = __file__.rsplit("/", 1)[0]
if here:
candidates.append(here)
tests = here.rsplit("/", 1)[0]
if tests:
candidates.append(tests)
root = tests.rsplit("/", 1)[0]
if root:
candidates.append(root)
candidates.append(root + "/src")
except NameError:
pass
for p in (".", "..", "/", "src", "/src"):
candidates.append(p)
for p in candidates:
if p and p not in sys.path:
sys.path.insert(0, p)
_bootstrap_import_path()
from machine import WDT # noqa: E402
from settings import Settings # noqa: E402
from presets import Presets # noqa: E402
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
presets.tick()
utime.sleep_ms(sleep_ms)
def main(): def main():
s = Settings() settings = Settings()
pin = s.get("led_pin", 10) presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000) wdt = WDT(timeout=10000)
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors)
p.edit("test_blink", {
"p": "blink",
"b": 64,
"d": 200,
"c": [(255, 0, 0), (0, 0, 255)],
})
p.select("test_blink")
start = utime.ticks_ms() presets.edit(
while utime.ticks_diff(utime.ticks_ms(), start) < 1500: "test_blink",
wdt.feed() {
run_tick(p) "p": "blink",
utime.sleep_ms(10) "b": 64,
"d": 200,
"c": [(255, 0, 0), (0, 0, 255)],
},
)
presets.select("test_blink")
_run_ms(presets, wdt, 1500)
if __name__ == "__main__": if __name__ == "__main__":
main() main()