feat(led-driver): add preset clear command and runtime debug
This commit is contained in:
@@ -24,6 +24,8 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
apply_brightness(data, settings, presets)
|
||||
if "presets" in data:
|
||||
apply_presets(data, settings, presets)
|
||||
if "clear_presets" in data:
|
||||
apply_clear_presets(data, presets)
|
||||
if "select" in data:
|
||||
apply_select(data, settings, presets)
|
||||
if "default" in data:
|
||||
@@ -32,6 +34,8 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
apply_patterns_ota(data, presets, controller_ip=controller_ip)
|
||||
if "save" in data and ("presets" in data or "default" in data):
|
||||
presets.save()
|
||||
if "save" in data and "clear_presets" in data:
|
||||
presets.save()
|
||||
|
||||
|
||||
def apply_brightness(data, settings, presets):
|
||||
@@ -70,6 +74,22 @@ def apply_select(data, settings, presets):
|
||||
presets.select(preset_name, step=step)
|
||||
|
||||
|
||||
def apply_clear_presets(data, presets):
|
||||
clear_value = data.get("clear_presets")
|
||||
if isinstance(clear_value, bool):
|
||||
should_clear = clear_value
|
||||
elif isinstance(clear_value, int):
|
||||
should_clear = bool(clear_value)
|
||||
elif isinstance(clear_value, str):
|
||||
should_clear = clear_value.lower() in ("true", "1", "yes", "on")
|
||||
else:
|
||||
should_clear = False
|
||||
if not should_clear:
|
||||
return
|
||||
presets.delete_all()
|
||||
print("Cleared all presets.")
|
||||
|
||||
|
||||
def apply_default(data, settings, presets):
|
||||
targets = data.get("targets") or []
|
||||
default_name = data["default"]
|
||||
|
||||
34
src/main.py
34
src/main.py
@@ -1,9 +1,10 @@
|
||||
from settings import Settings
|
||||
from machine import WDT
|
||||
import machine
|
||||
import network
|
||||
import utime
|
||||
import asyncio
|
||||
import json
|
||||
import gc
|
||||
from microdot import Microdot
|
||||
from microdot.websocket import WebSocketError, with_websocket
|
||||
from presets import Presets
|
||||
@@ -14,12 +15,25 @@ try:
|
||||
except ImportError:
|
||||
import os
|
||||
|
||||
machine.freq(160000000)
|
||||
|
||||
|
||||
settings = Settings()
|
||||
print(settings)
|
||||
|
||||
wdt = machine.WDT(timeout=10000)
|
||||
wdt.feed()
|
||||
|
||||
gc.collect()
|
||||
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
|
||||
presets = Presets(settings["led_pin"], settings["num_leds"])
|
||||
presets.load(settings)
|
||||
presets.b = settings.get("brightness", 255)
|
||||
presets.debug = bool(settings.get("debug", False))
|
||||
gc.collect()
|
||||
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
|
||||
default_preset = settings.get("default", "")
|
||||
if default_preset and default_preset in presets.presets:
|
||||
if presets.select(default_preset):
|
||||
@@ -27,10 +41,15 @@ if default_preset and default_preset in presets.presets:
|
||||
else:
|
||||
print("Startup preset failed (invalid pattern?):", default_preset)
|
||||
|
||||
wdt = WDT(timeout=10000)
|
||||
wdt.feed()
|
||||
|
||||
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
|
||||
# Reset both interfaces and collect before bringing STA up.
|
||||
ap_if = network.WLAN(network.AP_IF)
|
||||
ap_if.active(False)
|
||||
sta_if = network.WLAN(network.STA_IF)
|
||||
if sta_if.active():
|
||||
sta_if.active(False)
|
||||
utime.sleep_ms(100)
|
||||
gc.collect()
|
||||
sta_if.active(True)
|
||||
sta_if.config(pm=network.WLAN.PM_NONE)
|
||||
sta_if.connect(settings["ssid"], settings["password"])
|
||||
@@ -149,9 +168,16 @@ async def upload_pattern(request):
|
||||
|
||||
|
||||
async def presets_loop():
|
||||
last_mem_log = utime.ticks_ms()
|
||||
while True:
|
||||
presets.tick()
|
||||
wdt.feed()
|
||||
if bool(getattr(presets, "debug", False)):
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last_mem_log) >= 5000:
|
||||
gc.collect()
|
||||
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
|
||||
last_mem_log = now
|
||||
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
|
||||
await asyncio.sleep(0)
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import utime
|
||||
|
||||
_RADIATE_DBG_INTERVAL_MS = 1000
|
||||
|
||||
|
||||
class Radiate:
|
||||
def __init__(self, driver):
|
||||
@@ -28,6 +30,8 @@ class Radiate:
|
||||
now = utime.ticks_ms()
|
||||
last_trigger = now
|
||||
active_pulses = [now]
|
||||
last_dbg = now
|
||||
dbg_banner = False
|
||||
|
||||
if not preset.a:
|
||||
# Single-step render uses only the first instant pulse.
|
||||
@@ -44,17 +48,26 @@ class Radiate:
|
||||
off_color = self.driver.apply_brightness(base_off, preset.b)
|
||||
|
||||
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
|
||||
active_pulses.append(now)
|
||||
# Keep one pulse train at a time; replacing instead of appending
|
||||
# prevents overlap from keeping color[0] continuously visible.
|
||||
active_pulses = [now]
|
||||
last_trigger = utime.ticks_add(last_trigger, delay_ms)
|
||||
if bool(getattr(self.driver, "debug", False)):
|
||||
print(
|
||||
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
|
||||
% (spacing, outward_ms, return_ms, delay_ms)
|
||||
)
|
||||
|
||||
# Drop pulses once their out-and-back lifetime ends.
|
||||
pulse_lifetime = outward_ms + return_ms
|
||||
kept = []
|
||||
for start in active_pulses:
|
||||
age = utime.ticks_diff(now, start)
|
||||
if age <= pulse_lifetime:
|
||||
if age < pulse_lifetime:
|
||||
kept.append(start)
|
||||
active_pulses = kept
|
||||
debug_front = -1
|
||||
lit_count = 0
|
||||
|
||||
for i in range(self.driver.num_leds):
|
||||
# Nearest node distance for a repeating node grid every `spacing` LEDs.
|
||||
@@ -64,24 +77,58 @@ class Radiate:
|
||||
lit = False
|
||||
for start in active_pulses:
|
||||
age = utime.ticks_diff(now, start)
|
||||
if age < 0:
|
||||
# Do not render on the exact trigger tick; this avoids
|
||||
# node LEDs appearing "stuck on" between cycles.
|
||||
if age <= 0:
|
||||
continue
|
||||
if age <= outward_ms:
|
||||
front = (age * max_dist) / outward_ms
|
||||
# Integer-ceiling progression so peak can be reached even
|
||||
# when tick timing skips the exact outward_ms boundary.
|
||||
front = (age * max_dist + outward_ms - 1) // outward_ms
|
||||
elif age <= outward_ms + return_ms:
|
||||
back_age = age - outward_ms
|
||||
front = ((return_ms - back_age) * max_dist) / return_ms
|
||||
remaining = return_ms - back_age
|
||||
front = (remaining * max_dist + return_ms - 1) // return_ms
|
||||
else:
|
||||
continue
|
||||
|
||||
if dist <= front:
|
||||
lit = True
|
||||
if front > debug_front:
|
||||
debug_front = front
|
||||
break
|
||||
|
||||
self.driver.n[i] = lit_color if lit else off_color
|
||||
if lit:
|
||||
lit_count += 1
|
||||
|
||||
self.driver.n.write()
|
||||
|
||||
if bool(getattr(self.driver, "debug", False)):
|
||||
if not dbg_banner:
|
||||
dbg_banner = True
|
||||
print(
|
||||
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d"
|
||||
% (
|
||||
preset.n1,
|
||||
preset.n2,
|
||||
preset.n3,
|
||||
preset.d,
|
||||
self.driver.num_leds,
|
||||
)
|
||||
)
|
||||
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
|
||||
pulse_age = -1
|
||||
if active_pulses:
|
||||
pulse_age = utime.ticks_diff(now, active_pulses[0])
|
||||
print(
|
||||
"[radiate] age=%d front=%d max=%d active=%d lit=%d"
|
||||
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count)
|
||||
)
|
||||
if lit_count == 0:
|
||||
print("[radiate] fully off")
|
||||
last_dbg = now
|
||||
|
||||
if not preset.a:
|
||||
yield
|
||||
return
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
{"14": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 102, 0]], "b": 255, "n2": 1000, "n1": 2000, "p": "pulse", "n3": 2000, "d": 800}, "15": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 500}, "5": {"n5": 0, "n4": 1, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 0, 255]], "b": 255, "n2": 5, "n1": 5, "p": "chase", "n3": 1, "d": 200}, "4": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "transition", "n3": 0, "d": 500}, "7": {"n5": 0, "n4": 5, "a": true, "n6": 0, "c": [[255, 165, 0], [128, 0, 128]], "b": 255, "n2": 10, "n1": 2, "p": "circle", "n3": 2, "d": 200}, "11": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "12": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 0, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "6": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[0, 255, 0]], "b": 255, "n2": 500, "n1": 1000, "p": "pulse", "n3": 1000, "d": 500}, "3": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 2, "p": "rainbow", "n3": 0, "d": 100}, "2": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 0, "n2": 0, "n1": 0, "p": "off", "n3": 0, "d": 100}, "1": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "10": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[230, 242, 255]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "13": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 255, 255]], "b": 255, "n2": 0, "n1": 1, "p": "rainbow", "n3": 0, "d": 150}, "9": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 245, 230]], "b": 200, "n2": 0, "n1": 0, "p": "on", "n3": 0, "d": 100}, "8": {"n5": 0, "n4": 0, "a": true, "n6": 0, "c": [[255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 0]], "b": 255, "n2": 0, "n1": 0, "p": "blink", "n3": 0, "d": 1000}}
|
||||
@@ -9,6 +9,8 @@ try:
|
||||
except ImportError:
|
||||
import os
|
||||
|
||||
MAX_PRESETS = 32
|
||||
|
||||
|
||||
class Presets:
|
||||
def __init__(self, pin, num_leds):
|
||||
@@ -95,6 +97,9 @@ class Presets:
|
||||
order = settings if settings is not None else "rgb"
|
||||
self.presets = {}
|
||||
for name, preset_data in data.items():
|
||||
if len(self.presets) >= MAX_PRESETS:
|
||||
print("Preset limit reached on load:", MAX_PRESETS)
|
||||
break
|
||||
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
|
||||
if color_key is not None:
|
||||
preset_data[color_key] = convert_and_reorder_colors(
|
||||
@@ -113,6 +118,9 @@ class Presets:
|
||||
# Update existing preset
|
||||
self.presets[name].edit(data)
|
||||
else:
|
||||
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
|
||||
print("Preset limit reached:", MAX_PRESETS)
|
||||
return False
|
||||
# Create new preset
|
||||
self.presets[name] = Preset(data)
|
||||
return True
|
||||
@@ -123,6 +131,12 @@ class Presets:
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_all(self):
|
||||
self.presets = {}
|
||||
self.generator = None
|
||||
self.selected = None
|
||||
return True
|
||||
|
||||
def tick(self):
|
||||
if self.generator is None:
|
||||
return
|
||||
@@ -153,6 +167,9 @@ class Presets:
|
||||
self.generator = self.patterns[preset.p](preset)
|
||||
self.selected = preset_name # Store the preset name, not the object
|
||||
return True
|
||||
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
|
||||
return False
|
||||
print("select failed: preset not found", preset_name)
|
||||
# If preset doesn't exist or pattern not found, indicate failure
|
||||
return False
|
||||
|
||||
|
||||
Reference in New Issue
Block a user