From 7bdb324ebc8aa8780edf5c86049ce1f8566e07b9 Mon Sep 17 00:00:00 2001 From: pi Date: Sun, 12 Apr 2026 00:13:56 +1200 Subject: [PATCH] feat(patterns): driver_patterns helper, on/off ota guard, drop duplicate py tree Made-with: Cursor --- patterns/__init__.py | 6 -- patterns/blink.py | 33 ---------- patterns/chase.py | 124 ------------------------------------ patterns/circle.py | 96 ---------------------------- patterns/pulse.py | 64 ------------------- patterns/rainbow.py | 51 --------------- patterns/transition.py | 57 ----------------- src/controllers/device.py | 8 +-- src/controllers/pattern.py | 75 +++++++++++++++++----- src/util/driver_patterns.py | 53 +++++++++++++++ 10 files changed, 114 insertions(+), 453 deletions(-) delete mode 100644 patterns/__init__.py delete mode 100644 patterns/blink.py delete mode 100644 patterns/chase.py delete mode 100644 patterns/circle.py delete mode 100644 patterns/pulse.py delete mode 100644 patterns/rainbow.py delete mode 100644 patterns/transition.py create mode 100644 src/util/driver_patterns.py diff --git a/patterns/__init__.py b/patterns/__init__.py deleted file mode 100644 index 83b9dac..0000000 --- a/patterns/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from .blink import Blink -from .rainbow import Rainbow -from .pulse import Pulse -from .transition import Transition -from .chase import Chase -from .circle import Circle diff --git a/patterns/blink.py b/patterns/blink.py deleted file mode 100644 index 8a63fe5..0000000 --- a/patterns/blink.py +++ /dev/null @@ -1,33 +0,0 @@ -import utime - - -class Blink: - def __init__(self, driver): - self.driver = driver - - def run(self, preset): - """Blink pattern: toggles LEDs on/off using preset delay, cycling through colors.""" - # Use provided colors, or default to white if none - colors = preset.c if preset.c else [(255, 255, 255)] - color_index = 0 - state = True # True = on, False = off - last_update = utime.ticks_ms() - - while True: - current_time = utime.ticks_ms() - # Re-read delay each loop so live updates to preset.d take effect - delay_ms = max(1, int(preset.d)) - if utime.ticks_diff(current_time, last_update) >= delay_ms: - if state: - base_color = colors[color_index % len(colors)] - color = self.driver.apply_brightness(base_color, preset.b) - self.driver.fill(color) - # Advance to next color for the next "on" phase - color_index += 1 - else: - # "Off" phase: turn all LEDs off - self.driver.fill((0, 0, 0)) - state = not state - last_update = current_time - # Yield once per tick so other logic can run - yield diff --git a/patterns/chase.py b/patterns/chase.py deleted file mode 100644 index 837ac21..0000000 --- a/patterns/chase.py +++ /dev/null @@ -1,124 +0,0 @@ -import utime - - -class Chase: - def __init__(self, driver): - self.driver = driver - - def run(self, preset): - """Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating. - Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)""" - colors = preset.c - if len(colors) < 1: - # Need at least 1 color - return - - # Access colors, delay, and n values from preset - if not colors: - return - # If only one color provided, use it for both colors - if len(colors) < 2: - color0 = colors[0] - color1 = colors[0] - else: - color0 = colors[0] - color1 = colors[1] - - color0 = self.driver.apply_brightness(color0, preset.b) - color1 = self.driver.apply_brightness(color1, preset.b) - - n1 = max(1, int(preset.n1)) # LEDs of color 0 - n2 = max(1, int(preset.n2)) # LEDs of color 1 - n3 = int(preset.n3) # Step movement on even steps (can be negative) - n4 = int(preset.n4) # Step movement on odd steps (can be negative) - - segment_length = n1 + n2 - - # Calculate position from step_count - step_count = self.driver.step - # Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc. - if step_count % 2 == 0: - # Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3 - position = (step_count // 2) * (n3 + n4) + n3 - else: - # Odd steps: ((step_count+1)//2) pairs of (n3+n4) - position = ((step_count + 1) // 2) * (n3 + n4) - - # Wrap position to keep it reasonable - max_pos = self.driver.num_leds + segment_length - position = position % max_pos - if position < 0: - position += max_pos - - # If auto is False, run a single step and then stop - if not preset.a: - # Clear all LEDs - self.driver.n.fill((0, 0, 0)) - - # Draw repeating pattern starting at position - for i in range(self.driver.num_leds): - # Calculate position in the repeating segment - relative_pos = (i - position) % segment_length - if relative_pos < 0: - relative_pos = (relative_pos + segment_length) % segment_length - - # Determine which color based on position in segment - if relative_pos < n1: - self.driver.n[i] = color0 - else: - self.driver.n[i] = color1 - - self.driver.n.write() - - # Increment step for next beat - self.driver.step = step_count + 1 - - # Allow tick() to advance the generator once - yield - return - - # Auto mode: continuous loop - # Use transition_duration for timing and force the first update to happen immediately - transition_duration = max(10, int(preset.d)) - last_update = utime.ticks_ms() - transition_duration - - while True: - current_time = utime.ticks_ms() - if utime.ticks_diff(current_time, last_update) >= transition_duration: - # Calculate current position from step_count - if step_count % 2 == 0: - position = (step_count // 2) * (n3 + n4) + n3 - else: - position = ((step_count + 1) // 2) * (n3 + n4) - - # Wrap position - max_pos = self.driver.num_leds + segment_length - position = position % max_pos - if position < 0: - position += max_pos - - # Clear all LEDs - self.driver.n.fill((0, 0, 0)) - - # Draw repeating pattern starting at position - for i in range(self.driver.num_leds): - # Calculate position in the repeating segment - relative_pos = (i - position) % segment_length - if relative_pos < 0: - relative_pos = (relative_pos + segment_length) % segment_length - - # Determine which color based on position in segment - if relative_pos < n1: - self.driver.n[i] = color0 - else: - self.driver.n[i] = color1 - - self.driver.n.write() - - # Increment step - step_count += 1 - self.driver.step = step_count - last_update = current_time - - # Yield once per tick so other logic can run - yield diff --git a/patterns/circle.py b/patterns/circle.py deleted file mode 100644 index f063724..0000000 --- a/patterns/circle.py +++ /dev/null @@ -1,96 +0,0 @@ -import utime - - -class Circle: - def __init__(self, driver): - self.driver = driver - - def run(self, preset): - """Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4""" - head = 0 - tail = 0 - - # Calculate timing from preset - head_rate = max(1, int(preset.n1)) # n1 = head moves per second - tail_rate = max(1, int(preset.n3)) # n3 = tail moves per second - max_length = max(1, int(preset.n2)) # n2 = max length - min_length = max(0, int(preset.n4)) # n4 = min length - - head_delay = 1000 // head_rate # ms between head movements - tail_delay = 1000 // tail_rate # ms between tail movements - - last_head_move = utime.ticks_ms() - last_tail_move = utime.ticks_ms() - - phase = "growing" # "growing", "shrinking", or "off" - - # Support up to two colors (like chase). If only one color is provided, - # use black for the second; if none, default to white. - colors = preset.c - if not colors: - base0 = base1 = (255, 255, 255) - elif len(colors) == 1: - base0 = colors[0] - base1 = (0, 0, 0) - else: - base0 = colors[0] - base1 = colors[1] - - color0 = self.driver.apply_brightness(base0, preset.b) - color1 = self.driver.apply_brightness(base1, preset.b) - - while True: - current_time = utime.ticks_ms() - - # Background: use second color during the "off" phase, otherwise clear to black - if phase == "off": - self.driver.n.fill(color1) - else: - self.driver.n.fill((0, 0, 0)) - - # Calculate segment length - segment_length = (head - tail) % self.driver.num_leds - if segment_length == 0 and head != tail: - segment_length = self.driver.num_leds - - # Draw segment from tail to head as a solid color (no per-LED alternation) - current_color = color0 - for i in range(segment_length + 1): - led_pos = (tail + i) % self.driver.num_leds - self.driver.n[led_pos] = current_color - - # Move head continuously at n1 LEDs per second - if utime.ticks_diff(current_time, last_head_move) >= head_delay: - head = (head + 1) % self.driver.num_leds - last_head_move = current_time - - # Tail behavior based on phase - if phase == "growing": - # Growing phase: tail stays at 0 until max length reached - if segment_length >= max_length: - phase = "shrinking" - elif phase == "shrinking": - # Shrinking phase: move tail forward at n3 LEDs per second - if utime.ticks_diff(current_time, last_tail_move) >= tail_delay: - tail = (tail + 1) % self.driver.num_leds - last_tail_move = current_time - - # Check if we've reached min length - current_length = (head - tail) % self.driver.num_leds - if current_length == 0 and head != tail: - current_length = self.driver.num_leds - - # For min_length = 0, we need at least 1 LED (the head) - if min_length == 0 and current_length <= 1: - phase = "off" # All LEDs off for 1 step - elif min_length > 0 and current_length <= min_length: - phase = "growing" # Cycle repeats - else: # phase == "off" - # Off phase: second color fills the ring for 1 step, then restart - tail = head # Reset tail to head position to start fresh - phase = "growing" - - self.driver.n.write() - - # Yield once per tick so other logic can run - yield diff --git a/patterns/pulse.py b/patterns/pulse.py deleted file mode 100644 index 1faf020..0000000 --- a/patterns/pulse.py +++ /dev/null @@ -1,64 +0,0 @@ -import utime - - -class Pulse: - def __init__(self, driver): - self.driver = driver - - def run(self, preset): - self.driver.off() - - # Get colors from preset - colors = preset.c - if not colors: - colors = [(255, 255, 255)] - - color_index = 0 - cycle_start = utime.ticks_ms() - - # State machine based pulse using a single generator loop - while True: - # Read current timing parameters from preset - attack_ms = max(0, int(preset.n1)) # Attack time in ms - hold_ms = max(0, int(preset.n2)) # Hold time in ms - decay_ms = max(0, int(preset.n3)) # Decay time in ms - delay_ms = max(0, int(preset.d)) - - total_ms = attack_ms + hold_ms + decay_ms + delay_ms - if total_ms <= 0: - total_ms = 1 - - now = utime.ticks_ms() - elapsed = utime.ticks_diff(now, cycle_start) - - base_color = colors[color_index % len(colors)] - - if elapsed < attack_ms and attack_ms > 0: - # Attack: fade 0 -> 1 - factor = elapsed / attack_ms - color = tuple(int(c * factor) for c in base_color) - self.driver.fill(self.driver.apply_brightness(color, preset.b)) - elif elapsed < attack_ms + hold_ms: - # Hold: full brightness - self.driver.fill(self.driver.apply_brightness(base_color, preset.b)) - elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0: - # Decay: fade 1 -> 0 - dec_elapsed = elapsed - attack_ms - hold_ms - factor = max(0.0, 1.0 - (dec_elapsed / decay_ms)) - color = tuple(int(c * factor) for c in base_color) - self.driver.fill(self.driver.apply_brightness(color, preset.b)) - elif elapsed < total_ms: - # Delay phase: LEDs off between pulses - self.driver.fill((0, 0, 0)) - else: - # End of cycle, move to next color and restart timing - color_index += 1 - cycle_start = now - if not preset.a: - break - # Skip drawing this tick, start next cycle - yield - continue - - # Yield once per tick - yield diff --git a/patterns/rainbow.py b/patterns/rainbow.py deleted file mode 100644 index 64c54e9..0000000 --- a/patterns/rainbow.py +++ /dev/null @@ -1,51 +0,0 @@ -import utime - - -class Rainbow: - def __init__(self, driver): - self.driver = driver - - def _wheel(self, pos): - if pos < 85: - return (pos * 3, 255 - pos * 3, 0) - elif pos < 170: - pos -= 85 - return (255 - pos * 3, 0, pos * 3) - else: - pos -= 170 - return (0, pos * 3, 255 - pos * 3) - - def run(self, preset): - step = self.driver.step % 256 - step_amount = max(1, int(preset.n1)) # n1 controls step increment - - # If auto is False, run a single step and then stop - if not preset.a: - for i in range(self.driver.num_leds): - rc_index = (i * 256 // self.driver.num_leds) + step - self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b) - self.driver.n.write() - # Increment step by n1 for next manual call - self.driver.step = (step + step_amount) % 256 - # Allow tick() to advance the generator once - yield - return - - last_update = utime.ticks_ms() - - while True: - current_time = utime.ticks_ms() - sleep_ms = max(1, int(preset.d)) # Get delay from preset - if utime.ticks_diff(current_time, last_update) >= sleep_ms: - for i in range(self.driver.num_leds): - rc_index = (i * 256 // self.driver.num_leds) + step - self.driver.n[i] = self.driver.apply_brightness( - self._wheel(rc_index & 255), - preset.b, - ) - self.driver.n.write() - step = (step + step_amount) % 256 - self.driver.step = step - last_update = current_time - # Yield once per tick so other logic can run - yield diff --git a/patterns/transition.py b/patterns/transition.py deleted file mode 100644 index b29545a..0000000 --- a/patterns/transition.py +++ /dev/null @@ -1,57 +0,0 @@ -import utime - - -class Transition: - def __init__(self, driver): - self.driver = driver - - def run(self, preset): - """Transition between colors, blending over `delay` ms.""" - colors = preset.c - if not colors: - self.driver.off() - yield - return - - # Only one color: just keep it on - if len(colors) == 1: - while True: - self.driver.fill(self.driver.apply_brightness(colors[0], preset.b)) - yield - return - - color_index = 0 - start_time = utime.ticks_ms() - - while True: - if not colors: - break - - # Get current and next color based on live list - c1 = colors[color_index % len(colors)] - c2 = colors[(color_index + 1) % len(colors)] - - duration = max(10, int(preset.d)) # At least 10ms - now = utime.ticks_ms() - elapsed = utime.ticks_diff(now, start_time) - - if elapsed >= duration: - # End of this transition step - if not preset.a: - # One-shot: transition from first to second color only - self.driver.fill(self.driver.apply_brightness(c2, preset.b)) - break - # Auto: move to next pair - color_index = (color_index + 1) % len(colors) - start_time = now - yield - continue - - # Interpolate between c1 and c2 - factor = elapsed / duration - interpolated = tuple( - int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3) - ) - self.driver.fill(self.driver.apply_brightness(interpolated, preset.b)) - - yield diff --git a/src/controllers/device.py b/src/controllers/device.py index cd6cbf3..fca7f2e 100644 --- a/src/controllers/device.py +++ b/src/controllers/device.py @@ -11,6 +11,7 @@ from models.tcp_clients import ( send_json_line_to_ip, tcp_client_connected, ) +from util.driver_patterns import driver_patterns_dir from util.espnow_message import build_message import asyncio import json @@ -73,11 +74,6 @@ def _device_json_with_live_status(dev_dict): return row -def _driver_patterns_dir(): - here = os.path.dirname(__file__) - return os.path.abspath(os.path.join(here, "../../led-driver/src/patterns")) - - def _safe_pattern_filename(name): if not isinstance(name, str): return False @@ -89,7 +85,7 @@ def _safe_pattern_filename(name): def _build_patterns_manifest(host): - base_dir = _driver_patterns_dir() + base_dir = driver_patterns_dir() names = sorted(os.listdir(base_dir)) files = [] for name in names: diff --git a/src/controllers/pattern.py b/src/controllers/pattern.py index 88e1a89..ae98d78 100644 --- a/src/controllers/pattern.py +++ b/src/controllers/pattern.py @@ -2,6 +2,11 @@ from microdot import Microdot from models.pattern import Pattern from models.device import Device from models.tcp_clients import send_json_line_to_ip +from util.driver_patterns import ( + driver_patterns_dir, + is_firmware_builtin_pattern_module, + normalize_pattern_py_filename, +) import json import re import sys @@ -17,11 +22,6 @@ def _project_root(): return os.path.abspath(os.path.join(here, "..", "..")) -def _driver_patterns_dir(): - here = os.path.dirname(__file__) - return os.path.abspath(os.path.join(here, "../../led-driver/src/patterns")) - - def _safe_pattern_filename(name): if not isinstance(name, str): return False @@ -75,7 +75,7 @@ def load_driver_pattern_names(): """List available pattern module names from led-driver/src/patterns.""" try: names = [] - for filename in os.listdir(_driver_patterns_dir()): + for filename in os.listdir(driver_patterns_dir()): if not _safe_pattern_filename(filename) or filename == "__init__.py": continue names.append(filename[:-3]) @@ -111,7 +111,7 @@ async def get_pattern_definitions(request): @controller.get('/ota/manifest') async def ota_manifest(request): """Manifest of driver pattern source files for OTA pulls.""" - base_dir = _driver_patterns_dir() + base_dir = driver_patterns_dir() host = request.headers.get("Host", "") if not host: return json.dumps({"error": "Missing Host header"}), 400, { @@ -137,16 +137,32 @@ async def ota_manifest(request): @controller.get('/ota/file/') async def ota_pattern_file(request, name): """Serve one driver pattern source file for OTA pulls.""" - if not _safe_pattern_filename(name) or name == "__init__.py": + fname = normalize_pattern_py_filename(name) + if not fname or not _safe_pattern_filename(fname) or fname == "__init__.py": return json.dumps({"error": "Invalid filename"}), 400, { "Content-Type": "application/json" } - path = os.path.join(_driver_patterns_dir(), name) + if is_firmware_builtin_pattern_module(fname): + return json.dumps( + { + "error": "on and off are built into the driver firmware; there is no module file to serve.", + } + ), 400, { + "Content-Type": "application/json" + } + base = driver_patterns_dir() + path = os.path.join(base, fname) try: with open(path, "r") as f: content = f.read() except OSError: - return json.dumps({"error": "Pattern file not found"}), 404, { + return json.dumps( + { + "error": "Pattern file not found", + "path": path, + "hint": "Ensure led-driver is present or set LED_CONTROLLER_PATTERNS_DIR.", + } + ), 404, { "Content-Type": "application/json" } return content, 200, {"Content-Type": "text/plain; charset=utf-8"} @@ -159,19 +175,34 @@ async def send_pattern_to_device(request, name): return json.dumps({"error": "Invalid pattern name"}), 400, { "Content-Type": "application/json" } - filename = name if name.endswith(".py") else (name + ".py") - if not _safe_pattern_filename(filename) or filename == "__init__.py": + filename = normalize_pattern_py_filename(name) + if not filename or not _safe_pattern_filename(filename) or filename == "__init__.py": return json.dumps({"error": "Invalid pattern filename"}), 400, { "Content-Type": "application/json" } + if is_firmware_builtin_pattern_module(filename): + return json.dumps( + { + "error": "on and off are built into the driver firmware; OTA send does not apply.", + } + ), 400, { + "Content-Type": "application/json" + } devices = Device() body = request.json or {} requested_device_id = str(body.get("device_id") or "").strip() - path = os.path.join(_driver_patterns_dir(), filename) + base = driver_patterns_dir() + path = os.path.join(base, filename) if not os.path.exists(path): - return json.dumps({"error": "Pattern file not found"}), 404, { + return json.dumps( + { + "error": "Pattern file not found", + "path": path, + "hint": "Ensure led-driver is present or set LED_CONTROLLER_PATTERNS_DIR.", + } + ), 404, { "Content-Type": "application/json" } @@ -261,12 +292,18 @@ async def upload_pattern_file(request): return json.dumps({"error": "invalid pattern filename"}), 400, { "Content-Type": "application/json" } + if is_firmware_builtin_pattern_module(filename): + return json.dumps( + {"error": "on and off are built into the driver firmware; use a different pattern name."} + ), 400, { + "Content-Type": "application/json" + } if not isinstance(code, str) or not code.strip(): return json.dumps({"error": "code is required"}), 400, { "Content-Type": "application/json" } - path = os.path.join(_driver_patterns_dir(), filename) + path = os.path.join(driver_patterns_dir(), filename) exists = os.path.exists(path) if exists and not overwrite: return json.dumps({"error": "pattern file already exists", "name": filename}), 409, { @@ -304,6 +341,12 @@ async def create_driver_pattern(request): return json.dumps({ "error": "name must be a valid Python identifier (e.g. sparkle, my_pattern)", }), 400, {"Content-Type": "application/json"} + if is_firmware_builtin_pattern_module(key): + return json.dumps( + {"error": "on and off are built into the driver firmware; use a different pattern name."} + ), 400, { + "Content-Type": "application/json" + } code = data.get("code") if not isinstance(code, str) or not code.strip(): @@ -314,7 +357,7 @@ async def create_driver_pattern(request): overwrite = bool(data.get("overwrite", True)) filename = key + ".py" - py_path = os.path.join(_driver_patterns_dir(), filename) + py_path = os.path.join(driver_patterns_dir(), filename) if os.path.exists(py_path) and not overwrite: return json.dumps({"error": "pattern file already exists", "name": filename}), 409, { "Content-Type": "application/json" diff --git a/src/util/driver_patterns.py b/src/util/driver_patterns.py new file mode 100644 index 0000000..80f76a4 --- /dev/null +++ b/src/util/driver_patterns.py @@ -0,0 +1,53 @@ +import os + +_ENV_PATTERNS_DIR = "LED_CONTROLLER_PATTERNS_DIR" + +def driver_patterns_dir(): + """Absolute path to driver pattern ``.py`` modules. + + If ``LED_CONTROLLER_PATTERNS_DIR`` is set to an existing directory, that wins + (for installs where ``led-driver`` is not next to this repo). Otherwise uses + ``/led-driver/src/patterns``. + """ + env = (os.environ.get(_ENV_PATTERNS_DIR) or "").strip() + if env and os.path.isdir(env): + return os.path.abspath(env) + here = os.path.dirname(os.path.abspath(__file__)) + root = os.path.abspath(os.path.join(here, "..", "..")) + return os.path.join(root, "led-driver", "src", "patterns") + + +def normalize_pattern_py_filename(name): + """Return a single ``*.py`` basename (no paths), or ``\"\"`` if invalid. + + Strips repeated ``.py`` suffixes so ``blink.py.py`` becomes ``blink.py``. + """ + if not isinstance(name, str): + return "" + s = name.strip() + if not s: + return "" + lower = s.lower() + while lower.endswith(".py"): + s = s[:-3] + s = s.strip() + lower = s.lower() + if not s: + return "" + if "/" in s or "\\" in s or ".." in s: + return "" + return s + ".py" + + +# Implemented in led-driver ``presets.py`` only — no separate ``patterns/*.py``. +FIRMWARE_BUILTIN_PATTERN_IDS = frozenset({"on", "off"}) + + +def is_firmware_builtin_pattern_module(name): + """True for ``on`` / ``off``, with or without a ``.py`` suffix.""" + if not isinstance(name, str): + return False + s = name.strip().lower() + while s.endswith(".py"): + s = s[:-3].strip() + return s in FIRMWARE_BUILTIN_PATTERN_IDS