10 Commits

Author SHA1 Message Date
fbebe9f4f9 fix(patterns): correct non-blocking timing and blink off phase
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-06 20:28:52 +12:00
a79c6f4dd3 fix(patterns): remove blocking sleeps from pattern loops
Replace sleep-based timing in pattern generators with non-blocking tick checks so long delays do not block the main loop and risk watchdog resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:37:33 +12:00
pi
2fcaf2f064 fix(driver): persist brightness when message includes save and b
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 22:15:23 +12:00
pi
3b38264b70 chore(wifi): log connecting while waiting for STA
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
3ee89ce3b4 feat(driver): add HTTP routes, startup split, and binary envelope support
Wire controller messages through new modules (background tasks, runtime state,
startup) and add binary envelope handling.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:12 +12:00
74b4b495f9 feat(patterns): add expanded animation pack with smoke tests
Add a broad set of new pattern modules and matching pattern smoke scripts so the new effects can be validated directly on-device.
2026-04-23 20:10:01 +12:00
4575ef16ad test(led-driver): add espnow peer and ap pm0 scripts
Made-with: Cursor
2026-04-21 21:48:42 +12:00
a342187635 feat(patterns): add twinkle pattern defaults
Made-with: Cursor
2026-04-21 21:48:42 +12:00
428ed8b884 feat(led-driver): add preset clear command and runtime debug 2026-04-21 00:44:28 +12:00
a22702df4d feat(patterns): add radiate animation 2026-04-20 23:37:43 +12:00
61 changed files with 2645 additions and 235 deletions

1
presets.json Normal file
View File

@@ -0,0 +1 @@
{"15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "40": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 0]], "b": 255, "n2": 2600, "n1": 35, "p": "flame", "n3": 0, "d": 50}, "41": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[120, 200, 255], [80, 140, 255], [180, 120, 255], [100, 220, 232], [160, 200, 255]], "b": 255, "n2": 10, "n1": 72, "p": "twinkle", "n3": 5, "d": 500}, "42": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[166, 0, 255], [0, 10, 10]], "b": 255, "n2": 900, "n1": 30, "p": "radiate", "n3": 4000, "d": 5000}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "38": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 1, "p": "colour_cycle", "n3": 0, "d": 100}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}, "39": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 184, 77]], "b": 255, "n2": 0, "n1": 30, "p": "flicker", "n3": 0, "d": 80}, "14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 255], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 5000}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}}

42
src/background_tasks.py Normal file
View File

@@ -0,0 +1,42 @@
import asyncio
import gc
import utime
from hello import broadcast_hello_udp
async def presets_loop(presets, wdt):
last_mem_log = utime.ticks_ms()
while True:
presets.tick()
wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""Broadcast hello at startup-fast cadence, then slower cadence."""
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
if runtime_state.hello:
print("UDP hello: broadcasting...")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 5 if elapsed_ms < 60000 else 60
await asyncio.sleep(interval_s)

209
src/binary_envelope.py Normal file
View File

@@ -0,0 +1,209 @@
"""Decode compact binary controller envelopes — v2 native binary, v1 legacy JSON blobs."""
import json
import struct
BINARY_ENVELOPE_VERSION_1 = 1
BINARY_ENVELOPE_VERSION_2 = 2
HEADER_LEN = 5
def _brightness_0_255_from_wire(wire):
w = max(0, min(127, int(wire)))
return min(255, (w * 255) // 127)
def _decode_preset_record(buf, off):
nl = buf[off]
off += 1
name = buf[off : off + nl].decode("utf-8")
off += nl
pl = buf[off]
off += 1
pattern = buf[off : off + pl].decode("utf-8")
off += pl
nc = buf[off]
off += 1
colors = []
for _ in range(nc):
r, g, b = buf[off], buf[off + 1], buf[off + 2]
off += 3
colors.append("#%02x%02x%02x" % (r, g, b))
if off + 16 > len(buf):
raise ValueError("truncated")
delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from(
"<HBBhhhhhh", buf, off
)
off += 16
preset = {
"p": pattern,
"c": colors,
"d": delay,
"b": br,
"a": bool(auto),
"n1": n1,
"n2": n2,
"n3": n3,
"n4": n4,
"n5": n5,
"n6": n6,
}
return name, preset, off
def _decode_presets_blob(chunk):
if not chunk:
return {}
off = 0
count = chunk[off]
off += 1
out = {}
for _ in range(count):
name, preset, off = _decode_preset_record(chunk, off)
out[name] = preset
if off != len(chunk):
raise ValueError("presets blob mismatch")
return out
def _decode_select_blob(chunk):
if not chunk:
return {}
off = 0
count = chunk[off]
off += 1
out = {}
for _ in range(count):
dl = chunk[off]
off += 1
device = chunk[off : off + dl].decode("utf-8")
off += dl
pl = chunk[off]
off += 1
pname = chunk[off : off + pl].decode("utf-8")
off += pl
has_step = chunk[off]
off += 1
if has_step:
step = struct.unpack_from("<H", chunk, off)[0]
off += 2
out[device] = [pname, step]
else:
out[device] = [pname]
if off != len(chunk):
raise ValueError("select blob mismatch")
return out
def _decode_default_blob(chunk):
if not chunk:
return "", []
off = 0
nl = chunk[off]
off += 1
default_name = chunk[off : off + nl].decode("utf-8") if nl else ""
off += nl
nt = chunk[off]
off += 1
targets = []
for _ in range(nt):
tl = chunk[off]
off += 1
targets.append(chunk[off : off + tl].decode("utf-8"))
off += tl
if off != len(chunk):
raise ValueError("default blob mismatch")
return default_name, targets
def parse_binary_envelope_v2(buf):
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_2:
return None
lp = buf[2]
ls = buf[3]
ld = buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = _brightness_0_255_from_wire(br)
try:
if lp:
data["presets"] = _decode_presets_blob(presets_chunk)
if ls:
data["select"] = _decode_select_blob(select_chunk)
if ld:
dname, targets = _decode_default_blob(default_chunk)
data["default"] = dname
data["targets"] = targets
except (ValueError, UnicodeError, TypeError, struct.error):
return None
return data
def parse_binary_envelope_v1(buf):
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_1:
return None
lp = buf[2]
ls = buf[3]
ld = buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = _brightness_0_255_from_wire(br)
if lp:
try:
data["presets"] = json.loads(presets_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ls:
try:
data["select"] = json.loads(select_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ld:
try:
extra = json.loads(default_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if isinstance(extra, dict):
for k, v in extra.items():
data[k] = v
return data
def parse_binary_envelope(buf):
d = parse_binary_envelope_v2(buf)
if d is not None:
return d
return parse_binary_envelope_v1(buf)

View File

@@ -3,6 +3,7 @@
import json import json
import socket import socket
from binary_envelope import parse_binary_envelope
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
try: try:
@@ -12,18 +13,29 @@ except ImportError:
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None):
"""Read one controller message; json.loads (bytes or str), then apply fields.""" """Read one controller message; binary v1 envelope or JSON v1, then apply fields."""
try: data = None
data = json.loads(payload) if isinstance(payload, (bytes, bytearray)):
print(payload) data = parse_binary_envelope(payload)
if data.get("v", "") != "1": if data is None:
try:
data = json.loads(payload)
except (ValueError, TypeError):
return
else:
try:
data = json.loads(payload)
except (ValueError, TypeError):
return return
except (ValueError, TypeError): print(payload)
if data.get("v", "") != "1":
return return
if "b" in data: if "b" in data:
apply_brightness(data, settings, presets) apply_brightness(data, settings, presets)
if "presets" in data: if "presets" in data:
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data:
apply_clear_presets(data, presets)
if "select" in data: if "select" in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
@@ -32,6 +44,10 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_patterns_ota(data, presets, controller_ip=controller_ip) apply_patterns_ota(data, presets, controller_ip=controller_ip)
if "save" in data and ("presets" in data or "default" in data): if "save" in data and ("presets" in data or "default" in data):
presets.save() presets.save()
if "save" in data and "clear_presets" in data:
presets.save()
if "save" in data and "b" in data:
settings.save()
def apply_brightness(data, settings, presets): def apply_brightness(data, settings, presets):
@@ -70,6 +86,22 @@ def apply_select(data, settings, presets):
presets.select(preset_name, step=step) presets.select(preset_name, step=step)
def apply_clear_presets(data, presets):
clear_value = data.get("clear_presets")
if isinstance(clear_value, bool):
should_clear = clear_value
elif isinstance(clear_value, int):
should_clear = bool(clear_value)
elif isinstance(clear_value, str):
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
else:
should_clear = False
if not should_clear:
return
presets.delete_all()
print("Cleared all presets.")
def apply_default(data, settings, presets): def apply_default(data, settings, presets):
targets = data.get("targets") or [] targets = data.get("targets") or []
default_name = data["default"] default_name = data["default"]

125
src/http_routes.py Normal file
View File

@@ -0,0 +1,125 @@
import json
from controller_messages import process_data
from microdot.websocket import WebSocketError, with_websocket
try:
import uos as os
except ImportError:
import os
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def register_routes(app, settings, presets, runtime_state):
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
runtime_state.ws_connected()
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
print("WS controller_ip:", controller_ip)
try:
while True:
data = await ws.receive()
if not data:
print("WS client disconnected (closed)")
break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data)
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
print(
"WS client disconnected: hello=",
runtime_state.hello,
"ws_client_count=",
runtime_state.ws_client_count,
)
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {"Content-Type": "application/json"}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
print("patterns/upload body_bytes:", len(body))
try:
code = body.decode("utf-8")
except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, {"Content-Type": "application/json"}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {"Content-Type": "application/json"}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
print("patterns/upload writing:", path)
with open(path, "w") as f:
f.write(code)
if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps(
{
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}
), 201, {"Content-Type": "application/json"}
@app.post("/presets/upload")
async def upload_presets(request):
"""Receive v1 JSON with ``presets`` and apply/save on the driver."""
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "body is required"}), 400, {"Content-Type": "application/json"}
try:
process_data(body, settings, presets)
except Exception as e:
return json.dumps({"error": str(e)}), 400, {"Content-Type": "application/json"}
return json.dumps({"message": "presets applied"}), 200, {"Content-Type": "application/json"}

View File

@@ -1,9 +1,10 @@
from settings import Settings from settings import Settings
from machine import WDT import machine
import network import network
import utime import utime
import asyncio import asyncio
import json import json
import gc
from microdot import Microdot from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
@@ -14,12 +15,25 @@ try:
except ImportError: except ImportError:
import os import os
machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings) print(settings)
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "") default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets: if default_preset and default_preset in presets.presets:
if presets.select(default_preset): if presets.select(default_preset):
@@ -27,14 +41,20 @@ if default_preset and default_preset in presets.presets:
else: else:
print("Startup preset failed (invalid pattern?):", default_preset) print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000) # On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
wdt.feed() # Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True) sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"]) sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected(): while not sta_if.isconnected():
print("Connecting")
utime.sleep(1) utime.sleep(1)
wdt.feed() wdt.feed()
@@ -149,9 +169,16 @@ async def upload_pattern(request):
async def presets_loop(): async def presets_loop():
last_mem_log = utime.ticks_ms()
while True: while True:
presets.tick() presets.tick()
wdt.feed() wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run. # tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)

31
src/patterns/aurora.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Aurora:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(40, 200, 140), (80, 120, 255), (160, 80, 220)]
bands = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
shimmer = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 40))
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
idx = ((i * bands) // max(1, self.driver.num_leds) + (phase // 32)) % len(colors)
c = self.driver.apply_brightness(colors[idx], preset.b)
w = (255 - abs(128 - ((i * 8 + phase) & 255)) * 2)
w = max(0, min(255, w + shimmer))
self.driver.n[i] = ((c[0]*w)//255, (c[1]*w)//255, (c[2]*w)//255)
self.driver.n.write()
phase = (phase + 1) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

29
src/patterns/bar_graph.py Normal file
View File

@@ -0,0 +1,29 @@
import utime
class BarGraph:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(0, 255, 0), (255, 80, 0)]
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
level = max(0, min(100, int(preset.n1) if int(preset.n1) >= 0 else 50))
target = (self.driver.num_leds * level) // 100
lit = self.driver.apply_brightness(colors[0], preset.b)
unlit = self.driver.apply_brightness(
colors[-1],
preset.b,
)
for i in range(self.driver.num_leds):
self.driver.n[i] = lit if i < target else unlit
self.driver.n.write()
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -25,7 +25,7 @@ class Blink:
# Advance to next color for the next "on" phase # Advance to next color for the next "on" phase
color_index += 1 color_index += 1
else: else:
# "Off" phase: turn all LEDs off # "Off" phase should actually be off.
self.driver.fill((0, 0, 0)) self.driver.fill((0, 0, 0))
state = not state state = not state
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)

View File

@@ -0,0 +1,40 @@
import utime
class BreathingDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 140), (0, 120, 255)]
phase_offset = max(0, min(255, int(preset.n1)))
ease = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
p1 = phase
p2 = (phase + phase_offset) & 255
t1 = 255 - abs(128 - p1) * 2
t2 = 255 - abs(128 - p2) * 2
if ease > 1:
t1 = (t1 * t1) // 255
t2 = (t2 * t2) // 255
c1 = self.driver.apply_brightness(colors[0], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
half = self.driver.num_leds // 2
for i in range(self.driver.num_leds):
if i < half:
self.driver.n[i] = ((c1[0]*t1)//255, (c1[1]*t1)//255, (c1[2]*t1)//255)
else:
self.driver.n[i] = ((c2[0]*t2)//255, (c2[1]*t2)//255, (c2[2]*t2)//255)
self.driver.n.write()
phase = (phase + 2) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -26,6 +26,7 @@ class Chase:
color0 = self.driver.apply_brightness(color0, preset.b) color0 = self.driver.apply_brightness(color0, preset.b)
color1 = self.driver.apply_brightness(color1, preset.b) color1 = self.driver.apply_brightness(color1, preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
n1 = max(1, int(preset.n1)) # LEDs of color 0 n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1 n2 = max(1, int(preset.n2)) # LEDs of color 1
@@ -53,7 +54,7 @@ class Chase:
# If auto is False, run a single step and then stop # If auto is False, run a single step and then stop
if not preset.a: if not preset.a:
# Clear all LEDs # Clear all LEDs
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(bg_color)
# Draw repeating pattern starting at position # Draw repeating pattern starting at position
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
@@ -98,7 +99,7 @@ class Chase:
position += max_pos position += max_pos
# Clear all LEDs # Clear all LEDs
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(bg_color)
# Draw repeating pattern starting at position # Draw repeating pattern starting at position
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):

View File

@@ -31,10 +31,10 @@ class Circle:
base0 = base1 = (255, 255, 255) base0 = base1 = (255, 255, 255)
elif len(colors) == 1: elif len(colors) == 1:
base0 = colors[0] base0 = colors[0]
base1 = (0, 0, 0) base1 = colors[-1]
else: else:
base0 = colors[0] base0 = colors[0]
base1 = colors[1] base1 = colors[-1]
color0 = self.driver.apply_brightness(base0, preset.b) color0 = self.driver.apply_brightness(base0, preset.b)
color1 = self.driver.apply_brightness(base1, preset.b) color1 = self.driver.apply_brightness(base1, preset.b)
@@ -46,7 +46,7 @@ class Circle:
if phase == "off": if phase == "off":
self.driver.n.fill(color1) self.driver.n.fill(color1)
else: else:
self.driver.n.fill((0, 0, 0)) self.driver.n.fill(color1)
# Calculate segment length # Calculate segment length
segment_length = (head - tail) % self.driver.num_leds segment_length = (head - tail) % self.driver.num_leds

View File

@@ -0,0 +1,33 @@
import utime
class ClockSweep:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (60, 60, 60)]
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
marker = max(0, int(preset.n2) if int(preset.n2) > 0 else 0)
pos = self.driver.step % max(1, self.driver.num_leds)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg = self.driver.apply_brightness(colors[-1], preset.b)
fg = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg
if marker > 0 and i % marker == 0:
self.driver.n[i] = ((bg[0]*2)//3, (bg[1]*2)//3, (bg[2]*2)//3)
for w in range(width):
self.driver.n[(pos + w) % self.driver.num_leds] = fg
self.driver.n.write()
pos = (pos + 1) % max(1, self.driver.num_leds)
self.driver.step = pos
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,44 @@
import utime
class CometDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
tail = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
gap = max(0, int(preset.n3))
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
c1 = self.driver.apply_brightness(colors[0 % len(colors)], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
for t in range(tail):
i1 = p1 - t
if 0 <= i1 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i1] = ((c1[0]*s)//255, (c1[1]*s)//255, (c1[2]*s)//255)
i2 = p2 + t
if 0 <= i2 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i2] = ((c2[0]*s)//255, (c2[1]*s)//255, (c2[2]*s)//255)
self.driver.n.write()
p1 += speed
p2 -= speed
if p1 - tail > self.driver.num_leds and p2 + tail < 0:
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

35
src/patterns/fireflies.py Normal file
View File

@@ -0,0 +1,35 @@
import random
import utime
class Fireflies:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 210, 80), (120, 255, 120)]
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 8)
bugs = [[random.randint(0, max(0, self.driver.num_leds - 1)), random.randint(0, 255)] for _ in range(count)]
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for b in bugs:
idx, ph = b
tri = 255 - abs(128 - ph) * 2
c = self.driver.apply_brightness(colors[idx % len(colors)], preset.b)
self.driver.n[idx] = ((c[0]*tri)//255, (c[1]*tri)//255, (c[2]*tri)//255)
b[1] = (ph + speed) & 255
if random.randint(0, 31) == 0:
b[0] = random.randint(0, max(0, self.driver.num_leds - 1))
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,57 @@
import utime
class GradientScroll:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
"""Scrolling blended gradient.
n1: phase step amount (default 1)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

36
src/patterns/heartbeat.py Normal file
View File

@@ -0,0 +1,36 @@
import utime
class Heartbeat:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 40)]
phase = 0
phase_start = utime.ticks_ms()
did_manual_pulse = False
while True:
p1 = max(20, int(preset.n1) if int(preset.n1) > 0 else 120)
p2 = max(20, int(preset.n2) if int(preset.n2) > 0 else 80)
pause = max(20, int(preset.n3) if int(preset.n3) > 0 else 500)
beat_gap = max(20, int(preset.d))
colors = preset.c if preset.c else [(255, 0, 40)]
lit_color = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
phase_durations = (p1, beat_gap, p2, pause)
phase_colors = (lit_color, bg_color, lit_color, bg_color)
now = utime.ticks_ms()
while utime.ticks_diff(now, phase_start) >= phase_durations[phase]:
phase_start = utime.ticks_add(phase_start, phase_durations[phase])
phase = (phase + 1) % 4
self.driver.fill(phase_colors[phase])
yield
if not preset.a:
if did_manual_pulse or phase == 0:
self.driver.fill(bg_color)
yield
return
did_manual_pulse = True

31
src/patterns/marquee.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Marquee:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % (on_len + off_len)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
m = (i + phase) % (on_len + off_len)
self.driver.n[i] = c if m < on_len else bg_color
self.driver.n.write()
phase = (phase + step) % (on_len + off_len)
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,62 @@
import utime
class MeteorRain:
def __init__(self, driver):
self.driver = driver
def _fade(self, color, fade_amount):
return (
(color[0] * fade_amount) // 255,
(color[1] * fade_amount) // 255,
(color[2] * fade_amount) // 255,
)
def run(self, preset):
"""Single meteor with a fading tail.
n1: tail length (default 8)
n2: speed in LEDs per frame (default 1)
n3: fade amount per frame, 1..255 (default 192)
"""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
head = 0
direction = 1
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
tail_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 8)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
fade_amount = int(preset.n3) if int(preset.n3) > 0 else 192
fade_amount = max(1, min(255, fade_amount))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
for i in range(self.driver.num_leds):
self.driver.n[i] = self._fade(self.driver.n[i], fade_amount)
base = colors[color_index % len(colors)]
lit = self.driver.apply_brightness(base, preset.b)
if 0 <= head < self.driver.num_leds:
self.driver.n[head] = lit
self.driver.n.write()
head += direction * speed
if head >= self.driver.num_leds + tail_len:
head = self.driver.num_leds - 1
direction = -1
color_index += 1
elif head < -tail_len:
head = 0
direction = 1
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

31
src/patterns/orbit.py Normal file
View File

@@ -0,0 +1,31 @@
import utime
class Orbit:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (0, 180, 255), (255, 0, 120)]
orbits = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for k in range(orbits):
idx = ((phase * (k + 1)) // 8 + (k * self.driver.num_leds // max(1, orbits))) % max(1, self.driver.num_leds)
self.driver.n[idx] = self.driver.apply_brightness(colors[k % len(colors)], preset.b)
self.driver.n.write()
phase = (phase + speed) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,81 @@
import utime
class PaletteMorph:
def __init__(self, driver):
self.driver = driver
def _blend(self, c1, c2, t):
return (
c1[0] + ((c2[0] - c1[0]) * t) // 255,
c1[1] + ((c2[1] - c1[1]) * t) // 255,
c1[2] + ((c2[2] - c1[2]) * t) // 255,
)
def run(self, preset):
"""Living color field (non-scrolling palette warp).
Different from `colour_cycle`: this does not scroll a fixed gradient.
Instead, each LED breathes/warps through the palette with local phase
offsets so the strip looks alive.
n1: morph duration (ms)
n2: warp rate
n3: spatial turbulence amount
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
if len(colors) < 2:
while True:
self.driver.fill(self.driver.apply_brightness(colors[0], preset.b))
yield
morph = max(50, int(preset.n1) if int(preset.n1) > 0 else 1200)
warp_rate = max(1, int(preset.n2) if int(preset.n2) > 0 else 3)
turbulence = max(1, int(preset.n3) if int(preset.n3) > 0 else 24)
base_idx = 0
start = utime.ticks_ms()
phase = self.driver.step % 256
last_update = start
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) < delay_ms:
yield
continue
last_update = utime.ticks_add(last_update, delay_ms)
age = utime.ticks_diff(now, start)
if age < morph:
t = (age * 255) // morph
else:
t = 255
# Global morph anchor between neighboring palette colors.
a = colors[base_idx % len(colors)]
b = colors[(base_idx + 1) % len(colors)]
anchor = self._blend(a, b, t)
for i in range(self.driver.num_leds):
# Non-linear local warp per LED to create "living" motion.
pos = (i * 256) // max(1, self.driver.num_leds)
wobble = ((pos * turbulence) // 32 + phase + (t // 2)) & 255
breath = 255 - abs(128 - wobble) * 2
local = (pos + (breath // 3) + (t // 4)) % 256
idx = (base_idx + ((local * len(colors)) // 256)) % len(colors)
frac = (local * len(colors)) & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % len(colors)]
grad = self._blend(c1, c2, frac)
# Blend with anchor to keep coherent palette morphing.
out = self._blend(grad, anchor, 80)
self.driver.n[i] = self.driver.apply_brightness(out, preset.b)
self.driver.n.write()
if age >= morph:
base_idx = (base_idx + 1) % len(colors)
start = now
if not preset.a:
yield
return
phase = (phase + warp_rate) & 255
self.driver.step = phase
yield

39
src/patterns/plasma.py Normal file
View File

@@ -0,0 +1,39 @@
import utime
class Plasma:
def __init__(self, driver):
self.driver = driver
def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
if pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def run(self, preset):
scale = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
contrast = max(1, int(preset.n3) if int(preset.n3) > 0 else 2)
t = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
v = ((i * scale + t) & 255)
v2 = (((i * scale // max(1, contrast)) - (t * 2)) & 255)
c = self._wheel((v + v2) & 255)
self.driver.n[i] = self.driver.apply_brightness(c, preset.b)
self.driver.n.write()
t = (t + speed) % 256
self.driver.step = t
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -18,6 +18,7 @@ class Pulse:
# State machine based pulse using a single generator loop # State machine based pulse using a single generator loop
while True: while True:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
# Read current timing parameters from preset # Read current timing parameters from preset
attack_ms = max(0, int(preset.n1)) # Attack time in ms attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms hold_ms = max(0, int(preset.n2)) # Hold time in ms
@@ -49,7 +50,7 @@ class Pulse:
self.driver.fill(self.driver.apply_brightness(color, preset.b)) self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < total_ms: elif elapsed < total_ms:
# Delay phase: LEDs off between pulses # Delay phase: LEDs off between pulses
self.driver.fill((0, 0, 0)) self.driver.fill(bg_color)
else: else:
# End of cycle, move to next color and restart timing # End of cycle, move to next color and restart timing
color_index += 1 color_index += 1

136
src/patterns/radiate.py Normal file
View File

@@ -0,0 +1,136 @@
import utime
_RADIATE_DBG_INTERVAL_MS = 1000
class Radiate:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Radiate from nodes every n1 LEDs, retriggering every delay (d).
- n1: node spacing in LEDs
- n2: outbound travel time in ms
- n3: return travel time in ms
- d: retrigger interval in ms
"""
colors = preset.c if preset.c else [(255, 255, 255)]
base_on = colors[0]
base_off = colors[-1]
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
now = utime.ticks_ms()
last_trigger = now
active_pulses = [now]
last_dbg = now
dbg_banner = False
if not preset.a:
# Single-step render uses only the first instant pulse.
active_pulses = [utime.ticks_ms()]
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
# Keep one pulse train at a time; replacing instead of appending
# prevents overlap from keeping color[0] continuously visible.
active_pulses = [now]
last_trigger = utime.ticks_add(last_trigger, delay_ms)
if bool(getattr(self.driver, "debug", False)):
print(
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
% (spacing, outward_ms, return_ms, delay_ms)
)
# Drop pulses once their out-and-back lifetime ends.
pulse_lifetime = outward_ms + return_ms
kept = []
for start in active_pulses:
age = utime.ticks_diff(now, start)
if age < pulse_lifetime:
kept.append(start)
active_pulses = kept
debug_front = -1
lit_count = 0
for i in range(self.driver.num_leds):
# Nearest node distance for a repeating node grid every `spacing` LEDs.
offset = i % spacing
dist = min(offset, spacing - offset)
lit = False
for start in active_pulses:
age = utime.ticks_diff(now, start)
# Do not render on the exact trigger tick; this avoids
# node LEDs appearing "stuck on" between cycles.
if age <= 0:
continue
if age <= outward_ms:
# Integer-ceiling progression so peak can be reached even
# when tick timing skips the exact outward_ms boundary.
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms:
back_age = age - outward_ms
remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else:
continue
if dist <= front:
lit = True
if front > debug_front:
debug_front = front
break
self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write()
if bool(getattr(self.driver, "debug", False)):
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.d,
self.driver.num_leds,
)
)
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
print(
"[radiate] age=%d front=%d max=%d active=%d lit=%d"
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count)
)
if lit_count == 0:
print("[radiate] fully off")
last_dbg = now
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,41 @@
import random
import utime
class RainDrops:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(120, 180, 255)]
rate = max(1, int(preset.n1) if int(preset.n1) > 0 else 32)
width = max(1, int(preset.n2) if int(preset.n2) > 0 else 3)
drops = []
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
if random.randint(0, 255) < rate:
drops.append([random.randint(0, max(0, self.driver.num_leds - 1)), 0])
nd = []
for pos, age in drops:
for off in range(-width, width + 1):
idx = pos + off
if 0 <= idx < self.driver.num_leds:
s = 255 - min(255, abs(off) * 255 // max(1, width + 1) + age * 40)
base = self.driver.apply_brightness(colors[age % len(colors)], preset.b)
self.driver.n[idx] = ((base[0]*s)//255, (base[1]*s)//255, (base[2]*s)//255)
age += 1
if age < 8:
nd.append([pos, age])
drops = nd
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

67
src/patterns/scanner.py Normal file
View File

@@ -0,0 +1,67 @@
import utime
class Scanner:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Classic scanner eye with soft falloff.
n1: eye width (default 4)
n2: end pause in frames (default 0)
"""
colors = preset.c if preset.c else [(255, 0, 0)]
color_index = 0
center = 0
direction = 1
pause_frames = 0
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
end_pause = max(0, int(preset.n2))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
base = colors[color_index % len(colors)]
base = self.driver.apply_brightness(base, preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
dist = i - center
if dist < 0:
dist = -dist
if dist > width:
self.driver.n[i] = bg_color
else:
scale = ((width - dist) * 255) // max(1, width)
self.driver.n[i] = (
(base[0] * scale) // 255,
(base[1] * scale) // 255,
(base[2] * scale) // 255,
)
self.driver.n.write()
if pause_frames > 0:
pause_frames -= 1
else:
center += direction
if center >= self.driver.num_leds - 1:
center = self.driver.num_leds - 1
direction = -1
pause_frames = end_pause
color_index += 1
elif center <= 0:
center = 0
direction = 1
pause_frames = end_pause
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,45 @@
import utime
class SegmentChase:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Independent moving segments (distinct from classic two-color chase).
n1: segment size (LEDs per segment)
n2: step size (phase increment each frame)
n3: per-segment phase offset
n4: gap spacing inside segment (0 = solid segment)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
seg = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
phase_step = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
seg_offset = max(0, int(preset.n3))
gap = max(0, int(preset.n4))
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
seg_idx = i // seg
in_seg = i % seg
local_phase = (phase + seg_idx * seg_offset) % seg
lit_idx = (in_seg + local_phase) % seg
if gap > 0 and lit_idx >= max(1, seg - gap):
self.driver.n[i] = bg_color
else:
color_idx = seg_idx % len(colors)
self.driver.n[i] = self.driver.apply_brightness(colors[color_idx], preset.b)
self.driver.n.write()
phase = (phase + phase_step) % seg
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

37
src/patterns/snowfall.py Normal file
View File

@@ -0,0 +1,37 @@
import random
import utime
class Snowfall:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (180, 220, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
flakes = []
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
if random.randint(0, 255) < density:
flakes.append([self.driver.num_leds - 1, random.randint(0, len(colors)-1)])
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
nf = []
for pos, ci in flakes:
if 0 <= pos < self.driver.num_leds:
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b)
pos -= speed
if pos >= -1:
nf.append([pos, ci])
flakes = nf
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,31 @@
import random
import utime
class SparkleTrail:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(120, 120, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 24)
decay = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 210))
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
r,g,b = self.driver.n[i]
self.driver.n[i] = ((r*decay)//255, (g*decay)//255, (b*decay)//255)
sparks = max(1, self.driver.num_leds * density // 255)
for _ in range(sparks):
idx = random.randint(0, max(0, self.driver.num_leds - 1))
c = self.driver.apply_brightness(colors[random.randint(0, len(colors)-1)], preset.b)
self.driver.n[idx] = c
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,45 @@
import utime
class StrobeBurst:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
state = "flash_on"
flash_idx = 0
state_start = utime.ticks_ms()
while True:
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
gap = max(1, int(preset.n2) if int(preset.n2) > 0 else 60)
cooldown = max(1, int(preset.n3) if int(preset.n3) > 0 else 400)
on_ms = max(1, int(preset.d) // 2)
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
now = utime.ticks_ms()
if state == "flash_on":
self.driver.fill(c)
if utime.ticks_diff(now, state_start) >= on_ms:
state = "flash_off"
state_start = utime.ticks_add(state_start, on_ms)
elif state == "flash_off":
self.driver.fill(bg_color)
if utime.ticks_diff(now, state_start) >= gap:
flash_idx += 1
if flash_idx >= count:
if not preset.a:
return
state = "cooldown"
flash_idx = 0
state_start = utime.ticks_add(state_start, gap)
else:
state = "flash_on"
state_start = utime.ticks_add(state_start, gap)
else:
self.driver.fill(bg_color)
if utime.ticks_diff(now, state_start) >= cooldown:
state = "flash_on"
state_start = utime.ticks_add(state_start, cooldown)
yield

228
src/patterns/twinkle.py Normal file
View File

@@ -0,0 +1,228 @@
import random
import utime
# Default cool palette (icy blues, violet, mint) when preset has no colours.
# When `driver.debug` is True, print stats every N twinkle ticks (serial can be slow).
_TWINKLE_DBG_INTERVAL = 40
_DEFAULT_COOL = (
(120, 200, 255),
(80, 140, 255),
(180, 120, 255),
(100, 220, 240),
(160, 200, 255),
(90, 180, 220),
)
class Twinkle:
def __init__(self, driver):
self.driver = driver
def _palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_COOL)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
max(0, min(255, int(c[0]))),
max(0, min(255, int(c[1]))),
max(0, min(255, int(c[2]))),
)
)
return out if out else list(_DEFAULT_COOL)
def run(self, preset):
"""Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs."""
palette = self._palette(preset)
num = self.driver.num_leds
bg_color = self.driver.apply_brightness(palette[-1], preset.b)
if num <= 0:
while True:
yield
return
def activity_rate():
r = int(preset.n1)
if r <= 0:
r = 48
return max(1, min(255, r))
def density255():
"""Higher → more LEDs lit on average when a twinkle step fires (0 = default mid)."""
d = int(preset.n2)
if d <= 0:
d = 128
return max(0, min(255, d))
def cluster_len_bounds():
"""n3 = min adjacent LEDs per twinkle, n4 = max (both 0 → 1..4)."""
lo = int(preset.n3)
hi = int(preset.n4)
if lo <= 0 and hi <= 0:
lo, hi = 1, min(4, num)
else:
if lo <= 0:
lo = 1
if hi <= 0:
hi = lo
if hi < lo:
lo, hi = hi, lo
lo = max(1, min(lo, num))
hi = max(lo, min(hi, num))
return lo, hi
def random_cluster_len():
lo, hi = cluster_len_bounds()
# When min and max match, every lit/dim run is exactly that many LEDs (still capped by strip length).
if lo == hi:
return lo
return random.randint(lo, hi)
def cluster_base_index(start, k):
"""Shift run left so a length-k segment fits; keeps full k when num >= k."""
k = min(max(0, int(k)), num)
if k <= 0:
return 0
return max(0, min(int(start), num - k))
dens = density255()
on = [random.randint(0, 255) < dens for _ in range(num)]
colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)]
last_update = utime.ticks_ms()
dbg_tick = 0
dbg_banner = False
def on_run_min_max(bits):
"""Smallest and largest contiguous run of True in bits (0,0 if all off)."""
best_min = num + 1
best_max = 0
cur = 0
for j in range(num):
if bits[j]:
cur += 1
else:
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
cur = 0
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
if best_min == num + 1:
return 0, 0
return best_min, best_max
if not preset.a:
for i in range(num):
if on[i]:
base = palette[colour_i[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = bg_color
self.driver.n.write()
yield
return
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) >= delay_ms:
rate = activity_rate()
dens = density255()
dbg = bool(getattr(self.driver, "debug", False))
dbg_tick += 1
# Snapshot for decisions; apply all darks then all lights so
# overlaps in the same tick favour lit runs (lights win).
prev_on = on[:]
prev_ci = colour_i[:]
next_on = list(prev_on)
next_ci = list(prev_ci)
dbg_ops = {"L": 0, "D": 0}
light_i = []
dark_i = []
for i in range(num):
if random.randint(0, 255) < rate:
r = random.randint(0, 255)
if not prev_on[i]:
if r < dens:
light_i.append(i)
else:
if r < (255 - dens):
dark_i.append(i)
def light_adjacent(start):
dbg_ops["L"] += 1
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = True
next_ci[idx] = random.randint(0, len(palette) - 1)
def dark_adjacent(start):
dbg_ops["D"] += 1
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = False
for i in dark_i:
dark_adjacent(i)
for i in light_i:
light_adjacent(i)
for i in range(num):
if next_on[i]:
base = palette[next_ci[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = bg_color
self.driver.n.write()
on = next_on
colour_i = next_ci
last_update = utime.ticks_add(last_update, delay_ms)
if dbg:
lo, hi = cluster_len_bounds()
if not dbg_banner:
dbg_banner = True
print(
"[twinkle] debug on: n1=%s n2=%s n3=%s n4=%s d=%s -> lo=%d hi=%d num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.n4,
preset.d,
lo,
hi,
num,
)
)
rmin, rmax = on_run_min_max(on)
bad = lo > 0 and rmin > 0 and rmin < lo and num >= lo
if bad or (dbg_tick % _TWINKLE_DBG_INTERVAL == 0):
print(
"[twinkle] tick=%d rate=%d dens=%d L=%d D=%d on_runs min=%d max=%d%s"
% (
dbg_tick,
rate,
dens,
dbg_ops["L"],
dbg_ops["D"],
rmin,
rmax,
" **run<lo**" if bad else "",
)
)
yield

32
src/patterns/wave.py Normal file
View File

@@ -0,0 +1,32 @@
import utime
class Wave:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(0, 180, 255)]
wavelength = max(2, int(preset.n1) if int(preset.n1) > 0 else 12)
amp = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 180))
drift = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
base = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds):
x = (i * 256 // wavelength + phase) & 255
tri = 255 - abs(128 - x) * 2
s = (tri * amp) // 255
self.driver.n[i] = ((base[0]*s)//255, (base[1]*s)//255, (base[2]*s)//255)
self.driver.n.write()
phase = (phase + drift) % 256
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -1 +0,0 @@
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}

View File

@@ -9,6 +9,8 @@ try:
except ImportError: except ImportError:
import os import os
MAX_PRESETS = 32
class Presets: class Presets:
def __init__(self, pin, num_leds): def __init__(self, pin, num_leds):
@@ -95,6 +97,9 @@ class Presets:
order = settings if settings is not None else "rgb" order = settings if settings is not None else "rgb"
self.presets = {} self.presets = {}
for name, preset_data in data.items(): for name, preset_data in data.items():
if len(self.presets) >= MAX_PRESETS:
print("Preset limit reached on load:", MAX_PRESETS)
break
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None: if color_key is not None:
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
@@ -113,6 +118,9 @@ class Presets:
# Update existing preset # Update existing preset
self.presets[name].edit(data) self.presets[name].edit(data)
else: else:
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
print("Preset limit reached:", MAX_PRESETS)
return False
# Create new preset # Create new preset
self.presets[name] = Preset(data) self.presets[name] = Preset(data)
return True return True
@@ -123,6 +131,12 @@ class Presets:
return True return True
return False return False
def delete_all(self):
self.presets = {}
self.generator = None
self.selected = None
return True
def tick(self): def tick(self):
if self.generator is None: if self.generator is None:
return return
@@ -153,6 +167,9 @@ class Presets:
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
return True return True
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
return False
print("select failed: preset not found", preset_name)
# If preset doesn't exist or pattern not found, indicate failure # If preset doesn't exist or pattern not found, indicate failure
return False return False

12
src/runtime_state.py Normal file
View File

@@ -0,0 +1,12 @@
class RuntimeState:
def __init__(self):
self.hello = True
self.ws_client_count = 0
def ws_connected(self):
self.ws_client_count += 1
self.hello = False
def ws_disconnected(self):
self.ws_client_count = max(0, self.ws_client_count - 1)
self.hello = self.ws_client_count == 0

53
src/startup.py Normal file
View File

@@ -0,0 +1,53 @@
import gc
import machine
import network
import utime
from presets import Presets
from settings import Settings
def initialize_runtime():
machine.freq(160000000)
settings = Settings()
print(settings)
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
print("Selected startup preset:", default_preset)
else:
print("Startup preset failed (invalid pattern?):", default_preset)
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
utime.sleep(1)
wdt.feed()
print(sta_if.ifconfig())
return settings, presets, wdt, sta_if

View File

@@ -184,6 +184,36 @@ def test_pattern_smoke():
ctx.tick_for_ms(120) ctx.tick_for_ms(120)
def test_patterns_do_not_use_blocking_sleep():
pattern_dir = "patterns"
offenders = []
try:
files = os.listdir(pattern_dir)
except OSError:
raise AssertionError("patterns directory is missing")
for filename in files:
if not filename.endswith(".py") or filename in ("__init__.py", "main.py"):
continue
path = pattern_dir + "/" + filename
try:
with open(path, "r") as f:
src = f.read()
except OSError:
offenders.append(filename + " (unreadable)")
continue
if (
"utime.sleep(" in src
or "utime.sleep_ms(" in src
or "time.sleep(" in src
or "time.sleep_ms(" in src
):
offenders.append(filename)
assert not offenders, "blocking sleep found in patterns: %s" % ", ".join(offenders)
def test_default_requires_existing_preset(): def test_default_requires_existing_preset():
ctx = _TestContext() ctx = _TestContext()
_process_message(ctx, {"v": "1", "default": "missing"}) _process_message(ctx, {"v": "1", "default": "missing"})
@@ -242,6 +272,7 @@ def run_all():
test_preset_edit_sanitization, test_preset_edit_sanitization,
test_colour_conversion_and_transition, test_colour_conversion_and_transition,
test_pattern_smoke, test_pattern_smoke,
test_patterns_do_not_use_blocking_sleep,
test_default_requires_existing_preset, test_default_requires_existing_preset,
test_default_targets_gate_by_device_name, test_default_targets_gate_by_device_name,
test_save_and_load_roundtrip, test_save_and_load_roundtrip,

View File

@@ -1,43 +0,0 @@
"""Minimal Presets-like stub for pattern smoke tests (no machine / NeoPixel)."""
class _FakeNeo:
def __init__(self, count):
self._count = count
self.pixels = [(0, 0, 0)] * count
def fill(self, color):
self.pixels = [tuple(color) for _ in range(self._count)]
def __setitem__(self, index, value):
self.pixels[index] = tuple(value)
def __getitem__(self, index):
return self.pixels[index]
def write(self):
pass
class FakePresets:
"""Subset of led-driver Presets API used by patterns/*.py."""
def __init__(self, num_leds=24):
self.num_leds = num_leds
self.n = _FakeNeo(num_leds)
self.b = 255
self.step = 0
def apply_brightness(self, color, brightness_override=None):
local = brightness_override if brightness_override is not None else 255
effective_brightness = int(local * self.b / 255)
return tuple(int(c * effective_brightness / 255) for c in color)
def fill(self, color=None):
fill_color = color if color is not None else (0, 0, 0)
for i in range(self.num_leds):
self.n[i] = fill_color
self.n.write()
def off(self):
self.fill((0, 0, 0))

View File

@@ -1,174 +0,0 @@
"""
Smoke-test pattern generators with a fake driver (no hardware).
Run from repo root (CPython or MicroPython):
python3 led-driver/tests/pattern_smoke.py
Requires only the stdlib; loads ``preset.py`` and ``patterns/*.py`` from
``led-driver/src`` via import paths (MicroPython: copy ``src`` tree to the
device and run the same command if ``importlib.util`` is available, or set
``PYTHONPATH`` to ``led-driver/src`` and use ``python3`` on the host).
Exit code 0 on success, 1 on any failure.
"""
import importlib.util
import sys
from pathlib import Path
# -----------------------------------------------------------------------------
# Host-only ``utime`` so patterns import on CPython.
# -----------------------------------------------------------------------------
_UTIME_MS = [0]
def utime_advance(ms):
_UTIME_MS[0] += int(ms)
def _install_utime_shim():
if "utime" in sys.modules:
return
import types
m = types.ModuleType("utime")
def ticks_ms():
return _UTIME_MS[0]
def ticks_diff(a, b):
return a - b
def ticks_add(a, b):
return a + b
m.ticks_ms = ticks_ms
m.ticks_diff = ticks_diff
m.ticks_add = ticks_add
sys.modules["utime"] = m
# -----------------------------------------------------------------------------
# Load ``preset`` and pattern modules from ``led-driver/src`` (no sys.path).
# -----------------------------------------------------------------------------
_SRC = Path(__file__).resolve().parent.parent / "src"
_TESTS = Path(__file__).resolve().parent
def _load_module(name, path):
spec = importlib.util.spec_from_file_location(name, path)
if spec is None or spec.loader is None:
raise RuntimeError("no spec for %s" % path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _pattern_class_from_module(mod):
for attr_name in dir(mod):
attr = getattr(mod, attr_name)
if isinstance(attr, type) and hasattr(attr, "run"):
return attr
return None
def _load_preset_class():
preset_mod = _load_module("preset", _SRC / "preset.py")
return preset_mod.Preset
def _list_pattern_basenames():
pat_dir = _SRC / "patterns"
out = []
for p in sorted(pat_dir.iterdir()):
if p.suffix != ".py":
continue
if p.name in ("__init__.py", "main.py"):
continue
out.append(p.stem)
return out
def _default_preset_dict(basename):
"""Reasonable defaults so each pattern's ``run()`` can start."""
base = {
"p": basename,
"d": 100,
"b": 200,
"a": True,
"n1": 5,
"n2": 8,
"n3": 5,
"n4": 4,
}
if basename in ("rainbow", "colour_cycle"):
base["c"] = []
base["n1"] = 2
elif basename == "transition":
base["c"] = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
base["d"] = 200
elif basename in ("chase", "circle"):
base["c"] = [(255, 0, 0), (0, 0, 255)]
elif basename == "pulse":
base["c"] = [(0, 200, 100)]
base["n1"] = 50
base["n2"] = 40
base["n3"] = 50
base["d"] = 30
elif basename in ("blink", "flicker"):
base["c"] = [(255, 100, 0)]
base["d"] = 80
elif basename == "flame":
base["c"] = []
base["d"] = 40
base["n1"] = 40
base["n2"] = 2000
base["n3"] = -1
base["n4"] = 0
else:
base["c"] = [(200, 200, 200)]
return base
def _run_pattern_ticks(Preset, driver, basename, steps, ms_per_tick):
_install_utime_shim()
mod = _load_module("patterns.%s" % basename, _SRC / "patterns" / (basename + ".py"))
cls = _pattern_class_from_module(mod)
if cls is None:
raise RuntimeError("no pattern class in %s" % basename)
preset = Preset(_default_preset_dict(basename))
gen = cls(driver).run(preset)
for _ in range(steps):
utime_advance(ms_per_tick)
next(gen)
def main():
failures = []
Preset = _load_preset_class()
fake_mod = _load_module("fake_driver", _TESTS / "fake_driver.py")
FakePresets = fake_mod.FakePresets
for basename in _list_pattern_basenames():
d = FakePresets(16)
try:
_run_pattern_ticks(Preset, d, basename, steps=80, ms_per_tick=50)
print("ok patterns.%s" % basename)
except Exception as exc:
print("FAIL patterns.%s: %r" % (basename, exc))
failures.append((basename, exc))
if failures:
print("%d pattern(s) failed" % len(failures))
return 1
print("all %d pattern smoke tests passed" % len(_list_pattern_basenames()))
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(130)

40
tests/patterns/aurora.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_aurora", {
"p": "aurora",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_aurora")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_bar_graph", {
"p": "bar_graph",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_bar_graph")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_breathing_dual", {
"p": "breathing_dual",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_breathing_dual")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_clock_sweep", {
"p": "clock_sweep",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_clock_sweep")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_comet_dual", {
"p": "comet_dual",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_comet_dual")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_fireflies", {
"p": "fireflies",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_fireflies")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test gradient_scroll")
p.edit("gradient_test", {
"p": "gradient_scroll",
"b": 220,
"d": 60,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
"n1": 2,
"a": True,
})
p.select("gradient_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_heartbeat", {
"p": "heartbeat",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_heartbeat")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/marquee.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_marquee", {
"p": "marquee",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_marquee")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test meteor_rain")
p.edit("meteor_test", {
"p": "meteor_rain",
"b": 200,
"d": 40,
"c": [(255, 80, 0), (0, 120, 255)],
"n1": 10,
"n2": 1,
"n3": 200,
"a": True,
})
p.select("meteor_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/orbit.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_orbit", {
"p": "orbit",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_orbit")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_palette_morph", {
"p": "palette_morph",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_palette_morph")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/plasma.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_plasma", {
"p": "plasma",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_plasma")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_rain_drops", {
"p": "rain_drops",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_rain_drops")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/scanner.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
print("Test scanner")
p.edit("scanner_test", {
"p": "scanner",
"b": 255,
"d": 30,
"c": [(255, 0, 0)],
"n1": 4,
"n2": 2,
"a": True,
})
p.select("scanner_test")
run_for(p, wdt, 4000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_segment_chase", {
"p": "segment_chase",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_segment_chase")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_snowfall", {
"p": "snowfall",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_snowfall")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_sparkle_trail", {
"p": "sparkle_trail",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_sparkle_trail")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_strobe_burst", {
"p": "strobe_burst",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_strobe_burst")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

40
tests/patterns/wave.py Normal file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
wdt = WDT(timeout=10000)
p.edit("test_wave", {
"p": "wave",
"b": 200,
"d": 60,
"c": [(255, 0, 0), (0, 0, 255), (0, 255, 0)],
"n1": 4,
"n2": 2,
"n3": 120,
"a": True,
})
p.select("test_wave")
run_for(p, wdt, 3000)
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

25
tests/peers.py Normal file
View File

@@ -0,0 +1,25 @@
from espnow import ESPNow
import network
sta = network.WLAN(network.STA_IF)
sta.active(True)
espnow = ESPNow()
espnow.active(True)
# add_peer() expects a 6-byte MAC (bytes/bytearray), not integers.
# Unicast placeholders (not broadcast/multicast) so get_peers() lists them.
# PEERS = aa:aa:aa:aa:aa:START … aa:aa:aa:aa:aa:END (inclusive last octet).
_PREFIX = b"\xaa\xaa\xaa\xaa\xaa"
_START_LAST_OCTET = 1
_END_LAST_OCTET = 40
PEERS = tuple(_PREFIX + bytes((i,)) for i in range(_START_LAST_OCTET, _END_LAST_OCTET + 1))
for peer in PEERS:
espnow.add_peer(peer)
print("peers:", PEERS)
for peer in PEERS:
espnow.send(peer, b"Hello, world!")
print(espnow.get_peers())

41
tests/test_ap_pm0.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""MicroPython AP example with power management disabled (pm=0).
Run on device:
mpremote connect /dev/ttyACM0 run tests/test_ap_pm0.py
"""
import network
import time
AP_SSID = "led-ap"
AP_PASSWORD = "ledpass123"
AP_CHANNEL = 6
def main():
ap = network.WLAN(network.AP_IF)
ap.active(True)
# Explicitly disable Wi-Fi power save for AP mode.
try:
ap.config(pm=0)
except (AttributeError, ValueError, TypeError):
try:
ap.config(pm=network.WLAN.PM_NONE)
except (AttributeError, ValueError, TypeError):
pass
ap.config(essid=AP_SSID, password=AP_PASSWORD, channel=AP_CHANNEL, authmode=3)
print("[ap-pm0] AP active:", ap.active())
print("[ap-pm0] SSID:", AP_SSID)
print("[ap-pm0] IFCONFIG:", ap.ifconfig())
print("[ap-pm0] Waiting for clients. Ctrl+C to stop.")
while True:
time.sleep(2)
if __name__ == "__main__":
main()