Compare commits

4 Commits

Author SHA1 Message Date
5a8866add7 feat(esp32): pattern upload route and ws controller ip
Made-with: Cursor
2026-04-19 23:27:33 +12:00
a2cd2f8dc2 test(led-driver): add pattern smoke harness
Made-with: Cursor
2026-04-19 23:27:29 +12:00
c47725e31a feat(patterns): add colour cycle, flicker, and flame
Made-with: Cursor
2026-04-19 23:27:19 +12:00
22b1a8a6d6 fix(led-driver): phase-lock pattern timers
Made-with: Cursor
2026-04-19 21:41:18 +12:00
11 changed files with 631 additions and 12 deletions

View File

@@ -3,11 +3,16 @@ from machine import WDT
import network
import utime
import asyncio
import json
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets
from controller_messages import process_data
from hello import broadcast_hello_udp
try:
import uos as os
except ImportError:
import os
settings = Settings()
print(settings)
@@ -38,24 +43,111 @@ print(sta_if.ifconfig())
app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
print("WS controller_ip:", controller_ip)
try:
while True:
data = await ws.receive()
if not data:
print("WS client disconnected (closed)")
break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data)
process_data(data, settings, presets)
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
print("patterns/upload body_bytes:", len(body))
try:
code = body.decode("utf-8")
except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
print("patterns/upload writing:", path)
with open(path, "w") as f:
f.write(code)
if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def presets_loop():
while True:
presets.tick()

View File

@@ -1,6 +1,5 @@
from .blink import Blink
from .rainbow import Rainbow
from .pulse import Pulse
from .transition import Transition
from .chase import Chase
from .circle import Circle
"""Pattern modules are registered only via Presets._load_dynamic_patterns().
This file is ignored as a pattern (see presets.py). Keep it free of imports so
adding a pattern does not require editing this package.
"""

View File

@@ -28,6 +28,6 @@ class Blink:
# "Off" phase: turn all LEDs off
self.driver.fill((0, 0, 0))
state = not state
last_update = current_time
last_update = utime.ticks_add(last_update, delay_ms)
# Yield once per tick so other logic can run
yield

View File

@@ -118,7 +118,8 @@ class Chase:
# Increment step
step_count += 1
self.driver.step = step_count
last_update = current_time
last_update = utime.ticks_add(last_update, transition_duration)
transition_duration = max(10, int(preset.d))
# Yield once per tick so other logic can run
yield

View File

@@ -62,7 +62,9 @@ class Circle:
# Move head continuously at n1 LEDs per second
if utime.ticks_diff(current_time, last_head_move) >= head_delay:
head = (head + 1) % self.driver.num_leds
last_head_move = current_time
last_head_move = utime.ticks_add(last_head_move, head_delay)
head_rate = max(1, int(preset.n1))
head_delay = 1000 // head_rate
# Tail behavior based on phase
if phase == "growing":
@@ -73,7 +75,9 @@ class Circle:
# Shrinking phase: move tail forward at n3 LEDs per second
if utime.ticks_diff(current_time, last_tail_move) >= tail_delay:
tail = (tail + 1) % self.driver.num_leds
last_tail_move = current_time
last_tail_move = utime.ticks_add(last_tail_move, tail_delay)
tail_rate = max(1, int(preset.n3))
tail_delay = 1000 // tail_rate
# Check if we've reached min length
current_length = (head - tail) % self.driver.num_leds

View File

@@ -0,0 +1,56 @@
import utime
class ColourCycle:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
# Match rainbow behaviour: phase is 0..255 and maps to one full-strip shift.
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
# Position around the colour loop, shifted by phase.
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1))
if not preset.a:
self._render(colors, phase, preset.b)
self.driver.step = (phase + step_amount) % 256
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
yield

210
src/patterns/flame.py Normal file
View File

@@ -0,0 +1,210 @@
import random
import utime
# Default warm palette: ember → orange → yellow → pale hot (RGB)
_DEFAULT_PALETTE = (
(90, 8, 8),
(200, 40, 12),
(255, 120, 30),
(255, 220, 140),
)
def _clamp(x, lo, hi):
if x < lo:
return lo
if x > hi:
return hi
return x
def _lerp_chan(a, b, t):
return a + ((b - a) * t >> 8)
def _lerp_rgb(c0, c1, t):
return (
_lerp_chan(c0[0], c1[0], t),
_lerp_chan(c0[1], c1[1], t),
_lerp_chan(c0[2], c1[2], t),
)
def _palette_sample(palette, pos256):
n = len(palette)
if n == 0:
return (255, 160, 60)
if n == 1:
return palette[0]
span = (n - 1) * pos256
seg = span >> 8
if seg >= n - 1:
return palette[n - 1]
frac = span & 0xFF
return _lerp_rgb(palette[seg], palette[seg + 1], frac)
def _triangle_255(elapsed_ms, period_ms):
period_ms = max(period_ms, 400)
p = elapsed_ms % period_ms
half = period_ms >> 1
if half <= 0:
return 128
if p < half:
return (p * 255) // half
return ((period_ms - p) * 255) // (period_ms - half)
class Flame:
def __init__(self, driver):
self.driver = driver
def _build_palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_PALETTE)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
_clamp(int(c[0]), 0, 255),
_clamp(int(c[1]), 0, 255),
_clamp(int(c[2]), 0, 255),
)
)
return out if out else list(_DEFAULT_PALETTE)
def _draw_frame(self, preset, palette, ticks_now, breath_el_ms, rise, cluster_jit, breath_ms, lo, hi, spark_state):
"""spark_state: (active: bool, start_ticks, duration_ms). ticks_now for sparks; breath_el_ms for slow wave."""
num = self.driver.num_leds
denom = num - 1 if num > 1 else 1
breathe = _triangle_255(breath_el_ms, breath_ms)
base_level = lo + (((hi - lo) * breathe) >> 8)
micro = 232 + random.randint(0, 35)
level = (base_level * micro) >> 8
level = _clamp(level, lo, hi)
spark_boost = 0
spark_white = (0, 0, 0)
active, s0, dur = spark_state
if active and dur > 0:
el = utime.ticks_diff(ticks_now, s0)
if el < 0:
el = 0
if el >= dur:
spark_boost = 0
else:
env = 255 - ((el * 255) // dur)
spark_boost = (env * 90) >> 8
spark_white = ((env * 55) >> 8, (env * 50) >> 8, (env * 40) >> 8)
for i in range(num):
h = (i * 256) // denom
flow = (h + rise + ((i // max(1, num >> 3)) * 17)) & 255
pos = (flow + cluster_jit[(i >> 2) & 7]) & 255
rgb = _palette_sample(palette, pos)
if spark_boost:
rgb = (
_clamp(rgb[0] + spark_white[0] + (spark_boost * 3 >> 2), 0, 255),
_clamp(rgb[1] + spark_white[1] + (spark_boost >> 1), 0, 255),
_clamp(rgb[2] + spark_white[2] + (spark_boost >> 2), 0, 255),
)
self.driver.n[i] = self.driver.apply_brightness(rgb, level)
self.driver.n.write()
def run(self, preset):
"""Salt-lamp / hearth-style flame: warm gradient, breathing, jitter, drift, rare sparks."""
palette = self._build_palette(preset)
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
bp = int(preset.n2)
breath_ms = max(800, bp if bp > 0 else 2500)
gap_lo = int(preset.n3)
gap_hi = int(preset.n4)
# n3 < 0 disables sparks; n3=n4=0 uses ~1030 s gaps (hearth pops).
if gap_lo < 0:
sparks_on = False
else:
sparks_on = True
if gap_lo == 0 and gap_hi == 0:
gap_lo, gap_hi = 10000, 30000
else:
gap_lo = max(gap_lo, 500)
if gap_hi < gap_lo:
gap_hi = gap_lo
delay_ms = max(16, int(preset.d))
rise = random.randint(0, 255)
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
last_draw = utime.ticks_ms()
breath_origin = last_draw
last_cluster = last_draw
spark_active = False
spark_start = 0
spark_dur = 0
next_spark = utime.ticks_add(last_draw, random.randint(gap_lo, gap_hi)) if sparks_on else 0
if not preset.a:
now = utime.ticks_ms()
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(False, 0, 0),
)
yield
return
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_draw) < delay_ms:
yield
continue
last_draw = utime.ticks_add(last_draw, delay_ms)
rise = (rise + random.randint(-10, 12)) & 255
if utime.ticks_diff(now, last_cluster) >= (delay_ms * 4):
last_cluster = now
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
spark_state = (spark_active, spark_start, spark_dur)
if sparks_on:
if spark_active:
if utime.ticks_diff(now, spark_start) >= spark_dur:
spark_active = False
next_spark = utime.ticks_add(
now,
random.randint(gap_lo, gap_hi),
)
elif utime.ticks_diff(now, next_spark) >= 0:
spark_active = True
spark_start = now
spark_dur = random.randint(180, 360)
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(spark_active, spark_start, spark_dur),
)
yield

40
src/patterns/flicker.py Normal file
View File

@@ -0,0 +1,40 @@
import random
import utime
class Flicker:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Random brightness between n1 (min) and b (max); delay d ms between updates."""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
last_update = utime.ticks_ms()
def brightness_bounds():
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
return lo, hi
if not preset.a:
lo, hi = brightness_bounds()
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
yield
return
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
lo, hi = brightness_bounds()
if utime.ticks_diff(current_time, last_update) >= delay_ms:
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
yield

View File

@@ -46,6 +46,6 @@ class Rainbow:
self.driver.n.write()
step = (step + step_amount) % 256
self.driver.step = step
last_update = current_time
last_update = utime.ticks_add(last_update, sleep_ms)
# Yield once per tick so other logic can run
yield

43
tests/fake_driver.py Normal file
View File

@@ -0,0 +1,43 @@
"""Minimal Presets-like stub for pattern smoke tests (no machine / NeoPixel)."""
class _FakeNeo:
def __init__(self, count):
self._count = count
self.pixels = [(0, 0, 0)] * count
def fill(self, color):
self.pixels = [tuple(color) for _ in range(self._count)]
def __setitem__(self, index, value):
self.pixels[index] = tuple(value)
def __getitem__(self, index):
return self.pixels[index]
def write(self):
pass
class FakePresets:
"""Subset of led-driver Presets API used by patterns/*.py."""
def __init__(self, num_leds=24):
self.num_leds = num_leds
self.n = _FakeNeo(num_leds)
self.b = 255
self.step = 0
def apply_brightness(self, color, brightness_override=None):
local = brightness_override if brightness_override is not None else 255
effective_brightness = int(local * self.b / 255)
return tuple(int(c * effective_brightness / 255) for c in color)
def fill(self, color=None):
fill_color = color if color is not None else (0, 0, 0)
for i in range(self.num_leds):
self.n[i] = fill_color
self.n.write()
def off(self):
self.fill((0, 0, 0))

174
tests/pattern_smoke.py Normal file
View File

@@ -0,0 +1,174 @@
"""
Smoke-test pattern generators with a fake driver (no hardware).
Run from repo root (CPython or MicroPython):
python3 led-driver/tests/pattern_smoke.py
Requires only the stdlib; loads ``preset.py`` and ``patterns/*.py`` from
``led-driver/src`` via import paths (MicroPython: copy ``src`` tree to the
device and run the same command if ``importlib.util`` is available, or set
``PYTHONPATH`` to ``led-driver/src`` and use ``python3`` on the host).
Exit code 0 on success, 1 on any failure.
"""
import importlib.util
import sys
from pathlib import Path
# -----------------------------------------------------------------------------
# Host-only ``utime`` so patterns import on CPython.
# -----------------------------------------------------------------------------
_UTIME_MS = [0]
def utime_advance(ms):
_UTIME_MS[0] += int(ms)
def _install_utime_shim():
if "utime" in sys.modules:
return
import types
m = types.ModuleType("utime")
def ticks_ms():
return _UTIME_MS[0]
def ticks_diff(a, b):
return a - b
def ticks_add(a, b):
return a + b
m.ticks_ms = ticks_ms
m.ticks_diff = ticks_diff
m.ticks_add = ticks_add
sys.modules["utime"] = m
# -----------------------------------------------------------------------------
# Load ``preset`` and pattern modules from ``led-driver/src`` (no sys.path).
# -----------------------------------------------------------------------------
_SRC = Path(__file__).resolve().parent.parent / "src"
_TESTS = Path(__file__).resolve().parent
def _load_module(name, path):
spec = importlib.util.spec_from_file_location(name, path)
if spec is None or spec.loader is None:
raise RuntimeError("no spec for %s" % path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _pattern_class_from_module(mod):
for attr_name in dir(mod):
attr = getattr(mod, attr_name)
if isinstance(attr, type) and hasattr(attr, "run"):
return attr
return None
def _load_preset_class():
preset_mod = _load_module("preset", _SRC / "preset.py")
return preset_mod.Preset
def _list_pattern_basenames():
pat_dir = _SRC / "patterns"
out = []
for p in sorted(pat_dir.iterdir()):
if p.suffix != ".py":
continue
if p.name in ("__init__.py", "main.py"):
continue
out.append(p.stem)
return out
def _default_preset_dict(basename):
"""Reasonable defaults so each pattern's ``run()`` can start."""
base = {
"p": basename,
"d": 100,
"b": 200,
"a": True,
"n1": 5,
"n2": 8,
"n3": 5,
"n4": 4,
}
if basename in ("rainbow", "colour_cycle"):
base["c"] = []
base["n1"] = 2
elif basename == "transition":
base["c"] = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
base["d"] = 200
elif basename in ("chase", "circle"):
base["c"] = [(255, 0, 0), (0, 0, 255)]
elif basename == "pulse":
base["c"] = [(0, 200, 100)]
base["n1"] = 50
base["n2"] = 40
base["n3"] = 50
base["d"] = 30
elif basename in ("blink", "flicker"):
base["c"] = [(255, 100, 0)]
base["d"] = 80
elif basename == "flame":
base["c"] = []
base["d"] = 40
base["n1"] = 40
base["n2"] = 2000
base["n3"] = -1
base["n4"] = 0
else:
base["c"] = [(200, 200, 200)]
return base
def _run_pattern_ticks(Preset, driver, basename, steps, ms_per_tick):
_install_utime_shim()
mod = _load_module("patterns.%s" % basename, _SRC / "patterns" / (basename + ".py"))
cls = _pattern_class_from_module(mod)
if cls is None:
raise RuntimeError("no pattern class in %s" % basename)
preset = Preset(_default_preset_dict(basename))
gen = cls(driver).run(preset)
for _ in range(steps):
utime_advance(ms_per_tick)
next(gen)
def main():
failures = []
Preset = _load_preset_class()
fake_mod = _load_module("fake_driver", _TESTS / "fake_driver.py")
FakePresets = fake_mod.FakePresets
for basename in _list_pattern_basenames():
d = FakePresets(16)
try:
_run_pattern_ticks(Preset, d, basename, steps=80, ms_per_tick=50)
print("ok patterns.%s" % basename)
except Exception as exc:
print("FAIL patterns.%s: %r" % (basename, exc))
failures.append((basename, exc))
if failures:
print("%d pattern(s) failed" % len(failures))
return 1
print("all %d pattern smoke tests passed" % len(_list_pattern_basenames()))
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit(130)