feat(patterns): driver_patterns helper, on/off ota guard, drop duplicate py tree

Made-with: Cursor
This commit is contained in:
pi
2026-04-12 00:13:56 +12:00
parent 28b19b5219
commit 7bdb324ebc
10 changed files with 114 additions and 453 deletions

View File

@@ -1,6 +0,0 @@
from .blink import Blink
from .rainbow import Rainbow
from .pulse import Pulse
from .transition import Transition
from .chase import Chase
from .circle import Circle

View File

@@ -1,33 +0,0 @@
import utime
class Blink:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Blink pattern: toggles LEDs on/off using preset delay, cycling through colors."""
# Use provided colors, or default to white if none
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
state = True # True = on, False = off
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
# Re-read delay each loop so live updates to preset.d take effect
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms:
if state:
base_color = colors[color_index % len(colors)]
color = self.driver.apply_brightness(base_color, preset.b)
self.driver.fill(color)
# Advance to next color for the next "on" phase
color_index += 1
else:
# "Off" phase: turn all LEDs off
self.driver.fill((0, 0, 0))
state = not state
last_update = current_time
# Yield once per tick so other logic can run
yield

View File

@@ -1,124 +0,0 @@
import utime
class Chase:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating.
Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)"""
colors = preset.c
if len(colors) < 1:
# Need at least 1 color
return
# Access colors, delay, and n values from preset
if not colors:
return
# If only one color provided, use it for both colors
if len(colors) < 2:
color0 = colors[0]
color1 = colors[0]
else:
color0 = colors[0]
color1 = colors[1]
color0 = self.driver.apply_brightness(color0, preset.b)
color1 = self.driver.apply_brightness(color1, preset.b)
n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1
n3 = int(preset.n3) # Step movement on even steps (can be negative)
n4 = int(preset.n4) # Step movement on odd steps (can be negative)
segment_length = n1 + n2
# Calculate position from step_count
step_count = self.driver.step
# Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc.
if step_count % 2 == 0:
# Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3
position = (step_count // 2) * (n3 + n4) + n3
else:
# Odd steps: ((step_count+1)//2) pairs of (n3+n4)
position = ((step_count + 1) // 2) * (n3 + n4)
# Wrap position to keep it reasonable
max_pos = self.driver.num_leds + segment_length
position = position % max_pos
if position < 0:
position += max_pos
# If auto is False, run a single step and then stop
if not preset.a:
# Clear all LEDs
self.driver.n.fill((0, 0, 0))
# Draw repeating pattern starting at position
for i in range(self.driver.num_leds):
# Calculate position in the repeating segment
relative_pos = (i - position) % segment_length
if relative_pos < 0:
relative_pos = (relative_pos + segment_length) % segment_length
# Determine which color based on position in segment
if relative_pos < n1:
self.driver.n[i] = color0
else:
self.driver.n[i] = color1
self.driver.n.write()
# Increment step for next beat
self.driver.step = step_count + 1
# Allow tick() to advance the generator once
yield
return
# Auto mode: continuous loop
# Use transition_duration for timing and force the first update to happen immediately
transition_duration = max(10, int(preset.d))
last_update = utime.ticks_ms() - transition_duration
while True:
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, last_update) >= transition_duration:
# Calculate current position from step_count
if step_count % 2 == 0:
position = (step_count // 2) * (n3 + n4) + n3
else:
position = ((step_count + 1) // 2) * (n3 + n4)
# Wrap position
max_pos = self.driver.num_leds + segment_length
position = position % max_pos
if position < 0:
position += max_pos
# Clear all LEDs
self.driver.n.fill((0, 0, 0))
# Draw repeating pattern starting at position
for i in range(self.driver.num_leds):
# Calculate position in the repeating segment
relative_pos = (i - position) % segment_length
if relative_pos < 0:
relative_pos = (relative_pos + segment_length) % segment_length
# Determine which color based on position in segment
if relative_pos < n1:
self.driver.n[i] = color0
else:
self.driver.n[i] = color1
self.driver.n.write()
# Increment step
step_count += 1
self.driver.step = step_count
last_update = current_time
# Yield once per tick so other logic can run
yield

View File

@@ -1,96 +0,0 @@
import utime
class Circle:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4"""
head = 0
tail = 0
# Calculate timing from preset
head_rate = max(1, int(preset.n1)) # n1 = head moves per second
tail_rate = max(1, int(preset.n3)) # n3 = tail moves per second
max_length = max(1, int(preset.n2)) # n2 = max length
min_length = max(0, int(preset.n4)) # n4 = min length
head_delay = 1000 // head_rate # ms between head movements
tail_delay = 1000 // tail_rate # ms between tail movements
last_head_move = utime.ticks_ms()
last_tail_move = utime.ticks_ms()
phase = "growing" # "growing", "shrinking", or "off"
# Support up to two colors (like chase). If only one color is provided,
# use black for the second; if none, default to white.
colors = preset.c
if not colors:
base0 = base1 = (255, 255, 255)
elif len(colors) == 1:
base0 = colors[0]
base1 = (0, 0, 0)
else:
base0 = colors[0]
base1 = colors[1]
color0 = self.driver.apply_brightness(base0, preset.b)
color1 = self.driver.apply_brightness(base1, preset.b)
while True:
current_time = utime.ticks_ms()
# Background: use second color during the "off" phase, otherwise clear to black
if phase == "off":
self.driver.n.fill(color1)
else:
self.driver.n.fill((0, 0, 0))
# Calculate segment length
segment_length = (head - tail) % self.driver.num_leds
if segment_length == 0 and head != tail:
segment_length = self.driver.num_leds
# Draw segment from tail to head as a solid color (no per-LED alternation)
current_color = color0
for i in range(segment_length + 1):
led_pos = (tail + i) % self.driver.num_leds
self.driver.n[led_pos] = current_color
# Move head continuously at n1 LEDs per second
if utime.ticks_diff(current_time, last_head_move) >= head_delay:
head = (head + 1) % self.driver.num_leds
last_head_move = current_time
# Tail behavior based on phase
if phase == "growing":
# Growing phase: tail stays at 0 until max length reached
if segment_length >= max_length:
phase = "shrinking"
elif phase == "shrinking":
# Shrinking phase: move tail forward at n3 LEDs per second
if utime.ticks_diff(current_time, last_tail_move) >= tail_delay:
tail = (tail + 1) % self.driver.num_leds
last_tail_move = current_time
# Check if we've reached min length
current_length = (head - tail) % self.driver.num_leds
if current_length == 0 and head != tail:
current_length = self.driver.num_leds
# For min_length = 0, we need at least 1 LED (the head)
if min_length == 0 and current_length <= 1:
phase = "off" # All LEDs off for 1 step
elif min_length > 0 and current_length <= min_length:
phase = "growing" # Cycle repeats
else: # phase == "off"
# Off phase: second color fills the ring for 1 step, then restart
tail = head # Reset tail to head position to start fresh
phase = "growing"
self.driver.n.write()
# Yield once per tick so other logic can run
yield

View File

@@ -1,64 +0,0 @@
import utime
class Pulse:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
self.driver.off()
# Get colors from preset
colors = preset.c
if not colors:
colors = [(255, 255, 255)]
color_index = 0
cycle_start = utime.ticks_ms()
# State machine based pulse using a single generator loop
while True:
# Read current timing parameters from preset
attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms
decay_ms = max(0, int(preset.n3)) # Decay time in ms
delay_ms = max(0, int(preset.d))
total_ms = attack_ms + hold_ms + decay_ms + delay_ms
if total_ms <= 0:
total_ms = 1
now = utime.ticks_ms()
elapsed = utime.ticks_diff(now, cycle_start)
base_color = colors[color_index % len(colors)]
if elapsed < attack_ms and attack_ms > 0:
# Attack: fade 0 -> 1
factor = elapsed / attack_ms
color = tuple(int(c * factor) for c in base_color)
self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < attack_ms + hold_ms:
# Hold: full brightness
self.driver.fill(self.driver.apply_brightness(base_color, preset.b))
elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0:
# Decay: fade 1 -> 0
dec_elapsed = elapsed - attack_ms - hold_ms
factor = max(0.0, 1.0 - (dec_elapsed / decay_ms))
color = tuple(int(c * factor) for c in base_color)
self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < total_ms:
# Delay phase: LEDs off between pulses
self.driver.fill((0, 0, 0))
else:
# End of cycle, move to next color and restart timing
color_index += 1
cycle_start = now
if not preset.a:
break
# Skip drawing this tick, start next cycle
yield
continue
# Yield once per tick
yield

View File

@@ -1,51 +0,0 @@
import utime
class Rainbow:
def __init__(self, driver):
self.driver = driver
def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def run(self, preset):
step = self.driver.step % 256
step_amount = max(1, int(preset.n1)) # n1 controls step increment
# If auto is False, run a single step and then stop
if not preset.a:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b)
self.driver.n.write()
# Increment step by n1 for next manual call
self.driver.step = (step + step_amount) % 256
# Allow tick() to advance the generator once
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
sleep_ms = max(1, int(preset.d)) # Get delay from preset
if utime.ticks_diff(current_time, last_update) >= sleep_ms:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(
self._wheel(rc_index & 255),
preset.b,
)
self.driver.n.write()
step = (step + step_amount) % 256
self.driver.step = step
last_update = current_time
# Yield once per tick so other logic can run
yield

View File

@@ -1,57 +0,0 @@
import utime
class Transition:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Transition between colors, blending over `delay` ms."""
colors = preset.c
if not colors:
self.driver.off()
yield
return
# Only one color: just keep it on
if len(colors) == 1:
while True:
self.driver.fill(self.driver.apply_brightness(colors[0], preset.b))
yield
return
color_index = 0
start_time = utime.ticks_ms()
while True:
if not colors:
break
# Get current and next color based on live list
c1 = colors[color_index % len(colors)]
c2 = colors[(color_index + 1) % len(colors)]
duration = max(10, int(preset.d)) # At least 10ms
now = utime.ticks_ms()
elapsed = utime.ticks_diff(now, start_time)
if elapsed >= duration:
# End of this transition step
if not preset.a:
# One-shot: transition from first to second color only
self.driver.fill(self.driver.apply_brightness(c2, preset.b))
break
# Auto: move to next pair
color_index = (color_index + 1) % len(colors)
start_time = now
yield
continue
# Interpolate between c1 and c2
factor = elapsed / duration
interpolated = tuple(
int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3)
)
self.driver.fill(self.driver.apply_brightness(interpolated, preset.b))
yield

View File

@@ -11,6 +11,7 @@ from models.tcp_clients import (
send_json_line_to_ip, send_json_line_to_ip,
tcp_client_connected, tcp_client_connected,
) )
from util.driver_patterns import driver_patterns_dir
from util.espnow_message import build_message from util.espnow_message import build_message
import asyncio import asyncio
import json import json
@@ -73,11 +74,6 @@ def _device_json_with_live_status(dev_dict):
return row return row
def _driver_patterns_dir():
here = os.path.dirname(__file__)
return os.path.abspath(os.path.join(here, "../../led-driver/src/patterns"))
def _safe_pattern_filename(name): def _safe_pattern_filename(name):
if not isinstance(name, str): if not isinstance(name, str):
return False return False
@@ -89,7 +85,7 @@ def _safe_pattern_filename(name):
def _build_patterns_manifest(host): def _build_patterns_manifest(host):
base_dir = _driver_patterns_dir() base_dir = driver_patterns_dir()
names = sorted(os.listdir(base_dir)) names = sorted(os.listdir(base_dir))
files = [] files = []
for name in names: for name in names:

View File

@@ -2,6 +2,11 @@ from microdot import Microdot
from models.pattern import Pattern from models.pattern import Pattern
from models.device import Device from models.device import Device
from models.tcp_clients import send_json_line_to_ip from models.tcp_clients import send_json_line_to_ip
from util.driver_patterns import (
driver_patterns_dir,
is_firmware_builtin_pattern_module,
normalize_pattern_py_filename,
)
import json import json
import re import re
import sys import sys
@@ -17,11 +22,6 @@ def _project_root():
return os.path.abspath(os.path.join(here, "..", "..")) return os.path.abspath(os.path.join(here, "..", ".."))
def _driver_patterns_dir():
here = os.path.dirname(__file__)
return os.path.abspath(os.path.join(here, "../../led-driver/src/patterns"))
def _safe_pattern_filename(name): def _safe_pattern_filename(name):
if not isinstance(name, str): if not isinstance(name, str):
return False return False
@@ -75,7 +75,7 @@ def load_driver_pattern_names():
"""List available pattern module names from led-driver/src/patterns.""" """List available pattern module names from led-driver/src/patterns."""
try: try:
names = [] names = []
for filename in os.listdir(_driver_patterns_dir()): for filename in os.listdir(driver_patterns_dir()):
if not _safe_pattern_filename(filename) or filename == "__init__.py": if not _safe_pattern_filename(filename) or filename == "__init__.py":
continue continue
names.append(filename[:-3]) names.append(filename[:-3])
@@ -111,7 +111,7 @@ async def get_pattern_definitions(request):
@controller.get('/ota/manifest') @controller.get('/ota/manifest')
async def ota_manifest(request): async def ota_manifest(request):
"""Manifest of driver pattern source files for OTA pulls.""" """Manifest of driver pattern source files for OTA pulls."""
base_dir = _driver_patterns_dir() base_dir = driver_patterns_dir()
host = request.headers.get("Host", "") host = request.headers.get("Host", "")
if not host: if not host:
return json.dumps({"error": "Missing Host header"}), 400, { return json.dumps({"error": "Missing Host header"}), 400, {
@@ -137,16 +137,32 @@ async def ota_manifest(request):
@controller.get('/ota/file/<name>') @controller.get('/ota/file/<name>')
async def ota_pattern_file(request, name): async def ota_pattern_file(request, name):
"""Serve one driver pattern source file for OTA pulls.""" """Serve one driver pattern source file for OTA pulls."""
if not _safe_pattern_filename(name) or name == "__init__.py": fname = normalize_pattern_py_filename(name)
if not fname or not _safe_pattern_filename(fname) or fname == "__init__.py":
return json.dumps({"error": "Invalid filename"}), 400, { return json.dumps({"error": "Invalid filename"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
path = os.path.join(_driver_patterns_dir(), name) if is_firmware_builtin_pattern_module(fname):
return json.dumps(
{
"error": "on and off are built into the driver firmware; there is no module file to serve.",
}
), 400, {
"Content-Type": "application/json"
}
base = driver_patterns_dir()
path = os.path.join(base, fname)
try: try:
with open(path, "r") as f: with open(path, "r") as f:
content = f.read() content = f.read()
except OSError: except OSError:
return json.dumps({"error": "Pattern file not found"}), 404, { return json.dumps(
{
"error": "Pattern file not found",
"path": path,
"hint": "Ensure led-driver is present or set LED_CONTROLLER_PATTERNS_DIR.",
}
), 404, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
return content, 200, {"Content-Type": "text/plain; charset=utf-8"} return content, 200, {"Content-Type": "text/plain; charset=utf-8"}
@@ -159,19 +175,34 @@ async def send_pattern_to_device(request, name):
return json.dumps({"error": "Invalid pattern name"}), 400, { return json.dumps({"error": "Invalid pattern name"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
filename = name if name.endswith(".py") else (name + ".py") filename = normalize_pattern_py_filename(name)
if not _safe_pattern_filename(filename) or filename == "__init__.py": if not filename or not _safe_pattern_filename(filename) or filename == "__init__.py":
return json.dumps({"error": "Invalid pattern filename"}), 400, { return json.dumps({"error": "Invalid pattern filename"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
if is_firmware_builtin_pattern_module(filename):
return json.dumps(
{
"error": "on and off are built into the driver firmware; OTA send does not apply.",
}
), 400, {
"Content-Type": "application/json"
}
devices = Device() devices = Device()
body = request.json or {} body = request.json or {}
requested_device_id = str(body.get("device_id") or "").strip() requested_device_id = str(body.get("device_id") or "").strip()
path = os.path.join(_driver_patterns_dir(), filename) base = driver_patterns_dir()
path = os.path.join(base, filename)
if not os.path.exists(path): if not os.path.exists(path):
return json.dumps({"error": "Pattern file not found"}), 404, { return json.dumps(
{
"error": "Pattern file not found",
"path": path,
"hint": "Ensure led-driver is present or set LED_CONTROLLER_PATTERNS_DIR.",
}
), 404, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
@@ -261,12 +292,18 @@ async def upload_pattern_file(request):
return json.dumps({"error": "invalid pattern filename"}), 400, { return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
if is_firmware_builtin_pattern_module(filename):
return json.dumps(
{"error": "on and off are built into the driver firmware; use a different pattern name."}
), 400, {
"Content-Type": "application/json"
}
if not isinstance(code, str) or not code.strip(): if not isinstance(code, str) or not code.strip():
return json.dumps({"error": "code is required"}), 400, { return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
path = os.path.join(_driver_patterns_dir(), filename) path = os.path.join(driver_patterns_dir(), filename)
exists = os.path.exists(path) exists = os.path.exists(path)
if exists and not overwrite: if exists and not overwrite:
return json.dumps({"error": "pattern file already exists", "name": filename}), 409, { return json.dumps({"error": "pattern file already exists", "name": filename}), 409, {
@@ -304,6 +341,12 @@ async def create_driver_pattern(request):
return json.dumps({ return json.dumps({
"error": "name must be a valid Python identifier (e.g. sparkle, my_pattern)", "error": "name must be a valid Python identifier (e.g. sparkle, my_pattern)",
}), 400, {"Content-Type": "application/json"} }), 400, {"Content-Type": "application/json"}
if is_firmware_builtin_pattern_module(key):
return json.dumps(
{"error": "on and off are built into the driver firmware; use a different pattern name."}
), 400, {
"Content-Type": "application/json"
}
code = data.get("code") code = data.get("code")
if not isinstance(code, str) or not code.strip(): if not isinstance(code, str) or not code.strip():
@@ -314,7 +357,7 @@ async def create_driver_pattern(request):
overwrite = bool(data.get("overwrite", True)) overwrite = bool(data.get("overwrite", True))
filename = key + ".py" filename = key + ".py"
py_path = os.path.join(_driver_patterns_dir(), filename) py_path = os.path.join(driver_patterns_dir(), filename)
if os.path.exists(py_path) and not overwrite: if os.path.exists(py_path) and not overwrite:
return json.dumps({"error": "pattern file already exists", "name": filename}), 409, { return json.dumps({"error": "pattern file already exists", "name": filename}), 409, {
"Content-Type": "application/json" "Content-Type": "application/json"

View File

@@ -0,0 +1,53 @@
import os
_ENV_PATTERNS_DIR = "LED_CONTROLLER_PATTERNS_DIR"
def driver_patterns_dir():
"""Absolute path to driver pattern ``.py`` modules.
If ``LED_CONTROLLER_PATTERNS_DIR`` is set to an existing directory, that wins
(for installs where ``led-driver`` is not next to this repo). Otherwise uses
``<project-root>/led-driver/src/patterns``.
"""
env = (os.environ.get(_ENV_PATTERNS_DIR) or "").strip()
if env and os.path.isdir(env):
return os.path.abspath(env)
here = os.path.dirname(os.path.abspath(__file__))
root = os.path.abspath(os.path.join(here, "..", ".."))
return os.path.join(root, "led-driver", "src", "patterns")
def normalize_pattern_py_filename(name):
"""Return a single ``*.py`` basename (no paths), or ``\"\"`` if invalid.
Strips repeated ``.py`` suffixes so ``blink.py.py`` becomes ``blink.py``.
"""
if not isinstance(name, str):
return ""
s = name.strip()
if not s:
return ""
lower = s.lower()
while lower.endswith(".py"):
s = s[:-3]
s = s.strip()
lower = s.lower()
if not s:
return ""
if "/" in s or "\\" in s or ".." in s:
return ""
return s + ".py"
# Implemented in led-driver ``presets.py`` only — no separate ``patterns/*.py``.
FIRMWARE_BUILTIN_PATTERN_IDS = frozenset({"on", "off"})
def is_firmware_builtin_pattern_module(name):
"""True for ``on`` / ``off``, with or without a ``.py`` suffix."""
if not isinstance(name, str):
return False
s = name.strip().lower()
while s.endswith(".py"):
s = s[:-3].strip()
return s in FIRMWARE_BUILTIN_PATTERN_IDS