18 Commits

Author SHA1 Message Date
2a768376d0 chore(release): beta-1.03
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-10 16:13:59 +12:00
170a0e05ab feat(patterns): align manual and auto behaviour
Unify manual/auto timing semantics for key patterns, add preset background support, and improve runtime observability while keeping the driver responsive under beat-triggered selects.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 20:07:58 +12:00
4879fcfe90 fix(patterns): use preset background fallback across animations
Align pattern background rendering to use preset.background_or(...) and update pulse/radiate single-step behaviour to preserve visible frames and step progression.
2026-05-09 14:28:05 +12:00
fbebe9f4f9 fix(patterns): correct non-blocking timing and blink off phase
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-06 20:28:52 +12:00
a79c6f4dd3 fix(patterns): remove blocking sleeps from pattern loops
Replace sleep-based timing in pattern generators with non-blocking tick checks so long delays do not block the main loop and risk watchdog resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:37:33 +12:00
pi
2fcaf2f064 fix(driver): persist brightness when message includes save and b
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 22:15:23 +12:00
pi
3b38264b70 chore(wifi): log connecting while waiting for STA
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
3ee89ce3b4 feat(driver): add HTTP routes, startup split, and binary envelope support
Wire controller messages through new modules (background tasks, runtime state,
startup) and add binary envelope handling.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:12 +12:00
74b4b495f9 feat(patterns): add expanded animation pack with smoke tests
Add a broad set of new pattern modules and matching pattern smoke scripts so the new effects can be validated directly on-device.
2026-04-23 20:10:01 +12:00
4575ef16ad test(led-driver): add espnow peer and ap pm0 scripts
Made-with: Cursor
2026-04-21 21:48:42 +12:00
a342187635 feat(patterns): add twinkle pattern defaults
Made-with: Cursor
2026-04-21 21:48:42 +12:00
428ed8b884 feat(led-driver): add preset clear command and runtime debug 2026-04-21 00:44:28 +12:00
a22702df4d feat(patterns): add radiate animation 2026-04-20 23:37:43 +12:00
5a8866add7 feat(esp32): pattern upload route and ws controller ip
Made-with: Cursor
2026-04-19 23:27:33 +12:00
a2cd2f8dc2 test(led-driver): add pattern smoke harness
Made-with: Cursor
2026-04-19 23:27:29 +12:00
c47725e31a feat(patterns): add colour cycle, flicker, and flame
Made-with: Cursor
2026-04-19 23:27:19 +12:00
22b1a8a6d6 fix(led-driver): phase-lock pattern timers
Made-with: Cursor
2026-04-19 21:41:18 +12:00
45a38c05b7 fix(led-driver): persist default preset updates 2026-04-15 00:03:21 +12:00
70 changed files with 3437 additions and 107 deletions

1
presets.json Normal file
View File

@@ -0,0 +1 @@
{"15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "40": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 0]], "b": 255, "n2": 2600, "n1": 35, "p": "flame", "n3": 0, "d": 50}, "41": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[120, 200, 255], [80, 140, 255], [180, 120, 255], [100, 220, 232], [160, 200, 255]], "b": 255, "n2": 10, "n1": 72, "p": "twinkle", "n3": 5, "d": 500}, "42": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[166, 0, 255], [0, 10, 10]], "b": 255, "n2": 900, "n1": 30, "p": "radiate", "n3": 4000, "d": 5000}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "38": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 1, "p": "colour_cycle", "n3": 0, "d": 100}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}, "39": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 184, 77]], "b": 255, "n2": 0, "n1": 30, "p": "flicker", "n3": 0, "d": 80}, "14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 255], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 5000}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}}

59
src/background_tasks.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import gc
import utime
from hello import broadcast_hello_udp
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt):
last_mem_log = utime.ticks_ms()
while True:
presets.tick()
wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s)

209
src/binary_envelope.py Normal file
View File

@@ -0,0 +1,209 @@
"""Decode compact binary controller envelopes — v2 native binary, v1 legacy JSON blobs."""
import json
import struct
BINARY_ENVELOPE_VERSION_1 = 1
BINARY_ENVELOPE_VERSION_2 = 2
HEADER_LEN = 5
def _brightness_0_255_from_wire(wire):
w = max(0, min(127, int(wire)))
return min(255, (w * 255) // 127)
def _decode_preset_record(buf, off):
nl = buf[off]
off += 1
name = buf[off : off + nl].decode("utf-8")
off += nl
pl = buf[off]
off += 1
pattern = buf[off : off + pl].decode("utf-8")
off += pl
nc = buf[off]
off += 1
colors = []
for _ in range(nc):
r, g, b = buf[off], buf[off + 1], buf[off + 2]
off += 3
colors.append("#%02x%02x%02x" % (r, g, b))
if off + 16 > len(buf):
raise ValueError("truncated")
delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from(
"<HBBhhhhhh", buf, off
)
off += 16
preset = {
"p": pattern,
"c": colors,
"d": delay,
"b": br,
"a": bool(auto),
"n1": n1,
"n2": n2,
"n3": n3,
"n4": n4,
"n5": n5,
"n6": n6,
}
return name, preset, off
def _decode_presets_blob(chunk):
if not chunk:
return {}
off = 0
count = chunk[off]
off += 1
out = {}
for _ in range(count):
name, preset, off = _decode_preset_record(chunk, off)
out[name] = preset
if off != len(chunk):
raise ValueError("presets blob mismatch")
return out
def _decode_select_blob(chunk):
if not chunk:
return {}
off = 0
count = chunk[off]
off += 1
out = {}
for _ in range(count):
dl = chunk[off]
off += 1
device = chunk[off : off + dl].decode("utf-8")
off += dl
pl = chunk[off]
off += 1
pname = chunk[off : off + pl].decode("utf-8")
off += pl
has_step = chunk[off]
off += 1
if has_step:
step = struct.unpack_from("<H", chunk, off)[0]
off += 2
out[device] = [pname, step]
else:
out[device] = [pname]
if off != len(chunk):
raise ValueError("select blob mismatch")
return out
def _decode_default_blob(chunk):
if not chunk:
return "", []
off = 0
nl = chunk[off]
off += 1
default_name = chunk[off : off + nl].decode("utf-8") if nl else ""
off += nl
nt = chunk[off]
off += 1
targets = []
for _ in range(nt):
tl = chunk[off]
off += 1
targets.append(chunk[off : off + tl].decode("utf-8"))
off += tl
if off != len(chunk):
raise ValueError("default blob mismatch")
return default_name, targets
def parse_binary_envelope_v2(buf):
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_2:
return None
lp = buf[2]
ls = buf[3]
ld = buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = _brightness_0_255_from_wire(br)
try:
if lp:
data["presets"] = _decode_presets_blob(presets_chunk)
if ls:
data["select"] = _decode_select_blob(select_chunk)
if ld:
dname, targets = _decode_default_blob(default_chunk)
data["default"] = dname
data["targets"] = targets
except (ValueError, UnicodeError, TypeError, struct.error):
return None
return data
def parse_binary_envelope_v1(buf):
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_1:
return None
lp = buf[2]
ls = buf[3]
ld = buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = _brightness_0_255_from_wire(br)
if lp:
try:
data["presets"] = json.loads(presets_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ls:
try:
data["select"] = json.loads(select_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ld:
try:
extra = json.loads(default_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if isinstance(extra, dict):
for k, v in extra.items():
data[k] = v
return data
def parse_binary_envelope(buf):
d = parse_binary_envelope_v2(buf)
if d is not None:
return d
return parse_binary_envelope_v1(buf)

View File

@@ -3,6 +3,7 @@
import json import json
import socket import socket
from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
try: try:
@@ -11,19 +12,60 @@ except ImportError:
import os import os
def process_data(payload, settings, presets, controller_ip=None): def _log_rx(payload) -> None:
"""Read one controller message; json.loads (bytes or str), then apply fields.""" """Serial log when led-controller sends a message into ``process_data``."""
try: try:
data = json.loads(payload) if isinstance(payload, (bytes, bytearray)):
print(payload) n = len(payload)
if data.get("v", "") != "1": if n == 0:
print("rx 0 B")
return
cap = 160
chunk = payload if n <= cap else payload[:cap]
try:
txt = bytes(chunk).decode("utf-8")
except Exception:
txt = str(chunk)
if n > cap:
txt = txt + "..."
print("rx", n, "B", txt)
else:
s = str(payload)
cap = 200
if len(s) <= cap:
print("rx", len(s), "C", s)
else:
print("rx", len(s), "C", s[:cap] + "...")
except Exception:
print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields."""
_log_rx(payload)
data = None
if isinstance(payload, (bytes, bytearray)):
data = parse_binary_envelope(payload)
if data is None:
try:
data = json.loads(payload)
except (ValueError, TypeError):
return
else:
try:
data = json.loads(payload)
except (ValueError, TypeError):
return return
except (ValueError, TypeError): if data.get("v", "") != "1":
return return
if "device_config" in data:
apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
apply_brightness(data, settings, presets) apply_brightness(data, settings, presets)
if "presets" in data: if "presets" in data:
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data:
apply_clear_presets(data, presets)
if "select" in data: if "select" in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
@@ -32,6 +74,92 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_patterns_ota(data, presets, controller_ip=controller_ip) apply_patterns_ota(data, presets, controller_ip=controller_ip)
if "save" in data and ("presets" in data or "default" in data): if "save" in data and ("presets" in data or "default" in data):
presets.save() presets.save()
if "save" in data and "clear_presets" in data:
presets.save()
if "save" in data and "b" in data:
settings.save()
if "save" in data and "device_config" in data:
settings.save()
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
_STARTUP_MODES = frozenset({"default", "last", "off"})
_MAX_DEVICE_LEDS = 2048
def apply_startup_pattern(settings, presets):
"""Apply power-on behaviour from ``startup_mode`` (default / last / off)."""
mode = str(settings.get("startup_mode", "default")).lower().strip()
if mode not in _STARTUP_MODES:
mode = "default"
if mode == "off":
if presets.select("off"):
return
presets.fill((0, 0, 0))
return
if mode == "last":
lp = settings.get("last_preset") or ""
if isinstance(lp, str) and lp.strip() and lp.strip() in presets.presets:
if presets.select(lp.strip()):
return
dp = settings.get("default", "")
if dp and dp in presets.presets:
if not presets.select(dp):
print("Startup preset failed (invalid pattern?):", dp)
def apply_device_config(data, settings, presets):
"""Apply fields from v1 ``device_config``; reload presets when strip length or colour order changes."""
dc = data.get("device_config")
if not isinstance(dc, dict):
return
strip_changed = False
meta_changed = False
if "name" in dc:
n = dc["name"]
if isinstance(n, str) and n.strip():
settings["name"] = n.strip()
meta_changed = True
if "num_leds" in dc:
try:
n = int(dc["num_leds"])
if 1 <= n <= _MAX_DEVICE_LEDS:
settings["num_leds"] = n
presets.update_num_leds(settings["led_pin"], n)
strip_changed = True
except (TypeError, ValueError):
pass
if "color_order" in dc:
co = str(dc["color_order"]).lower().strip()
if co in _VALID_DEVICE_COLOR_ORDERS:
settings["color_order"] = co
settings.color_order = settings.get_color_order(co)
strip_changed = True
if "startup_mode" in dc:
sm = str(dc["startup_mode"]).lower().strip()
if sm in _STARTUP_MODES:
settings["startup_mode"] = sm
meta_changed = True
if not strip_changed and not meta_changed:
return
if strip_changed:
prev = presets.selected
try:
presets.load(settings)
except Exception as e:
print("device_config: presets.load failed:", e)
if prev and prev in presets.presets:
presets.select(prev)
elif settings.get("default") and settings["default"] in presets.presets:
presets.select(settings["default"])
def record_last_preset(settings, preset_name):
"""Persist the last selected preset id (single entry in flash)."""
if not isinstance(preset_name, str) or not preset_name:
return
settings["last_preset"] = preset_name.strip()
settings.save()
def apply_brightness(data, settings, presets): def apply_brightness(data, settings, presets):
@@ -55,8 +183,14 @@ def apply_presets(data, settings, presets):
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
continue continue
if "bg" in preset_data:
try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
if bg_color:
preset_data["bg"] = bg_color[0]
except (TypeError, ValueError, KeyError):
pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
@@ -67,7 +201,23 @@ def apply_select(data, settings, presets):
return return
preset_name = select_list[0] preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
presets.select(preset_name, step=step) if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
def apply_clear_presets(data, presets):
clear_value = data.get("clear_presets")
if isinstance(clear_value, bool):
should_clear = clear_value
elif isinstance(clear_value, int):
should_clear = bool(clear_value)
elif isinstance(clear_value, str):
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
else:
should_clear = False
if not should_clear:
return
presets.delete_all()
def apply_default(data, settings, presets): def apply_default(data, settings, presets):
@@ -79,6 +229,7 @@ def apply_default(data, settings, presets):
and default_name in presets.presets and default_name in presets.presets
): ):
settings["default"] = default_name settings["default"] = default_name
settings.save()
def _parse_http_url(url): def _parse_http_url(url):
@@ -211,8 +362,5 @@ def apply_patterns_ota(data, presets, controller_ip=None):
updated += 1 updated += 1
if updated > 0: if updated > 0:
presets.reload_patterns() presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e: except Exception as e:
print("patterns_ota failed:", e) print("patterns_ota failed:", e)

View File

@@ -92,7 +92,6 @@ def broadcast_hello_udp(
""" """
ip, mask, _gw, _dns = sta.ifconfig() ip, mask, _gw, _dns = sta.ifconfig()
msg = pack_hello_line(sta, device_name) msg = pack_hello_line(sta, device_name)
print("hello:", msg)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try: try:
@@ -121,11 +120,9 @@ def broadcast_hello_udp(
for dest_ip, dest_port in targets: for dest_ip, dest_port in targets:
if wdt is not None: if wdt is not None:
wdt.feed() wdt.feed()
label = "%s:%s" % (dest_ip, dest_port)
target = (dest_ip, dest_port) target = (dest_ip, dest_port)
try: try:
sock.sendto(msg, target) sock.sendto(msg, target)
print("sent hello ->", target)
except OSError as e: except OSError as e:
print("sendto failed:", e) print("sendto failed:", e)
continue continue
@@ -134,20 +131,12 @@ def broadcast_hello_udp(
if wdt is not None: if wdt is not None:
wdt.feed() wdt.feed()
try: try:
data, addr = sock.recvfrom(2048) _data, addr = sock.recvfrom(2048)
print("reply from", addr, ":", data)
remote_ip = addr[0] remote_ip = addr[0]
if data != msg:
print("(warning: reply payload differs from hello; still using source IP.)")
discovered = remote_ip discovered = remote_ip
print("Discovered controller at", remote_ip)
break break
except OSError as e: except OSError:
print("recv (no reply):", e, "via", label) pass
if dest_ip == "255.255.255.255":
print(
"(hint: many APs drop Wi-Fi client broadcast; try wired server or AP without client isolation.)"
)
sock.close() sock.close()
return discovered return discovered
@@ -171,18 +160,12 @@ def discover_controller_udp(device_name="", wdt=None):
print("hello: STA has no IP address.") print("hello: STA has no IP address.")
raise SystemExit(1) raise SystemExit(1)
print("STA IP:", ip, "mask:", mask)
discovered = broadcast_hello_udp( discovered = broadcast_hello_udp(
sta, sta,
device_name, device_name,
wait_reply=True, wait_reply=True,
wdt=wdt, wdt=wdt,
) )
if discovered:
print("discover done; controller =", repr(discovered))
else:
print("discover done; controller not found")
return discovered return discovered

107
src/http_routes.py Normal file
View File

@@ -0,0 +1,107 @@
import json
from controller_messages import process_data
from microdot.websocket import WebSocketError, with_websocket
try:
import uos as os
except ImportError:
import os
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def register_routes(app, settings, presets, runtime_state):
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
runtime_state.ws_connected()
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {"Content-Type": "application/json"}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {"Content-Type": "application/json"}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {"Content-Type": "application/json"}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}
return json.dumps(
{
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}
), 201, {"Content-Type": "application/json"}
@app.post("/presets/upload")
async def upload_presets(request):
"""Receive v1 JSON with ``presets`` and apply/save on the driver."""
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "body is required"}), 400, {"Content-Type": "application/json"}
try:
process_data(body, settings, presets)
except Exception as e:
return json.dumps({"error": str(e)}), 400, {"Content-Type": "application/json"}
return json.dumps({"message": "presets applied"}), 200, {"Content-Type": "application/json"}

View File

@@ -1,88 +1,185 @@
from settings import Settings from settings import Settings
from machine import WDT import machine
import network import network
import utime import utime
import asyncio import asyncio
import json
import gc
from microdot import Microdot from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
from controller_messages import process_data from controller_messages import apply_startup_pattern, process_data
from hello import broadcast_hello_udp from runtime_state import RuntimeState
from background_tasks import udp_hello_loop_after_http_ready
from wifi_sta import connect_until_up
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000)
wdt.feed()
machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings)
gc.collect()
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
default_preset = settings.get("default", "") presets.debug = bool(settings.get("debug", False))
if default_preset and default_preset in presets.presets: gc.collect()
if presets.select(default_preset):
print(f"Selected startup preset: {default_preset}")
else:
print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000) apply_startup_pattern(settings, presets)
wdt.feed()
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True) sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"]) _boot_ssid = settings.get("ssid") or ""
while not sta_if.isconnected(): if _boot_ssid:
utime.sleep(1) connect_until_up(sta_if, _boot_ssid, settings.get("password") or "", wdt)
wdt.feed()
print(sta_if.ifconfig())
def _print_network_ips(controller_ip=None):
"""Always log STA address and led-controller (WS client) address when known."""
try:
led_ip = sta_if.ifconfig()[0]
except Exception:
led_ip = "?"
ctrl = controller_ip if controller_ip else "(not connected)"
print("led-driver IP:", led_ip, " led-controller IP:", ctrl)
_print_network_ips()
runtime_state = RuntimeState()
app = Microdot() app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws") @app.route("/ws")
@with_websocket @with_websocket
async def ws_handler(request, ws): async def ws_handler(request, ws):
print("WS client connected") runtime_state.ws_connected()
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
_print_network_ips(controller_ip)
try: try:
while True: while True:
data = await ws.receive() data = await ws.receive()
if not data: if not data:
print("WS client disconnected (closed)")
break break
print(data) process_data(data, settings, presets, controller_ip=controller_ip)
process_data(data, settings, presets)
except WebSocketError as e: except WebSocketError as e:
print("WS client disconnected:", e) print("WS client disconnected:", e)
except OSError as e: except OSError as e:
print("WS client dropped (OSError):", e) print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def presets_loop(): async def presets_loop():
last_mem_log = utime.ticks_ms()
while True: while True:
presets.tick() presets.tick()
wdt.feed() wdt.feed()
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
print("UDP hello: broadcasting…")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80): async def main(port=80):
asyncio.create_task(presets_loop()) asyncio.create_task(presets_loop())
asyncio.create_task(_udp_hello_after_http_ready()) asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port) await app.start_server(host="0.0.0.0", port=port)

View File

@@ -1,6 +1,5 @@
from .blink import Blink """Pattern modules are registered only via Presets._load_dynamic_patterns().
from .rainbow import Rainbow
from .pulse import Pulse This file is ignored as a pattern (see presets.py). Keep it free of imports so
from .transition import Transition adding a pattern does not require editing this package.
from .chase import Chase """
from .circle import Circle

31
src/patterns/aurora.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Aurora:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(40, 200, 140), (80, 120, 255), (160, 80, 220)]
bands = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
shimmer = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 40))
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
idx = ((i * bands) // max(1, self.driver.num_leds) + (phase // 32)) % len(colors)
c = self.driver.apply_brightness(colors[idx], preset.b)
w = (255 - abs(128 - ((i * 8 + phase) & 255)) * 2)
w = max(0, min(255, w + shimmer))
self.driver.n[i] = ((c[0]*w)//255, (c[1]*w)//255, (c[2]*w)//255)
self.driver.n.write()
phase = (phase + 1) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

29
src/patterns/bar_graph.py Normal file
View File

@@ -0,0 +1,29 @@
import utime
class BarGraph:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(0, 255, 0), (255, 80, 0)]
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
level = max(0, min(100, int(preset.n1) if int(preset.n1) >= 0 else 50))
target = (self.driver.num_leds * level) // 100
lit = self.driver.apply_brightness(colors[0], preset.b)
unlit = self.driver.apply_brightness(
preset.background_or(colors),
preset.b,
)
for i in range(self.driver.num_leds):
self.driver.n[i] = lit if i < target else unlit
self.driver.n.write()
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -9,6 +9,7 @@ class Blink:
"""Blink pattern: toggles LEDs on/off using preset delay, cycling through colors.""" """Blink pattern: toggles LEDs on/off using preset delay, cycling through colors."""
# Use provided colors, or default to white if none # Use provided colors, or default to white if none
colors = preset.c if preset.c else [(255, 255, 255)] colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0 color_index = 0
state = True # True = on, False = off state = True # True = on, False = off
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
@@ -25,9 +26,9 @@ class Blink:
# Advance to next color for the next "on" phase # Advance to next color for the next "on" phase
color_index += 1 color_index += 1
else: else:
# "Off" phase: turn all LEDs off # Inactive phase uses the preset background color.
self.driver.fill((0, 0, 0)) self.driver.fill(bg_color)
state = not state state = not state
last_update = current_time last_update = utime.ticks_add(last_update, delay_ms)
# Yield once per tick so other logic can run # Yield once per tick so other logic can run
yield yield

View File

@@ -0,0 +1,40 @@
import utime
class BreathingDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 140), (0, 120, 255)]
phase_offset = max(0, min(255, int(preset.n1)))
ease = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
p1 = phase
p2 = (phase + phase_offset) & 255
t1 = 255 - abs(128 - p1) * 2
t2 = 255 - abs(128 - p2) * 2
if ease > 1:
t1 = (t1 * t1) // 255
t2 = (t2 * t2) // 255
c1 = self.driver.apply_brightness(colors[0], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
half = self.driver.num_leds // 2
for i in range(self.driver.num_leds):
if i < half:
self.driver.n[i] = ((c1[0]*t1)//255, (c1[1]*t1)//255, (c1[2]*t1)//255)
else:
self.driver.n[i] = ((c2[0]*t2)//255, (c2[1]*t2)//255, (c2[2]*t2)//255)
self.driver.n.write()
phase = (phase + 2) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -26,6 +26,7 @@ class Chase:
color0 = self.driver.apply_brightness(color0, preset.b) color0 = self.driver.apply_brightness(color0, preset.b)
color1 = self.driver.apply_brightness(color1, preset.b) color1 = self.driver.apply_brightness(color1, preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
n1 = max(1, int(preset.n1)) # LEDs of color 0 n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1 n2 = max(1, int(preset.n2)) # LEDs of color 1
@@ -35,7 +36,7 @@ class Chase:
segment_length = n1 + n2 segment_length = n1 + n2
# Calculate position from step_count # Calculate position from step_count
step_count = self.driver.step step_count = int(self.driver.step) % 2
# Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc. # Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc.
if step_count % 2 == 0: if step_count % 2 == 0:
# Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3 # Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3
@@ -53,7 +54,7 @@ class Chase:
# If auto is False, run a single step and then stop # If auto is False, run a single step and then stop
if not preset.a: if not preset.a:
# Clear all LEDs # Clear all LEDs
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(bg_color)
# Draw repeating pattern starting at position # Draw repeating pattern starting at position
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
@@ -69,9 +70,10 @@ class Chase:
self.driver.n[i] = color1 self.driver.n[i] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count)
# Increment step for next beat # Increment step for next beat
self.driver.step = step_count + 1 self.driver.step = (step_count + 1) % 2
# Allow tick() to advance the generator once # Allow tick() to advance the generator once
yield yield
@@ -98,7 +100,7 @@ class Chase:
position += max_pos position += max_pos
# Clear all LEDs # Clear all LEDs
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(bg_color)
# Draw repeating pattern starting at position # Draw repeating pattern starting at position
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
@@ -114,11 +116,13 @@ class Chase:
self.driver.n[i] = color1 self.driver.n[i] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count)
# Increment step # Increment step
step_count += 1 step_count = (step_count + 1) % 2
self.driver.step = step_count self.driver.step = step_count
last_update = current_time last_update = utime.ticks_add(last_update, transition_duration)
transition_duration = max(10, int(preset.d))
# Yield once per tick so other logic can run # Yield once per tick so other logic can run
yield yield

View File

@@ -31,10 +31,10 @@ class Circle:
base0 = base1 = (255, 255, 255) base0 = base1 = (255, 255, 255)
elif len(colors) == 1: elif len(colors) == 1:
base0 = colors[0] base0 = colors[0]
base1 = (0, 0, 0) base1 = preset.background_or(colors)
else: else:
base0 = colors[0] base0 = colors[0]
base1 = colors[1] base1 = preset.background_or(colors)
color0 = self.driver.apply_brightness(base0, preset.b) color0 = self.driver.apply_brightness(base0, preset.b)
color1 = self.driver.apply_brightness(base1, preset.b) color1 = self.driver.apply_brightness(base1, preset.b)
@@ -46,7 +46,7 @@ class Circle:
if phase == "off": if phase == "off":
self.driver.n.fill(color1) self.driver.n.fill(color1)
else: else:
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(color1)
# Calculate segment length # Calculate segment length
segment_length = (head - tail) % self.driver.num_leds segment_length = (head - tail) % self.driver.num_leds
@@ -62,7 +62,9 @@ class Circle:
# Move head continuously at n1 LEDs per second # Move head continuously at n1 LEDs per second
if utime.ticks_diff(current_time, last_head_move) >= head_delay: if utime.ticks_diff(current_time, last_head_move) >= head_delay:
head = (head + 1) % self.driver.num_leds head = (head + 1) % self.driver.num_leds
last_head_move = current_time last_head_move = utime.ticks_add(last_head_move, head_delay)
head_rate = max(1, int(preset.n1))
head_delay = 1000 // head_rate
# Tail behavior based on phase # Tail behavior based on phase
if phase == "growing": if phase == "growing":
@@ -73,7 +75,9 @@ class Circle:
# Shrinking phase: move tail forward at n3 LEDs per second # Shrinking phase: move tail forward at n3 LEDs per second
if utime.ticks_diff(current_time, last_tail_move) >= tail_delay: if utime.ticks_diff(current_time, last_tail_move) >= tail_delay:
tail = (tail + 1) % self.driver.num_leds tail = (tail + 1) % self.driver.num_leds
last_tail_move = current_time last_tail_move = utime.ticks_add(last_tail_move, tail_delay)
tail_rate = max(1, int(preset.n3))
tail_delay = 1000 // tail_rate
# Check if we've reached min length # Check if we've reached min length
current_length = (head - tail) % self.driver.num_leds current_length = (head - tail) % self.driver.num_leds

View File

@@ -0,0 +1,33 @@
import utime
class ClockSweep:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (60, 60, 60)]
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
marker = max(0, int(preset.n2) if int(preset.n2) > 0 else 0)
pos = self.driver.step % max(1, self.driver.num_leds)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
fg = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg
if marker > 0 and i % marker == 0:
self.driver.n[i] = ((bg[0]*2)//3, (bg[1]*2)//3, (bg[2]*2)//3)
for w in range(width):
self.driver.n[(pos + w) % self.driver.num_leds] = fg
self.driver.n.write()
pos = (pos + 1) % max(1, self.driver.num_leds)
self.driver.step = pos
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,56 @@
import utime
class ColourCycle:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
# Match rainbow behaviour: phase is 0..255 and maps to one full-strip shift.
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
# Position around the colour loop, shifted by phase.
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1))
if not preset.a:
self._render(colors, phase, preset.b)
self.driver.step = (phase + step_amount) % 256
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
yield

View File

@@ -0,0 +1,44 @@
import utime
class CometDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
tail = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
gap = max(0, int(preset.n3))
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
c1 = self.driver.apply_brightness(colors[0 % len(colors)], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
for t in range(tail):
i1 = p1 - t
if 0 <= i1 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i1] = ((c1[0]*s)//255, (c1[1]*s)//255, (c1[2]*s)//255)
i2 = p2 + t
if 0 <= i2 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i2] = ((c2[0]*s)//255, (c2[1]*s)//255, (c2[2]*s)//255)
self.driver.n.write()
p1 += speed
p2 -= speed
if p1 - tail > self.driver.num_leds and p2 + tail < 0:
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

35
src/patterns/fireflies.py Normal file
View File

@@ -0,0 +1,35 @@
import random
import utime
class Fireflies:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 210, 80), (120, 255, 120)]
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 8)
bugs = [[random.randint(0, max(0, self.driver.num_leds - 1)), random.randint(0, 255)] for _ in range(count)]
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for b in bugs:
idx, ph = b
tri = 255 - abs(128 - ph) * 2
c = self.driver.apply_brightness(colors[idx % len(colors)], preset.b)
self.driver.n[idx] = ((c[0]*tri)//255, (c[1]*tri)//255, (c[2]*tri)//255)
b[1] = (ph + speed) & 255
if random.randint(0, 31) == 0:
b[0] = random.randint(0, max(0, self.driver.num_leds - 1))
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

210
src/patterns/flame.py Normal file
View File

@@ -0,0 +1,210 @@
import random
import utime
# Default warm palette: ember → orange → yellow → pale hot (RGB)
_DEFAULT_PALETTE = (
(90, 8, 8),
(200, 40, 12),
(255, 120, 30),
(255, 220, 140),
)
def _clamp(x, lo, hi):
if x < lo:
return lo
if x > hi:
return hi
return x
def _lerp_chan(a, b, t):
return a + ((b - a) * t >> 8)
def _lerp_rgb(c0, c1, t):
return (
_lerp_chan(c0[0], c1[0], t),
_lerp_chan(c0[1], c1[1], t),
_lerp_chan(c0[2], c1[2], t),
)
def _palette_sample(palette, pos256):
n = len(palette)
if n == 0:
return (255, 160, 60)
if n == 1:
return palette[0]
span = (n - 1) * pos256
seg = span >> 8
if seg >= n - 1:
return palette[n - 1]
frac = span & 0xFF
return _lerp_rgb(palette[seg], palette[seg + 1], frac)
def _triangle_255(elapsed_ms, period_ms):
period_ms = max(period_ms, 400)
p = elapsed_ms % period_ms
half = period_ms >> 1
if half <= 0:
return 128
if p < half:
return (p * 255) // half
return ((period_ms - p) * 255) // (period_ms - half)
class Flame:
def __init__(self, driver):
self.driver = driver
def _build_palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_PALETTE)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
_clamp(int(c[0]), 0, 255),
_clamp(int(c[1]), 0, 255),
_clamp(int(c[2]), 0, 255),
)
)
return out if out else list(_DEFAULT_PALETTE)
def _draw_frame(self, preset, palette, ticks_now, breath_el_ms, rise, cluster_jit, breath_ms, lo, hi, spark_state):
"""spark_state: (active: bool, start_ticks, duration_ms). ticks_now for sparks; breath_el_ms for slow wave."""
num = self.driver.num_leds
denom = num - 1 if num > 1 else 1
breathe = _triangle_255(breath_el_ms, breath_ms)
base_level = lo + (((hi - lo) * breathe) >> 8)
micro = 232 + random.randint(0, 35)
level = (base_level * micro) >> 8
level = _clamp(level, lo, hi)
spark_boost = 0
spark_white = (0, 0, 0)
active, s0, dur = spark_state
if active and dur > 0:
el = utime.ticks_diff(ticks_now, s0)
if el < 0:
el = 0
if el >= dur:
spark_boost = 0
else:
env = 255 - ((el * 255) // dur)
spark_boost = (env * 90) >> 8
spark_white = ((env * 55) >> 8, (env * 50) >> 8, (env * 40) >> 8)
for i in range(num):
h = (i * 256) // denom
flow = (h + rise + ((i // max(1, num >> 3)) * 17)) & 255
pos = (flow + cluster_jit[(i >> 2) & 7]) & 255
rgb = _palette_sample(palette, pos)
if spark_boost:
rgb = (
_clamp(rgb[0] + spark_white[0] + (spark_boost * 3 >> 2), 0, 255),
_clamp(rgb[1] + spark_white[1] + (spark_boost >> 1), 0, 255),
_clamp(rgb[2] + spark_white[2] + (spark_boost >> 2), 0, 255),
)
self.driver.n[i] = self.driver.apply_brightness(rgb, level)
self.driver.n.write()
def run(self, preset):
"""Salt-lamp / hearth-style flame: warm gradient, breathing, jitter, drift, rare sparks."""
palette = self._build_palette(preset)
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
bp = int(preset.n2)
breath_ms = max(800, bp if bp > 0 else 2500)
gap_lo = int(preset.n3)
gap_hi = int(preset.n4)
# n3 < 0 disables sparks; n3=n4=0 uses ~1030 s gaps (hearth pops).
if gap_lo < 0:
sparks_on = False
else:
sparks_on = True
if gap_lo == 0 and gap_hi == 0:
gap_lo, gap_hi = 10000, 30000
else:
gap_lo = max(gap_lo, 500)
if gap_hi < gap_lo:
gap_hi = gap_lo
delay_ms = max(16, int(preset.d))
rise = random.randint(0, 255)
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
last_draw = utime.ticks_ms()
breath_origin = last_draw
last_cluster = last_draw
spark_active = False
spark_start = 0
spark_dur = 0
next_spark = utime.ticks_add(last_draw, random.randint(gap_lo, gap_hi)) if sparks_on else 0
if not preset.a:
now = utime.ticks_ms()
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(False, 0, 0),
)
yield
return
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_draw) < delay_ms:
yield
continue
last_draw = utime.ticks_add(last_draw, delay_ms)
rise = (rise + random.randint(-10, 12)) & 255
if utime.ticks_diff(now, last_cluster) >= (delay_ms * 4):
last_cluster = now
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
spark_state = (spark_active, spark_start, spark_dur)
if sparks_on:
if spark_active:
if utime.ticks_diff(now, spark_start) >= spark_dur:
spark_active = False
next_spark = utime.ticks_add(
now,
random.randint(gap_lo, gap_hi),
)
elif utime.ticks_diff(now, next_spark) >= 0:
spark_active = True
spark_start = now
spark_dur = random.randint(180, 360)
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(spark_active, spark_start, spark_dur),
)
yield

40
src/patterns/flicker.py Normal file
View File

@@ -0,0 +1,40 @@
import random
import utime
class Flicker:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Random brightness between n1 (min) and b (max); delay d ms between updates."""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
last_update = utime.ticks_ms()
def brightness_bounds():
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
return lo, hi
if not preset.a:
lo, hi = brightness_bounds()
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
yield
return
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
lo, hi = brightness_bounds()
if utime.ticks_diff(current_time, last_update) >= delay_ms:
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
yield

View File

@@ -0,0 +1,57 @@
import utime
class GradientScroll:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
"""Scrolling blended gradient.
n1: phase step amount (default 1)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

36
src/patterns/heartbeat.py Normal file
View File

@@ -0,0 +1,36 @@
import utime
class Heartbeat:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 40)]
phase = 0
phase_start = utime.ticks_ms()
did_manual_pulse = False
while True:
p1 = max(20, int(preset.n1) if int(preset.n1) > 0 else 120)
p2 = max(20, int(preset.n2) if int(preset.n2) > 0 else 80)
pause = max(20, int(preset.n3) if int(preset.n3) > 0 else 500)
beat_gap = max(20, int(preset.d))
colors = preset.c if preset.c else [(255, 0, 40)]
lit_color = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
phase_durations = (p1, beat_gap, p2, pause)
phase_colors = (lit_color, bg_color, lit_color, bg_color)
now = utime.ticks_ms()
while utime.ticks_diff(now, phase_start) >= phase_durations[phase]:
phase_start = utime.ticks_add(phase_start, phase_durations[phase])
phase = (phase + 1) % 4
self.driver.fill(phase_colors[phase])
yield
if not preset.a:
if did_manual_pulse or phase == 0:
self.driver.fill(bg_color)
yield
return
did_manual_pulse = True

31
src/patterns/marquee.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Marquee:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % (on_len + off_len)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
m = (i + phase) % (on_len + off_len)
self.driver.n[i] = c if m < on_len else bg_color
self.driver.n.write()
phase = (phase + step) % (on_len + off_len)
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,62 @@
import utime
class MeteorRain:
def __init__(self, driver):
self.driver = driver
def _fade(self, color, fade_amount):
return (
(color[0] * fade_amount) // 255,
(color[1] * fade_amount) // 255,
(color[2] * fade_amount) // 255,
)
def run(self, preset):
"""Single meteor with a fading tail.
n1: tail length (default 8)
n2: speed in LEDs per frame (default 1)
n3: fade amount per frame, 1..255 (default 192)
"""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
head = 0
direction = 1
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
tail_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 8)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
fade_amount = int(preset.n3) if int(preset.n3) > 0 else 192
fade_amount = max(1, min(255, fade_amount))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
for i in range(self.driver.num_leds):
self.driver.n[i] = self._fade(self.driver.n[i], fade_amount)
base = colors[color_index % len(colors)]
lit = self.driver.apply_brightness(base, preset.b)
if 0 <= head < self.driver.num_leds:
self.driver.n[head] = lit
self.driver.n.write()
head += direction * speed
if head >= self.driver.num_leds + tail_len:
head = self.driver.num_leds - 1
direction = -1
color_index += 1
elif head < -tail_len:
head = 0
direction = 1
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

31
src/patterns/orbit.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Orbit:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (0, 180, 255), (255, 0, 120)]
orbits = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for k in range(orbits):
idx = ((phase * (k + 1)) // 8 + (k * self.driver.num_leds // max(1, orbits))) % max(1, self.driver.num_leds)
self.driver.n[idx] = self.driver.apply_brightness(colors[k % len(colors)], preset.b)
self.driver.n.write()
phase = (phase + speed) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,81 @@
import utime
class PaletteMorph:
def __init__(self, driver):
self.driver = driver
def _blend(self, c1, c2, t):
return (
c1[0] + ((c2[0] - c1[0]) * t) // 255,
c1[1] + ((c2[1] - c1[1]) * t) // 255,
c1[2] + ((c2[2] - c1[2]) * t) // 255,
)
def run(self, preset):
"""Living color field (non-scrolling palette warp).
Different from `colour_cycle`: this does not scroll a fixed gradient.
Instead, each LED breathes/warps through the palette with local phase
offsets so the strip looks alive.
n1: morph duration (ms)
n2: warp rate
n3: spatial turbulence amount
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
if len(colors) < 2:
while True:
self.driver.fill(self.driver.apply_brightness(colors[0], preset.b))
yield
morph = max(50, int(preset.n1) if int(preset.n1) > 0 else 1200)
warp_rate = max(1, int(preset.n2) if int(preset.n2) > 0 else 3)
turbulence = max(1, int(preset.n3) if int(preset.n3) > 0 else 24)
base_idx = 0
start = utime.ticks_ms()
phase = self.driver.step % 256
last_update = start
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) < delay_ms:
yield
continue
last_update = utime.ticks_add(last_update, delay_ms)
age = utime.ticks_diff(now, start)
if age < morph:
t = (age * 255) // morph
else:
t = 255
# Global morph anchor between neighboring palette colors.
a = colors[base_idx % len(colors)]
b = colors[(base_idx + 1) % len(colors)]
anchor = self._blend(a, b, t)
for i in range(self.driver.num_leds):
# Non-linear local warp per LED to create "living" motion.
pos = (i * 256) // max(1, self.driver.num_leds)
wobble = ((pos * turbulence) // 32 + phase + (t // 2)) & 255
breath = 255 - abs(128 - wobble) * 2
local = (pos + (breath // 3) + (t // 4)) % 256
idx = (base_idx + ((local * len(colors)) // 256)) % len(colors)
frac = (local * len(colors)) & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % len(colors)]
grad = self._blend(c1, c2, frac)
# Blend with anchor to keep coherent palette morphing.
out = self._blend(grad, anchor, 80)
self.driver.n[i] = self.driver.apply_brightness(out, preset.b)
self.driver.n.write()
if age >= morph:
base_idx = (base_idx + 1) % len(colors)
start = now
if not preset.a:
yield
return
phase = (phase + warp_rate) & 255
self.driver.step = phase
yield

39
src/patterns/plasma.py Normal file
View File

@@ -0,0 +1,39 @@
import utime
class Plasma:
def __init__(self, driver):
self.driver = driver
def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
if pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def run(self, preset):
scale = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
contrast = max(1, int(preset.n3) if int(preset.n3) > 0 else 2)
t = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
v = ((i * scale + t) & 255)
v2 = (((i * scale // max(1, contrast)) - (t * 2)) & 255)
c = self._wheel((v + v2) & 255)
self.driver.n[i] = self.driver.apply_brightness(c, preset.b)
self.driver.n.write()
t = (t + speed) % 256
self.driver.step = t
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -6,18 +6,25 @@ class Pulse:
self.driver = driver self.driver = driver
def run(self, preset): def run(self, preset):
self.driver.off()
# Get colors from preset # Get colors from preset
colors = preset.c colors = preset.c
if not colors: if not colors:
colors = [(255, 255, 255)] colors = [(255, 255, 255)]
bg_base = preset.background_or(colors)
self.driver.fill(self.driver.apply_brightness(bg_base, preset.b))
color_index = 0 color_index = self.driver.step % max(1, len(colors))
if not preset.a:
# Manual / beat trigger: each select restarts this generator and resets
# cycle_start below. Advancing step here makes each beat the next colour
# without requiring a full wall-clock cycle between beats.
nclr = max(1, len(colors))
self.driver.step = (color_index + 1) % nclr
cycle_start = utime.ticks_ms() cycle_start = utime.ticks_ms()
# State machine based pulse using a single generator loop # State machine based pulse using a single generator loop
while True: while True:
bg_color = self.driver.apply_brightness(bg_base, preset.b)
# Read current timing parameters from preset # Read current timing parameters from preset
attack_ms = max(0, int(preset.n1)) # Attack time in ms attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms hold_ms = max(0, int(preset.n2)) # Hold time in ms
@@ -49,14 +56,15 @@ class Pulse:
self.driver.fill(self.driver.apply_brightness(color, preset.b)) self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < total_ms: elif elapsed < total_ms:
# Delay phase: LEDs off between pulses # Delay phase: LEDs off between pulses
self.driver.fill((0, 0, 0)) self.driver.fill(bg_color)
else: else:
# End of cycle, move to next color and restart timing # End of cycle: auto advances colour and loops; manual already
color_index += 1 # advanced step at run start for the next beat.
cycle_start = now
if not preset.a: if not preset.a:
break break
# Skip drawing this tick, start next cycle color_index = (color_index + 1) % max(1, len(colors))
self.driver.step = color_index
cycle_start = now
yield yield
continue continue

169
src/patterns/radiate.py Normal file
View File

@@ -0,0 +1,169 @@
import utime
# When ``driver.debug`` is True (``settings["debug"]``), log at most this often (ms).
_RADIATE_DBG_INTERVAL_MS = 800
class Radiate:
def __init__(self, driver):
self.driver = driver
self._color_step = 0
def run(self, preset):
"""Radiate from nodes every n1 LEDs, retriggering every delay (d).
- n1: node spacing in LEDs
- n2: outbound travel time in ms
- n3: return travel time in ms
- d: retrigger interval in ms
"""
colors = preset.c if preset.c else [(255, 255, 255)]
base_off = preset.background_or(colors)
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
lit_color = self.driver.apply_brightness(colors[self._color_step % max(1, len(colors))], preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
now = utime.ticks_ms()
last_trigger = now
active_pulses = [now]
last_dbg = now
dbg_banner = False
if not preset.a:
# Manual mode: one-shot pulse using the same ms-based timing as auto.
cycle_start = utime.ticks_ms()
while True:
dbg = bool(getattr(self.driver, "debug", False))
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
on_color = colors[self._color_step % max(1, len(colors))]
lit_color = self.driver.apply_brightness(on_color, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
pulse_lifetime = outward_ms + return_ms
now = utime.ticks_ms()
age = utime.ticks_diff(now, cycle_start)
if age < 1:
age = 1
if age <= outward_ms:
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms:
back_age = age - outward_ms
remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else:
front = 0
lit_count = 0
for i in range(self.driver.num_leds):
offset = (i + (spacing // 2)) % spacing
dist = min(offset, spacing - offset)
lit = dist <= front
self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write()
if dbg:
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on n1=%s n2=%s n3=%s d=%s auto=%s num_leds=%d"
% (preset.n1, preset.n2, preset.n3, preset.d, preset.a, self.driver.num_leds)
)
print(
"[radiate] manual frame age=%d/%d front=%d lit=%d"
% (age, pulse_lifetime, front, lit_count)
)
yield
if age >= pulse_lifetime:
self._color_step += 1
return
while True:
now = utime.ticks_ms()
dbg = bool(getattr(self.driver, "debug", False))
delay_ms = max(1, int(preset.d))
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
pulse_lifetime = outward_ms + return_ms
max_dist = spacing // 2
on_color = colors[self._color_step % max(1, len(colors))]
lit_color = self.driver.apply_brightness(on_color, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
# Keep one pulse train at a time; replacing instead of appending
# prevents overlap from keeping color[0] continuously visible.
active_pulses = [now]
last_trigger = utime.ticks_add(last_trigger, delay_ms)
self._color_step += 1
# Drop pulses once their out-and-back lifetime ends.
kept = []
for start in active_pulses:
age = utime.ticks_diff(now, start)
if age < pulse_lifetime:
kept.append(start)
active_pulses = kept
lit_count = 0
for i in range(self.driver.num_leds):
# Nearest node distance for a repeating node grid every `spacing` LEDs.
offset = (i + (spacing // 2)) % spacing
dist = min(offset, spacing - offset)
lit = False
for start in active_pulses:
age = utime.ticks_diff(now, start)
# Auto: skip the exact trigger tick (age==0) so nodes are not stuck on.
if age <= 0:
continue
if age <= outward_ms:
# Integer-ceiling progression so peak can be reached even
# when tick timing skips the exact outward_ms boundary.
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms:
back_age = age - outward_ms
remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else:
continue
if dist <= front:
lit = True
break
self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write()
if dbg:
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on n1=%s n2=%s n3=%s d=%s auto=%s num_leds=%d"
% (preset.n1, preset.n2, preset.n3, preset.d, preset.a, self.driver.num_leds)
)
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
print(
"[radiate] pulses=%d first_age=%d lit=%d lifetime=%d"
% (len(active_pulses), pulse_age, lit_count, pulse_lifetime)
)
last_dbg = now
yield

View File

@@ -0,0 +1,41 @@
import random
import utime
class RainDrops:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(120, 180, 255)]
rate = max(1, int(preset.n1) if int(preset.n1) > 0 else 32)
width = max(1, int(preset.n2) if int(preset.n2) > 0 else 3)
drops = []
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
if random.randint(0, 255) < rate:
drops.append([random.randint(0, max(0, self.driver.num_leds - 1)), 0])
nd = []
for pos, age in drops:
for off in range(-width, width + 1):
idx = pos + off
if 0 <= idx < self.driver.num_leds:
s = 255 - min(255, abs(off) * 255 // max(1, width + 1) + age * 40)
base = self.driver.apply_brightness(colors[age % len(colors)], preset.b)
self.driver.n[idx] = ((base[0]*s)//255, (base[1]*s)//255, (base[2]*s)//255)
age += 1
if age < 8:
nd.append([pos, age])
drops = nd
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -46,6 +46,6 @@ class Rainbow:
self.driver.n.write() self.driver.n.write()
step = (step + step_amount) % 256 step = (step + step_amount) % 256
self.driver.step = step self.driver.step = step
last_update = current_time last_update = utime.ticks_add(last_update, sleep_ms)
# Yield once per tick so other logic can run # Yield once per tick so other logic can run
yield yield

67
src/patterns/scanner.py Normal file
View File

@@ -0,0 +1,67 @@
import utime
class Scanner:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Classic scanner eye with soft falloff.
n1: eye width (default 4)
n2: end pause in frames (default 0)
"""
colors = preset.c if preset.c else [(255, 0, 0)]
color_index = 0
center = 0
direction = 1
pause_frames = 0
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
end_pause = max(0, int(preset.n2))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
base = colors[color_index % len(colors)]
base = self.driver.apply_brightness(base, preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
dist = i - center
if dist < 0:
dist = -dist
if dist > width:
self.driver.n[i] = bg_color
else:
scale = ((width - dist) * 255) // max(1, width)
self.driver.n[i] = (
(base[0] * scale) // 255,
(base[1] * scale) // 255,
(base[2] * scale) // 255,
)
self.driver.n.write()
if pause_frames > 0:
pause_frames -= 1
else:
center += direction
if center >= self.driver.num_leds - 1:
center = self.driver.num_leds - 1
direction = -1
pause_frames = end_pause
color_index += 1
elif center <= 0:
center = 0
direction = 1
pause_frames = end_pause
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,45 @@
import utime
class SegmentChase:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Independent moving segments (distinct from classic two-color chase).
n1: segment size (LEDs per segment)
n2: step size (phase increment each frame)
n3: per-segment phase offset
n4: gap spacing inside segment (0 = solid segment)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
seg = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
phase_step = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
seg_offset = max(0, int(preset.n3))
gap = max(0, int(preset.n4))
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
seg_idx = i // seg
in_seg = i % seg
local_phase = (phase + seg_idx * seg_offset) % seg
lit_idx = (in_seg + local_phase) % seg
if gap > 0 and lit_idx >= max(1, seg - gap):
self.driver.n[i] = bg_color
else:
color_idx = seg_idx % len(colors)
self.driver.n[i] = self.driver.apply_brightness(colors[color_idx], preset.b)
self.driver.n.write()
phase = (phase + phase_step) % seg
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

37
src/patterns/snowfall.py Normal file
View File

@@ -0,0 +1,37 @@
import random
import utime
class Snowfall:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (180, 220, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
flakes = []
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
if random.randint(0, 255) < density:
flakes.append([self.driver.num_leds - 1, random.randint(0, len(colors)-1)])
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
nf = []
for pos, ci in flakes:
if 0 <= pos < self.driver.num_leds:
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b)
pos -= speed
if pos >= -1:
nf.append([pos, ci])
flakes = nf
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,31 @@
import random
import utime
class SparkleTrail:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(120, 120, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 24)
decay = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 210))
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
r,g,b = self.driver.n[i]
self.driver.n[i] = ((r*decay)//255, (g*decay)//255, (b*decay)//255)
sparks = max(1, self.driver.num_leds * density // 255)
for _ in range(sparks):
idx = random.randint(0, max(0, self.driver.num_leds - 1))
c = self.driver.apply_brightness(colors[random.randint(0, len(colors)-1)], preset.b)
self.driver.n[idx] = c
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,45 @@
import utime
class StrobeBurst:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
state = "flash_on"
flash_idx = 0
state_start = utime.ticks_ms()
while True:
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
gap = max(1, int(preset.n2) if int(preset.n2) > 0 else 60)
cooldown = max(1, int(preset.n3) if int(preset.n3) > 0 else 400)
on_ms = max(1, int(preset.d) // 2)
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
now = utime.ticks_ms()
if state == "flash_on":
self.driver.fill(c)
if utime.ticks_diff(now, state_start) >= on_ms:
state = "flash_off"
state_start = utime.ticks_add(state_start, on_ms)
elif state == "flash_off":
self.driver.fill(bg_color)
if utime.ticks_diff(now, state_start) >= gap:
flash_idx += 1
if flash_idx >= count:
if not preset.a:
return
state = "cooldown"
flash_idx = 0
state_start = utime.ticks_add(state_start, gap)
else:
state = "flash_on"
state_start = utime.ticks_add(state_start, gap)
else:
self.driver.fill(bg_color)
if utime.ticks_diff(now, state_start) >= cooldown:
state = "flash_on"
state_start = utime.ticks_add(state_start, cooldown)
yield

160
src/patterns/twinkle.py Normal file
View File

@@ -0,0 +1,160 @@
import random
import utime
# Default cool palette (icy blues, violet, mint) when preset has no colours.
_DEFAULT_COOL = (
(120, 200, 255),
(80, 140, 255),
(180, 120, 255),
(100, 220, 240),
(160, 200, 255),
(90, 180, 220),
)
class Twinkle:
def __init__(self, driver):
self.driver = driver
def _palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_COOL)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
max(0, min(255, int(c[0]))),
max(0, min(255, int(c[1]))),
max(0, min(255, int(c[2]))),
)
)
return out if out else list(_DEFAULT_COOL)
def run(self, preset):
"""Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs."""
palette = self._palette(preset)
num = self.driver.num_leds
bg_color = self.driver.apply_brightness(preset.background_or(palette), preset.b)
if num <= 0:
while True:
yield
return
def activity_rate():
r = int(preset.n1)
if r <= 0:
r = 48
return max(1, min(255, r))
def density255():
"""Higher → more LEDs lit on average when a twinkle step fires (0 = default mid)."""
d = int(preset.n2)
if d <= 0:
d = 128
return max(0, min(255, d))
def cluster_len_bounds():
"""n3 = min adjacent LEDs per twinkle, n4 = max (both 0 → 1..4)."""
lo = int(preset.n3)
hi = int(preset.n4)
if lo <= 0 and hi <= 0:
lo, hi = 1, min(4, num)
else:
if lo <= 0:
lo = 1
if hi <= 0:
hi = lo
if hi < lo:
lo, hi = hi, lo
lo = max(1, min(lo, num))
hi = max(lo, min(hi, num))
return lo, hi
def random_cluster_len():
lo, hi = cluster_len_bounds()
# When min and max match, every lit/dim run is exactly that many LEDs (still capped by strip length).
if lo == hi:
return lo
return random.randint(lo, hi)
def cluster_base_index(start, k):
"""Shift run left so a length-k segment fits; keeps full k when num >= k."""
k = min(max(0, int(k)), num)
if k <= 0:
return 0
return max(0, min(int(start), num - k))
dens = density255()
on = [random.randint(0, 255) < dens for _ in range(num)]
colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)]
last_update = utime.ticks_ms()
if not preset.a:
for i in range(num):
if on[i]:
base = palette[colour_i[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = bg_color
self.driver.n.write()
yield
return
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) >= delay_ms:
rate = activity_rate()
dens = density255()
# Snapshot for decisions; apply all darks then all lights so
# overlaps in the same tick favour lit runs (lights win).
prev_on = on[:]
prev_ci = colour_i[:]
next_on = list(prev_on)
next_ci = list(prev_ci)
light_i = []
dark_i = []
for i in range(num):
if random.randint(0, 255) < rate:
r = random.randint(0, 255)
if not prev_on[i]:
if r < dens:
light_i.append(i)
else:
if r < (255 - dens):
dark_i.append(i)
def light_adjacent(start):
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = True
next_ci[idx] = random.randint(0, len(palette) - 1)
def dark_adjacent(start):
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = False
for i in dark_i:
dark_adjacent(i)
for i in light_i:
light_adjacent(i)
for i in range(num):
if next_on[i]:
base = palette[next_ci[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = bg_color
self.driver.n.write()
on = next_on
colour_i = next_ci
last_update = utime.ticks_add(last_update, delay_ms)
yield

32
src/patterns/wave.py Normal file
View File

@@ -0,0 +1,32 @@
import utime
class Wave:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(0, 180, 255)]
wavelength = max(2, int(preset.n1) if int(preset.n1) > 0 else 12)
amp = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 180))
drift = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
base = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds):
x = (i * 256 // wavelength + phase) & 255
tri = 255 - abs(128 - x) * 2
s = (tri * amp) // 255
self.driver.n[i] = ((base[0]*s)//255, (base[1]*s)//255, (base[2]*s)//255)
self.driver.n.write()
phase = (phase + drift) % 256
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -12,6 +12,7 @@ class Preset:
self.n4 = 0 self.n4 = 0
self.n5 = 0 self.n5 = 0
self.n6 = 0 self.n6 = 0
self.bg = (0, 0, 0)
# Override defaults with provided data # Override defaults with provided data
self.edit(data) self.edit(data)
@@ -25,9 +26,10 @@ class Preset:
"delay": "d", "delay": "d",
"brightness": "b", "brightness": "b",
"auto": "a", "auto": "a",
"background": "bg",
} }
int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"} int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"}
allowed_fields = {"p", "c", "d", "b", "a", "n1", "n2", "n3", "n4", "n5", "n6"} allowed_fields = {"p", "c", "d", "b", "a", "bg", "n1", "n2", "n3", "n4", "n5", "n6"}
for key, value in data.items(): for key, value in data.items():
key = aliases.get(key, key) key = aliases.get(key, key)
if key not in allowed_fields: if key not in allowed_fields:
@@ -56,6 +58,21 @@ class Preset:
elif key == "c": elif key == "c":
if isinstance(value, (list, tuple)): if isinstance(value, (list, tuple)):
self.c = value self.c = value
elif key == "bg":
if isinstance(value, str) and value.startswith("#") and len(value) == 7:
try:
self.bg = (
int(value[1:3], 16),
int(value[3:5], 16),
int(value[5:7], 16),
)
except (TypeError, ValueError):
continue
elif isinstance(value, (list, tuple)) and len(value) == 3:
try:
self.bg = tuple(max(0, min(255, int(x))) for x in value)
except (TypeError, ValueError):
continue
else: else:
setattr(self, key, value) setattr(self, key, value)
return True return True
@@ -100,6 +117,15 @@ class Preset:
def auto(self, value): def auto(self, value):
self.a = value self.a = value
def background_or(self, colors=None, default=(0, 0, 0)):
bg = getattr(self, "bg", None)
if isinstance(bg, (list, tuple)) and len(bg) == 3:
try:
return tuple(max(0, min(255, int(x))) for x in bg)
except (TypeError, ValueError):
return default
return default
def to_dict(self): def to_dict(self):
return { return {
"p": self.p, "p": self.p,
@@ -107,6 +133,7 @@ class Preset:
"b": self.b, "b": self.b,
"c": self.c, "c": self.c,
"a": self.a, "a": self.a,
"bg": self.bg,
"n1": self.n1, "n1": self.n1,
"n2": self.n2, "n2": self.n2,
"n3": self.n3, "n3": self.n3,

View File

@@ -1 +0,0 @@
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}

View File

@@ -9,6 +9,8 @@ try:
except ImportError: except ImportError:
import os import os
MAX_PRESETS = 32
class Presets: class Presets:
def __init__(self, pin, num_leds): def __init__(self, pin, num_leds):
@@ -95,16 +97,15 @@ class Presets:
order = settings if settings is not None else "rgb" order = settings if settings is not None else "rgb"
self.presets = {} self.presets = {}
for name, preset_data in data.items(): for name, preset_data in data.items():
if len(self.presets) >= MAX_PRESETS:
print("Preset limit reached on load:", MAX_PRESETS)
break
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None: if color_key is not None:
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], order preset_data[color_key], order
) )
self.presets[name] = Preset(preset_data) self.presets[name] = Preset(preset_data)
if self.presets:
print("Loaded presets:")
#for name in sorted(self.presets.keys()):
# print(f" {name}: {self.presets[name].to_dict()}")
return True return True
def edit(self, name, data): def edit(self, name, data):
@@ -112,7 +113,19 @@ class Presets:
if name in self.presets: if name in self.presets:
# Update existing preset # Update existing preset
self.presets[name].edit(data) self.presets[name].edit(data)
# Editing the live preset (e.g. toggling auto/manual) must reset runtime
# state; re-select alone keeps step because preset name is unchanged.
if self.selected == name:
self.step = 0
self.generator = None
self.fill((0, 0, 0))
# Re-start pattern so manual/auto and other edits apply without a
# separate select message (controller usually sends both).
self.select(name)
else: else:
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
print("Preset limit reached:", MAX_PRESETS)
return False
# Create new preset # Create new preset
self.presets[name] = Preset(data) self.presets[name] = Preset(data)
return True return True
@@ -123,6 +136,12 @@ class Presets:
return True return True
return False return False
def delete_all(self):
self.presets = {}
self.generator = None
self.selected = None
return True
def tick(self): def tick(self):
if self.generator is None: if self.generator is None:
return return
@@ -145,6 +164,16 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
# Manual single-shot patterns: if this select arrives before the main loop has
# tick()'d the previous frame, completing it first keeps step in sync with beats.
if (
preset_name == self.selected
and not preset.a
and preset.p == "chase"
and self.generator is not None
):
while self.generator is not None:
self.tick()
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
@@ -152,7 +181,11 @@ class Presets:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
self.tick()
return True return True
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
return False
print("select failed: preset not found", preset_name)
# If preset doesn't exist or pattern not found, indicate failure # If preset doesn't exist or pattern not found, indicate failure
return False return False

12
src/runtime_state.py Normal file
View File

@@ -0,0 +1,12 @@
class RuntimeState:
def __init__(self):
self.hello = True
self.ws_client_count = 0
def ws_connected(self):
self.ws_client_count += 1
self.hello = False
def ws_disconnected(self):
self.ws_client_count = max(0, self.ws_client_count - 1)
self.hello = self.ws_client_count == 0

View File

@@ -27,6 +27,9 @@ class Settings(dict):
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["last_preset"] = ""
# Power-on: "default" | "last" | "off"
self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow" self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
@@ -39,7 +42,6 @@ class Settings(dict):
j = json.dumps(self) j = json.dumps(self)
with open(self.SETTINGS_FILE, 'w') as file: with open(self.SETTINGS_FILE, 'w') as file:
file.write(j) file.write(j)
print("Settings saved successfully.")
except Exception as e: except Exception as e:
print(f"Error saving settings: {e}") print(f"Error saving settings: {e}")
@@ -48,7 +50,17 @@ class Settings(dict):
with open(self.SETTINGS_FILE, 'r') as file: with open(self.SETTINGS_FILE, 'r') as file:
loaded_settings = json.load(file) loaded_settings = json.load(file)
self.update(loaded_settings) self.update(loaded_settings)
print("Settings loaded successfully.") old_recent = self.pop("recent_presets", None)
if isinstance(old_recent, list) and old_recent and not self.get("last_preset"):
for x in reversed(old_recent):
if isinstance(x, str) and x.strip():
self["last_preset"] = x.strip()
break
if x is not None:
s = str(x).strip()
if s:
self["last_preset"] = s
break
except Exception as e: except Exception as e:
print(f"Error loading settings") print(f"Error loading settings")
self.set_defaults() self.set_defaults()

51
src/startup.py Normal file
View File

@@ -0,0 +1,51 @@
import gc
import machine
import network
import utime
from presets import Presets
from settings import Settings
from controller_messages import apply_startup_pattern
def initialize_runtime():
machine.freq(160000000)
settings = Settings()
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect()
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
apply_startup_pattern(settings, presets)
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
utime.sleep(1)
wdt.feed()
try:
led_ip = sta_if.ifconfig()[0]
except Exception:
led_ip = "?"
print("led-driver IP:", led_ip, " led-controller IP:", "(not connected)")
return settings, presets, wdt, sta_if

93
src/wifi_sta.py Normal file
View File

@@ -0,0 +1,93 @@
"""STA connect helpers aligned with tests/test_wifi.py (status polling, fatal codes)."""
import utime
import network
_CONNECT_TIMEOUT_S = 45
_RETRY_DELAY_S = 2
def _wifi_status_label(code):
names = {
getattr(network, "STAT_IDLE", 0): "idle",
getattr(network, "STAT_CONNECTING", 1): "connecting",
getattr(network, "STAT_WRONG_PASSWORD", -3): "wrong_password",
getattr(network, "STAT_NO_AP_FOUND", -2): "no_ap_found",
getattr(network, "STAT_CONNECT_FAIL", -1): "connect_fail",
getattr(network, "STAT_GOT_IP", 3): "got_ip",
}
return names.get(code, str(code))
# Only abort the wait loop immediately on wrong password. NO_AP_FOUND / CONNECT_FAIL are often
# transient while the radio is still scanning (ESP32-C3 may report them before the AP appears).
_ABORT_WAIT_IMMEDIATE = (
getattr(network, "STAT_WRONG_PASSWORD", -3),
)
def _one_association_campaign(sta_if, ssid, password, wdt):
"""disconnect → connect → wait until connected, wrong password, or timeout. Returns True if connected."""
try:
sta_if.disconnect()
except Exception:
pass
utime.sleep_ms(200)
try:
sta_if.connect(ssid, password)
except Exception as ex:
print("wifi_sta: connect raised:", ex)
return False
start = utime.time()
last_status = None
while not sta_if.isconnected():
status = sta_if.status()
if status != last_status:
print("wifi_sta: status", status, _wifi_status_label(status))
last_status = status
if status in _ABORT_WAIT_IMMEDIATE:
return False
if utime.time() - start >= _CONNECT_TIMEOUT_S:
print("wifi_sta: association timeout")
return False
utime.sleep(1)
if wdt is not None:
wdt.feed()
return True
def connect_until_up(sta_if, ssid, password, wdt):
"""Boot: repeat campaigns until STA has a route (same strategy as tests/test_wifi.py)."""
if not ssid:
print("wifi_sta: no ssid in settings")
return False
attempt = 0
while True:
attempt += 1
print("wifi_sta: boot attempt", attempt, "ssid=", repr(ssid))
if _one_association_campaign(sta_if, ssid, password, wdt):
try:
print("wifi_sta: connected", sta_if.ifconfig()[0])
except Exception:
print("wifi_sta: connected")
return True
print("wifi_sta: retry in", _RETRY_DELAY_S, "s")
for _ in range(_RETRY_DELAY_S):
utime.sleep(1)
if wdt is not None:
wdt.feed()
def try_reconnect(sta_if, ssid, password, wdt):
"""Runtime: single association campaign after link loss; non-looping."""
if not ssid:
return False
print("wifi_sta: reconnect")
ok = _one_association_campaign(sta_if, ssid, password, wdt)
if ok:
try:
print("wifi_sta: connected", sta_if.ifconfig()[0])
except Exception:
print("wifi_sta: connected")
return ok

View File

@@ -184,6 +184,36 @@ def test_pattern_smoke():
ctx.tick_for_ms(120) ctx.tick_for_ms(120)
def test_patterns_do_not_use_blocking_sleep():
pattern_dir = "patterns"
offenders = []
try:
files = os.listdir(pattern_dir)
except OSError:
raise AssertionError("patterns directory is missing")
for filename in files:
if not filename.endswith(".py") or filename in ("__init__.py", "main.py"):
continue
path = pattern_dir + "/" + filename
try:
with open(path, "r") as f:
src = f.read()
except OSError:
offenders.append(filename + " (unreadable)")
continue
if (
"utime.sleep(" in src
or "utime.sleep_ms(" in src
or "time.sleep(" in src
or "time.sleep_ms(" in src
):
offenders.append(filename)
assert not offenders, "blocking sleep found in patterns: %s" % ", ".join(offenders)
def test_default_requires_existing_preset(): def test_default_requires_existing_preset():
ctx = _TestContext() ctx = _TestContext()
_process_message(ctx, {"v": "1", "default": "missing"}) _process_message(ctx, {"v": "1", "default": "missing"})
@@ -242,6 +272,7 @@ def run_all():
test_preset_edit_sanitization, test_preset_edit_sanitization,
test_colour_conversion_and_transition, test_colour_conversion_and_transition,
test_pattern_smoke, test_pattern_smoke,
test_patterns_do_not_use_blocking_sleep,
test_default_requires_existing_preset, test_default_requires_existing_preset,
test_default_targets_gate_by_device_name, test_default_targets_gate_by_device_name,
test_save_and_load_roundtrip, test_save_and_load_roundtrip,

40
tests/patterns/aurora.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_aurora", {
"p": "aurora",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_aurora")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_bar_graph", {
"p": "bar_graph",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_bar_graph")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_breathing_dual", {
"p": "breathing_dual",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_breathing_dual")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_clock_sweep", {
"p": "clock_sweep",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_clock_sweep")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_comet_dual", {
"p": "comet_dual",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_comet_dual")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_fireflies", {
"p": "fireflies",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_fireflies")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test gradient_scroll")
p.edit("gradient_test", {
"p": "gradient_scroll",
"b": 220,
"d": 60,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
"n1": 2,
"a": True,
})
p.select("gradient_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_heartbeat", {
"p": "heartbeat",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_heartbeat")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/marquee.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_marquee", {
"p": "marquee",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_marquee")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test meteor_rain")
p.edit("meteor_test", {
"p": "meteor_rain",
"b": 200,
"d": 40,
"c": [(255, 80, 0), (0, 120, 255)],
"n1": 10,
"n2": 1,
"n3": 200,
"a": True,
})
p.select("meteor_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/orbit.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_orbit", {
"p": "orbit",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_orbit")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_palette_morph", {
"p": "palette_morph",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_palette_morph")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/plasma.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_plasma", {
"p": "plasma",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_plasma")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

52
tests/patterns/radiate.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
print("[test] radiate: start")
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
p.debug = True
wdt = WDT(timeout=10000)
print("[test] radiate: auto phase begin")
p.edit("test_pattern", {"p": "radiate", "b": 64, "a": True, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern"):
raise Exception("radiate select failed in auto phase")
auto_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), auto_start) < 2500:
wdt.feed()
p.run_step()
utime.sleep_ms(20)
remaining_ms = utime.ticks_diff(p.next_tick_ms, utime.ticks_ms())
if p.next_tick_ms == 0 or remaining_ms <= 0:
raise Exception("radiate delay scheduling invalid")
print("[test] radiate: auto phase end")
print("[test] radiate: manual phase begin")
p.edit("test_pattern", {"p": "radiate", "b": 64, "a": False, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern", step=0):
raise Exception("radiate select failed in manual phase")
for _ in range(6):
current_step = int(p.step)
if not p.select("test_pattern", step=current_step):
raise Exception("radiate external select failed")
p.run_step()
wdt.feed()
if int(p.step) == current_step:
raise Exception("radiate external step did not advance")
if p.generator is not None:
raise Exception("radiate manual mode rescheduled generator")
hold_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), hold_start) < 700:
wdt.feed()
utime.sleep_ms(20)
print("[test] radiate: manual phase end")
print("[test] radiate: pass")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_rain_drops", {
"p": "rain_drops",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_rain_drops")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/scanner.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test scanner")
p.edit("scanner_test", {
"p": "scanner",
"b": 255,
"d": 30,
"c": [(255, 0, 0)],
"n1": 4,
"n2": 2,
"a": True,
})
p.select("scanner_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_segment_chase", {
"p": "segment_chase",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_segment_chase")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_snowfall", {
"p": "snowfall",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_snowfall")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_sparkle_trail", {
"p": "sparkle_trail",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_sparkle_trail")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_strobe_burst", {
"p": "strobe_burst",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_strobe_burst")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

52
tests/patterns/twinkle.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
print("[test] twinkle: start")
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
p.debug = True
wdt = WDT(timeout=10000)
print("[test] twinkle: auto phase begin")
p.edit("test_pattern", {"p": "twinkle", "b": 64, "a": True, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern"):
raise Exception("twinkle select failed in auto phase")
auto_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), auto_start) < 2500:
wdt.feed()
p.run_step()
utime.sleep_ms(20)
remaining_ms = utime.ticks_diff(p.next_tick_ms, utime.ticks_ms())
if p.next_tick_ms == 0 or remaining_ms <= 0:
raise Exception("twinkle delay scheduling invalid")
print("[test] twinkle: auto phase end")
print("[test] twinkle: manual phase begin")
p.edit("test_pattern", {"p": "twinkle", "b": 64, "a": False, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern", step=0):
raise Exception("twinkle select failed in manual phase")
for _ in range(6):
current_step = int(p.step)
if not p.select("test_pattern", step=current_step):
raise Exception("twinkle external select failed")
p.run_step()
wdt.feed()
if int(p.step) == current_step:
raise Exception("twinkle external step did not advance")
if p.generator is not None:
raise Exception("twinkle manual mode rescheduled generator")
hold_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), hold_start) < 700:
wdt.feed()
utime.sleep_ms(20)
print("[test] twinkle: manual phase end")
print("[test] twinkle: pass")
if __name__ == "__main__":
main()

40
tests/patterns/wave.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_wave", {
"p": "wave",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_wave")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

25
tests/peers.py Normal file
View File

@@ -0,0 +1,25 @@
from espnow import ESPNow
import network
sta = network.WLAN(network.STA_IF)
sta.active(True)
espnow = ESPNow()
espnow.active(True)
# add_peer() expects a 6-byte MAC (bytes/bytearray), not integers.
# Unicast placeholders (not broadcast/multicast) so get_peers() lists them.
# PEERS = aa:aa:aa:aa:aa:START … aa:aa:aa:aa:aa:END (inclusive last octet).
_PREFIX = b"\xaa\xaa\xaa\xaa\xaa"
_START_LAST_OCTET = 1
_END_LAST_OCTET = 40
PEERS = tuple(_PREFIX + bytes((i,)) for i in range(_START_LAST_OCTET, _END_LAST_OCTET + 1))
for peer in PEERS:
espnow.add_peer(peer)
print("peers:", PEERS)
for peer in PEERS:
espnow.send(peer, b"Hello, world!")
print(espnow.get_peers())

41
tests/test_ap_pm0.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""MicroPython AP example with power management disabled (pm=0).
Run on device:
mpremote connect /dev/ttyACM0 run tests/test_ap_pm0.py
"""
import network
import time
AP_SSID = "led-ap"
AP_PASSWORD = "ledpass123"
AP_CHANNEL = 6
def main():
ap = network.WLAN(network.AP_IF)
ap.active(True)
# Explicitly disable Wi-Fi power save for AP mode.
try:
ap.config(pm=0)
except (AttributeError, ValueError, TypeError):
try:
ap.config(pm=network.WLAN.PM_NONE)
except (AttributeError, ValueError, TypeError):
pass
ap.config(essid=AP_SSID, password=AP_PASSWORD, channel=AP_CHANNEL, authmode=3)
print("[ap-pm0] AP active:", ap.active())
print("[ap-pm0] SSID:", AP_SSID)
print("[ap-pm0] IFCONFIG:", ap.ifconfig())
print("[ap-pm0] Waiting for clients. Ctrl+C to stop.")
while True:
time.sleep(2)
if __name__ == "__main__":
main()