Compare commits

1 Commits

Author SHA1 Message Date
428ed8b884 feat(led-driver): add preset clear command and runtime debug 2026-04-21 00:44:28 +12:00
7 changed files with 119 additions and 227 deletions

View File

@@ -24,6 +24,8 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_brightness(data, settings, presets) apply_brightness(data, settings, presets)
if "presets" in data: if "presets" in data:
apply_presets(data, settings, presets) apply_presets(data, settings, presets)
if "clear_presets" in data:
apply_clear_presets(data, presets)
if "select" in data: if "select" in data:
apply_select(data, settings, presets) apply_select(data, settings, presets)
if "default" in data: if "default" in data:
@@ -32,6 +34,8 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_patterns_ota(data, presets, controller_ip=controller_ip) apply_patterns_ota(data, presets, controller_ip=controller_ip)
if "save" in data and ("presets" in data or "default" in data): if "save" in data and ("presets" in data or "default" in data):
presets.save() presets.save()
if "save" in data and "clear_presets" in data:
presets.save()
def apply_brightness(data, settings, presets): def apply_brightness(data, settings, presets):
@@ -70,6 +74,22 @@ def apply_select(data, settings, presets):
presets.select(preset_name, step=step) presets.select(preset_name, step=step)
def apply_clear_presets(data, presets):
clear_value = data.get("clear_presets")
if isinstance(clear_value, bool):
should_clear = clear_value
elif isinstance(clear_value, int):
should_clear = bool(clear_value)
elif isinstance(clear_value, str):
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
else:
should_clear = False
if not should_clear:
return
presets.delete_all()
print("Cleared all presets.")
def apply_default(data, settings, presets): def apply_default(data, settings, presets):
targets = data.get("targets") or [] targets = data.get("targets") or []
default_name = data["default"] default_name = data["default"]

View File

@@ -1,9 +1,10 @@
from settings import Settings from settings import Settings
from machine import WDT import machine
import network import network
import utime import utime
import asyncio import asyncio
import json import json
import gc
from microdot import Microdot from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
@@ -14,12 +15,25 @@ try:
except ImportError: except ImportError:
import os import os
machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings) print(settings)
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "") default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets: if default_preset and default_preset in presets.presets:
if presets.select(default_preset): if presets.select(default_preset):
@@ -27,10 +41,15 @@ if default_preset and default_preset in presets.presets:
else: else:
print("Startup preset failed (invalid pattern?):", default_preset) print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000) # On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
wdt.feed() # Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True) sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"]) sta_if.connect(settings["ssid"], settings["password"])
@@ -149,9 +168,16 @@ async def upload_pattern(request):
async def presets_loop(): async def presets_loop():
last_mem_log = utime.ticks_ms()
while True: while True:
presets.tick() presets.tick()
wdt.feed() wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run. # tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)

View File

@@ -1,5 +1,7 @@
import utime import utime
_RADIATE_DBG_INTERVAL_MS = 1000
class Radiate: class Radiate:
def __init__(self, driver): def __init__(self, driver):
@@ -28,6 +30,8 @@ class Radiate:
now = utime.ticks_ms() now = utime.ticks_ms()
last_trigger = now last_trigger = now
active_pulses = [now] active_pulses = [now]
last_dbg = now
dbg_banner = False
if not preset.a: if not preset.a:
# Single-step render uses only the first instant pulse. # Single-step render uses only the first instant pulse.
@@ -44,17 +48,26 @@ class Radiate:
off_color = self.driver.apply_brightness(base_off, preset.b) off_color = self.driver.apply_brightness(base_off, preset.b)
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms: if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
active_pulses.append(now) # Keep one pulse train at a time; replacing instead of appending
# prevents overlap from keeping color[0] continuously visible.
active_pulses = [now]
last_trigger = utime.ticks_add(last_trigger, delay_ms) last_trigger = utime.ticks_add(last_trigger, delay_ms)
if bool(getattr(self.driver, "debug", False)):
print(
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
% (spacing, outward_ms, return_ms, delay_ms)
)
# Drop pulses once their out-and-back lifetime ends. # Drop pulses once their out-and-back lifetime ends.
pulse_lifetime = outward_ms + return_ms pulse_lifetime = outward_ms + return_ms
kept = [] kept = []
for start in active_pulses: for start in active_pulses:
age = utime.ticks_diff(now, start) age = utime.ticks_diff(now, start)
if age <= pulse_lifetime: if age < pulse_lifetime:
kept.append(start) kept.append(start)
active_pulses = kept active_pulses = kept
debug_front = -1
lit_count = 0
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
# Nearest node distance for a repeating node grid every `spacing` LEDs. # Nearest node distance for a repeating node grid every `spacing` LEDs.
@@ -64,24 +77,58 @@ class Radiate:
lit = False lit = False
for start in active_pulses: for start in active_pulses:
age = utime.ticks_diff(now, start) age = utime.ticks_diff(now, start)
if age < 0: # Do not render on the exact trigger tick; this avoids
# node LEDs appearing "stuck on" between cycles.
if age <= 0:
continue continue
if age <= outward_ms: if age <= outward_ms:
front = (age * max_dist) / outward_ms # Integer-ceiling progression so peak can be reached even
# when tick timing skips the exact outward_ms boundary.
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms: elif age <= outward_ms + return_ms:
back_age = age - outward_ms back_age = age - outward_ms
front = ((return_ms - back_age) * max_dist) / return_ms remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else: else:
continue continue
if dist <= front: if dist <= front:
lit = True lit = True
if front > debug_front:
debug_front = front
break break
self.driver.n[i] = lit_color if lit else off_color self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write() self.driver.n.write()
if bool(getattr(self.driver, "debug", False)):
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.d,
self.driver.num_leds,
)
)
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
print(
"[radiate] age=%d front=%d max=%d active=%d lit=%d"
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count)
)
if lit_count == 0:
print("[radiate] fully off")
last_dbg = now
if not preset.a: if not preset.a:
yield yield
return return

View File

@@ -1 +0,0 @@
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}

View File

@@ -9,6 +9,8 @@ try:
except ImportError: except ImportError:
import os import os
MAX_PRESETS = 32
class Presets: class Presets:
def __init__(self, pin, num_leds): def __init__(self, pin, num_leds):
@@ -95,6 +97,9 @@ class Presets:
order = settings if settings is not None else "rgb" order = settings if settings is not None else "rgb"
self.presets = {} self.presets = {}
for name, preset_data in data.items(): for name, preset_data in data.items():
if len(self.presets) >= MAX_PRESETS:
print("Preset limit reached on load:", MAX_PRESETS)
break
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None: if color_key is not None:
preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key] = convert_and_reorder_colors(
@@ -113,6 +118,9 @@ class Presets:
# Update existing preset # Update existing preset
self.presets[name].edit(data) self.presets[name].edit(data)
else: else:
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
print("Preset limit reached:", MAX_PRESETS)
return False
# Create new preset # Create new preset
self.presets[name] = Preset(data) self.presets[name] = Preset(data)
return True return True
@@ -123,6 +131,12 @@ class Presets:
return True return True
return False return False
def delete_all(self):
self.presets = {}
self.generator = None
self.selected = None
return True
def tick(self): def tick(self):
if self.generator is None: if self.generator is None:
return return
@@ -153,6 +167,9 @@ class Presets:
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
return True return True
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
return False
print("select failed: preset not found", preset_name)
# If preset doesn't exist or pattern not found, indicate failure # If preset doesn't exist or pattern not found, indicate failure
return False return False

View File

@@ -1,43 +0,0 @@
"""Minimal Presets-like stub for pattern smoke tests (no machine / NeoPixel)."""
class _FakeNeo:
def __init__(self, count):
self._count = count
self.pixels = [(0, 0, 0)] * count
def fill(self, color):
self.pixels = [tuple(color) for _ in range(self._count)]
def __setitem__(self, index, value):
self.pixels[index] = tuple(value)
def __getitem__(self, index):
return self.pixels[index]
def write(self):
pass
class FakePresets:
"""Subset of led-driver Presets API used by patterns/*.py."""
def __init__(self, num_leds=24):
self.num_leds = num_leds
self.n = _FakeNeo(num_leds)
self.b = 255
self.step = 0
def apply_brightness(self, color, brightness_override=None):
local = brightness_override if brightness_override is not None else 255
effective_brightness = int(local * self.b / 255)
return tuple(int(c * effective_brightness / 255) for c in color)
def fill(self, color=None):
fill_color = color if color is not None else (0, 0, 0)
for i in range(self.num_leds):
self.n[i] = fill_color
self.n.write()
def off(self):
self.fill((0, 0, 0))

View File

@@ -1,174 +0,0 @@
"""
Smoke-test pattern generators with a fake driver (no hardware).
Run from repo root (CPython or MicroPython):
python3 led-driver/tests/pattern_smoke.py
Requires only the stdlib; loads ``preset.py`` and ``patterns/*.py`` from
``led-driver/src`` via import paths (MicroPython: copy ``src`` tree to the
device and run the same command if ``importlib.util`` is available, or set
``PYTHONPATH`` to ``led-driver/src`` and use ``python3`` on the host).
Exit code 0 on success, 1 on any failure.
"""
import importlib.util
import sys
from pathlib import Path
# -----------------------------------------------------------------------------
# Host-only ``utime`` so patterns import on CPython.
# -----------------------------------------------------------------------------
_UTIME_MS = [0]
def utime_advance(ms):
_UTIME_MS[0] += int(ms)
def _install_utime_shim():
if "utime" in sys.modules:
return
import types
m = types.ModuleType("utime")
def ticks_ms():
return _UTIME_MS[0]
def ticks_diff(a, b):
return a - b
def ticks_add(a, b):
return a + b
m.ticks_ms = ticks_ms
m.ticks_diff = ticks_diff
m.ticks_add = ticks_add
sys.modules["utime"] = m
# -----------------------------------------------------------------------------
# Load ``preset`` and pattern modules from ``led-driver/src`` (no sys.path).
# -----------------------------------------------------------------------------
_SRC = Path(__file__).resolve().parent.parent / "src"
_TESTS = Path(__file__).resolve().parent
def _load_module(name, path):
spec = importlib.util.spec_from_file_location(name, path)
if spec is None or spec.loader is None:
raise RuntimeError("no spec for %s" % path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _pattern_class_from_module(mod):
for attr_name in dir(mod):
attr = getattr(mod, attr_name)
if isinstance(attr, type) and hasattr(attr, "run"):
return attr
return None
def _load_preset_class():
preset_mod = _load_module("preset", _SRC / "preset.py")
return preset_mod.Preset
def _list_pattern_basenames():
pat_dir = _SRC / "patterns"
out = []
for p in sorted(pat_dir.iterdir()):
if p.suffix != ".py":
continue
if p.name in ("__init__.py", "main.py"):
continue
out.append(p.stem)
return out
def _default_preset_dict(basename):
"""Reasonable defaults so each pattern's ``run()`` can start."""
base = {
"p": basename,
"d": 100,
"b": 200,
"a": True,
"n1": 5,
"n2": 8,
"n3": 5,
"n4": 4,
}
if basename in ("rainbow", "colour_cycle"):
base["c"] = []
base["n1"] = 2
elif basename == "transition":
base["c"] = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
base["d"] = 200
elif basename in ("chase", "circle"):
base["c"] = [(255, 0, 0), (0, 0, 255)]
elif basename == "pulse":
base["c"] = [(0, 200, 100)]
base["n1"] = 50
base["n2"] = 40
base["n3"] = 50
base["d"] = 30
elif basename in ("blink", "flicker"):
base["c"] = [(255, 100, 0)]
base["d"] = 80
elif basename == "flame":
base["c"] = []
base["d"] = 40
base["n1"] = 40
base["n2"] = 2000
base["n3"] = -1
base["n4"] = 0
else:
base["c"] = [(200, 200, 200)]
return base
def _run_pattern_ticks(Preset, driver, basename, steps, ms_per_tick):
_install_utime_shim()
mod = _load_module("patterns.%s" % basename, _SRC / "patterns" / (basename + ".py"))
cls = _pattern_class_from_module(mod)
if cls is None:
raise RuntimeError("no pattern class in %s" % basename)
preset = Preset(_default_preset_dict(basename))
gen = cls(driver).run(preset)
for _ in range(steps):
utime_advance(ms_per_tick)
next(gen)
def main():
failures = []
Preset = _load_preset_class()
fake_mod = _load_module("fake_driver", _TESTS / "fake_driver.py")
FakePresets = fake_mod.FakePresets
for basename in _list_pattern_basenames():
d = FakePresets(16)
try:
_run_pattern_ticks(Preset, d, basename, steps=80, ms_per_tick=50)
print("ok patterns.%s" % basename)
except Exception as exc:
print("FAIL patterns.%s: %r" % (basename, exc))
failures.append((basename, exc))
if failures:
print("%d pattern(s) failed" % len(failures))
return 1
print("all %d pattern smoke tests passed" % len(_list_pattern_basenames()))
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(130)