11 Commits

Author SHA1 Message Date
3286c4002d chore(settings): update default num_leds and color_order
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:56 +12:00
68eb547ec4 feat(espnow): add debug logging and channel diagnostics
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:54 +12:00
8403df531d feat(espnow): improve bridge transport and driver sync
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-28 00:38:09 +12:00
088fe161a8 fix(main): blocking espnow rx loop and pass peer host
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
c9895df512 fix(presets): phase-lock blink and one tick on re-select
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
39a84696c3 feat(espnow): ping request/response with jittered delay
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:39 +12:00
c7560b2e87 fix(settings): default wifi channel to 5
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:36 +12:00
ea21563900 fix(controller): apply select when presets not in message
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:30 +12:00
a97f6c7c2c feat(espnow): groups filter and v1 select list on driver
Apply group membership on RX, accept select as [preset_id, step?],
and fix identify/off plus presets layout for manual beat stepping.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 01:44:21 +12:00
1fdb2c9441 fix(espnow): handle binary and JSON RX in simplified main
Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON
v1 payloads to process_data from the poll loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:45:13 +12:00
3e718f7432 feat(espnow): add wire transport and simplify broadcast main
Binary espnow_wire/espnow_transport modules plus a minimal main that
broadcasts a JSON hello and polls ESP-NOW while running presets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:44:39 +12:00
15 changed files with 1038 additions and 228 deletions

8
bulk.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
PORT="${1:-/dev/ttyACM0}"
while true; do
ls "$PORT" && led-cli -p "$PORT" --erase --src --patterns && led-cli -p "$PORT" --reset -f
sleep 0.5
done

View File

@@ -1,11 +1,7 @@
import asyncio import asyncio
import utime import utime
from hello import broadcast_hello_udp
from mem_stats import print_mem from mem_stats import print_mem
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt): async def presets_loop(presets, wdt):
@@ -18,41 +14,4 @@ async def presets_loop(presets, wdt):
if utime.ticks_diff(now, last_mem_log) >= 5000: if utime.ticks_diff(now, last_mem_log) >= 5000:
print_mem("runtime") print_mem("runtime")
last_mem_log = now last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s)

View File

@@ -2,7 +2,11 @@
import json import json
import socket import socket
import network
import ubinascii
import device_groups as dg
from v1_wire import expand_v1
from binary_envelope import parse_binary_envelope from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -40,8 +44,8 @@ def _log_rx(payload) -> None:
print("rx (logging failed)") print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None, save=False):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" """Read one controller message; binary v2 envelope or JSON v1, then apply fields."""
_log_rx(payload) _log_rx(payload)
data = None data = None
if isinstance(payload, (bytes, bytearray)): if isinstance(payload, (bytes, bytearray)):
@@ -58,6 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None):
return return
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
data = expand_v1(data)
if save:
data["save"] = True
set_groups = bool(data.get("set_groups"))
groups = data.get("groups")
if set_groups and isinstance(groups, list):
dg.groups_replace(groups, settings)
print("groups set", dg.list_groups())
elif isinstance(groups, list) and groups:
if not any(dg.in_group(str(g)) for g in groups):
print("ignored: not in groups", groups)
return
if "device_config" in data: if "device_config" in data:
apply_device_config(data, settings, presets) apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
@@ -66,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data: if "clear_presets" in data:
apply_clear_presets(data, presets) apply_clear_presets(data, presets)
if "select" in data: if ("select" in data or "s" in data) and "presets" not in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
apply_default(data, settings, presets) apply_default(data, settings, presets)
@@ -80,6 +96,7 @@ def process_data(payload, settings, presets, controller_ip=None):
settings.save() settings.save()
if "save" in data and "device_config" in data: if "save" in data and "device_config" in data:
settings.save() settings.save()
_flush_pending_select(settings, presets)
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"}) _VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
@@ -170,7 +187,30 @@ def apply_brightness(data, settings, presets):
pass pass
_pending_select = None
def _run_select(presets, settings, preset_name, step=None):
if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
return True
return False
def _flush_pending_select(settings, presets):
global _pending_select
if _pending_select is None:
return
preset_name, step = _pending_select
if preset_name not in presets.presets and preset_name not in ("on", "off"):
return
_pending_select = None
if not _run_select(presets, settings, preset_name, step):
print("select failed (pending):", preset_name)
def apply_presets(data, settings, presets): def apply_presets(data, settings, presets):
global _pending_select
presets_map = data["presets"] presets_map = data["presets"]
for id, preset_data in presets_map.items(): for id, preset_data in presets_map.items():
if not preset_data: if not preset_data:
@@ -181,8 +221,8 @@ def apply_presets(data, settings, presets):
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings preset_data[color_key], settings
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError) as err:
continue print("preset color convert failed:", id, err)
if "bg" in preset_data: if "bg" in preset_data:
try: try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
@@ -191,18 +231,74 @@ def apply_presets(data, settings, presets):
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
pass pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
# Same message often carries select; apply now while presets are loaded.
if "select" in data or "s" in data:
apply_select(data, settings, presets)
else:
_flush_pending_select(settings, presets)
def _select_list_for_this_device(select_val, settings):
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
if isinstance(select_val, list) and select_val:
return select_val
if isinstance(select_val, str) and str(select_val).strip():
return [str(select_val).strip()]
if not isinstance(select_val, dict) or not select_val:
return None
if "preset" in select_val:
preset_name = select_val.get("preset")
if preset_name is None:
return None
out = [str(preset_name)]
if "step" in select_val:
out.append(select_val["step"])
return out
device_name = str(settings.get("name") or "").strip()
select_list = select_val.get(device_name)
if select_list:
return select_list
try:
sta = network.WLAN(network.STA_IF)
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
except Exception:
mac_hex = ""
if mac_hex:
for key in select_val:
k = str(key).lower().replace(":", "").replace("-", "")
if mac_hex in k:
return select_val[key]
if len(select_val) == 1:
return next(iter(select_val.values()))
return None
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
select_map = data["select"] global _pending_select
device_name = settings["name"] select_val = data.get("select")
select_list = select_map.get(device_name, []) if select_val is None:
select_val = data.get("s")
select_list = _select_list_for_this_device(select_val, settings)
if not select_list: if not select_list:
print("select ignored:", repr(select_val))
return
preset_name = str(select_list[0]).strip()
if not preset_name:
return return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
if presets.select(preset_name, step=step): if preset_name not in presets.presets and preset_name not in ("on", "off"):
record_last_preset(settings, preset_name) try:
presets.load(settings)
except Exception:
pass
if preset_name not in presets.presets and preset_name not in ("on", "off"):
_pending_select = (preset_name, step)
print("select deferred (preset not loaded yet):", preset_name)
return
if _run_select(presets, settings, preset_name, step):
_pending_select = None
else:
print("select failed:", preset_name)
def apply_clear_presets(data, presets): def apply_clear_presets(data, presets):

28
src/device_groups.py Normal file
View File

@@ -0,0 +1,28 @@
"""Group membership for GROUP_CMD filtering; persisted in settings.json."""
_groups = []
def load_from_settings(settings):
global _groups
g = settings.get("groups") if settings is not None else None
if isinstance(g, list):
_groups = [str(x) for x in g if str(x).strip()]
else:
_groups = []
def groups_replace(group_ids, settings=None, *, persist=True):
global _groups
_groups = [str(g) for g in group_ids]
if persist and settings is not None:
settings["groups"] = list(_groups)
settings.save()
def in_group(group_id):
return str(group_id) in _groups
def list_groups():
return list(_groups)

187
src/espnow_transport.py Normal file
View File

@@ -0,0 +1,187 @@
"""ESP-NOW receive loop and boot announce."""
import asyncio
import urandom
import ubinascii
import espnow
import network
import device_groups as dg
from espnow_wire import (
BROADCAST_MAC,
MSG_ANNOUNCE,
MSG_CMD,
MSG_GROUP_CMD,
MSG_GROUPS,
MSG_PING_REQ,
cmd_envelope,
pack_announce,
pack_ping_rsp,
parse_group_cmd,
parse_groups,
parse_ping_req,
wire_msg_type,
)
from controller_messages import process_data
from settings import WIFI_CHANNEL_DEFAULT
_PING_DELAY_MS_MIN = 50
_PING_DELAY_MS_MAX = 500
_esp = None
_groups_received = False
_debug = False
def _dlog(*parts):
if _debug:
print(*parts)
def init_espnow(settings):
global _esp, _debug
_debug = bool(settings.get("debug", False))
try:
ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT))
except (TypeError, ValueError):
ch = WIFI_CHANNEL_DEFAULT
ch = max(1, min(11, ch))
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
try:
sta.config(channel=ch)
except Exception as e:
print("espnow sta channel set failed:", e)
_esp = espnow.ESPNow()
_esp.active(True)
try:
_esp.add_peer(BROADCAST_MAC)
_dlog("espnow add bcast ok")
except Exception as e:
print("espnow add bcast failed:", e)
try:
actual_ch = sta.config("channel")
except Exception:
actual_ch = "?"
print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug)
return _esp
def _send_ping_rsp(host, settings, ping_id, delay_ms):
import utime
utime.sleep_ms(delay_ms)
if _esp is None or not host or len(host) != 6:
return
pkt = pack_ping_rsp(ping_id, settings.get("name", "led"))
try:
try:
_esp.add_peer(host)
_dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode())
except Exception as e:
_dlog("espnow ping add_peer skip", repr(e))
_esp.send(host, pkt)
print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode())
except Exception as e:
print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt))
async def _send_ping_rsp_delayed(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
await asyncio.sleep(delay_ms / 1000)
_send_ping_rsp(host, settings, ping_id, delay_ms)
def _schedule_ping_rsp(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
try:
import _thread
_thread.start_new_thread(_send_ping_rsp, (host, settings, ping_id, delay_ms))
except ImportError:
asyncio.create_task(_send_ping_rsp_delayed(host, settings, ping_id))
def send_boot_announce(settings):
if _esp is None:
return
pkt = pack_announce(
settings.get("name", "led"),
settings.get("num_leds", 1),
color_order=settings.get("color_order", "rgb"),
startup_mode=settings.get("startup_mode", "default"),
brightness=settings.get("brightness", 32),
)
try:
_esp.send(BROADCAST_MAC, pkt)
print("espnow announce", len(pkt), "B")
except Exception as e:
print("espnow announce failed:", e)
def _handle_packet(host, pkt, settings, presets):
global _groups_received
mt = wire_msg_type(pkt)
if mt == MSG_GROUPS:
ids = parse_groups(pkt)
if ids is not None:
dg.groups_replace(ids, settings)
_groups_received = True
print("groups", ids)
return
if mt == MSG_GROUP_CMD:
parsed = parse_group_cmd(pkt)
if parsed is None:
return
gid, env = parsed
if not dg.in_group(gid):
return
from espnow_wire import _envelope_size
need = _envelope_size(env)
save = len(env) > need and env[need] == 1
body = env[:need] if save else env
if body:
process_data(body, settings, presets, save=save)
return
if mt == MSG_CMD:
env, save = cmd_envelope(pkt)
if env:
process_data(env, settings, presets, save=save)
return
if mt == MSG_PING_REQ:
ping_id = parse_ping_req(pkt)
if ping_id is not None and host and len(host) == 6:
_schedule_ping_rsp(host, settings, ping_id)
return
if mt == MSG_ANNOUNCE:
return
async def espnow_receive_loop(settings, presets, wdt=None):
global _groups_received
while True:
if _esp is None:
await asyncio.sleep(0.1)
continue
host, msg = _esp.recv(0)
if not host:
if not _groups_received:
await asyncio.sleep(5)
send_boot_announce(settings)
else:
await asyncio.sleep(0.02)
if wdt:
wdt.feed()
continue
try:
_handle_packet(host, msg, settings, presets)
except Exception as e:
print("espnow rx error:", e)
if wdt:
wdt.feed()

135
src/espnow_wire.py Normal file
View File

@@ -0,0 +1,135 @@
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
import struct
WIRE_MAGIC = 0x4C
MAX_ESPNOW_PAYLOAD = 250
MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02
MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04
MSG_PING_REQ = 0x05
MSG_PING_RSP = 0x06
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
COLOR_ORDER_TO_ENUM = {
"rgb": 0,
"rbg": 1,
"grb": 2,
"gbr": 3,
"brg": 4,
"bgr": 5,
}
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
def _pack_header(msg_type, body):
pkt = bytes([WIRE_MAGIC, msg_type]) + body
if len(pkt) > MAX_ESPNOW_PAYLOAD:
raise ValueError("packet too large")
return pkt
def pack_announce(
name,
num_leds,
color_order="rgb",
startup_mode="default",
brightness=32,
device_type=0,
):
name_b = name.encode("utf-8")
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
body = (
bytes([len(name_b)])
+ name_b
+ struct.pack("<H", int(num_leds))
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
)
return _pack_header(MSG_ANNOUNCE, body)
def parse_groups(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_GROUPS:
return None
body = payload[2:]
else:
body = payload
if not body:
return []
off = 0
count = body[off]
off += 1
out = []
for _ in range(count):
gl = body[off]
off += 1
out.append(body[off : off + gl].decode("utf-8"))
off += gl
return out
def parse_group_cmd(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
return None
body = payload[2:]
gl = body[0]
gid = body[1 : 1 + gl].decode("utf-8")
env = body[1 + gl :]
return gid, env
HEADER_LEN = 5
def _envelope_size(env):
if len(env) < HEADER_LEN:
return len(env)
lp, ls, ld = env[2], env[3], env[4]
return HEADER_LEN + lp + ls + ld
def cmd_envelope(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
return None, False
env = payload[2:]
if not env:
return None, False
need = _envelope_size(env)
if need > len(env):
return None, False
save = len(env) > need and env[need] == 1
return env[:need], save
def pack_ping_req(ping_id):
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF)
return _pack_header(MSG_PING_REQ, body)
def parse_ping_req(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_PING_REQ:
return None
body = payload[2:]
else:
body = payload
if len(body) < 4:
return None
return struct.unpack("<I", body[:4])[0]
def pack_ping_rsp(ping_id, name):
name_b = name.encode("utf-8")
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF) + bytes([len(name_b)]) + name_b
return _pack_header(MSG_PING_RSP, body)
def wire_msg_type(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return payload[1]
return None

View File

@@ -1,33 +1,26 @@
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms] import print_timestamp # noqa: F401
from settings import Settings from settings import Settings
import machine import machine
import utime
import asyncio import asyncio
import json
import gc import gc
from microdot import Microdot import json
from microdot.websocket import WebSocketError, with_websocket import network
import espnow
import device_groups as dg
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern, process_data from controller_messages import apply_startup_pattern, process_data
from runtime_state import RuntimeState from espnow_transport import _handle_packet, init_espnow
from background_tasks import presets_loop, udp_hello_loop_after_http_ready from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
from mem_stats import print_mem
from wifi_sta import boot_sta
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
dg.load_from_settings(settings)
print(settings)
gc.collect() gc.collect()
sta_if = boot_sta(settings, wdt)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
@@ -37,130 +30,41 @@ gc.collect()
apply_startup_pattern(settings, presets) apply_startup_pattern(settings, presets)
esp = init_espnow(settings)
print(network.WLAN(network.STA_IF).config("channel"))
def _print_network_ips(controller_ip=None): hello = json.dumps({
"""Always log STA address and led-controller (WS client) address when known.""" "v": "1",
"name": settings.get("name", "led"),
"type": "led",
})
print(hello)
esp.send(BROADCAST_MAC, hello)
print("espnow hello", len(hello), "B")
def _on_espnow_message(host, msg):
if not msg:
return
if msg[0] == WIRE_MAGIC:
_handle_packet(host, msg, settings, presets)
return
if msg[0:1] == b"{":
process_data(msg, settings, presets)
while True:
wdt.feed()
while esp.any():
host, msg = esp.recv(0)
if not host or not msg:
continue
print(host, len(msg), "B")
try: try:
led_ip = sta_if.ifconfig()[0] _on_espnow_message(host, msg)
except Exception: print(msg)
led_ip = "?" except Exception as e:
ctrl = controller_ip if controller_ip else "(not connected)" print("espnow rx error:", e)
print("led-driver IP:", led_ip, " led-controller IP:", ctrl) presets.tick()
_print_network_ips()
print_mem("startup")
runtime_state = RuntimeState()
app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
runtime_state.ws_connected()
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
_print_network_ips(controller_ip)
print_mem("ws connect")
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def main(port=80):
asyncio.create_task(presets_loop(presets, wdt))
asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__":
asyncio.run(main(port=80))

View File

@@ -4,6 +4,7 @@ from preset import Preset
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys import sys
import utime
try: try:
import uos as os import uos as os
except ImportError: except ImportError:
@@ -31,6 +32,7 @@ class Presets:
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": self.blink,
} }
self.patterns.update(self._load_dynamic_patterns()) self.patterns.update(self._load_dynamic_patterns())
@@ -193,20 +195,23 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
# Manual single-shot patterns: if this select arrives before the main loop has if preset.p == "off":
# tick()'d the previous frame, completing it first keeps step in sync with beats. self.generator = None
self.step = 0
self.fill((0, 0, 0))
self.selected = preset_name
return True
# If re-selecting the same preset before the main loop has tick()'d the
# previous frame, run one pending tick so step stays in sync.
if ( if (
preset_name == self.selected preset_name == self.selected
and not preset.a
and preset.p in ("chase", "pulse")
and self.generator is not None and self.generator is not None
): ):
while self.generator is not None:
self.tick() self.tick()
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
elif preset.p == "off" or self.selected != preset_name: elif self.selected != preset_name:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
@@ -256,4 +261,44 @@ class Presets:
def on(self, preset): def on(self, preset):
colors = preset.c colors = preset.c
color = colors[0] if colors else (255, 255, 255) color = colors[0] if colors else (255, 255, 255)
self.fill(self.apply_brightness(color, preset.b)) lit = self.apply_brightness(color, preset.b)
while True:
self.fill(lit)
yield
def blink(self, preset):
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0
delay_ms = max(1, int(preset.d))
period = delay_ms * 2
now = utime.ticks_ms()
# Phase-lock to wall time so group identify (broadcast select) stays in sync even
# when devices process the packet on different main-loop iterations.
phase = now % period if period else 0
state = phase < delay_ms
last_update = utime.ticks_add(now, -phase)
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
state = not state
last_update = utime.ticks_add(last_update, delay_ms)
yield
def run_tick(presets):
"""Advance one animation frame (standalone tests / mpremote demos)."""
presets.tick()

View File

@@ -3,6 +3,8 @@ import ubinascii
import machine import machine
import network import network
WIFI_CHANNEL_DEFAULT = 5
class Settings(dict): class Settings(dict):
SETTINGS_FILE = "/settings.json" SETTINGS_FILE = "/settings.json"
@@ -14,9 +16,9 @@ class Settings(dict):
def set_defaults(self): def set_defaults(self):
self["led_pin"] = 10 self["led_pin"] = 10
self["num_leds"] = 119 self["num_leds"] = 200
self["color_order"] = "rgb" self["color_order"] = "grb"
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
@@ -31,11 +33,10 @@ class Settings(dict):
# Power-on: "default" | "last" | "off" # Power-on: "default" | "last" | "off"
self["startup_mode"] = "default" self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = WIFI_CHANNEL_DEFAULT
# ESP-NOW transport (requires espnow firmware; uses wifi_channel). self["groups"] = []
self["ssid"] = ""
self["password"] = ""
def save(self): def save(self):
try: try:

36
src/v1_wire.py Normal file
View File

@@ -0,0 +1,36 @@
"""Expand short v1 wire keys to long names (MicroPython)."""
K_PRESETS = "p"
K_SELECT = "s"
K_GROUPS = "g"
K_SET_GROUPS = "sg"
K_SAVE = "sv"
K_DEFAULT = "df"
K_DEVICE_CONFIG = "dc"
K_CLEAR_PRESETS = "cp"
K_MANIFEST = "mf"
_SHORT_TO_LONG = {
K_PRESETS: "presets",
K_SELECT: "select",
K_GROUPS: "groups",
K_SET_GROUPS: "set_groups",
K_SAVE: "save",
K_DEFAULT: "default",
K_DEVICE_CONFIG: "device_config",
K_CLEAR_PRESETS: "clear_presets",
K_MANIFEST: "manifest",
}
def expand_v1(data):
if not isinstance(data, dict):
return data
out = dict(data)
for short_key, long_key in _SHORT_TO_LONG.items():
if short_key in data and long_key not in out:
out[long_key] = data[short_key]
if short_key in out:
del out[short_key]
return out

169
tests/bridge_ws_blink.py Normal file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
pattern, and colours; this script reaches the driver over ESP-NOW through
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
pipenv run python led-driver/tests/bridge_ws_blink.py
pipenv run python led-driver/tests/bridge_ws_blink.py \\
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
From **led-driver** (if Pipenv/env is the parent project)::
pipenv run python tests/bridge_ws_blink.py --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# led-driver/tests -> led-driver -> led-controller
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
PROJECT_ROOT = LED_DRIVER_ROOT.parent
def _load_bridge_url(explicit: Optional[str]) -> str:
if explicit and explicit.strip():
return explicit.strip()
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
url = str(data.get("bridge_ws_url") or "").strip()
if url:
return url
except (OSError, json.JSONDecodeError, TypeError):
pass
return "ws://192.168.4.1/ws"
def _format_mac(mac: str) -> str:
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
def build_blink_envelope(
mac: str,
*,
preset_id: str = "2",
delay_ms: int = 200,
brightness: int = 64,
) -> Dict[str, Any]:
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
body = {
"p": {
preset_id: {
"p": "blink",
"b": max(0, min(255, int(brightness))),
"d": max(1, int(delay_ms)),
"c": ["#FF0000", "#0000FF"],
"a": True,
}
},
"s": [str(preset_id)],
}
return {"v": "1", "dv": {_format_mac(mac): body}}
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
import websockets
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
print(f"connecting to {url}")
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
print(f"connected, sending {len(packet)} B")
print(packet.decode("utf-8"))
await ws.send(packet)
if hold_s > 0:
print(f"holding connection {hold_s}s …")
await asyncio.sleep(hold_s)
print("done")
def main() -> int:
parser = argparse.ArgumentParser(
description="Send blink preset+select to one driver via bridge WebSocket.",
)
parser.add_argument(
"--url",
default=None,
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
)
parser.add_argument(
"--mac",
default="188b0e1560a8",
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
)
parser.add_argument(
"--preset-id",
default="2",
help="Wire preset slot id (default: 2, matches zone push)",
)
parser.add_argument(
"--delay-ms",
type=int,
default=200,
help="Blink delay in ms (default: 200)",
)
parser.add_argument(
"--brightness",
type=int,
default=64,
help="Preset brightness 0255 (default: 64)",
)
parser.add_argument(
"--hold",
type=float,
default=2.0,
help="Seconds to keep WebSocket open after send (default: 2)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print envelope only; do not connect",
)
args = parser.parse_args()
url = _load_bridge_url(args.url)
try:
envelope = build_blink_envelope(
args.mac,
preset_id=args.preset_id,
delay_ms=args.delay_ms,
brightness=args.brightness,
)
except ValueError as e:
print(f"error: {e}", file=sys.stderr)
return 1
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
if args.dry_run:
print(json.dumps(envelope, indent=2))
return 0
try:
asyncio.run(_send(url, envelope, args.hold))
except KeyboardInterrupt:
print("interrupted")
return 130
except Exception as e:
print(f"failed: {e!r}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,116 @@
"""Device-side radio diagnostic test (MicroPython).
Checks:
1) STA/AP bring-up on channel 5
2) ESP-NOW init and broadcast peer add
3) Broadcast TX test packet send
4) RX wait window to see any incoming ESP-NOW frames
"""
import espnow
import machine
import network
import time
import ubinascii
CHANNEL = 5
RX_WINDOW_MS = 3000
WDT_TIMEOUT_MS = 10000
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00"
def _mac_hex(mac_bytes):
try:
return ubinascii.hexlify(mac_bytes).decode()
except Exception:
return "?"
def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
print("diag start")
print("cpu freq", machine.freq())
sta = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
# Clean start
try:
sta.active(False)
ap.active(False)
time.sleep_ms(100)
except Exception as e:
print("wifi reset failed", repr(e))
# STA setup
try:
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac")))
except Exception as e:
print("sta setup failed", repr(e))
# AP setup
try:
ap.active(True)
try:
ap.config(essid="diag-ap", channel=channel, hidden=True)
except TypeError:
ap.config(essid="diag-ap", channel=channel)
print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac")))
except Exception as e:
print("ap setup failed", repr(e))
wdt.feed()
# ESP-NOW setup
try:
e = espnow.ESPNow()
e.active(True)
print("espnow active ok")
except Exception as e_err:
print("espnow init failed", repr(e_err))
return
# Add broadcast peer
try:
e.add_peer(BROADCAST_MAC, channel=channel)
print("add bcast peer ok")
except TypeError:
try:
e.add_peer(BROADCAST_MAC)
print("add bcast peer ok (no channel arg)")
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
# TX test
try:
ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True)
print("tx bcast", ok, "len", len(TEST_PAYLOAD))
except Exception as e_err:
print("tx bcast failed", repr(e_err))
# RX window
print("rx window ms", rx_window_ms)
t_end = time.ticks_add(time.ticks_ms(), rx_window_ms)
rx_count = 0
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
wdt.feed()
host, msg = e.recv(100)
if host:
rx_count += 1
print("rx", rx_count, _mac_hex(host), "len", len(msg))
time.sleep_ms(5)
print("diag done rx_count", rx_count)
if __name__ == "__main__":
run_diag()

View File

@@ -0,0 +1,40 @@
"""Device test: receive ESP-NOW packets on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
import time
CHANNEL = 5
TIMEOUT_MS = 1000
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
print("recv ready ch", channel)
while True:
wdt.feed()
host, msg = e.recv(timeout_ms)
if host:
mac_hex = ubinascii.hexlify(host).decode()
print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode())
else:
print("rx timeout")
time.sleep_ms(10)
if __name__ == "__main__":
recv_loop()

View File

@@ -0,0 +1,47 @@
"""Device test: send one ESP-NOW packet on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
CHANNEL = 5
DEST_HEX = "ffffffffffff"
PAYLOAD_HEX = "4c0501000000"
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def _add_peer(esp, dest, channel):
try:
esp.add_peer(dest, channel=channel)
except TypeError:
esp.add_peer(dest)
except OSError:
pass
def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
dest = ubinascii.unhexlify(dest_hex)
pkt = ubinascii.unhexlify(payload_hex)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
_add_peer(e, dest, channel)
wdt.feed()
ok = e.send(dest, pkt, True)
print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt))
return ok
if __name__ == "__main__":
send_once()

View File

@@ -1,35 +1,74 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Standalone blink pattern demo (WDT-fed tick loop).
Run on device::
mpremote connect <port> run tests/patterns/blink.py
"""
import sys
import utime import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick def _bootstrap_import_path():
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
try:
import uos as os
except ImportError:
import os
candidates = []
try:
here = __file__.rsplit("/", 1)[0]
if here:
candidates.append(here)
tests = here.rsplit("/", 1)[0]
if tests:
candidates.append(tests)
root = tests.rsplit("/", 1)[0]
if root:
candidates.append(root)
candidates.append(root + "/src")
except NameError:
pass
for p in (".", "..", "/", "src", "/src"):
candidates.append(p)
for p in candidates:
if p and p not in sys.path:
sys.path.insert(0, p)
_bootstrap_import_path()
from machine import WDT # noqa: E402
from settings import Settings # noqa: E402
from presets import Presets # noqa: E402
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
presets.tick()
utime.sleep_ms(sleep_ms)
def main(): def main():
s = Settings() settings = Settings()
pin = s.get("led_pin", 10) presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000) wdt = WDT(timeout=10000)
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors) presets.edit(
p.edit("test_blink", { "test_blink",
{
"p": "blink", "p": "blink",
"b": 64, "b": 64,
"d": 200, "d": 200,
"c": [(255, 0, 0), (0, 0, 255)], "c": [(255, 0, 0), (0, 0, 255)],
}) },
p.select("test_blink") )
presets.select("test_blink")
start = utime.ticks_ms() _run_ms(presets, wdt, 1500)
while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
if __name__ == "__main__": if __name__ == "__main__":
main() main()