13 Commits

Author SHA1 Message Date
3286c4002d chore(settings): update default num_leds and color_order
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:56 +12:00
68eb547ec4 feat(espnow): add debug logging and channel diagnostics
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-29 16:00:54 +12:00
8403df531d feat(espnow): improve bridge transport and driver sync
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-28 00:38:09 +12:00
088fe161a8 fix(main): blocking espnow rx loop and pass peer host
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
c9895df512 fix(presets): phase-lock blink and one tick on re-select
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:46 +12:00
39a84696c3 feat(espnow): ping request/response with jittered delay
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:39 +12:00
c7560b2e87 fix(settings): default wifi channel to 5
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:36 +12:00
ea21563900 fix(controller): apply select when presets not in message
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 21:55:30 +12:00
a97f6c7c2c feat(espnow): groups filter and v1 select list on driver
Apply group membership on RX, accept select as [preset_id, step?],
and fix identify/off plus presets layout for manual beat stepping.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 01:44:21 +12:00
1fdb2c9441 fix(espnow): handle binary and JSON RX in simplified main
Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON
v1 payloads to process_data from the poll loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:45:13 +12:00
3e718f7432 feat(espnow): add wire transport and simplify broadcast main
Binary espnow_wire/espnow_transport modules plus a minimal main that
broadcasts a JSON hello and polls ESP-NOW while running presets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:44:39 +12:00
85490a3bd0 feat(deploy): add file_hashes.json manifest on device
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 19:14:51 +12:00
94266d5a7c feat(patterns): reverse animation direction via preset n8
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 18:32:01 +12:00
26 changed files with 1269 additions and 289 deletions

8
bulk.sh Executable file
View File

@@ -0,0 +1,8 @@
#!/usr/bin/env bash
PORT="${1:-/dev/ttyACM0}"
while true; do
ls "$PORT" && led-cli -p "$PORT" --erase --src --patterns && led-cli -p "$PORT" --reset -f
sleep 0.5
done

View File

@@ -1,11 +1,7 @@
import asyncio import asyncio
import utime import utime
from hello import broadcast_hello_udp
from mem_stats import print_mem from mem_stats import print_mem
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt): async def presets_loop(presets, wdt):
@@ -18,41 +14,4 @@ async def presets_loop(presets, wdt):
if utime.ticks_diff(now, last_mem_log) >= 5000: if utime.ticks_diff(now, last_mem_log) >= 5000:
print_mem("runtime") print_mem("runtime")
last_mem_log = now last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s)

View File

@@ -2,7 +2,11 @@
import json import json
import socket import socket
import network
import ubinascii
import device_groups as dg
from v1_wire import expand_v1
from binary_envelope import parse_binary_envelope from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -40,8 +44,8 @@ def _log_rx(payload) -> None:
print("rx (logging failed)") print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None, save=False):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" """Read one controller message; binary v2 envelope or JSON v1, then apply fields."""
_log_rx(payload) _log_rx(payload)
data = None data = None
if isinstance(payload, (bytes, bytearray)): if isinstance(payload, (bytes, bytearray)):
@@ -58,6 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None):
return return
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
data = expand_v1(data)
if save:
data["save"] = True
set_groups = bool(data.get("set_groups"))
groups = data.get("groups")
if set_groups and isinstance(groups, list):
dg.groups_replace(groups, settings)
print("groups set", dg.list_groups())
elif isinstance(groups, list) and groups:
if not any(dg.in_group(str(g)) for g in groups):
print("ignored: not in groups", groups)
return
if "device_config" in data: if "device_config" in data:
apply_device_config(data, settings, presets) apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
@@ -66,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data: if "clear_presets" in data:
apply_clear_presets(data, presets) apply_clear_presets(data, presets)
if "select" in data: if ("select" in data or "s" in data) and "presets" not in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
apply_default(data, settings, presets) apply_default(data, settings, presets)
@@ -80,6 +96,7 @@ def process_data(payload, settings, presets, controller_ip=None):
settings.save() settings.save()
if "save" in data and "device_config" in data: if "save" in data and "device_config" in data:
settings.save() settings.save()
_flush_pending_select(settings, presets)
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"}) _VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
@@ -170,7 +187,30 @@ def apply_brightness(data, settings, presets):
pass pass
_pending_select = None
def _run_select(presets, settings, preset_name, step=None):
if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
return True
return False
def _flush_pending_select(settings, presets):
global _pending_select
if _pending_select is None:
return
preset_name, step = _pending_select
if preset_name not in presets.presets and preset_name not in ("on", "off"):
return
_pending_select = None
if not _run_select(presets, settings, preset_name, step):
print("select failed (pending):", preset_name)
def apply_presets(data, settings, presets): def apply_presets(data, settings, presets):
global _pending_select
presets_map = data["presets"] presets_map = data["presets"]
for id, preset_data in presets_map.items(): for id, preset_data in presets_map.items():
if not preset_data: if not preset_data:
@@ -181,8 +221,8 @@ def apply_presets(data, settings, presets):
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings preset_data[color_key], settings
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError) as err:
continue print("preset color convert failed:", id, err)
if "bg" in preset_data: if "bg" in preset_data:
try: try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
@@ -191,18 +231,74 @@ def apply_presets(data, settings, presets):
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
pass pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
# Same message often carries select; apply now while presets are loaded.
if "select" in data or "s" in data:
apply_select(data, settings, presets)
else:
_flush_pending_select(settings, presets)
def _select_list_for_this_device(select_val, settings):
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
if isinstance(select_val, list) and select_val:
return select_val
if isinstance(select_val, str) and str(select_val).strip():
return [str(select_val).strip()]
if not isinstance(select_val, dict) or not select_val:
return None
if "preset" in select_val:
preset_name = select_val.get("preset")
if preset_name is None:
return None
out = [str(preset_name)]
if "step" in select_val:
out.append(select_val["step"])
return out
device_name = str(settings.get("name") or "").strip()
select_list = select_val.get(device_name)
if select_list:
return select_list
try:
sta = network.WLAN(network.STA_IF)
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
except Exception:
mac_hex = ""
if mac_hex:
for key in select_val:
k = str(key).lower().replace(":", "").replace("-", "")
if mac_hex in k:
return select_val[key]
if len(select_val) == 1:
return next(iter(select_val.values()))
return None
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
select_map = data["select"] global _pending_select
device_name = settings["name"] select_val = data.get("select")
select_list = select_map.get(device_name, []) if select_val is None:
select_val = data.get("s")
select_list = _select_list_for_this_device(select_val, settings)
if not select_list: if not select_list:
print("select ignored:", repr(select_val))
return
preset_name = str(select_list[0]).strip()
if not preset_name:
return return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
if presets.select(preset_name, step=step): if preset_name not in presets.presets and preset_name not in ("on", "off"):
record_last_preset(settings, preset_name) try:
presets.load(settings)
except Exception:
pass
if preset_name not in presets.presets and preset_name not in ("on", "off"):
_pending_select = (preset_name, step)
print("select deferred (preset not loaded yet):", preset_name)
return
if _run_select(presets, settings, preset_name, step):
_pending_select = None
else:
print("select failed:", preset_name)
def apply_clear_presets(data, presets): def apply_clear_presets(data, presets):

28
src/device_groups.py Normal file
View File

@@ -0,0 +1,28 @@
"""Group membership for GROUP_CMD filtering; persisted in settings.json."""
_groups = []
def load_from_settings(settings):
global _groups
g = settings.get("groups") if settings is not None else None
if isinstance(g, list):
_groups = [str(x) for x in g if str(x).strip()]
else:
_groups = []
def groups_replace(group_ids, settings=None, *, persist=True):
global _groups
_groups = [str(g) for g in group_ids]
if persist and settings is not None:
settings["groups"] = list(_groups)
settings.save()
def in_group(group_id):
return str(group_id) in _groups
def list_groups():
return list(_groups)

187
src/espnow_transport.py Normal file
View File

@@ -0,0 +1,187 @@
"""ESP-NOW receive loop and boot announce."""
import asyncio
import urandom
import ubinascii
import espnow
import network
import device_groups as dg
from espnow_wire import (
BROADCAST_MAC,
MSG_ANNOUNCE,
MSG_CMD,
MSG_GROUP_CMD,
MSG_GROUPS,
MSG_PING_REQ,
cmd_envelope,
pack_announce,
pack_ping_rsp,
parse_group_cmd,
parse_groups,
parse_ping_req,
wire_msg_type,
)
from controller_messages import process_data
from settings import WIFI_CHANNEL_DEFAULT
_PING_DELAY_MS_MIN = 50
_PING_DELAY_MS_MAX = 500
_esp = None
_groups_received = False
_debug = False
def _dlog(*parts):
if _debug:
print(*parts)
def init_espnow(settings):
global _esp, _debug
_debug = bool(settings.get("debug", False))
try:
ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT))
except (TypeError, ValueError):
ch = WIFI_CHANNEL_DEFAULT
ch = max(1, min(11, ch))
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
try:
sta.config(channel=ch)
except Exception as e:
print("espnow sta channel set failed:", e)
_esp = espnow.ESPNow()
_esp.active(True)
try:
_esp.add_peer(BROADCAST_MAC)
_dlog("espnow add bcast ok")
except Exception as e:
print("espnow add bcast failed:", e)
try:
actual_ch = sta.config("channel")
except Exception:
actual_ch = "?"
print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug)
return _esp
def _send_ping_rsp(host, settings, ping_id, delay_ms):
import utime
utime.sleep_ms(delay_ms)
if _esp is None or not host or len(host) != 6:
return
pkt = pack_ping_rsp(ping_id, settings.get("name", "led"))
try:
try:
_esp.add_peer(host)
_dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode())
except Exception as e:
_dlog("espnow ping add_peer skip", repr(e))
_esp.send(host, pkt)
print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode())
except Exception as e:
print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt))
async def _send_ping_rsp_delayed(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
await asyncio.sleep(delay_ms / 1000)
_send_ping_rsp(host, settings, ping_id, delay_ms)
def _schedule_ping_rsp(host, settings, ping_id):
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
try:
import _thread
_thread.start_new_thread(_send_ping_rsp, (host, settings, ping_id, delay_ms))
except ImportError:
asyncio.create_task(_send_ping_rsp_delayed(host, settings, ping_id))
def send_boot_announce(settings):
if _esp is None:
return
pkt = pack_announce(
settings.get("name", "led"),
settings.get("num_leds", 1),
color_order=settings.get("color_order", "rgb"),
startup_mode=settings.get("startup_mode", "default"),
brightness=settings.get("brightness", 32),
)
try:
_esp.send(BROADCAST_MAC, pkt)
print("espnow announce", len(pkt), "B")
except Exception as e:
print("espnow announce failed:", e)
def _handle_packet(host, pkt, settings, presets):
global _groups_received
mt = wire_msg_type(pkt)
if mt == MSG_GROUPS:
ids = parse_groups(pkt)
if ids is not None:
dg.groups_replace(ids, settings)
_groups_received = True
print("groups", ids)
return
if mt == MSG_GROUP_CMD:
parsed = parse_group_cmd(pkt)
if parsed is None:
return
gid, env = parsed
if not dg.in_group(gid):
return
from espnow_wire import _envelope_size
need = _envelope_size(env)
save = len(env) > need and env[need] == 1
body = env[:need] if save else env
if body:
process_data(body, settings, presets, save=save)
return
if mt == MSG_CMD:
env, save = cmd_envelope(pkt)
if env:
process_data(env, settings, presets, save=save)
return
if mt == MSG_PING_REQ:
ping_id = parse_ping_req(pkt)
if ping_id is not None and host and len(host) == 6:
_schedule_ping_rsp(host, settings, ping_id)
return
if mt == MSG_ANNOUNCE:
return
async def espnow_receive_loop(settings, presets, wdt=None):
global _groups_received
while True:
if _esp is None:
await asyncio.sleep(0.1)
continue
host, msg = _esp.recv(0)
if not host:
if not _groups_received:
await asyncio.sleep(5)
send_boot_announce(settings)
else:
await asyncio.sleep(0.02)
if wdt:
wdt.feed()
continue
try:
_handle_packet(host, msg, settings, presets)
except Exception as e:
print("espnow rx error:", e)
if wdt:
wdt.feed()

135
src/espnow_wire.py Normal file
View File

@@ -0,0 +1,135 @@
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
import struct
WIRE_MAGIC = 0x4C
MAX_ESPNOW_PAYLOAD = 250
MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02
MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04
MSG_PING_REQ = 0x05
MSG_PING_RSP = 0x06
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
COLOR_ORDER_TO_ENUM = {
"rgb": 0,
"rbg": 1,
"grb": 2,
"gbr": 3,
"brg": 4,
"bgr": 5,
}
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
def _pack_header(msg_type, body):
pkt = bytes([WIRE_MAGIC, msg_type]) + body
if len(pkt) > MAX_ESPNOW_PAYLOAD:
raise ValueError("packet too large")
return pkt
def pack_announce(
name,
num_leds,
color_order="rgb",
startup_mode="default",
brightness=32,
device_type=0,
):
name_b = name.encode("utf-8")
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
body = (
bytes([len(name_b)])
+ name_b
+ struct.pack("<H", int(num_leds))
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
)
return _pack_header(MSG_ANNOUNCE, body)
def parse_groups(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_GROUPS:
return None
body = payload[2:]
else:
body = payload
if not body:
return []
off = 0
count = body[off]
off += 1
out = []
for _ in range(count):
gl = body[off]
off += 1
out.append(body[off : off + gl].decode("utf-8"))
off += gl
return out
def parse_group_cmd(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
return None
body = payload[2:]
gl = body[0]
gid = body[1 : 1 + gl].decode("utf-8")
env = body[1 + gl :]
return gid, env
HEADER_LEN = 5
def _envelope_size(env):
if len(env) < HEADER_LEN:
return len(env)
lp, ls, ld = env[2], env[3], env[4]
return HEADER_LEN + lp + ls + ld
def cmd_envelope(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
return None, False
env = payload[2:]
if not env:
return None, False
need = _envelope_size(env)
if need > len(env):
return None, False
save = len(env) > need and env[need] == 1
return env[:need], save
def pack_ping_req(ping_id):
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF)
return _pack_header(MSG_PING_REQ, body)
def parse_ping_req(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_PING_REQ:
return None
body = payload[2:]
else:
body = payload
if len(body) < 4:
return None
return struct.unpack("<I", body[:4])[0]
def pack_ping_rsp(ping_id, name):
name_b = name.encode("utf-8")
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF) + bytes([len(name_b)]) + name_b
return _pack_header(MSG_PING_RSP, body)
def wire_msg_type(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return payload[1]
return None

101
src/file_hashes.py Normal file
View File

@@ -0,0 +1,101 @@
"""
Deploy hash manifest at flash root (file_hashes.json).
Updated by led-cli after directory uploads; used to skip unchanged files on
the next deploy. Format: {"version": 1, "algorithm": "sha256", "files": {...}}
"""
import json
import os
MANIFEST_VERSION = 1
MANIFEST_FILENAME = "file_hashes.json"
HASH_ALGO = "sha256"
_SKIP_NAMES = frozenset({MANIFEST_FILENAME, "__pycache__"})
_SKIP_SUFFIXES = (".pyc", ".pyo")
def _normalize_path(path):
return path.replace("\\", "/").lstrip("/")
def load():
"""Return path -> sha256 hex map, or {} if missing or invalid."""
try:
with open(MANIFEST_FILENAME, "r") as f:
doc = json.load(f)
except OSError:
return {}
if not isinstance(doc, dict):
return {}
files = doc.get("files")
return files if isinstance(files, dict) else {}
def save(files):
"""Write manifest (path keys use forward slashes, no leading slash)."""
if not isinstance(files, dict):
files = {}
doc = {
"version": MANIFEST_VERSION,
"algorithm": HASH_ALGO,
"files": files,
}
with open(MANIFEST_FILENAME, "w") as f:
json.dump(doc, f)
def _hash_file(path):
import hashlib
h = hashlib.sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(256)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def _walk_dir(base, prefix, out):
try:
names = os.listdir(base)
except OSError:
return
for name in names:
if name in _SKIP_NAMES or name.endswith(_SKIP_SUFFIXES):
continue
full = base + "/" + name if base else name
key = _normalize_path((prefix + "/" + name) if prefix else name)
try:
mode = os.stat(full)[0]
except OSError:
continue
if mode & 0x4000:
_walk_dir(full, key, out)
else:
out[key] = _hash_file(full)
def rebuild():
"""Rebuild manifest from root .py files plus patterns/ and lib/ trees."""
files = {}
try:
for name in os.listdir("."):
if name in _SKIP_NAMES or name.endswith(_SKIP_SUFFIXES):
continue
try:
mode = os.stat(name)[0]
except OSError:
continue
if mode & 0x4000:
if name in ("patterns", "lib"):
_walk_dir(name, name, files)
else:
files[_normalize_path(name)] = _hash_file(name)
except OSError:
pass
save(files)
return files

View File

@@ -1,33 +1,26 @@
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms] import print_timestamp # noqa: F401
from settings import Settings from settings import Settings
import machine import machine
import utime
import asyncio import asyncio
import json
import gc import gc
from microdot import Microdot import json
from microdot.websocket import WebSocketError, with_websocket import network
import espnow
import device_groups as dg
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern, process_data from controller_messages import apply_startup_pattern, process_data
from runtime_state import RuntimeState from espnow_transport import _handle_packet, init_espnow
from background_tasks import presets_loop, udp_hello_loop_after_http_ready from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
from mem_stats import print_mem
from wifi_sta import boot_sta
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
dg.load_from_settings(settings)
print(settings)
gc.collect() gc.collect()
sta_if = boot_sta(settings, wdt)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
@@ -37,130 +30,41 @@ gc.collect()
apply_startup_pattern(settings, presets) apply_startup_pattern(settings, presets)
esp = init_espnow(settings)
print(network.WLAN(network.STA_IF).config("channel"))
def _print_network_ips(controller_ip=None): hello = json.dumps({
"""Always log STA address and led-controller (WS client) address when known.""" "v": "1",
try: "name": settings.get("name", "led"),
led_ip = sta_if.ifconfig()[0] "type": "led",
except Exception: })
led_ip = "?" print(hello)
ctrl = controller_ip if controller_ip else "(not connected)"
print("led-driver IP:", led_ip, " led-controller IP:", ctrl) esp.send(BROADCAST_MAC, hello)
print("espnow hello", len(hello), "B")
_print_network_ips() def _on_espnow_message(host, msg):
print_mem("startup") if not msg:
return
runtime_state = RuntimeState() if msg[0] == WIRE_MAGIC:
_handle_packet(host, msg, settings, presets)
app = Microdot() return
if msg[0:1] == b"{":
process_data(msg, settings, presets)
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
while True:
@app.route("/ws") wdt.feed()
@with_websocket while esp.any():
async def ws_handler(request, ws): host, msg = esp.recv(0)
runtime_state.ws_connected() if not host or not msg:
controller_ip = None continue
try: print(host, len(msg), "B")
client_addr = getattr(request, "client_addr", None) try:
if isinstance(client_addr, (tuple, list)) and client_addr: _on_espnow_message(host, msg)
controller_ip = client_addr[0] print(msg)
elif isinstance(client_addr, str): except Exception as e:
controller_ip = client_addr print("espnow rx error:", e)
except Exception: presets.tick()
controller_ip = None
_print_network_ips(controller_ip)
print_mem("ws connect")
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def main(port=80):
asyncio.create_task(presets_loop(presets, wdt))
asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__":
asyncio.run(main(port=80))

View File

@@ -26,13 +26,13 @@ class Aurora:
c = self.driver.apply_brightness(colors[idx], preset.b) c = self.driver.apply_brightness(colors[idx], preset.b)
w = 255 - abs(128 - ((i * 8 + phase) & 255)) * 2 w = 255 - abs(128 - ((i * 8 + phase) & 255)) * 2
w = max(0, min(255, w + shimmer)) w = max(0, min(255, w + shimmer))
self.driver.n[i] = ( self.driver.n[self.driver.led_i(preset, i)] = (
(c[0] * w) // 255, (c[0] * w) // 255,
(c[1] * w) // 255, (c[1] * w) // 255,
(c[2] * w) // 255, (c[2] * w) // 255,
) )
self.driver.n.write() self.driver.n.write()
phase = (phase + 1) & 255 phase = (phase + self.driver.signed(preset, 1)) & 255
self.driver.step = phase self.driver.step = phase
last = utime.ticks_add(last, d) last = utime.ticks_add(last, d)
if not preset.a: if not preset.a:
@@ -76,9 +76,9 @@ class Aurora:
peak = lerp3(colors[fi], colors[fi + 1], frac) peak = lerp3(colors[fi], colors[fi + 1], frac)
peak = self.driver.apply_brightness(peak, preset.b) peak = self.driver.apply_brightness(peak, preset.b)
mixf = min(255, int(w * contrast * 2) >> 1) mixf = min(255, int(w * contrast * 2) >> 1)
self.driver.n[i] = lerp3(bg, peak, mixf) self.driver.n[self.driver.led_i(preset, i)] = lerp3(bg, peak, mixf)
self.driver.n.write() self.driver.n.write()
phase = (phase + drift) % 256 phase = (phase + self.driver.signed(preset, drift)) % 256
last = utime.ticks_add(last, d) last = utime.ticks_add(last, d)
if not preset.a: if not preset.a:
yield yield

View File

@@ -46,12 +46,14 @@ class Blizzard:
for pos, ci, wj in flakes: for pos, ci, wj in flakes:
p = pos p = pos
lateral = wind + (wj if wj else 0) lateral = wind + (wj if wj else 0)
p -= speed p -= self.driver.signed(preset, speed)
p += lateral p += self.driver.signed(preset, lateral)
if p < -2 or p >= nled + 2: if p < -2 or p >= nled + 2:
continue continue
pi = max(0, min(nled - 1, int(p))) pi = max(0, min(nled - 1, int(p)))
self.driver.n[pi] = self.driver.apply_brightness(colors[ci], preset.b) self.driver.n[self.driver.led_i(preset, pi)] = self.driver.apply_brightness(
colors[ci], preset.b
)
nf.append([p, ci, wj]) nf.append([p, ci, wj])
flakes = nf flakes = nf

View File

@@ -12,7 +12,7 @@ class Chase:
def _run_marquee(self, preset, colors): def _run_marquee(self, preset, colors):
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3) on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2) off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1) step = max(1, abs(self.driver.signed(preset, int(preset.n3) if int(preset.n3) > 0 else 1)))
phase = self.driver.step % (on_len + off_len) phase = self.driver.step % (on_len + off_len)
last = utime.ticks_ms() last = utime.ticks_ms()
while True: while True:
@@ -23,9 +23,9 @@ class Chase:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b) bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
m = (i + phase) % (on_len + off_len) m = (i + phase) % (on_len + off_len)
self.driver.n[i] = c if m < on_len else bg_color self.driver.n[self.driver.led_i(preset, i)] = c if m < on_len else bg_color
self.driver.n.write() self.driver.n.write()
phase = (phase + step) % (on_len + off_len) phase = (phase + self.driver.signed(preset, step)) % (on_len + off_len)
self.driver.step = phase self.driver.step = phase
last = utime.ticks_add(last, d) last = utime.ticks_add(last, d)
if not preset.a: if not preset.a:
@@ -66,8 +66,8 @@ class Chase:
n1 = max(1, int(preset.n1)) # LEDs of color 0 n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1 n2 = max(1, int(preset.n2)) # LEDs of color 1
n3 = int(preset.n3) # Step movement on even steps (can be negative) n3 = self.driver.signed(preset, int(preset.n3)) # Step movement on even steps
n4 = int(preset.n4) # Step movement on odd steps (can be negative) n4 = self.driver.signed(preset, int(preset.n4)) # Step movement on odd steps
segment_length = n1 + n2 segment_length = n1 + n2
@@ -101,9 +101,9 @@ class Chase:
# Determine which color based on position in segment # Determine which color based on position in segment
if relative_pos < n1: if relative_pos < n1:
self.driver.n[i] = color0 self.driver.n[self.driver.led_i(preset, i)] = color0
else: else:
self.driver.n[i] = color1 self.driver.n[self.driver.led_i(preset, i)] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count) print("[chase] step", step_count)
@@ -147,9 +147,9 @@ class Chase:
# Determine which color based on position in segment # Determine which color based on position in segment
if relative_pos < n1: if relative_pos < n1:
self.driver.n[i] = color0 self.driver.n[self.driver.led_i(preset, i)] = color0
else: else:
self.driver.n[i] = color1 self.driver.n[self.driver.led_i(preset, i)] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count) print("[chase] step", step_count)

View File

@@ -18,7 +18,7 @@ class ColourCycle:
pos -= 170 pos -= 170
return (0, pos * 3, 255 - pos * 3) return (0, pos * 3, 255 - pos * 3)
def _render_gradient(self, colors, phase, brightness): def _render_gradient(self, preset, colors, phase, brightness):
num_leds = self.driver.num_leds num_leds = self.driver.num_leds
color_count = len(colors) color_count = len(colors)
if num_leds <= 0 or color_count <= 0: if num_leds <= 0 or color_count <= 0:
@@ -40,14 +40,16 @@ class ColourCycle:
c1[1] + ((c2[1] - c1[1]) * frac) // 256, c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256, c1[2] + ((c2[2] - c1[2]) * frac) // 256,
) )
self.driver.n[i] = self.driver.apply_brightness(blended, brightness) self.driver.n[self.driver.led_i(preset, i)] = self.driver.apply_brightness(
blended, brightness
)
self.driver.n.write() self.driver.n.write()
def _render_rainbow(self, phase, brightness): def _render_rainbow(self, preset, phase, brightness):
num_leds = self.driver.num_leds num_leds = self.driver.num_leds
for i in range(num_leds): for i in range(num_leds):
rc_index = (i * 256 // max(1, num_leds)) + phase rc_index = (i * 256 // max(1, num_leds)) + phase
self.driver.n[i] = self.driver.apply_brightness( self.driver.n[self.driver.led_i(preset, i)] = self.driver.apply_brightness(
self._wheel(rc_index & 255), brightness self._wheel(rc_index & 255), brightness
) )
self.driver.n.write() self.driver.n.write()
@@ -64,8 +66,8 @@ class ColourCycle:
if mode == 1: if mode == 1:
if not preset.a: if not preset.a:
self._render_rainbow(phase, preset.b) self._render_rainbow(preset, phase, preset.b)
self.driver.step = (phase + step_amount) % 256 self.driver.step = (phase + self.driver.signed(preset, step_amount)) % 256
yield yield
return return
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
@@ -73,16 +75,16 @@ class ColourCycle:
delay_ms = max(1, int(preset.d)) delay_ms = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms: if utime.ticks_diff(now, last_update) >= delay_ms:
self._render_rainbow(phase, preset.b) self._render_rainbow(preset, phase, preset.b)
phase = (phase + step_amount) % 256 phase = (phase + self.driver.signed(preset, step_amount)) % 256
self.driver.step = phase self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)
yield yield
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)] colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
if not preset.a: if not preset.a:
self._render_gradient(colors, phase, preset.b) self._render_gradient(preset, colors, phase, preset.b)
self.driver.step = (phase + step_amount) % 256 self.driver.step = (phase + self.driver.signed(preset, step_amount)) % 256
yield yield
return return
@@ -91,8 +93,8 @@ class ColourCycle:
delay_ms = max(1, int(preset.d)) delay_ms = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms: if utime.ticks_diff(now, last_update) >= delay_ms:
self._render_gradient(colors, phase, preset.b) self._render_gradient(preset, colors, phase, preset.b)
phase = (phase + step_amount) % 256 phase = (phase + self.driver.signed(preset, step_amount)) % 256
self.driver.step = phase self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)
yield yield

View File

@@ -44,7 +44,7 @@ class Icicles:
if idx >= nled: if idx >= nled:
break break
br = ((k + 1) * 255) // max(1, ic_len) br = ((k + 1) * 255) // max(1, ic_len)
self.driver.n[idx] = ( self.driver.n[self.driver.led_i(preset, idx)] = (
(tip[0] * br + bg[0] * (255 - br)) // 255, (tip[0] * br + bg[0] * (255 - br)) // 255,
(tip[1] * br + bg[1] * (255 - br)) // 255, (tip[1] * br + bg[1] * (255 - br)) // 255,
(tip[2] * br + bg[2] * (255 - br)) // 255, (tip[2] * br + bg[2] * (255 - br)) // 255,
@@ -52,7 +52,7 @@ class Icicles:
aidx += 1 aidx += 1
self.driver.n.write() self.driver.n.write()
phase = (phase + phase_step) % span phase = (phase + self.driver.signed(preset, phase_step)) % span
last = utime.ticks_add(last, d_ms) last = utime.ticks_add(last, d_ms)
if not preset.a: if not preset.a:

View File

@@ -30,9 +30,9 @@ class Meteor:
base = colors[color_index % len(colors)] base = colors[color_index % len(colors)]
lit = self.driver.apply_brightness(base, preset.b) lit = self.driver.apply_brightness(base, preset.b)
if 0 <= head < self.driver.num_leds: if 0 <= head < self.driver.num_leds:
self.driver.n[head] = lit self.driver.n[self.driver.led_i(preset, head)] = lit
self.driver.n.write() self.driver.n.write()
head += direction * speed head += self.driver.signed(preset, direction * speed)
if head >= self.driver.num_leds + tail_len: if head >= self.driver.num_leds + tail_len:
head = self.driver.num_leds - 1 head = self.driver.num_leds - 1
direction = -1 direction = -1
@@ -62,14 +62,22 @@ class Meteor:
i1 = p1 - t i1 = p1 - t
if 0 <= i1 < self.driver.num_leds: if 0 <= i1 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail) s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i1] = ((c1[0] * s) // 255, (c1[1] * s) // 255, (c1[2] * s) // 255) self.driver.n[self.driver.led_i(preset, i1)] = (
(c1[0] * s) // 255,
(c1[1] * s) // 255,
(c1[2] * s) // 255,
)
i2 = p2 + t i2 = p2 + t
if 0 <= i2 < self.driver.num_leds: if 0 <= i2 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail) s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i2] = ((c2[0] * s) // 255, (c2[1] * s) // 255, (c2[2] * s) // 255) self.driver.n[self.driver.led_i(preset, i2)] = (
(c2[0] * s) // 255,
(c2[1] * s) // 255,
(c2[2] * s) // 255,
)
self.driver.n.write() self.driver.n.write()
p1 += speed p1 += self.driver.signed(preset, speed)
p2 -= speed p2 -= self.driver.signed(preset, speed)
if p1 - tail > self.driver.num_leds and p2 + tail < 0: if p1 - tail > self.driver.num_leds and p2 + tail < 0:
p1 = 0 p1 = 0
p2 = self.driver.num_leds - 1 - gap p2 = self.driver.num_leds - 1 - gap
@@ -89,10 +97,10 @@ class Meteor:
if dist < 0: if dist < 0:
dist = -dist dist = -dist
if dist > width: if dist > width:
self.driver.n[i] = bg_color self.driver.n[self.driver.led_i(preset, i)] = bg_color
else: else:
scale = ((width - dist) * 255) // max(1, width) scale = ((width - dist) * 255) // max(1, width)
self.driver.n[i] = ( self.driver.n[self.driver.led_i(preset, i)] = (
(base[0] * scale) // 255, (base[0] * scale) // 255,
(base[1] * scale) // 255, (base[1] * scale) // 255,
(base[2] * scale) // 255, (base[2] * scale) // 255,
@@ -101,7 +109,7 @@ class Meteor:
if pause_frames > 0: if pause_frames > 0:
pause_frames -= 1 pause_frames -= 1
else: else:
center += direction center += self.driver.signed(preset, direction)
if center >= self.driver.num_leds - 1: if center >= self.driver.num_leds - 1:
center = self.driver.num_leds - 1 center = self.driver.num_leds - 1
direction = -1 direction = -1
@@ -121,7 +129,11 @@ class Meteor:
if mode == 1: if mode == 1:
gap = max(0, int(preset.n3)) gap = max(0, int(preset.n3))
p1, p2 = 0, self.driver.num_leds - 1 - gap nled = self.driver.num_leds
if self.driver.is_reversed(preset):
p1, p2 = nled - 1, gap
else:
p1, p2 = 0, nled - 1 - gap
last = utime.ticks_ms() last = utime.ticks_ms()
while True: while True:
p1, p2, last, stepped = self._run_comet_dual(preset, colors, p1, p2, last) p1, p2, last, stepped = self._run_comet_dual(preset, colors, p1, p2, last)
@@ -131,7 +143,11 @@ class Meteor:
yield yield
if mode == 2: if mode == 2:
color_index, center, direction, pause_frames = 0, 0, 1, 0 nled = self.driver.num_leds
if self.driver.is_reversed(preset):
color_index, center, direction, pause_frames = 0, max(0, nled - 1), -1, 0
else:
color_index, center, direction, pause_frames = 0, 0, 1, 0
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
while True: while True:
color_index, center, direction, pause_frames, last_update, stepped = ( color_index, center, direction, pause_frames, last_update, stepped = (
@@ -144,7 +160,11 @@ class Meteor:
return return
yield yield
color_index, head, direction = 0, 0, 1 nled = self.driver.num_leds
if self.driver.is_reversed(preset):
color_index, head, direction = 0, max(0, nled - 1), -1
else:
color_index, head, direction = 0, 0, 1
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
while True: while True:
color_index, head, direction, last_update, stepped = self._run_meteor( color_index, head, direction, last_update, stepped = self._run_meteor(

View File

@@ -12,7 +12,7 @@ class Particles:
def _run_snowfall(self, preset, colors, flakes, last): def _run_snowfall(self, preset, colors, flakes, last):
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20) density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1) speed = max(1, abs(self.driver.signed(preset, int(preset.n2) if int(preset.n2) > 0 else 1)))
d = max(1, int(preset.d)) d = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d: if utime.ticks_diff(now, last) < d:
@@ -25,8 +25,10 @@ class Particles:
nf = [] nf = []
for pos, ci in flakes: for pos, ci in flakes:
if 0 <= pos < self.driver.num_leds: if 0 <= pos < self.driver.num_leds:
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b) self.driver.n[self.driver.led_i(preset, pos)] = self.driver.apply_brightness(
pos -= speed colors[ci], preset.b
)
pos -= self.driver.signed(preset, speed)
if pos >= -1: if pos >= -1:
nf.append([pos, ci]) nf.append([pos, ci])
self.driver.n.write() self.driver.n.write()
@@ -34,7 +36,7 @@ class Particles:
def _run_starfall(self, preset, colors, stars, last): def _run_starfall(self, preset, colors, stars, last):
rate = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 14)) rate = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 14))
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2) speed = max(1, abs(self.driver.signed(preset, int(preset.n2) if int(preset.n2) > 0 else 2)))
tail = max(2, int(preset.n3) if int(preset.n3) > 0 else 10) tail = max(2, int(preset.n3) if int(preset.n3) > 0 else 10)
max_stars = 4 max_stars = 4
d = max(1, int(preset.d)) d = max(1, int(preset.d))
@@ -65,13 +67,14 @@ class Particles:
(base[2] * fade) // 255, (base[2] * fade) // 255,
) )
lit = self.driver.apply_brightness(lit, preset.b) lit = self.driver.apply_brightness(lit, preset.b)
o = self.driver.n[idx] pix = self.driver.led_i(preset, idx)
self.driver.n[idx] = ( o = self.driver.n[pix]
self.driver.n[pix] = (
max(o[0], lit[0]), max(o[0], lit[0]),
max(o[1], lit[1]), max(o[1], lit[1]),
max(o[2], lit[2]), max(o[2], lit[2]),
) )
h -= speed h -= self.driver.signed(preset, speed)
if h >= -tail: if h >= -tail:
s["h"] = h s["h"] = h
ns.append(s) ns.append(s)

View File

@@ -0,0 +1,19 @@
"""Strip install direction: n5 bit 0 reverses along-strip motion (upside-down wiring)."""
def is_reversed(preset):
return bool(int(getattr(preset, "n5", 0) or 0) & 1)
def led_i(driver, preset, logical_index):
"""Map a logical strip index (0 = pattern start) to a physical pixel index."""
n = int(driver.num_leds)
i = int(logical_index)
if 0 <= i < n and is_reversed(preset):
return n - 1 - i
return i
def signed(preset, value):
v = int(value)
return -v if is_reversed(preset) else v

View File

@@ -13,13 +13,8 @@ class Pulse:
bg_base = preset.background_or(colors) bg_base = preset.background_or(colors)
self.driver.fill(self.driver.apply_brightness(bg_base, preset.b)) self.driver.fill(self.driver.apply_brightness(bg_base, preset.b))
manual = not preset.a
color_index = self.driver.step % max(1, len(colors)) color_index = self.driver.step % max(1, len(colors))
if not preset.a:
# Manual / beat trigger: each select restarts this generator and resets
# cycle_start below. Advancing step here makes each beat the next colour
# without requiring a full wall-clock cycle between beats.
nclr = max(1, len(colors))
self.driver.step = (color_index + 1) % nclr
cycle_start = utime.ticks_ms() cycle_start = utime.ticks_ms()
# State machine based pulse using a single generator loop # State machine based pulse using a single generator loop
@@ -29,7 +24,7 @@ class Pulse:
attack_ms = max(0, int(preset.n1)) # Attack time in ms attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms hold_ms = max(0, int(preset.n2)) # Hold time in ms
decay_ms = max(0, int(preset.n3)) # Decay time in ms decay_ms = max(0, int(preset.n3)) # Decay time in ms
delay_ms = max(0, int(preset.d)) delay_ms = 0 if manual else max(0, int(preset.d))
total_ms = attack_ms + hold_ms + decay_ms + delay_ms total_ms = attack_ms + hold_ms + decay_ms + delay_ms
if total_ms <= 0: if total_ms <= 0:
@@ -58,12 +53,13 @@ class Pulse:
# Delay phase: LEDs off between pulses # Delay phase: LEDs off between pulses
self.driver.fill(bg_color) self.driver.fill(bg_color)
else: else:
# End of cycle: auto advances colour and loops; manual already # End of cycle: advance colour for the next run, then loop or stop.
# advanced step at run start for the next beat. nclr = max(1, len(colors))
if not preset.a: color_index = (color_index + 1) % nclr
break
color_index = (color_index + 1) % max(1, len(colors))
self.driver.step = color_index self.driver.step = color_index
if manual:
self.driver.fill(bg_color)
break
cycle_start = now cycle_start = now
yield yield
continue continue

View File

@@ -32,6 +32,18 @@ class Preset:
int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"} int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"}
allowed_fields = {"p", "c", "d", "b", "a", "bg", "n1", "n2", "n3", "n4", "n5", "n6"} allowed_fields = {"p", "c", "d", "b", "a", "bg", "n1", "n2", "n3", "n4", "n5", "n6"}
for key, value in data.items(): for key, value in data.items():
if key == "reverse":
try:
if isinstance(value, bool):
self.n5 = 1 if value else 0
elif isinstance(value, (int, float)):
self.n5 = 1 if int(value) else 0
elif isinstance(value, str):
lowered = value.lower()
self.n5 = 1 if lowered in ("true", "1", "yes", "on") else 0
except (TypeError, ValueError):
pass
continue
key = aliases.get(key, key) key = aliases.get(key, key)
if key not in allowed_fields: if key not in allowed_fields:
continue continue

View File

@@ -4,6 +4,7 @@ from preset import Preset
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys import sys
import utime
try: try:
import uos as os import uos as os
except ImportError: except ImportError:
@@ -31,6 +32,7 @@ class Presets:
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": self.blink,
} }
self.patterns.update(self._load_dynamic_patterns()) self.patterns.update(self._load_dynamic_patterns())
@@ -193,20 +195,23 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
# Manual single-shot patterns: if this select arrives before the main loop has if preset.p == "off":
# tick()'d the previous frame, completing it first keeps step in sync with beats. self.generator = None
self.step = 0
self.fill((0, 0, 0))
self.selected = preset_name
return True
# If re-selecting the same preset before the main loop has tick()'d the
# previous frame, run one pending tick so step stays in sync.
if ( if (
preset_name == self.selected preset_name == self.selected
and not preset.a
and preset.p == "chase"
and self.generator is not None and self.generator is not None
): ):
while self.generator is not None: self.tick()
self.tick()
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
elif preset.p == "off" or self.selected != preset_name: elif self.selected != preset_name:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
@@ -222,6 +227,21 @@ class Presets:
self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
self.num_leds = num_leds self.num_leds = num_leds
def is_reversed(self, preset):
from patterns.pattern_direction import is_reversed as _is_reversed
return _is_reversed(preset)
def led_i(self, preset, logical_index):
from patterns.pattern_direction import led_i as _led_i
return _led_i(self, preset, logical_index)
def signed(self, preset, value):
from patterns.pattern_direction import signed as _signed
return _signed(preset, value)
def apply_brightness(self, color, brightness_override=None): def apply_brightness(self, color, brightness_override=None):
# Combine per-preset brightness (override) with global brightness self.b # Combine per-preset brightness (override) with global brightness self.b
local = brightness_override if brightness_override is not None else 255 local = brightness_override if brightness_override is not None else 255
@@ -241,4 +261,44 @@ class Presets:
def on(self, preset): def on(self, preset):
colors = preset.c colors = preset.c
color = colors[0] if colors else (255, 255, 255) color = colors[0] if colors else (255, 255, 255)
self.fill(self.apply_brightness(color, preset.b)) lit = self.apply_brightness(color, preset.b)
while True:
self.fill(lit)
yield
def blink(self, preset):
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0
delay_ms = max(1, int(preset.d))
period = delay_ms * 2
now = utime.ticks_ms()
# Phase-lock to wall time so group identify (broadcast select) stays in sync even
# when devices process the packet on different main-loop iterations.
phase = now % period if period else 0
state = phase < delay_ms
last_update = utime.ticks_add(now, -phase)
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
if state:
base = colors[color_index % len(colors)]
self.fill(self.apply_brightness(base, preset.b))
color_index += 1
else:
self.fill(bg_color)
state = not state
last_update = utime.ticks_add(last_update, delay_ms)
yield
def run_tick(presets):
"""Advance one animation frame (standalone tests / mpremote demos)."""
presets.tick()

View File

@@ -3,6 +3,8 @@ import ubinascii
import machine import machine
import network import network
WIFI_CHANNEL_DEFAULT = 5
class Settings(dict): class Settings(dict):
SETTINGS_FILE = "/settings.json" SETTINGS_FILE = "/settings.json"
@@ -14,9 +16,9 @@ class Settings(dict):
def set_defaults(self): def set_defaults(self):
self["led_pin"] = 10 self["led_pin"] = 10
self["num_leds"] = 119 self["num_leds"] = 200
self["color_order"] = "rgb" self["color_order"] = "grb"
sta = network.WLAN(network.STA_IF) sta = network.WLAN(network.STA_IF)
sta.active(True) sta.active(True)
@@ -31,11 +33,10 @@ class Settings(dict):
# Power-on: "default" | "last" | "off" # Power-on: "default" | "last" | "off"
self["startup_mode"] = "default" self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = WIFI_CHANNEL_DEFAULT
# ESP-NOW transport (requires espnow firmware; uses wifi_channel). self["groups"] = []
self["ssid"] = ""
self["password"] = ""
def save(self): def save(self):
try: try:

36
src/v1_wire.py Normal file
View File

@@ -0,0 +1,36 @@
"""Expand short v1 wire keys to long names (MicroPython)."""
K_PRESETS = "p"
K_SELECT = "s"
K_GROUPS = "g"
K_SET_GROUPS = "sg"
K_SAVE = "sv"
K_DEFAULT = "df"
K_DEVICE_CONFIG = "dc"
K_CLEAR_PRESETS = "cp"
K_MANIFEST = "mf"
_SHORT_TO_LONG = {
K_PRESETS: "presets",
K_SELECT: "select",
K_GROUPS: "groups",
K_SET_GROUPS: "set_groups",
K_SAVE: "save",
K_DEFAULT: "default",
K_DEVICE_CONFIG: "device_config",
K_CLEAR_PRESETS: "clear_presets",
K_MANIFEST: "manifest",
}
def expand_v1(data):
if not isinstance(data, dict):
return data
out = dict(data)
for short_key, long_key in _SHORT_TO_LONG.items():
if short_key in data and long_key not in out:
out[long_key] = data[short_key]
if short_key in out:
del out[short_key]
return out

169
tests/bridge_ws_blink.py Normal file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
pattern, and colours; this script reaches the driver over ESP-NOW through
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
pipenv run python led-driver/tests/bridge_ws_blink.py
pipenv run python led-driver/tests/bridge_ws_blink.py \\
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
From **led-driver** (if Pipenv/env is the parent project)::
pipenv run python tests/bridge_ws_blink.py --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# led-driver/tests -> led-driver -> led-controller
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
PROJECT_ROOT = LED_DRIVER_ROOT.parent
def _load_bridge_url(explicit: Optional[str]) -> str:
if explicit and explicit.strip():
return explicit.strip()
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
url = str(data.get("bridge_ws_url") or "").strip()
if url:
return url
except (OSError, json.JSONDecodeError, TypeError):
pass
return "ws://192.168.4.1/ws"
def _format_mac(mac: str) -> str:
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
def build_blink_envelope(
mac: str,
*,
preset_id: str = "2",
delay_ms: int = 200,
brightness: int = 64,
) -> Dict[str, Any]:
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
body = {
"p": {
preset_id: {
"p": "blink",
"b": max(0, min(255, int(brightness))),
"d": max(1, int(delay_ms)),
"c": ["#FF0000", "#0000FF"],
"a": True,
}
},
"s": [str(preset_id)],
}
return {"v": "1", "dv": {_format_mac(mac): body}}
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
import websockets
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
print(f"connecting to {url}")
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
print(f"connected, sending {len(packet)} B")
print(packet.decode("utf-8"))
await ws.send(packet)
if hold_s > 0:
print(f"holding connection {hold_s}s …")
await asyncio.sleep(hold_s)
print("done")
def main() -> int:
parser = argparse.ArgumentParser(
description="Send blink preset+select to one driver via bridge WebSocket.",
)
parser.add_argument(
"--url",
default=None,
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
)
parser.add_argument(
"--mac",
default="188b0e1560a8",
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
)
parser.add_argument(
"--preset-id",
default="2",
help="Wire preset slot id (default: 2, matches zone push)",
)
parser.add_argument(
"--delay-ms",
type=int,
default=200,
help="Blink delay in ms (default: 200)",
)
parser.add_argument(
"--brightness",
type=int,
default=64,
help="Preset brightness 0255 (default: 64)",
)
parser.add_argument(
"--hold",
type=float,
default=2.0,
help="Seconds to keep WebSocket open after send (default: 2)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print envelope only; do not connect",
)
args = parser.parse_args()
url = _load_bridge_url(args.url)
try:
envelope = build_blink_envelope(
args.mac,
preset_id=args.preset_id,
delay_ms=args.delay_ms,
brightness=args.brightness,
)
except ValueError as e:
print(f"error: {e}", file=sys.stderr)
return 1
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
if args.dry_run:
print(json.dumps(envelope, indent=2))
return 0
try:
asyncio.run(_send(url, envelope, args.hold))
except KeyboardInterrupt:
print("interrupted")
return 130
except Exception as e:
print(f"failed: {e!r}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,116 @@
"""Device-side radio diagnostic test (MicroPython).
Checks:
1) STA/AP bring-up on channel 5
2) ESP-NOW init and broadcast peer add
3) Broadcast TX test packet send
4) RX wait window to see any incoming ESP-NOW frames
"""
import espnow
import machine
import network
import time
import ubinascii
CHANNEL = 5
RX_WINDOW_MS = 3000
WDT_TIMEOUT_MS = 10000
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00"
def _mac_hex(mac_bytes):
try:
return ubinascii.hexlify(mac_bytes).decode()
except Exception:
return "?"
def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
print("diag start")
print("cpu freq", machine.freq())
sta = network.WLAN(network.STA_IF)
ap = network.WLAN(network.AP_IF)
# Clean start
try:
sta.active(False)
ap.active(False)
time.sleep_ms(100)
except Exception as e:
print("wifi reset failed", repr(e))
# STA setup
try:
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac")))
except Exception as e:
print("sta setup failed", repr(e))
# AP setup
try:
ap.active(True)
try:
ap.config(essid="diag-ap", channel=channel, hidden=True)
except TypeError:
ap.config(essid="diag-ap", channel=channel)
print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac")))
except Exception as e:
print("ap setup failed", repr(e))
wdt.feed()
# ESP-NOW setup
try:
e = espnow.ESPNow()
e.active(True)
print("espnow active ok")
except Exception as e_err:
print("espnow init failed", repr(e_err))
return
# Add broadcast peer
try:
e.add_peer(BROADCAST_MAC, channel=channel)
print("add bcast peer ok")
except TypeError:
try:
e.add_peer(BROADCAST_MAC)
print("add bcast peer ok (no channel arg)")
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
except Exception as e_err:
print("add bcast peer failed", repr(e_err))
# TX test
try:
ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True)
print("tx bcast", ok, "len", len(TEST_PAYLOAD))
except Exception as e_err:
print("tx bcast failed", repr(e_err))
# RX window
print("rx window ms", rx_window_ms)
t_end = time.ticks_add(time.ticks_ms(), rx_window_ms)
rx_count = 0
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
wdt.feed()
host, msg = e.recv(100)
if host:
rx_count += 1
print("rx", rx_count, _mac_hex(host), "len", len(msg))
time.sleep_ms(5)
print("diag done rx_count", rx_count)
if __name__ == "__main__":
run_diag()

View File

@@ -0,0 +1,40 @@
"""Device test: receive ESP-NOW packets on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
import time
CHANNEL = 5
TIMEOUT_MS = 1000
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
print("recv ready ch", channel)
while True:
wdt.feed()
host, msg = e.recv(timeout_ms)
if host:
mac_hex = ubinascii.hexlify(host).decode()
print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode())
else:
print("rx timeout")
time.sleep_ms(10)
if __name__ == "__main__":
recv_loop()

View File

@@ -0,0 +1,47 @@
"""Device test: send one ESP-NOW packet on channel 5 (MicroPython)."""
import espnow
import machine
import network
import ubinascii
CHANNEL = 5
DEST_HEX = "ffffffffffff"
PAYLOAD_HEX = "4c0501000000"
WDT_TIMEOUT_MS = 10000
def _set_channel(channel):
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE)
sta.config(channel=channel)
def _add_peer(esp, dest, channel):
try:
esp.add_peer(dest, channel=channel)
except TypeError:
esp.add_peer(dest)
except OSError:
pass
def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL):
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
dest = ubinascii.unhexlify(dest_hex)
pkt = ubinascii.unhexlify(payload_hex)
_set_channel(channel)
e = espnow.ESPNow()
e.active(True)
_add_peer(e, dest, channel)
wdt.feed()
ok = e.send(dest, pkt, True)
print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt))
return ok
if __name__ == "__main__":
send_once()

View File

@@ -1,35 +1,74 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Standalone blink pattern demo (WDT-fed tick loop).
Run on device::
mpremote connect <port> run tests/patterns/blink.py
"""
import sys
import utime import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick def _bootstrap_import_path():
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
try:
import uos as os
except ImportError:
import os
candidates = []
try:
here = __file__.rsplit("/", 1)[0]
if here:
candidates.append(here)
tests = here.rsplit("/", 1)[0]
if tests:
candidates.append(tests)
root = tests.rsplit("/", 1)[0]
if root:
candidates.append(root)
candidates.append(root + "/src")
except NameError:
pass
for p in (".", "..", "/", "src", "/src"):
candidates.append(p)
for p in candidates:
if p and p not in sys.path:
sys.path.insert(0, p)
_bootstrap_import_path()
from machine import WDT # noqa: E402
from settings import Settings # noqa: E402
from presets import Presets # noqa: E402
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
presets.tick()
utime.sleep_ms(sleep_ms)
def main(): def main():
s = Settings() settings = Settings()
pin = s.get("led_pin", 10) presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000) wdt = WDT(timeout=10000)
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors) presets.edit(
p.edit("test_blink", { "test_blink",
"p": "blink", {
"b": 64, "p": "blink",
"d": 200, "b": 64,
"c": [(255, 0, 0), (0, 0, 255)], "d": 200,
}) "c": [(255, 0, 0), (0, 0, 255)],
p.select("test_blink") },
)
start = utime.ticks_ms() presets.select("test_blink")
while utime.ticks_diff(utime.ticks_ms(), start) < 1500: _run_ms(presets, wdt, 1500)
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
if __name__ == "__main__": if __name__ == "__main__":
main() main()