Compare commits
3 Commits
a22702df4d
...
preset
| Author | SHA1 | Date | |
|---|---|---|---|
| 4575ef16ad | |||
| a342187635 | |||
| 428ed8b884 |
1
presets.json
Normal file
1
presets.json
Normal file
@@ -0,0 +1 @@
|
||||
{"15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "40": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 0]], "b": 255, "n2": 2600, "n1": 35, "p": "flame", "n3": 0, "d": 50}, "41": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[120, 200, 255], [80, 140, 255], [180, 120, 255], [100, 220, 232], [160, 200, 255]], "b": 255, "n2": 10, "n1": 72, "p": "twinkle", "n3": 5, "d": 500}, "42": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[166, 0, 255], [0, 10, 10]], "b": 255, "n2": 900, "n1": 30, "p": "radiate", "n3": 4000, "d": 5000}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "38": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 1, "p": "colour_cycle", "n3": 0, "d": 100}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}, "39": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 184, 77]], "b": 255, "n2": 0, "n1": 30, "p": "flicker", "n3": 0, "d": 80}, "14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 255], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 5000}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}}
|
||||
@@ -24,6 +24,8 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
apply_brightness(data, settings, presets)
|
||||
if "presets" in data:
|
||||
apply_presets(data, settings, presets)
|
||||
if "clear_presets" in data:
|
||||
apply_clear_presets(data, presets)
|
||||
if "select" in data:
|
||||
apply_select(data, settings, presets)
|
||||
if "default" in data:
|
||||
@@ -32,6 +34,8 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
apply_patterns_ota(data, presets, controller_ip=controller_ip)
|
||||
if "save" in data and ("presets" in data or "default" in data):
|
||||
presets.save()
|
||||
if "save" in data and "clear_presets" in data:
|
||||
presets.save()
|
||||
|
||||
|
||||
def apply_brightness(data, settings, presets):
|
||||
@@ -70,6 +74,22 @@ def apply_select(data, settings, presets):
|
||||
presets.select(preset_name, step=step)
|
||||
|
||||
|
||||
def apply_clear_presets(data, presets):
|
||||
clear_value = data.get("clear_presets")
|
||||
if isinstance(clear_value, bool):
|
||||
should_clear = clear_value
|
||||
elif isinstance(clear_value, int):
|
||||
should_clear = bool(clear_value)
|
||||
elif isinstance(clear_value, str):
|
||||
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
|
||||
else:
|
||||
should_clear = False
|
||||
if not should_clear:
|
||||
return
|
||||
presets.delete_all()
|
||||
print("Cleared all presets.")
|
||||
|
||||
|
||||
def apply_default(data, settings, presets):
|
||||
targets = data.get("targets") or []
|
||||
default_name = data["default"]
|
||||
|
||||
34
src/main.py
34
src/main.py
@@ -1,9 +1,10 @@
|
||||
from settings import Settings
|
||||
from machine import WDT
|
||||
import machine
|
||||
import network
|
||||
import utime
|
||||
import asyncio
|
||||
import json
|
||||
import gc
|
||||
from microdot import Microdot
|
||||
from microdot.websocket import WebSocketError, with_websocket
|
||||
from presets import Presets
|
||||
@@ -14,12 +15,25 @@ try:
|
||||
except ImportError:
|
||||
import os
|
||||
|
||||
machine.freq(160000000)
|
||||
|
||||
|
||||
settings = Settings()
|
||||
print(settings)
|
||||
|
||||
wdt = machine.WDT(timeout=10000)
|
||||
wdt.feed()
|
||||
|
||||
gc.collect()
|
||||
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
|
||||
presets = Presets(settings["led_pin"], settings["num_leds"])
|
||||
presets.load(settings)
|
||||
presets.b = settings.get("brightness", 255)
|
||||
presets.debug = bool(settings.get("debug", False))
|
||||
gc.collect()
|
||||
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
|
||||
default_preset = settings.get("default", "")
|
||||
if default_preset and default_preset in presets.presets:
|
||||
if presets.select(default_preset):
|
||||
@@ -27,10 +41,15 @@ if default_preset and default_preset in presets.presets:
|
||||
else:
|
||||
print("Startup preset failed (invalid pattern?):", default_preset)
|
||||
|
||||
wdt = WDT(timeout=10000)
|
||||
wdt.feed()
|
||||
|
||||
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
|
||||
# Reset both interfaces and collect before bringing STA up.
|
||||
ap_if = network.WLAN(network.AP_IF)
|
||||
ap_if.active(False)
|
||||
sta_if = network.WLAN(network.STA_IF)
|
||||
if sta_if.active():
|
||||
sta_if.active(False)
|
||||
utime.sleep_ms(100)
|
||||
gc.collect()
|
||||
sta_if.active(True)
|
||||
sta_if.config(pm=network.WLAN.PM_NONE)
|
||||
sta_if.connect(settings["ssid"], settings["password"])
|
||||
@@ -149,9 +168,16 @@ async def upload_pattern(request):
|
||||
|
||||
|
||||
async def presets_loop():
|
||||
last_mem_log = utime.ticks_ms()
|
||||
while True:
|
||||
presets.tick()
|
||||
wdt.feed()
|
||||
if bool(getattr(presets, "debug", False)):
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last_mem_log) >= 5000:
|
||||
gc.collect()
|
||||
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
last_mem_log = now
|
||||
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
|
||||
await asyncio.sleep(0)
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import utime
|
||||
|
||||
_RADIATE_DBG_INTERVAL_MS = 1000
|
||||
|
||||
|
||||
class Radiate:
|
||||
def __init__(self, driver):
|
||||
@@ -28,6 +30,8 @@ class Radiate:
|
||||
now = utime.ticks_ms()
|
||||
last_trigger = now
|
||||
active_pulses = [now]
|
||||
last_dbg = now
|
||||
dbg_banner = False
|
||||
|
||||
if not preset.a:
|
||||
# Single-step render uses only the first instant pulse.
|
||||
@@ -44,17 +48,26 @@ class Radiate:
|
||||
off_color = self.driver.apply_brightness(base_off, preset.b)
|
||||
|
||||
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
|
||||
active_pulses.append(now)
|
||||
# Keep one pulse train at a time; replacing instead of appending
|
||||
# prevents overlap from keeping color[0] continuously visible.
|
||||
active_pulses = [now]
|
||||
last_trigger = utime.ticks_add(last_trigger, delay_ms)
|
||||
if bool(getattr(self.driver, "debug", False)):
|
||||
print(
|
||||
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
|
||||
% (spacing, outward_ms, return_ms, delay_ms)
|
||||
)
|
||||
|
||||
# Drop pulses once their out-and-back lifetime ends.
|
||||
pulse_lifetime = outward_ms + return_ms
|
||||
kept = []
|
||||
for start in active_pulses:
|
||||
age = utime.ticks_diff(now, start)
|
||||
if age <= pulse_lifetime:
|
||||
if age < pulse_lifetime:
|
||||
kept.append(start)
|
||||
active_pulses = kept
|
||||
debug_front = -1
|
||||
lit_count = 0
|
||||
|
||||
for i in range(self.driver.num_leds):
|
||||
# Nearest node distance for a repeating node grid every `spacing` LEDs.
|
||||
@@ -64,24 +77,58 @@ class Radiate:
|
||||
lit = False
|
||||
for start in active_pulses:
|
||||
age = utime.ticks_diff(now, start)
|
||||
if age < 0:
|
||||
# Do not render on the exact trigger tick; this avoids
|
||||
# node LEDs appearing "stuck on" between cycles.
|
||||
if age <= 0:
|
||||
continue
|
||||
if age <= outward_ms:
|
||||
front = (age * max_dist) / outward_ms
|
||||
# Integer-ceiling progression so peak can be reached even
|
||||
# when tick timing skips the exact outward_ms boundary.
|
||||
front = (age * max_dist + outward_ms - 1) // outward_ms
|
||||
elif age <= outward_ms + return_ms:
|
||||
back_age = age - outward_ms
|
||||
front = ((return_ms - back_age) * max_dist) / return_ms
|
||||
remaining = return_ms - back_age
|
||||
front = (remaining * max_dist + return_ms - 1) // return_ms
|
||||
else:
|
||||
continue
|
||||
|
||||
if dist <= front:
|
||||
lit = True
|
||||
if front > debug_front:
|
||||
debug_front = front
|
||||
break
|
||||
|
||||
self.driver.n[i] = lit_color if lit else off_color
|
||||
if lit:
|
||||
lit_count += 1
|
||||
|
||||
self.driver.n.write()
|
||||
|
||||
if bool(getattr(self.driver, "debug", False)):
|
||||
if not dbg_banner:
|
||||
dbg_banner = True
|
||||
print(
|
||||
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d"
|
||||
% (
|
||||
preset.n1,
|
||||
preset.n2,
|
||||
preset.n3,
|
||||
preset.d,
|
||||
self.driver.num_leds,
|
||||
)
|
||||
)
|
||||
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
|
||||
pulse_age = -1
|
||||
if active_pulses:
|
||||
pulse_age = utime.ticks_diff(now, active_pulses[0])
|
||||
print(
|
||||
"[radiate] age=%d front=%d max=%d active=%d lit=%d"
|
||||
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count)
|
||||
)
|
||||
if lit_count == 0:
|
||||
print("[radiate] fully off")
|
||||
last_dbg = now
|
||||
|
||||
if not preset.a:
|
||||
yield
|
||||
return
|
||||
|
||||
227
src/patterns/twinkle.py
Normal file
227
src/patterns/twinkle.py
Normal file
@@ -0,0 +1,227 @@
|
||||
import random
|
||||
import utime
|
||||
|
||||
# Default cool palette (icy blues, violet, mint) when preset has no colours.
|
||||
# When `driver.debug` is True, print stats every N twinkle ticks (serial can be slow).
|
||||
_TWINKLE_DBG_INTERVAL = 40
|
||||
|
||||
_DEFAULT_COOL = (
|
||||
(120, 200, 255),
|
||||
(80, 140, 255),
|
||||
(180, 120, 255),
|
||||
(100, 220, 240),
|
||||
(160, 200, 255),
|
||||
(90, 180, 220),
|
||||
)
|
||||
|
||||
|
||||
class Twinkle:
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
|
||||
def _palette(self, preset):
|
||||
colors = preset.c
|
||||
if not colors:
|
||||
return list(_DEFAULT_COOL)
|
||||
out = []
|
||||
for c in colors:
|
||||
if isinstance(c, (list, tuple)) and len(c) == 3:
|
||||
out.append(
|
||||
(
|
||||
max(0, min(255, int(c[0]))),
|
||||
max(0, min(255, int(c[1]))),
|
||||
max(0, min(255, int(c[2]))),
|
||||
)
|
||||
)
|
||||
return out if out else list(_DEFAULT_COOL)
|
||||
|
||||
def run(self, preset):
|
||||
"""Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs."""
|
||||
palette = self._palette(preset)
|
||||
num = self.driver.num_leds
|
||||
if num <= 0:
|
||||
while True:
|
||||
yield
|
||||
return
|
||||
|
||||
def activity_rate():
|
||||
r = int(preset.n1)
|
||||
if r <= 0:
|
||||
r = 48
|
||||
return max(1, min(255, r))
|
||||
|
||||
def density255():
|
||||
"""Higher → more LEDs lit on average when a twinkle step fires (0 = default mid)."""
|
||||
d = int(preset.n2)
|
||||
if d <= 0:
|
||||
d = 128
|
||||
return max(0, min(255, d))
|
||||
|
||||
def cluster_len_bounds():
|
||||
"""n3 = min adjacent LEDs per twinkle, n4 = max (both 0 → 1..4)."""
|
||||
lo = int(preset.n3)
|
||||
hi = int(preset.n4)
|
||||
if lo <= 0 and hi <= 0:
|
||||
lo, hi = 1, min(4, num)
|
||||
else:
|
||||
if lo <= 0:
|
||||
lo = 1
|
||||
if hi <= 0:
|
||||
hi = lo
|
||||
if hi < lo:
|
||||
lo, hi = hi, lo
|
||||
lo = max(1, min(lo, num))
|
||||
hi = max(lo, min(hi, num))
|
||||
return lo, hi
|
||||
|
||||
def random_cluster_len():
|
||||
lo, hi = cluster_len_bounds()
|
||||
# When min and max match, every lit/dim run is exactly that many LEDs (still capped by strip length).
|
||||
if lo == hi:
|
||||
return lo
|
||||
return random.randint(lo, hi)
|
||||
|
||||
def cluster_base_index(start, k):
|
||||
"""Shift run left so a length-k segment fits; keeps full k when num >= k."""
|
||||
k = min(max(0, int(k)), num)
|
||||
if k <= 0:
|
||||
return 0
|
||||
return max(0, min(int(start), num - k))
|
||||
|
||||
dens = density255()
|
||||
on = [random.randint(0, 255) < dens for _ in range(num)]
|
||||
colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)]
|
||||
last_update = utime.ticks_ms()
|
||||
dbg_tick = 0
|
||||
dbg_banner = False
|
||||
|
||||
def on_run_min_max(bits):
|
||||
"""Smallest and largest contiguous run of True in bits (0,0 if all off)."""
|
||||
best_min = num + 1
|
||||
best_max = 0
|
||||
cur = 0
|
||||
for j in range(num):
|
||||
if bits[j]:
|
||||
cur += 1
|
||||
else:
|
||||
if cur:
|
||||
if cur < best_min:
|
||||
best_min = cur
|
||||
if cur > best_max:
|
||||
best_max = cur
|
||||
cur = 0
|
||||
if cur:
|
||||
if cur < best_min:
|
||||
best_min = cur
|
||||
if cur > best_max:
|
||||
best_max = cur
|
||||
if best_min == num + 1:
|
||||
return 0, 0
|
||||
return best_min, best_max
|
||||
|
||||
if not preset.a:
|
||||
for i in range(num):
|
||||
if on[i]:
|
||||
base = palette[colour_i[i] % len(palette)]
|
||||
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
|
||||
else:
|
||||
self.driver.n[i] = (0, 0, 0)
|
||||
self.driver.n.write()
|
||||
yield
|
||||
return
|
||||
|
||||
while True:
|
||||
now = utime.ticks_ms()
|
||||
delay_ms = max(1, int(preset.d))
|
||||
if utime.ticks_diff(now, last_update) >= delay_ms:
|
||||
rate = activity_rate()
|
||||
dens = density255()
|
||||
dbg = bool(getattr(self.driver, "debug", False))
|
||||
dbg_tick += 1
|
||||
# Snapshot for decisions; apply all darks then all lights so
|
||||
# overlaps in the same tick favour lit runs (lights win).
|
||||
prev_on = on[:]
|
||||
prev_ci = colour_i[:]
|
||||
next_on = list(prev_on)
|
||||
next_ci = list(prev_ci)
|
||||
dbg_ops = {"L": 0, "D": 0}
|
||||
|
||||
light_i = []
|
||||
dark_i = []
|
||||
for i in range(num):
|
||||
if random.randint(0, 255) < rate:
|
||||
r = random.randint(0, 255)
|
||||
if not prev_on[i]:
|
||||
if r < dens:
|
||||
light_i.append(i)
|
||||
else:
|
||||
if r < (255 - dens):
|
||||
dark_i.append(i)
|
||||
|
||||
def light_adjacent(start):
|
||||
dbg_ops["L"] += 1
|
||||
k = random_cluster_len()
|
||||
b = cluster_base_index(start, k)
|
||||
for dj in range(k):
|
||||
idx = b + dj
|
||||
next_on[idx] = True
|
||||
next_ci[idx] = random.randint(0, len(palette) - 1)
|
||||
|
||||
def dark_adjacent(start):
|
||||
dbg_ops["D"] += 1
|
||||
k = random_cluster_len()
|
||||
b = cluster_base_index(start, k)
|
||||
for dj in range(k):
|
||||
idx = b + dj
|
||||
next_on[idx] = False
|
||||
|
||||
for i in dark_i:
|
||||
dark_adjacent(i)
|
||||
for i in light_i:
|
||||
light_adjacent(i)
|
||||
|
||||
for i in range(num):
|
||||
if next_on[i]:
|
||||
base = palette[next_ci[i] % len(palette)]
|
||||
self.driver.n[i] = self.driver.apply_brightness(base, preset.b)
|
||||
else:
|
||||
self.driver.n[i] = (0, 0, 0)
|
||||
self.driver.n.write()
|
||||
on = next_on
|
||||
colour_i = next_ci
|
||||
last_update = utime.ticks_add(last_update, delay_ms)
|
||||
|
||||
if dbg:
|
||||
lo, hi = cluster_len_bounds()
|
||||
if not dbg_banner:
|
||||
dbg_banner = True
|
||||
print(
|
||||
"[twinkle] debug on: n1=%s n2=%s n3=%s n4=%s d=%s -> lo=%d hi=%d num=%d"
|
||||
% (
|
||||
preset.n1,
|
||||
preset.n2,
|
||||
preset.n3,
|
||||
preset.n4,
|
||||
preset.d,
|
||||
lo,
|
||||
hi,
|
||||
num,
|
||||
)
|
||||
)
|
||||
rmin, rmax = on_run_min_max(on)
|
||||
bad = lo > 0 and rmin > 0 and rmin < lo and num >= lo
|
||||
if bad or (dbg_tick % _TWINKLE_DBG_INTERVAL == 0):
|
||||
print(
|
||||
"[twinkle] tick=%d rate=%d dens=%d L=%d D=%d on_runs min=%d max=%d%s"
|
||||
% (
|
||||
dbg_tick,
|
||||
rate,
|
||||
dens,
|
||||
dbg_ops["L"],
|
||||
dbg_ops["D"],
|
||||
rmin,
|
||||
rmax,
|
||||
" **run<lo**" if bad else "",
|
||||
)
|
||||
)
|
||||
yield
|
||||
@@ -1 +0,0 @@
|
||||
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}
|
||||
@@ -9,6 +9,8 @@ try:
|
||||
except ImportError:
|
||||
import os
|
||||
|
||||
MAX_PRESETS = 32
|
||||
|
||||
|
||||
class Presets:
|
||||
def __init__(self, pin, num_leds):
|
||||
@@ -95,6 +97,9 @@ class Presets:
|
||||
order = settings if settings is not None else "rgb"
|
||||
self.presets = {}
|
||||
for name, preset_data in data.items():
|
||||
if len(self.presets) >= MAX_PRESETS:
|
||||
print("Preset limit reached on load:", MAX_PRESETS)
|
||||
break
|
||||
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
|
||||
if color_key is not None:
|
||||
preset_data[color_key] = convert_and_reorder_colors(
|
||||
@@ -113,6 +118,9 @@ class Presets:
|
||||
# Update existing preset
|
||||
self.presets[name].edit(data)
|
||||
else:
|
||||
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
|
||||
print("Preset limit reached:", MAX_PRESETS)
|
||||
return False
|
||||
# Create new preset
|
||||
self.presets[name] = Preset(data)
|
||||
return True
|
||||
@@ -123,6 +131,12 @@ class Presets:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_all(self):
|
||||
self.presets = {}
|
||||
self.generator = None
|
||||
self.selected = None
|
||||
return True
|
||||
|
||||
def tick(self):
|
||||
if self.generator is None:
|
||||
return
|
||||
@@ -153,6 +167,9 @@ class Presets:
|
||||
self.generator = self.patterns[preset.p](preset)
|
||||
self.selected = preset_name # Store the preset name, not the object
|
||||
return True
|
||||
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
|
||||
return False
|
||||
print("select failed: preset not found", preset_name)
|
||||
# If preset doesn't exist or pattern not found, indicate failure
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
"""Minimal Presets-like stub for pattern smoke tests (no machine / NeoPixel)."""
|
||||
|
||||
|
||||
class _FakeNeo:
|
||||
def __init__(self, count):
|
||||
self._count = count
|
||||
self.pixels = [(0, 0, 0)] * count
|
||||
|
||||
def fill(self, color):
|
||||
self.pixels = [tuple(color) for _ in range(self._count)]
|
||||
|
||||
def __setitem__(self, index, value):
|
||||
self.pixels[index] = tuple(value)
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self.pixels[index]
|
||||
|
||||
def write(self):
|
||||
pass
|
||||
|
||||
|
||||
class FakePresets:
|
||||
"""Subset of led-driver Presets API used by patterns/*.py."""
|
||||
|
||||
def __init__(self, num_leds=24):
|
||||
self.num_leds = num_leds
|
||||
self.n = _FakeNeo(num_leds)
|
||||
self.b = 255
|
||||
self.step = 0
|
||||
|
||||
def apply_brightness(self, color, brightness_override=None):
|
||||
local = brightness_override if brightness_override is not None else 255
|
||||
effective_brightness = int(local * self.b / 255)
|
||||
return tuple(int(c * effective_brightness / 255) for c in color)
|
||||
|
||||
def fill(self, color=None):
|
||||
fill_color = color if color is not None else (0, 0, 0)
|
||||
for i in range(self.num_leds):
|
||||
self.n[i] = fill_color
|
||||
self.n.write()
|
||||
|
||||
def off(self):
|
||||
self.fill((0, 0, 0))
|
||||
@@ -1,174 +0,0 @@
|
||||
"""
|
||||
Smoke-test pattern generators with a fake driver (no hardware).
|
||||
|
||||
Run from repo root (CPython or MicroPython):
|
||||
|
||||
python3 led-driver/tests/pattern_smoke.py
|
||||
|
||||
Requires only the stdlib; loads ``preset.py`` and ``patterns/*.py`` from
|
||||
``led-driver/src`` via import paths (MicroPython: copy ``src`` tree to the
|
||||
device and run the same command if ``importlib.util`` is available, or set
|
||||
``PYTHONPATH`` to ``led-driver/src`` and use ``python3`` on the host).
|
||||
|
||||
Exit code 0 on success, 1 on any failure.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Host-only ``utime`` so patterns import on CPython.
|
||||
# -----------------------------------------------------------------------------
|
||||
_UTIME_MS = [0]
|
||||
|
||||
|
||||
def utime_advance(ms):
|
||||
_UTIME_MS[0] += int(ms)
|
||||
|
||||
|
||||
def _install_utime_shim():
|
||||
if "utime" in sys.modules:
|
||||
return
|
||||
import types
|
||||
|
||||
m = types.ModuleType("utime")
|
||||
|
||||
def ticks_ms():
|
||||
return _UTIME_MS[0]
|
||||
|
||||
def ticks_diff(a, b):
|
||||
return a - b
|
||||
|
||||
def ticks_add(a, b):
|
||||
return a + b
|
||||
|
||||
m.ticks_ms = ticks_ms
|
||||
m.ticks_diff = ticks_diff
|
||||
m.ticks_add = ticks_add
|
||||
sys.modules["utime"] = m
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Load ``preset`` and pattern modules from ``led-driver/src`` (no sys.path).
|
||||
# -----------------------------------------------------------------------------
|
||||
_SRC = Path(__file__).resolve().parent.parent / "src"
|
||||
_TESTS = Path(__file__).resolve().parent
|
||||
|
||||
|
||||
def _load_module(name, path):
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise RuntimeError("no spec for %s" % path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def _pattern_class_from_module(mod):
|
||||
for attr_name in dir(mod):
|
||||
attr = getattr(mod, attr_name)
|
||||
if isinstance(attr, type) and hasattr(attr, "run"):
|
||||
return attr
|
||||
return None
|
||||
|
||||
|
||||
def _load_preset_class():
|
||||
preset_mod = _load_module("preset", _SRC / "preset.py")
|
||||
return preset_mod.Preset
|
||||
|
||||
|
||||
def _list_pattern_basenames():
|
||||
pat_dir = _SRC / "patterns"
|
||||
out = []
|
||||
for p in sorted(pat_dir.iterdir()):
|
||||
if p.suffix != ".py":
|
||||
continue
|
||||
if p.name in ("__init__.py", "main.py"):
|
||||
continue
|
||||
out.append(p.stem)
|
||||
return out
|
||||
|
||||
|
||||
def _default_preset_dict(basename):
|
||||
"""Reasonable defaults so each pattern's ``run()`` can start."""
|
||||
base = {
|
||||
"p": basename,
|
||||
"d": 100,
|
||||
"b": 200,
|
||||
"a": True,
|
||||
"n1": 5,
|
||||
"n2": 8,
|
||||
"n3": 5,
|
||||
"n4": 4,
|
||||
}
|
||||
if basename in ("rainbow", "colour_cycle"):
|
||||
base["c"] = []
|
||||
base["n1"] = 2
|
||||
elif basename == "transition":
|
||||
base["c"] = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
|
||||
base["d"] = 200
|
||||
elif basename in ("chase", "circle"):
|
||||
base["c"] = [(255, 0, 0), (0, 0, 255)]
|
||||
elif basename == "pulse":
|
||||
base["c"] = [(0, 200, 100)]
|
||||
base["n1"] = 50
|
||||
base["n2"] = 40
|
||||
base["n3"] = 50
|
||||
base["d"] = 30
|
||||
elif basename in ("blink", "flicker"):
|
||||
base["c"] = [(255, 100, 0)]
|
||||
base["d"] = 80
|
||||
elif basename == "flame":
|
||||
base["c"] = []
|
||||
base["d"] = 40
|
||||
base["n1"] = 40
|
||||
base["n2"] = 2000
|
||||
base["n3"] = -1
|
||||
base["n4"] = 0
|
||||
else:
|
||||
base["c"] = [(200, 200, 200)]
|
||||
return base
|
||||
|
||||
|
||||
def _run_pattern_ticks(Preset, driver, basename, steps, ms_per_tick):
|
||||
_install_utime_shim()
|
||||
mod = _load_module("patterns.%s" % basename, _SRC / "patterns" / (basename + ".py"))
|
||||
cls = _pattern_class_from_module(mod)
|
||||
if cls is None:
|
||||
raise RuntimeError("no pattern class in %s" % basename)
|
||||
|
||||
preset = Preset(_default_preset_dict(basename))
|
||||
gen = cls(driver).run(preset)
|
||||
for _ in range(steps):
|
||||
utime_advance(ms_per_tick)
|
||||
next(gen)
|
||||
|
||||
|
||||
def main():
|
||||
failures = []
|
||||
Preset = _load_preset_class()
|
||||
fake_mod = _load_module("fake_driver", _TESTS / "fake_driver.py")
|
||||
FakePresets = fake_mod.FakePresets
|
||||
|
||||
for basename in _list_pattern_basenames():
|
||||
d = FakePresets(16)
|
||||
try:
|
||||
_run_pattern_ticks(Preset, d, basename, steps=80, ms_per_tick=50)
|
||||
print("ok patterns.%s" % basename)
|
||||
except Exception as exc:
|
||||
print("FAIL patterns.%s: %r" % (basename, exc))
|
||||
failures.append((basename, exc))
|
||||
|
||||
if failures:
|
||||
print("%d pattern(s) failed" % len(failures))
|
||||
return 1
|
||||
print("all %d pattern smoke tests passed" % len(_list_pattern_basenames()))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(130)
|
||||
25
tests/peers.py
Normal file
25
tests/peers.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from espnow import ESPNow
|
||||
import network
|
||||
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
|
||||
espnow = ESPNow()
|
||||
espnow.active(True)
|
||||
|
||||
# add_peer() expects a 6-byte MAC (bytes/bytearray), not integers.
|
||||
# Unicast placeholders (not broadcast/multicast) so get_peers() lists them.
|
||||
# PEERS = aa:aa:aa:aa:aa:START … aa:aa:aa:aa:aa:END (inclusive last octet).
|
||||
_PREFIX = b"\xaa\xaa\xaa\xaa\xaa"
|
||||
_START_LAST_OCTET = 1
|
||||
_END_LAST_OCTET = 40
|
||||
PEERS = tuple(_PREFIX + bytes((i,)) for i in range(_START_LAST_OCTET, _END_LAST_OCTET + 1))
|
||||
for peer in PEERS:
|
||||
espnow.add_peer(peer)
|
||||
|
||||
print("peers:", PEERS)
|
||||
|
||||
for peer in PEERS:
|
||||
espnow.send(peer, b"Hello, world!")
|
||||
|
||||
print(espnow.get_peers())
|
||||
41
tests/test_ap_pm0.py
Normal file
41
tests/test_ap_pm0.py
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env python3
|
||||
"""MicroPython AP example with power management disabled (pm=0).
|
||||
|
||||
Run on device:
|
||||
mpremote connect /dev/ttyACM0 run tests/test_ap_pm0.py
|
||||
"""
|
||||
|
||||
import network
|
||||
import time
|
||||
|
||||
AP_SSID = "led-ap"
|
||||
AP_PASSWORD = "ledpass123"
|
||||
AP_CHANNEL = 6
|
||||
|
||||
|
||||
def main():
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
ap.active(True)
|
||||
|
||||
# Explicitly disable Wi-Fi power save for AP mode.
|
||||
try:
|
||||
ap.config(pm=0)
|
||||
except (AttributeError, ValueError, TypeError):
|
||||
try:
|
||||
ap.config(pm=network.WLAN.PM_NONE)
|
||||
except (AttributeError, ValueError, TypeError):
|
||||
pass
|
||||
|
||||
ap.config(essid=AP_SSID, password=AP_PASSWORD, channel=AP_CHANNEL, authmode=3)
|
||||
|
||||
print("[ap-pm0] AP active:", ap.active())
|
||||
print("[ap-pm0] SSID:", AP_SSID)
|
||||
print("[ap-pm0] IFCONFIG:", ap.ifconfig())
|
||||
print("[ap-pm0] Waiting for clients. Ctrl+C to stop.")
|
||||
|
||||
while True:
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user