3 Commits

Author SHA1 Message Date
a97f6c7c2c feat(espnow): groups filter and v1 select list on driver
Apply group membership on RX, accept select as [preset_id, step?],
and fix identify/off plus presets layout for manual beat stepping.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 01:44:21 +12:00
1fdb2c9441 fix(espnow): handle binary and JSON RX in simplified main
Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON
v1 payloads to process_data from the poll loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:45:13 +12:00
3e718f7432 feat(espnow): add wire transport and simplify broadcast main
Binary espnow_wire/espnow_transport modules plus a minimal main that
broadcasts a JSON hello and polls ESP-NOW while running presets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:44:39 +12:00
11 changed files with 687 additions and 217 deletions

View File

@@ -1,11 +1,7 @@
import asyncio import asyncio
import utime import utime
from hello import broadcast_hello_udp
from mem_stats import print_mem from mem_stats import print_mem
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt): async def presets_loop(presets, wdt):
@@ -18,41 +14,4 @@ async def presets_loop(presets, wdt):
if utime.ticks_diff(now, last_mem_log) >= 5000: if utime.ticks_diff(now, last_mem_log) >= 5000:
print_mem("runtime") print_mem("runtime")
last_mem_log = now last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s)

View File

@@ -2,7 +2,11 @@
import json import json
import socket import socket
import network
import ubinascii
import device_groups as dg
from v1_wire import expand_v1
from binary_envelope import parse_binary_envelope from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -40,8 +44,8 @@ def _log_rx(payload) -> None:
print("rx (logging failed)") print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None, save=False):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" """Read one controller message; binary v2 envelope or JSON v1, then apply fields."""
_log_rx(payload) _log_rx(payload)
data = None data = None
if isinstance(payload, (bytes, bytearray)): if isinstance(payload, (bytes, bytearray)):
@@ -58,6 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None):
return return
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
data = expand_v1(data)
if save:
data["save"] = True
set_groups = bool(data.get("set_groups"))
groups = data.get("groups")
if set_groups and isinstance(groups, list):
dg.groups_replace(groups)
print("groups set", dg.list_groups())
elif isinstance(groups, list) and groups:
if not any(dg.in_group(str(g)) for g in groups):
print("ignored: not in groups", groups)
return
if "device_config" in data: if "device_config" in data:
apply_device_config(data, settings, presets) apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
@@ -66,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data: if "clear_presets" in data:
apply_clear_presets(data, presets) apply_clear_presets(data, presets)
if "select" in data: elif "select" in data or "s" in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
apply_default(data, settings, presets) apply_default(data, settings, presets)
@@ -170,7 +186,18 @@ def apply_brightness(data, settings, presets):
pass pass
_pending_select = None
def _run_select(presets, settings, preset_name, step=None):
if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
return True
return False
def apply_presets(data, settings, presets): def apply_presets(data, settings, presets):
global _pending_select
presets_map = data["presets"] presets_map = data["presets"]
for id, preset_data in presets_map.items(): for id, preset_data in presets_map.items():
if not preset_data: if not preset_data:
@@ -181,8 +208,8 @@ def apply_presets(data, settings, presets):
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings preset_data[color_key], settings
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError) as err:
continue print("preset color convert failed:", id, err)
if "bg" in preset_data: if "bg" in preset_data:
try: try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
@@ -191,18 +218,72 @@ def apply_presets(data, settings, presets):
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
pass pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
# Same message often carries select; apply now while presets are loaded.
if "select" in data:
apply_select(data, settings, presets)
elif _pending_select is not None:
preset_name, step = _pending_select
_pending_select = None
if preset_name in presets.presets or preset_name in ("on", "off"):
_run_select(presets, settings, preset_name, step)
def _select_list_for_this_device(select_val, settings):
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
if isinstance(select_val, list) and select_val:
return select_val
if isinstance(select_val, str) and str(select_val).strip():
return [str(select_val).strip()]
if not isinstance(select_val, dict) or not select_val:
return None
if "preset" in select_val:
preset_name = select_val.get("preset")
if preset_name is None:
return None
out = [str(preset_name)]
if "step" in select_val:
out.append(select_val["step"])
return out
device_name = str(settings.get("name") or "").strip()
select_list = select_val.get(device_name)
if select_list:
return select_list
try:
sta = network.WLAN(network.STA_IF)
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
except Exception:
mac_hex = ""
if mac_hex:
for key in select_val:
k = str(key).lower().replace(":", "").replace("-", "")
if mac_hex in k:
return select_val[key]
if len(select_val) == 1:
return next(iter(select_val.values()))
return None
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
select_map = data["select"] global _pending_select
device_name = settings["name"] select_val = data.get("select")
select_list = select_map.get(device_name, []) if select_val is None:
select_val = data.get("s")
select_list = _select_list_for_this_device(select_val, settings)
if not select_list: if not select_list:
print("select ignored:", repr(select_val))
return
preset_name = str(select_list[0]).strip()
if not preset_name:
return return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
if presets.select(preset_name, step=step): if preset_name not in presets.presets and preset_name not in ("on", "off"):
record_last_preset(settings, preset_name) _pending_select = (preset_name, step)
print("select deferred (preset not loaded yet):", preset_name)
return
if _run_select(presets, settings, preset_name, step):
_pending_select = None
else:
print("select failed:", preset_name)
def apply_clear_presets(data, presets): def apply_clear_presets(data, presets):

16
src/device_groups.py Normal file
View File

@@ -0,0 +1,16 @@
"""In-memory group membership for GROUP_CMD filtering."""
_groups = []
def groups_replace(group_ids):
global _groups
_groups = [str(g) for g in group_ids]
def in_group(group_id):
return str(group_id) in _groups
def list_groups():
return list(_groups)

119
src/espnow_transport.py Normal file
View File

@@ -0,0 +1,119 @@
"""ESP-NOW receive loop and boot announce."""
import asyncio
import espnow
import network
import device_groups as dg
from espnow_wire import (
BROADCAST_MAC,
MSG_ANNOUNCE,
MSG_CMD,
MSG_GROUP_CMD,
MSG_GROUPS,
cmd_envelope,
pack_announce,
parse_group_cmd,
parse_groups,
wire_msg_type,
)
from controller_messages import process_data
_esp = None
_groups_received = False
def init_espnow(settings):
global _esp
ch = 6
try:
ch = int(settings.get("wifi_channel", 1))
except (TypeError, ValueError):
pass
ch = max(1, min(11, ch))
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=ch)
_esp = espnow.ESPNow()
_esp.active(True)
try:
_esp.add_peer(BROADCAST_MAC)
except Exception:
pass
return _esp
def send_boot_announce(settings):
if _esp is None:
return
pkt = pack_announce(
settings.get("name", "led"),
settings.get("num_leds", 1),
color_order=settings.get("color_order", "rgb"),
startup_mode=settings.get("startup_mode", "default"),
brightness=settings.get("brightness", 32),
)
try:
_esp.send(BROADCAST_MAC, pkt)
print("espnow announce", len(pkt), "B")
except Exception as e:
print("espnow announce failed:", e)
def _handle_packet(pkt, settings, presets):
global _groups_received
mt = wire_msg_type(pkt)
if mt == MSG_GROUPS:
ids = parse_groups(pkt)
if ids is not None:
dg.groups_replace(ids)
_groups_received = True
print("groups", ids)
return
if mt == MSG_GROUP_CMD:
parsed = parse_group_cmd(pkt)
if parsed is None:
return
gid, env = parsed
if not dg.in_group(gid):
return
from espnow_wire import _envelope_size
need = _envelope_size(env)
save = len(env) > need and env[need] == 1
body = env[:need] if save else env
if body:
process_data(body, settings, presets, save=save)
return
if mt == MSG_CMD:
env, save = cmd_envelope(pkt)
if env:
process_data(env, settings, presets, save=save)
return
if mt == MSG_ANNOUNCE:
return
async def espnow_receive_loop(settings, presets, wdt=None):
global _groups_received
while True:
if _esp is None:
await asyncio.sleep(0.1)
continue
host, msg = _esp.recv(0)
if not host:
if not _groups_received:
await asyncio.sleep(5)
send_boot_announce(settings)
else:
await asyncio.sleep(0.02)
if wdt:
wdt.feed()
continue
try:
_handle_packet(msg, settings, presets)
except Exception as e:
print("espnow rx error:", e)
if wdt:
wdt.feed()

110
src/espnow_wire.py Normal file
View File

@@ -0,0 +1,110 @@
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
import struct
WIRE_MAGIC = 0x4C
MAX_ESPNOW_PAYLOAD = 250
MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02
MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
COLOR_ORDER_TO_ENUM = {
"rgb": 0,
"rbg": 1,
"grb": 2,
"gbr": 3,
"brg": 4,
"bgr": 5,
}
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
def _pack_header(msg_type, body):
pkt = bytes([WIRE_MAGIC, msg_type]) + body
if len(pkt) > MAX_ESPNOW_PAYLOAD:
raise ValueError("packet too large")
return pkt
def pack_announce(
name,
num_leds,
color_order="rgb",
startup_mode="default",
brightness=32,
device_type=0,
):
name_b = name.encode("utf-8")
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
body = (
bytes([len(name_b)])
+ name_b
+ struct.pack("<H", int(num_leds))
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
)
return _pack_header(MSG_ANNOUNCE, body)
def parse_groups(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_GROUPS:
return None
body = payload[2:]
else:
body = payload
if not body:
return []
off = 0
count = body[off]
off += 1
out = []
for _ in range(count):
gl = body[off]
off += 1
out.append(body[off : off + gl].decode("utf-8"))
off += gl
return out
def parse_group_cmd(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
return None
body = payload[2:]
gl = body[0]
gid = body[1 : 1 + gl].decode("utf-8")
env = body[1 + gl :]
return gid, env
HEADER_LEN = 5
def _envelope_size(env):
if len(env) < HEADER_LEN:
return len(env)
lp, ls, ld = env[2], env[3], env[4]
return HEADER_LEN + lp + ls + ld
def cmd_envelope(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
return None, False
env = payload[2:]
if not env:
return None, False
need = _envelope_size(env)
if need > len(env):
return None, False
save = len(env) > need and env[need] == 1
return env[:need], save
def wire_msg_type(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return payload[1]
return None

View File

@@ -1,33 +1,24 @@
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms] import print_timestamp # noqa: F401
from settings import Settings from settings import Settings
import machine import machine
import utime
import asyncio import asyncio
import json
import gc import gc
from microdot import Microdot import json
from microdot.websocket import WebSocketError, with_websocket import network
import espnow
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern, process_data from controller_messages import apply_startup_pattern, process_data
from runtime_state import RuntimeState from espnow_transport import _handle_packet, init_espnow
from background_tasks import presets_loop, udp_hello_loop_after_http_ready from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
from mem_stats import print_mem
from wifi_sta import boot_sta
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings)
gc.collect() gc.collect()
sta_if = boot_sta(settings, wdt)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
@@ -37,130 +28,45 @@ gc.collect()
apply_startup_pattern(settings, presets) apply_startup_pattern(settings, presets)
esp = init_espnow(settings)
print(network.WLAN(network.STA_IF).config("channel"))
def _print_network_ips(controller_ip=None): hello = json.dumps({
"""Always log STA address and led-controller (WS client) address when known.""" "v": "1",
try: "name": settings.get("name", "led"),
led_ip = sta_if.ifconfig()[0] "type": "led",
except Exception: })
led_ip = "?" print(hello)
ctrl = controller_ip if controller_ip else "(not connected)"
print("led-driver IP:", led_ip, " led-controller IP:", ctrl) esp.send(BROADCAST_MAC, hello)
print("espnow hello", len(hello), "B")
_print_network_ips() def _on_espnow_message(msg):
print_mem("startup") if not msg:
return
runtime_state = RuntimeState() if msg[0] == WIRE_MAGIC:
_handle_packet(msg, settings, presets)
app = Microdot() return
if msg[0:1] == b"{":
process_data(msg, settings, presets)
def _safe_pattern_filename(name): async def main():
if not isinstance(name, str): while True:
return False presets.tick()
if not name.endswith(".py"): wdt.feed()
return False if esp.any():
if "/" in name or "\\" in name or ".." in name: host, msg = esp.recv(0)
return False if host and msg:
return True print(host, len(msg), "B")
try:
_on_espnow_message(msg)
@app.route("/ws") print(msg)
@with_websocket except Exception as e:
async def ws_handler(request, ws): print("espnow rx error:", e)
runtime_state.ws_connected() await asyncio.sleep(0)
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
_print_network_ips(controller_ip)
print_mem("ws connect")
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def main(port=80):
asyncio.create_task(presets_loop(presets, wdt))
asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main(port=80)) asyncio.run(main())

View File

@@ -4,6 +4,7 @@ from preset import Preset
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys import sys
import utime
try: try:
import uos as os import uos as os
except ImportError: except ImportError:
@@ -31,6 +32,7 @@ class Presets:
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": self.blink,
} }
self.patterns.update(self._load_dynamic_patterns()) self.patterns.update(self._load_dynamic_patterns())
@@ -193,6 +195,12 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
if preset.p == "off":
self.generator = None
self.step = 0
self.fill((0, 0, 0))
self.selected = preset_name
return True
# Manual single-shot patterns: if this select arrives before the main loop has # Manual single-shot patterns: if this select arrives before the main loop has
# tick()'d the previous frame, completing it first keeps step in sync with beats. # tick()'d the previous frame, completing it first keeps step in sync with beats.
if ( if (
@@ -206,7 +214,7 @@ class Presets:
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
elif preset.p == "off" or self.selected != preset_name: elif self.selected != preset_name:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
@@ -256,4 +264,33 @@ class Presets:
def on(self, preset): def on(self, preset):
colors = preset.c colors = preset.c
color = colors[0] if colors else (255, 255, 255) color = colors[0] if colors else (255, 255, 255)
self.fill(self.apply_brightness(color, preset.b)) lit = self.apply_brightness(color, preset.b)
while True:
self.fill(lit)
yield
def blink(self, preset):
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0
state = True
last_update = utime.ticks_ms()
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) >= delay_ms:
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
state = not state
last_update = utime.ticks_add(last_update, delay_ms)
yield
def run_tick(presets):
"""Advance one animation frame (standalone tests / mpremote demos)."""
presets.tick()

View File

@@ -31,11 +31,9 @@ class Settings(dict):
# Power-on: "default" | "last" | "off" # Power-on: "default" | "last" | "off"
self["startup_mode"] = "default" self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
# ESP-NOW transport (requires espnow firmware; uses wifi_channel).
self["ssid"] = ""
self["password"] = ""
def save(self): def save(self):
try: try:

36
src/v1_wire.py Normal file
View File

@@ -0,0 +1,36 @@
"""Expand short v1 wire keys to long names (MicroPython)."""
K_PRESETS = "p"
K_SELECT = "s"
K_GROUPS = "g"
K_SET_GROUPS = "sg"
K_SAVE = "sv"
K_DEFAULT = "df"
K_DEVICE_CONFIG = "dc"
K_CLEAR_PRESETS = "cp"
K_MANIFEST = "mf"
_SHORT_TO_LONG = {
K_PRESETS: "presets",
K_SELECT: "select",
K_GROUPS: "groups",
K_SET_GROUPS: "set_groups",
K_SAVE: "save",
K_DEFAULT: "default",
K_DEVICE_CONFIG: "device_config",
K_CLEAR_PRESETS: "clear_presets",
K_MANIFEST: "manifest",
}
def expand_v1(data):
if not isinstance(data, dict):
return data
out = dict(data)
for short_key, long_key in _SHORT_TO_LONG.items():
if short_key in data and long_key not in out:
out[long_key] = data[short_key]
if short_key in out:
del out[short_key]
return out

169
tests/bridge_ws_blink.py Normal file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
pattern, and colours; this script reaches the driver over ESP-NOW through
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
pipenv run python led-driver/tests/bridge_ws_blink.py
pipenv run python led-driver/tests/bridge_ws_blink.py \\
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
From **led-driver** (if Pipenv/env is the parent project)::
pipenv run python tests/bridge_ws_blink.py --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# led-driver/tests -> led-driver -> led-controller
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
PROJECT_ROOT = LED_DRIVER_ROOT.parent
def _load_bridge_url(explicit: Optional[str]) -> str:
if explicit and explicit.strip():
return explicit.strip()
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
url = str(data.get("bridge_ws_url") or "").strip()
if url:
return url
except (OSError, json.JSONDecodeError, TypeError):
pass
return "ws://192.168.4.1/ws"
def _format_mac(mac: str) -> str:
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
def build_blink_envelope(
mac: str,
*,
preset_id: str = "2",
delay_ms: int = 200,
brightness: int = 64,
) -> Dict[str, Any]:
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
body = {
"p": {
preset_id: {
"p": "blink",
"b": max(0, min(255, int(brightness))),
"d": max(1, int(delay_ms)),
"c": ["#FF0000", "#0000FF"],
"a": True,
}
},
"s": [str(preset_id)],
}
return {"v": "1", "dv": {_format_mac(mac): body}}
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
import websockets
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
print(f"connecting to {url}")
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
print(f"connected, sending {len(packet)} B")
print(packet.decode("utf-8"))
await ws.send(packet)
if hold_s > 0:
print(f"holding connection {hold_s}s …")
await asyncio.sleep(hold_s)
print("done")
def main() -> int:
parser = argparse.ArgumentParser(
description="Send blink preset+select to one driver via bridge WebSocket.",
)
parser.add_argument(
"--url",
default=None,
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
)
parser.add_argument(
"--mac",
default="188b0e1560a8",
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
)
parser.add_argument(
"--preset-id",
default="2",
help="Wire preset slot id (default: 2, matches zone push)",
)
parser.add_argument(
"--delay-ms",
type=int,
default=200,
help="Blink delay in ms (default: 200)",
)
parser.add_argument(
"--brightness",
type=int,
default=64,
help="Preset brightness 0255 (default: 64)",
)
parser.add_argument(
"--hold",
type=float,
default=2.0,
help="Seconds to keep WebSocket open after send (default: 2)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print envelope only; do not connect",
)
args = parser.parse_args()
url = _load_bridge_url(args.url)
try:
envelope = build_blink_envelope(
args.mac,
preset_id=args.preset_id,
delay_ms=args.delay_ms,
brightness=args.brightness,
)
except ValueError as e:
print(f"error: {e}", file=sys.stderr)
return 1
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
if args.dry_run:
print(json.dumps(envelope, indent=2))
return 0
try:
asyncio.run(_send(url, envelope, args.hold))
except KeyboardInterrupt:
print("interrupted")
return 130
except Exception as e:
print(f"failed: {e!r}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,35 +1,74 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Standalone blink pattern demo (WDT-fed tick loop).
Run on device::
mpremote connect <port> run tests/patterns/blink.py
"""
import sys
import utime import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick def _bootstrap_import_path():
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
try:
import uos as os
except ImportError:
import os
candidates = []
try:
here = __file__.rsplit("/", 1)[0]
if here:
candidates.append(here)
tests = here.rsplit("/", 1)[0]
if tests:
candidates.append(tests)
root = tests.rsplit("/", 1)[0]
if root:
candidates.append(root)
candidates.append(root + "/src")
except NameError:
pass
for p in (".", "..", "/", "src", "/src"):
candidates.append(p)
for p in candidates:
if p and p not in sys.path:
sys.path.insert(0, p)
_bootstrap_import_path()
from machine import WDT # noqa: E402
from settings import Settings # noqa: E402
from presets import Presets # noqa: E402
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
presets.tick()
utime.sleep_ms(sleep_ms)
def main(): def main():
s = Settings() settings = Settings()
pin = s.get("led_pin", 10) presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000) wdt = WDT(timeout=10000)
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors)
p.edit("test_blink", {
"p": "blink",
"b": 64,
"d": 200,
"c": [(255, 0, 0), (0, 0, 255)],
})
p.select("test_blink")
start = utime.ticks_ms() presets.edit(
while utime.ticks_diff(utime.ticks_ms(), start) < 1500: "test_blink",
wdt.feed() {
run_tick(p) "p": "blink",
utime.sleep_ms(10) "b": 64,
"d": 200,
"c": [(255, 0, 0), (0, 0, 255)],
},
)
presets.select("test_blink")
_run_ms(presets, wdt, 1500)
if __name__ == "__main__": if __name__ == "__main__":
main() main()