8 Commits

Author SHA1 Message Date
4575ef16ad test(led-driver): add espnow peer and ap pm0 scripts
Made-with: Cursor
2026-04-21 21:48:42 +12:00
a342187635 feat(patterns): add twinkle pattern defaults
Made-with: Cursor
2026-04-21 21:48:42 +12:00
428ed8b884 feat(led-driver): add preset clear command and runtime debug 2026-04-21 00:44:28 +12:00
a22702df4d feat(patterns): add radiate animation 2026-04-20 23:37:43 +12:00
5a8866add7 feat(esp32): pattern upload route and ws controller ip
Made-with: Cursor
2026-04-19 23:27:33 +12:00
a2cd2f8dc2 test(led-driver): add pattern smoke harness
Made-with: Cursor
2026-04-19 23:27:29 +12:00
c47725e31a feat(patterns): add colour cycle, flicker, and flame
Made-with: Cursor
2026-04-19 23:27:19 +12:00
22b1a8a6d6 fix(led-driver): phase-lock pattern timers
Made-with: Cursor
2026-04-19 21:41:18 +12:00
17 changed files with 911 additions and 17 deletions

1
presets.json Normal file
View File

@@ -0,0 +1 @@
{"15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "40": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 0]], "b": 255, "n2": 2600, "n1": 35, "p": "flame", "n3": 0, "d": 50}, "41": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[120, 200, 255], [80, 140, 255], [180, 120, 255], [100, 220, 232], [160, 200, 255]], "b": 255, "n2": 10, "n1": 72, "p": "twinkle", "n3": 5, "d": 500}, "42": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[166, 0, 255], [0, 10, 10]], "b": 255, "n2": 900, "n1": 30, "p": "radiate", "n3": 4000, "d": 5000}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "38": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 1, "p": "colour_cycle", "n3": 0, "d": 100}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}, "39": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 184, 77]], "b": 255, "n2": 0, "n1": 30, "p": "flicker", "n3": 0, "d": 80}, "14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 255], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 5000}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}}

View File

@@ -24,6 +24,8 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_brightness(data, settings, presets)
if "presets" in data:
apply_presets(data, settings, presets)
if "clear_presets" in data:
apply_clear_presets(data, presets)
if "select" in data:
apply_select(data, settings, presets)
if "default" in data:
@@ -32,6 +34,8 @@ def process_data(payload, settings, presets, controller_ip=None):
apply_patterns_ota(data, presets, controller_ip=controller_ip)
if "save" in data and ("presets" in data or "default" in data):
presets.save()
if "save" in data and "clear_presets" in data:
presets.save()
def apply_brightness(data, settings, presets):
@@ -70,6 +74,22 @@ def apply_select(data, settings, presets):
presets.select(preset_name, step=step)
def apply_clear_presets(data, presets):
clear_value = data.get("clear_presets")
if isinstance(clear_value, bool):
should_clear = clear_value
elif isinstance(clear_value, int):
should_clear = bool(clear_value)
elif isinstance(clear_value, str):
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
else:
should_clear = False
if not should_clear:
return
presets.delete_all()
print("Cleared all presets.")
def apply_default(data, settings, presets):
targets = data.get("targets") or []
default_name = data["default"]

View File

@@ -1,20 +1,39 @@
from settings import Settings
from machine import WDT
import machine
import network
import utime
import asyncio
import json
import gc
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets
from controller_messages import process_data
from hello import broadcast_hello_udp
try:
import uos as os
except ImportError:
import os
machine.freq(160000000)
settings = Settings()
print(settings)
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False))
gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
@@ -22,10 +41,15 @@ if default_preset and default_preset in presets.presets:
else:
print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000)
wdt.feed()
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
@@ -38,28 +62,122 @@ print(sta_if.ifconfig())
app = Microdot()
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
print("WS controller_ip:", controller_ip)
try:
while True:
data = await ws.receive()
if not data:
print("WS client disconnected (closed)")
break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data)
process_data(data, settings, presets)
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
print("patterns/upload body_bytes:", len(body))
try:
code = body.decode("utf-8")
except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
print("patterns/upload writing:", path)
with open(path, "w") as f:
f.write(code)
if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def presets_loop():
last_mem_log = utime.ticks_ms()
while True:
presets.tick()
wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)

View File

@@ -1,6 +1,5 @@
from .blink import Blink
from .rainbow import Rainbow
from .pulse import Pulse
from .transition import Transition
from .chase import Chase
from .circle import Circle
"""Pattern modules are registered only via Presets._load_dynamic_patterns().
This file is ignored as a pattern (see presets.py). Keep it free of imports so
adding a pattern does not require editing this package.
"""

View File

@@ -28,6 +28,6 @@ class Blink:
# "Off" phase: turn all LEDs off
self.driver.fill((0, 0, 0))
state = not state
last_update = current_time
last_update = utime.ticks_add(last_update, delay_ms)
# Yield once per tick so other logic can run
yield

View File

@@ -118,7 +118,8 @@ class Chase:
# Increment step
step_count += 1
self.driver.step = step_count
last_update = current_time
last_update = utime.ticks_add(last_update, transition_duration)
transition_duration = max(10, int(preset.d))
# Yield once per tick so other logic can run
yield

View File

@@ -62,7 +62,9 @@ class Circle:
# Move head continuously at n1 LEDs per second
if utime.ticks_diff(current_time, last_head_move) >= head_delay:
head = (head + 1) % self.driver.num_leds
last_head_move = current_time
last_head_move = utime.ticks_add(last_head_move, head_delay)
head_rate = max(1, int(preset.n1))
head_delay = 1000 // head_rate
# Tail behavior based on phase
if phase == "growing":
@@ -73,7 +75,9 @@ class Circle:
# Shrinking phase: move tail forward at n3 LEDs per second
if utime.ticks_diff(current_time, last_tail_move) >= tail_delay:
tail = (tail + 1) % self.driver.num_leds
last_tail_move = current_time
last_tail_move = utime.ticks_add(last_tail_move, tail_delay)
tail_rate = max(1, int(preset.n3))
tail_delay = 1000 // tail_rate
# Check if we've reached min length
current_length = (head - tail) % self.driver.num_leds

View File

@@ -0,0 +1,56 @@
import utime
class ColourCycle:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
# Match rainbow behaviour: phase is 0..255 and maps to one full-strip shift.
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
# Position around the colour loop, shifted by phase.
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1))
if not preset.a:
self._render(colors, phase, preset.b)
self.driver.step = (phase + step_amount) % 256
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
yield

210
src/patterns/flame.py Normal file
View File

@@ -0,0 +1,210 @@
import random
import utime
# Default warm palette: ember → orange → yellow → pale hot (RGB)
_DEFAULT_PALETTE = (
(90, 8, 8),
(200, 40, 12),
(255, 120, 30),
(255, 220, 140),
)
def _clamp(x, lo, hi):
if x < lo:
return lo
if x > hi:
return hi
return x
def _lerp_chan(a, b, t):
return a + ((b - a) * t >> 8)
def _lerp_rgb(c0, c1, t):
return (
_lerp_chan(c0[0], c1[0], t),
_lerp_chan(c0[1], c1[1], t),
_lerp_chan(c0[2], c1[2], t),
)
def _palette_sample(palette, pos256):
n = len(palette)
if n == 0:
return (255, 160, 60)
if n == 1:
return palette[0]
span = (n - 1) * pos256
seg = span >> 8
if seg >= n - 1:
return palette[n - 1]
frac = span & 0xFF
return _lerp_rgb(palette[seg], palette[seg + 1], frac)
def _triangle_255(elapsed_ms, period_ms):
period_ms = max(period_ms, 400)
p = elapsed_ms % period_ms
half = period_ms >> 1
if half <= 0:
return 128
if p < half:
return (p * 255) // half
return ((period_ms - p) * 255) // (period_ms - half)
class Flame:
def __init__(self, driver):
self.driver = driver
def _build_palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_PALETTE)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
_clamp(int(c[0]), 0, 255),
_clamp(int(c[1]), 0, 255),
_clamp(int(c[2]), 0, 255),
)
)
return out if out else list(_DEFAULT_PALETTE)
def _draw_frame(self, preset, palette, ticks_now, breath_el_ms, rise, cluster_jit, breath_ms, lo, hi, spark_state):
"""spark_state: (active: bool, start_ticks, duration_ms). ticks_now for sparks; breath_el_ms for slow wave."""
num = self.driver.num_leds
denom = num - 1 if num > 1 else 1
breathe = _triangle_255(breath_el_ms, breath_ms)
base_level = lo + (((hi - lo) * breathe) >> 8)
micro = 232 + random.randint(0, 35)
level = (base_level * micro) >> 8
level = _clamp(level, lo, hi)
spark_boost = 0
spark_white = (0, 0, 0)
active, s0, dur = spark_state
if active and dur > 0:
el = utime.ticks_diff(ticks_now, s0)
if el < 0:
el = 0
if el >= dur:
spark_boost = 0
else:
env = 255 - ((el * 255) // dur)
spark_boost = (env * 90) >> 8
spark_white = ((env * 55) >> 8, (env * 50) >> 8, (env * 40) >> 8)
for i in range(num):
h = (i * 256) // denom
flow = (h + rise + ((i // max(1, num >> 3)) * 17)) & 255
pos = (flow + cluster_jit[(i >> 2) & 7]) & 255
rgb = _palette_sample(palette, pos)
if spark_boost:
rgb = (
_clamp(rgb[0] + spark_white[0] + (spark_boost * 3 >> 2), 0, 255),
_clamp(rgb[1] + spark_white[1] + (spark_boost >> 1), 0, 255),
_clamp(rgb[2] + spark_white[2] + (spark_boost >> 2), 0, 255),
)
self.driver.n[i] = self.driver.apply_brightness(rgb, level)
self.driver.n.write()
def run(self, preset):
"""Salt-lamp / hearth-style flame: warm gradient, breathing, jitter, drift, rare sparks."""
palette = self._build_palette(preset)
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
bp = int(preset.n2)
breath_ms = max(800, bp if bp > 0 else 2500)
gap_lo = int(preset.n3)
gap_hi = int(preset.n4)
# n3 < 0 disables sparks; n3=n4=0 uses ~1030 s gaps (hearth pops).
if gap_lo < 0:
sparks_on = False
else:
sparks_on = True
if gap_lo == 0 and gap_hi == 0:
gap_lo, gap_hi = 10000, 30000
else:
gap_lo = max(gap_lo, 500)
if gap_hi < gap_lo:
gap_hi = gap_lo
delay_ms = max(16, int(preset.d))
rise = random.randint(0, 255)
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
last_draw = utime.ticks_ms()
breath_origin = last_draw
last_cluster = last_draw
spark_active = False
spark_start = 0
spark_dur = 0
next_spark = utime.ticks_add(last_draw, random.randint(gap_lo, gap_hi)) if sparks_on else 0
if not preset.a:
now = utime.ticks_ms()
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(False, 0, 0),
)
yield
return
while True:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_draw) < delay_ms:
yield
continue
last_draw = utime.ticks_add(last_draw, delay_ms)
rise = (rise + random.randint(-10, 12)) & 255
if utime.ticks_diff(now, last_cluster) >= (delay_ms * 4):
last_cluster = now
cluster_jit = [random.randint(-18, 18) for _ in range(8)]
spark_state = (spark_active, spark_start, spark_dur)
if sparks_on:
if spark_active:
if utime.ticks_diff(now, spark_start) >= spark_dur:
spark_active = False
next_spark = utime.ticks_add(
now,
random.randint(gap_lo, gap_hi),
)
elif utime.ticks_diff(now, next_spark) >= 0:
spark_active = True
spark_start = now
spark_dur = random.randint(180, 360)
self._draw_frame(
preset,
palette,
now,
utime.ticks_diff(now, breath_origin),
rise,
cluster_jit,
breath_ms,
lo,
hi,
(spark_active, spark_start, spark_dur),
)
yield

40
src/patterns/flicker.py Normal file
View File

@@ -0,0 +1,40 @@
import random
import utime
class Flicker:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Random brightness between n1 (min) and b (max); delay d ms between updates."""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
last_update = utime.ticks_ms()
def brightness_bounds():
lo = max(0, min(255, int(preset.n1)))
hi = max(0, min(255, int(preset.b)))
if lo > hi:
lo, hi = hi, lo
return lo, hi
if not preset.a:
lo, hi = brightness_bounds()
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
yield
return
while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
lo, hi = brightness_bounds()
if utime.ticks_diff(current_time, last_update) >= delay_ms:
level = random.randint(lo, hi)
base = colors[color_index % len(colors)]
self.driver.fill(self.driver.apply_brightness(base, level))
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
yield

136
src/patterns/radiate.py Normal file
View File

@@ -0,0 +1,136 @@
import utime
_RADIATE_DBG_INTERVAL_MS = 1000
class Radiate:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Radiate from nodes every n1 LEDs, retriggering every delay (d).
- n1: node spacing in LEDs
- n2: outbound travel time in ms
- n3: return travel time in ms
- d: retrigger interval in ms
"""
colors = preset.c if preset.c else [(255, 255, 255)]
base_on = colors[0]
base_off = colors[1] if len(colors) > 1 else (0, 0, 0)
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
now = utime.ticks_ms()
last_trigger = now
active_pulses = [now]
last_dbg = now
dbg_banner = False
if not preset.a:
# Single-step render uses only the first instant pulse.
active_pulses = [utime.ticks_ms()]
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
# Keep one pulse train at a time; replacing instead of appending
# prevents overlap from keeping color[0] continuously visible.
active_pulses = [now]
last_trigger = utime.ticks_add(last_trigger, delay_ms)
if bool(getattr(self.driver, "debug", False)):
print(
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
% (spacing, outward_ms, return_ms, delay_ms)
)
# Drop pulses once their out-and-back lifetime ends.
pulse_lifetime = outward_ms + return_ms
kept = []
for start in active_pulses:
age = utime.ticks_diff(now, start)
if age < pulse_lifetime:
kept.append(start)
active_pulses = kept
debug_front = -1
lit_count = 0
for i in range(self.driver.num_leds):
# Nearest node distance for a repeating node grid every `spacing` LEDs.
offset = i % spacing
dist = min(offset, spacing - offset)
lit = False
for start in active_pulses:
age = utime.ticks_diff(now, start)
# Do not render on the exact trigger tick; this avoids
# node LEDs appearing "stuck on" between cycles.
if age <= 0:
continue
if age <= outward_ms:
# Integer-ceiling progression so peak can be reached even
# when tick timing skips the exact outward_ms boundary.
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms:
back_age = age - outward_ms
remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else:
continue
if dist <= front:
lit = True
if front > debug_front:
debug_front = front
break
self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write()
if bool(getattr(self.driver, "debug", False)):
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.d,
self.driver.num_leds,
)
)
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
print(
"[radiate] age=%d front=%d max=%d active=%d lit=%d"
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count)
)
if lit_count == 0:
print("[radiate] fully off")
last_dbg = now
if not preset.a:
yield
return
yield

View File

@@ -46,6 +46,6 @@ class Rainbow:
self.driver.n.write()
step = (step + step_amount) % 256
self.driver.step = step
last_update = current_time
last_update = utime.ticks_add(last_update, sleep_ms)
# Yield once per tick so other logic can run
yield

227
src/patterns/twinkle.py Normal file
View File

@@ -0,0 +1,227 @@
import random
import utime
# Default cool palette (icy blues, violet, mint) when preset has no colours.
# When `driver.debug` is True, print stats every N twinkle ticks (serial can be slow).
_TWINKLE_DBG_INTERVAL = 40
_DEFAULT_COOL = (
(120, 200, 255),
(80, 140, 255),
(180, 120, 255),
(100, 220, 240),
(160, 200, 255),
(90, 180, 220),
)
class Twinkle:
def __init__(self, driver):
self.driver = driver
def _palette(self, preset):
colors = preset.c
if not colors:
return list(_DEFAULT_COOL)
out = []
for c in colors:
if isinstance(c, (list, tuple)) and len(c) == 3:
out.append(
(
max(0, min(255, int(c[0]))),
max(0, min(255, int(c[1]))),
max(0, min(255, int(c[2]))),
)
)
return out if out else list(_DEFAULT_COOL)
def run(self, preset):
"""Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs."""
palette = self._palette(preset)
num = self.driver.num_leds
if num <= 0:
while True:
yield
return
def activity_rate():
r = int(preset.n1)
if r <= 0:
r = 48
return max(1, min(255, r))
def density255():
"""Higher → more LEDs lit on average when a twinkle step fires (0 = default mid)."""
d = int(preset.n2)
if d <= 0:
d = 128
return max(0, min(255, d))
def cluster_len_bounds():
"""n3 = min adjacent LEDs per twinkle, n4 = max (both 0 → 1..4)."""
lo = int(preset.n3)
hi = int(preset.n4)
if lo <= 0 and hi <= 0:
lo, hi = 1, min(4, num)
else:
if lo <= 0:
lo = 1
if hi <= 0:
hi = lo
if hi < lo:
lo, hi = hi, lo
lo = max(1, min(lo, num))
hi = max(lo, min(hi, num))
return lo, hi
def random_cluster_len():
lo, hi = cluster_len_bounds()
# When min and max match, every lit/dim run is exactly that many LEDs (still capped by strip length).
if lo == hi:
return lo
return random.randint(lo, hi)
def cluster_base_index(start, k):
"""Shift run left so a length-k segment fits; keeps full k when num >= k."""
k = min(max(0, int(k)), num)
if k <= 0:
return 0
return max(0, min(int(start), num - k))
dens = density255()
on = [random.randint(0, 255) < dens for _ in range(num)]
colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)]
last_update = utime.ticks_ms()
dbg_tick = 0
dbg_banner = False
def on_run_min_max(bits):
"""Smallest and largest contiguous run of True in bits (0,0 if all off)."""
best_min = num + 1
best_max = 0
cur = 0
for j in range(num):
if bits[j]:
cur += 1
else:
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
cur = 0
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
if best_min == num + 1:
return 0, 0
return best_min, best_max
if not preset.a:
for i in range(num):
if on[i]:
base = palette[colour_i[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = (0, 0, 0)
self.driver.n.write()
yield
return
while True:
now = utime.ticks_ms()
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(now, last_update) >= delay_ms:
rate = activity_rate()
dens = density255()
dbg = bool(getattr(self.driver, "debug", False))
dbg_tick += 1
# Snapshot for decisions; apply all darks then all lights so
# overlaps in the same tick favour lit runs (lights win).
prev_on = on[:]
prev_ci = colour_i[:]
next_on = list(prev_on)
next_ci = list(prev_ci)
dbg_ops = {"L": 0, "D": 0}
light_i = []
dark_i = []
for i in range(num):
if random.randint(0, 255) < rate:
r = random.randint(0, 255)
if not prev_on[i]:
if r < dens:
light_i.append(i)
else:
if r < (255 - dens):
dark_i.append(i)
def light_adjacent(start):
dbg_ops["L"] += 1
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = True
next_ci[idx] = random.randint(0, len(palette) - 1)
def dark_adjacent(start):
dbg_ops["D"] += 1
k = random_cluster_len()
b = cluster_base_index(start, k)
for dj in range(k):
idx = b + dj
next_on[idx] = False
for i in dark_i:
dark_adjacent(i)
for i in light_i:
light_adjacent(i)
for i in range(num):
if next_on[i]:
base = palette[next_ci[i] % len(palette)]
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
else:
self.driver.n[i] = (0, 0, 0)
self.driver.n.write()
on = next_on
colour_i = next_ci
last_update = utime.ticks_add(last_update, delay_ms)
if dbg:
lo, hi = cluster_len_bounds()
if not dbg_banner:
dbg_banner = True
print(
"[twinkle] debug on: n1=%s n2=%s n3=%s n4=%s d=%s -> lo=%d hi=%d num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.n4,
preset.d,
lo,
hi,
num,
)
)
rmin, rmax = on_run_min_max(on)
bad = lo > 0 and rmin > 0 and rmin < lo and num >= lo
if bad or (dbg_tick % _TWINKLE_DBG_INTERVAL == 0):
print(
"[twinkle] tick=%d rate=%d dens=%d L=%d D=%d on_runs min=%d max=%d%s"
% (
dbg_tick,
rate,
dens,
dbg_ops["L"],
dbg_ops["D"],
rmin,
rmax,
" **run<lo**" if bad else "",
)
)
yield

View File

@@ -1 +0,0 @@
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}

View File

@@ -9,6 +9,8 @@ try:
except ImportError:
import os
MAX_PRESETS = 32
class Presets:
def __init__(self, pin, num_leds):
@@ -95,6 +97,9 @@ class Presets:
order = settings if settings is not None else "rgb"
self.presets = {}
for name, preset_data in data.items():
if len(self.presets) >= MAX_PRESETS:
print("Preset limit reached on load:", MAX_PRESETS)
break
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None:
preset_data[color_key] = convert_and_reorder_colors(
@@ -113,6 +118,9 @@ class Presets:
# Update existing preset
self.presets[name].edit(data)
else:
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
print("Preset limit reached:", MAX_PRESETS)
return False
# Create new preset
self.presets[name] = Preset(data)
return True
@@ -123,6 +131,12 @@ class Presets:
return True
return False
def delete_all(self):
self.presets = {}
self.generator = None
self.selected = None
return True
def tick(self):
if self.generator is None:
return
@@ -153,6 +167,9 @@ class Presets:
self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object
return True
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
return False
print("select failed: preset not found", preset_name)
# If preset doesn't exist or pattern not found, indicate failure
return False

25
tests/peers.py Normal file
View File

@@ -0,0 +1,25 @@
from espnow import ESPNow
import network
sta = network.WLAN(network.STA_IF)
sta.active(True)
espnow = ESPNow()
espnow.active(True)
# add_peer() expects a 6-byte MAC (bytes/bytearray), not integers.
# Unicast placeholders (not broadcast/multicast) so get_peers() lists them.
# PEERS = aa:aa:aa:aa:aa:START … aa:aa:aa:aa:aa:END (inclusive last octet).
_PREFIX = b"\xaa\xaa\xaa\xaa\xaa"
_START_LAST_OCTET = 1
_END_LAST_OCTET = 40
PEERS = tuple(_PREFIX + bytes((i,)) for i in range(_START_LAST_OCTET, _END_LAST_OCTET + 1))
for peer in PEERS:
espnow.add_peer(peer)
print("peers:", PEERS)
for peer in PEERS:
espnow.send(peer, b"Hello, world!")
print(espnow.get_peers())

41
tests/test_ap_pm0.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""MicroPython AP example with power management disabled (pm=0).
Run on device:
mpremote connect /dev/ttyACM0 run tests/test_ap_pm0.py
"""
import network
import time
AP_SSID = "led-ap"
AP_PASSWORD = "ledpass123"
AP_CHANNEL = 6
def main():
ap = network.WLAN(network.AP_IF)
ap.active(True)
# Explicitly disable Wi-Fi power save for AP mode.
try:
ap.config(pm=0)
except (AttributeError, ValueError, TypeError):
try:
ap.config(pm=network.WLAN.PM_NONE)
except (AttributeError, ValueError, TypeError):
pass
ap.config(essid=AP_SSID, password=AP_PASSWORD, channel=AP_CHANNEL, authmode=3)
print("[ap-pm0] AP active:", ap.active())
print("[ap-pm0] SSID:", AP_SSID)
print("[ap-pm0] IFCONFIG:", ap.ifconfig())
print("[ap-pm0] Waiting for clients. Ctrl+C to stop.")
while True:
time.sleep(2)
if __name__ == "__main__":
main()