From 0c73d56ab5e2e6f4ce3e390e1c1ae8cb7174d950 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Thu, 19 Feb 2026 18:14:17 +1300 Subject: [PATCH] Import led-driver app: pico/ and esp32/ layout Co-authored-by: Cursor --- README.md | 28 ++ dev.py | 108 +++-- esp32/src/main.py | 34 ++ esp32/test/test_uart_send_json.py | 64 +++ esp32/test/test_uart_tx.py | 32 ++ main.py | 102 +++++ pico/src/main.py | 106 +++++ pico/src/p2p.py | 16 + pico/src/patterns/__init__.py | 6 + pico/src/patterns/blink.py | 33 ++ pico/src/patterns/chase.py | 124 ++++++ pico/src/patterns/circle.py | 96 +++++ pico/src/patterns/pulse.py | 64 +++ pico/src/patterns/rainbow.py | 51 +++ pico/src/patterns/transition.py | 57 +++ pico/src/preset.py | 79 ++++ pico/src/presets.py | 131 ++++++ pico/src/settings.py | 94 ++++ pico/src/utils.py | 53 +++ pico/test/leds.py | 2 +- pico/test/patterns/auto_manual.py | 190 ++++++++ pico/test/patterns/blink.py | 35 ++ pico/test/patterns/chase.py | 161 +++++++ pico/test/patterns/circle.py | 113 +++++ pico/test/patterns/off.py | 30 ++ pico/test/patterns/on.py | 47 ++ pico/test/patterns/pulse.py | 92 ++++ pico/test/patterns/rainbow.py | 151 +++++++ pico/test/patterns/transition.py | 81 ++++ pico/test/rainbow.py | 87 ++-- pico/test/test_espnow_receive.py | 694 ++++++++++++++++++++++++++++++ 31 files changed, 2907 insertions(+), 54 deletions(-) create mode 100644 esp32/src/main.py create mode 100644 esp32/test/test_uart_send_json.py create mode 100644 esp32/test/test_uart_tx.py create mode 100644 main.py create mode 100644 pico/src/main.py create mode 100644 pico/src/p2p.py create mode 100644 pico/src/patterns/__init__.py create mode 100644 pico/src/patterns/blink.py create mode 100644 pico/src/patterns/chase.py create mode 100644 pico/src/patterns/circle.py create mode 100644 pico/src/patterns/pulse.py create mode 100644 pico/src/patterns/rainbow.py create mode 100644 pico/src/patterns/transition.py create mode 100644 pico/src/preset.py create mode 100644 pico/src/presets.py create mode 100644 pico/src/settings.py create mode 100644 pico/src/utils.py create mode 100644 pico/test/patterns/auto_manual.py create mode 100644 pico/test/patterns/blink.py create mode 100644 pico/test/patterns/chase.py create mode 100644 pico/test/patterns/circle.py create mode 100644 pico/test/patterns/off.py create mode 100644 pico/test/patterns/on.py create mode 100644 pico/test/patterns/pulse.py create mode 100644 pico/test/patterns/rainbow.py create mode 100644 pico/test/patterns/transition.py create mode 100644 pico/test/test_espnow_receive.py diff --git a/README.md b/README.md index 9b650b4..d5b1efb 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,30 @@ # led-bar +## Recovery: when the board is stuck and you have to nuke the flash + +**Option A – clear startup files (no reflash)** +If the board runs but REPL/Thonny is blocked by `main.py` or `boot.py`, remove them so the next boot drops straight to REPL. From your PC (with the Pico connected via USB): + +```bash +# Remove startup files (paths are on the device; try without colons if one form fails) +mpremote fs rm boot.py +mpremote fs rm main.py +mpremote reset +``` + +If that fails, try one of these: + +```bash +mpremote rm boot.py +mpremote rm main.py +``` + +Or in **Thonny**: Stop the running program (Ctrl+C or Stop button), then **View → Files**, right‑click the device, delete `boot.py` and `main.py` on the device, then **Tools → Reset**. + +If the board doesn’t respond to serial at all, use Option B. + +**Option B – full flash erase (Pico 2)** +1. Unplug the Pico 2. +2. Hold **BOOTSEL**, plug USB in, then release BOOTSEL. +3. It should mount as a drive. Delete any existing UF2 if you want a clean state. +4. Copy the MicroPython UF2 for Pico 2 (RP2350) onto the drive. The board will reboot with a fresh install and empty filesystem. diff --git a/dev.py b/dev.py index 7b3ab91..7c1c87d 100755 --- a/dev.py +++ b/dev.py @@ -1,33 +1,91 @@ #!/usr/bin/env python3 +import os import subprocess -import serial import sys -print(sys.argv) - -port = sys.argv[1] - -cmd = sys.argv[1] - -for cmd in sys.argv[1:]: - print(cmd) - match cmd: - case "src": - subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", ".", ":" ], cwd="src") - case "lib": - subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", "lib", ":" ]) - case "ls": - subprocess.call(["mpremote", "connect", port, "fs", "ls", ":" ]) - case "reset": - with serial.Serial(port, baudrate=115200) as ser: - ser.write(b'\x03\x03\x04') - case "follow": - with serial.Serial(port, baudrate=115200) as ser: - while True: - if ser.in_waiting > 0: # Check if there is data in the buffer - data = ser.readline().decode('utf-8').strip() # Read and decode the data - print(data) +import serial +def usage() -> None: + print("Usage:") + print(" dev.py [src] [lib] [reset] [follow]") + print(" e.g. dev.py /dev/ttyUSB0 pico src lib") + print(" e.g. dev.py /dev/ttyUSB0 esp32 src reset follow") + print(" device: pico | esp32. If no src/lib given, deploys both.") + +def main() -> None: + if len(sys.argv) < 3: + usage() + return + + port = sys.argv[1] + device = sys.argv[2].lower() + actions = [a.lower() for a in sys.argv[3:]] + + if port.startswith("/") or (len(port) >= 3 and port.upper().startswith("COM")): + pass + else: + print("First argument must be serial port (e.g. /dev/ttyUSB0 or COM3).") + usage() + return + + if device not in ("pico", "esp32"): + print("Device must be pico or esp32.") + usage() + return + + if not actions: + actions = ["src", "lib"] + + src_dir = f"{device}/src" + lib_dir = f"{device}/lib" + + for a in actions: + print(a) + match a: + case "src": + if os.path.isdir(src_dir): + # Ensure remote directories exist before copying files + created_dirs: set[str] = set() + for dirpath, _, filenames in os.walk(src_dir): + for name in filenames: + path = os.path.join(dirpath, name) + rel = os.path.relpath(path, src_dir).replace(os.sep, "/") + remote_dir = "" + if "/" in rel: + remote_dir = rel.rsplit("/", 1)[0] + if remote_dir and remote_dir not in created_dirs: + subprocess.call( + ["mpremote", "connect", port, "fs", "mkdir", ":" + remote_dir], + ) + created_dirs.add(remote_dir) + subprocess.call( + ["mpremote", "connect", port, "fs", "cp", path, ":" + rel], + ) + else: + print(" (no src dir)") + case "lib": + if os.path.isdir(lib_dir): + subprocess.call( + ["mpremote", "connect", port, "fs", "cp", "-r", lib_dir, ":"], + ) + else: + print(" (no lib dir)") + case "reset": + with serial.Serial(port, baudrate=115200) as ser: + ser.write(b"\x03\x03\x04") + case "follow": + with serial.Serial(port, baudrate=115200) as ser: + while True: + if ser.in_waiting > 0: + data = ser.readline().decode("utf-8").strip() + print(data) + case _: + print("Unknown action:", a) + usage() + + +if __name__ == "__main__": + main() diff --git a/esp32/src/main.py b/esp32/src/main.py new file mode 100644 index 0000000..0ce75d3 --- /dev/null +++ b/esp32/src/main.py @@ -0,0 +1,34 @@ +""" +XIAO ESP32-C6: ESPNOW -> UART passthrough to Pico. +Receives messages via ESPNOW, forwards them unchanged to UART (GPIO17). +UART at 921600 baud. LED on GPIO15 blinks on activity. +""" +import network +import espnow +import machine +import time + +# UART: TX on GPIO17 -> Pico RX, max baud for throughput +UART_BAUD = 921600 +uart = machine.UART(1, baudrate=UART_BAUD, tx=17) +led = machine.Pin(15, machine.Pin.OUT) + +# WLAN must be active for ESPNOW (no need to connect) +sta = network.WLAN(network.WLAN.IF_STA) +sta.active(True) +sta.disconnect() + +e = espnow.ESPNow() +e.active(True) +# No peers needed to receive; add_peer() only for send() + +# Recv timeout 0 = non-blocking +print("ESP32: ESPNOW -> UART passthrough, %d baud" % UART_BAUD) +while True: + mac, msg = e.irecv(0) + if msg: + uart.write(msg) + led.value(1) + else: + led.value(0) + time.sleep_ms(1) diff --git a/esp32/test/test_uart_send_json.py b/esp32/test/test_uart_send_json.py new file mode 100644 index 0000000..df58d02 --- /dev/null +++ b/esp32/test/test_uart_send_json.py @@ -0,0 +1,64 @@ +""" +ESP32-C6 test: send JSON messages to Pico over UART (GPIO17). +Settings use strips = [[pin, num_leds], ...]. Run with Pico connected on RX. + +Run with mpremote (from repo root): + ./esp32/run_test_uart_json.sh + # or + mpremote run esp32/test/test_uart_send_json.py + # or with port + mpremote connect /dev/ttyUSB0 run esp32/test/test_uart_send_json.py +""" +import machine +import time +import json + +UART_TX_PIN = 17 +UART_BAUD = 921600 +LED_PIN = 15 + + +def send_json(uart, obj): + line = json.dumps(obj) + "\n" + uart.write(line) + print("TX:", line.strip()) + + +def main(): + uart = machine.UART(1, baudrate=UART_BAUD, tx=UART_TX_PIN) + led = machine.Pin(LED_PIN, machine.Pin.OUT) + + # 1) Settings: one strip, pin 2, 10 LEDs (list of lists) + send_json(uart, { + "v": 1, + "settings": { + "strips": [[2, 10]], + "brightness": 30, + }, + }) + led.value(1) + time.sleep(0.2) + led.value(0) + time.sleep(0.3) + + # 2) led-controller format: light + settings.color (hex) + send_json(uart, {"light": "strip1", "settings": {"color": "#FF0000"}, "save": False}) + time.sleep(0.5) + + # 3) led-controller format: light + settings.r,g,b + send_json(uart, {"light": "strip1", "settings": {"r": 0, "g": 255, "b": 0}, "save": False}) + time.sleep(0.5) + + # 4) led-controller format: blue (hex) + send_json(uart, {"light": "strip1", "settings": {"color": "#0000FF"}, "save": False}) + time.sleep(0.5) + + # 5) Off (existing format) + send_json(uart, {"v": 1, "off": True}) + time.sleep(0.3) + + print("Done. Pico: settings -> red (hex) -> green (r,g,b) -> blue (hex) -> off.") + + +if __name__ == "__main__": + main() diff --git a/esp32/test/test_uart_tx.py b/esp32/test/test_uart_tx.py new file mode 100644 index 0000000..a59ebed --- /dev/null +++ b/esp32/test/test_uart_tx.py @@ -0,0 +1,32 @@ +""" +ESP32-C6 UART TX + LED test. Sends a few commands on GPIO17, blinks LED on GPIO15. +Run on device: exec(open('test/test_uart_tx').read()) or import test.test_uart_tx +Does not require Pico connected. +""" +import machine +import time + +UART_TX_PIN = 17 +LED_PIN = 15 + +def main(): + uart = machine.UART(1, baudrate=115200, tx=UART_TX_PIN) + led = machine.Pin(LED_PIN, machine.Pin.OUT) + + def send(cmd): + uart.write(cmd + "\n") + print("TX:", cmd) + + # Blink and send a short command sequence + commands = ["off", "fill 255 0 0", "fill 0 255 0", "fill 0 0 255", "off"] + for i, cmd in enumerate(commands): + led.value(1) + send(cmd) + time.sleep(0.3) + led.value(0) + time.sleep(0.2) + + print("Done. Connect Pico to see strip follow commands.") + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py new file mode 100644 index 0000000..382c68f --- /dev/null +++ b/main.py @@ -0,0 +1,102 @@ +# """ +# Pico: receive led-driver JSON from UART (one message per line). Runs Presets + patterns. +# UART RX on D7 (GPIO1). Non-blocking so presets.tick() runs every loop. +# """ +# from settings import Settings +# from machine import UART, Pin +# import utime +# from presets import Presets +# from utils import convert_and_reorder_colors +# import json + +# # UART (Pico XIAO: D7 = GPIO1) +# UART_RX_PIN = 1 +# UART_BAUD = 115200 +# UART_ID = 0 + +# settings = Settings() +# print(settings) + +# presets = Presets(settings["led_pin"], settings["num_leds"]) +# presets.load() +# presets.b = settings.get("brightness", 255) +# startup_preset = settings.get("startup_preset") +# if startup_preset: +# presets.select(startup_preset) +# print("Selected startup preset:", startup_preset) + +# last_brightness_save = 0 + +# # Non-blocking UART +# uart = UART(UART_ID, baudrate=UART_BAUD, rx=Pin(UART_RX_PIN), rxbuf=512, timeout=0) +# uart_buf = bytearray() + +# print("UART RX on pin %s, %s baud (one JSON object per line)" % (UART_RX_PIN, UART_BAUD)) + + +# def process_message(data): +# """Handle one JSON message (led-driver protocol: v, b, presets, select, default, save).""" +# if data.get("v") != "1": +# return +# global last_brightness_save +# if "b" in data: +# try: +# presets.b = max(0, min(255, int(data["b"]))) +# settings["brightness"] = presets.b +# now = utime.ticks_ms() +# if utime.ticks_diff(now, last_brightness_save) >= 500: +# settings.save() +# last_brightness_save = now +# except (TypeError, ValueError): +# pass +# if "presets" in data: +# for id, preset_data in data["presets"].items(): +# if "c" in preset_data: +# preset_data["c"] = convert_and_reorder_colors(preset_data["c"], settings) +# presets.edit(id, preset_data) +# print("Edited preset", id, preset_data.get("name", "")) +# if settings.get("name") in data.get("select", {}): +# select_list = data["select"][settings.get("name")] +# if select_list: +# preset_name = select_list[0] +# step = select_list[1] if len(select_list) > 1 else None +# presets.select(preset_name, step=step) +# if "default" in data: +# settings["startup_preset"] = data["default"] +# print("Set startup preset to", data["default"]) +# settings.save() +# if "save" in data: +# presets.save() + + +# while True: +# presets.tick() +# n = uart.any() +# if n: +# data_in = uart.read(n) +# if data_in: +# for b in data_in: +# if b in (0x0A, 0x0D): # LF or CR +# if uart_buf: +# try: +# msg = uart_buf.decode("utf-8").strip() +# if msg: +# data = json.loads(msg) +# process_message(data) +# except (ValueError, UnicodeError): +# pass +# uart_buf = bytearray() +# else: +# if len(uart_buf) < 1024: +# uart_buf.append(b) +# utime.sleep_ms(1) + +from neopixel import NeoPixel +from machine import Pin + +pins = ((2,270), (3,271), (4,272), (0,273), (7,274), (6,275), (29,276), (28,277)) +for pin, num_leds in pins: + print(pin, num_leds) + np = NeoPixel(Pin(pin), num_leds) + np.fill((8, 0, 0)) + np.write() diff --git a/pico/src/main.py b/pico/src/main.py new file mode 100644 index 0000000..7fddb6f --- /dev/null +++ b/pico/src/main.py @@ -0,0 +1,106 @@ +# """ +# Pico: receive led-driver JSON from UART (one message per line). Runs Presets + patterns. +# UART RX on D7 (GPIO1). Non-blocking so presets.tick() runs every loop. +# """ +# from settings import Settings +# from machine import UART, Pin +# import utime +# from presets import Presets +# from utils import convert_and_reorder_colors +# import json + +# # UART (Pico XIAO: D7 = GPIO1) +# UART_RX_PIN = 1 +# UART_BAUD = 115200 +# UART_ID = 0 + +# settings = Settings() +# print(settings) + +# presets = Presets(settings["led_pin"], settings["num_leds"]) +# presets.load() +# presets.b = settings.get("brightness", 255) +# startup_preset = settings.get("startup_preset") +# if startup_preset: +# presets.select(startup_preset) +# print("Selected startup preset:", startup_preset) + +# last_brightness_save = 0 + +# # Non-blocking UART +# uart = UART(UART_ID, baudrate=UART_BAUD, rx=Pin(UART_RX_PIN), rxbuf=512, timeout=0) +# uart_buf = bytearray() + +# print("UART RX on pin %s, %s baud (one JSON object per line)" % (UART_RX_PIN, UART_BAUD)) + + +# def process_message(data): +# """Handle one JSON message (led-driver protocol: v, b, presets, select, default, save).""" +# if data.get("v") != "1": +# return +# global last_brightness_save +# if "b" in data: +# try: +# presets.b = max(0, min(255, int(data["b"]))) +# settings["brightness"] = presets.b +# now = utime.ticks_ms() +# if utime.ticks_diff(now, last_brightness_save) >= 500: +# settings.save() +# last_brightness_save = now +# except (TypeError, ValueError): +# pass +# if "presets" in data: +# for id, preset_data in data["presets"].items(): +# if "c" in preset_data: +# preset_data["c"] = convert_and_reorder_colors(preset_data["c"], settings) +# presets.edit(id, preset_data) +# print("Edited preset", id, preset_data.get("name", "")) +# if settings.get("name") in data.get("select", {}): +# select_list = data["select"][settings.get("name")] +# if select_list: +# preset_name = select_list[0] +# step = select_list[1] if len(select_list) > 1 else None +# presets.select(preset_name, step=step) +# if "default" in data: +# settings["startup_preset"] = data["default"] +# print("Set startup preset to", data["default"]) +# settings.save() +# if "save" in data: +# presets.save() + + +# while True: +# presets.tick() +# n = uart.any() +# if n: +# data_in = uart.read(n) +# if data_in: +# for b in data_in: +# if b in (0x0A, 0x0D): # LF or CR +# if uart_buf: +# try: +# msg = uart_buf.decode("utf-8").strip() +# if msg: +# data = json.loads(msg) +# process_message(data) +# except (ValueError, UnicodeError): +# pass +# uart_buf = bytearray() +# else: +# if len(uart_buf) < 1024: +# uart_buf.append(b) +# utime.sleep_ms(1) + +from neopixel import NeoPixel +from machine import Pin + +from ws2812 import WS2812B + +sm = 0 +pins = ((2,270), (3,271), (4,272), (0,273), (7,274), (6,275), (29,276), (28,277)) +for pin, num_leds in pins: + print(pin, num_leds) + np = WS2812B(num_leds, pin, sm, 0.1) + sm += 1 + np.fill((8, 0, 0)) + np.show() diff --git a/pico/src/p2p.py b/pico/src/p2p.py new file mode 100644 index 0000000..a00f6a2 --- /dev/null +++ b/pico/src/p2p.py @@ -0,0 +1,16 @@ +import asyncio +import aioespnow +import json + +async def p2p(settings, patterns): + e = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support + e.active(True) + async for mac, msg in e: + try: + data = json.loads(msg) + except: + print(f"Failed to load espnow data {msg}") + continue + + if "names" not in data or settings.get("name") in data.get("names", []): + await settings.set_settings(data.get("settings", {}), patterns, data.get("save", False)) \ No newline at end of file diff --git a/pico/src/patterns/__init__.py b/pico/src/patterns/__init__.py new file mode 100644 index 0000000..83b9dac --- /dev/null +++ b/pico/src/patterns/__init__.py @@ -0,0 +1,6 @@ +from .blink import Blink +from .rainbow import Rainbow +from .pulse import Pulse +from .transition import Transition +from .chase import Chase +from .circle import Circle diff --git a/pico/src/patterns/blink.py b/pico/src/patterns/blink.py new file mode 100644 index 0000000..8a63fe5 --- /dev/null +++ b/pico/src/patterns/blink.py @@ -0,0 +1,33 @@ +import utime + + +class Blink: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Blink pattern: toggles LEDs on/off using preset delay, cycling through colors.""" + # Use provided colors, or default to white if none + colors = preset.c if preset.c else [(255, 255, 255)] + color_index = 0 + state = True # True = on, False = off + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + # Re-read delay each loop so live updates to preset.d take effect + delay_ms = max(1, int(preset.d)) + if utime.ticks_diff(current_time, last_update) >= delay_ms: + if state: + base_color = colors[color_index % len(colors)] + color = self.driver.apply_brightness(base_color, preset.b) + self.driver.fill(color) + # Advance to next color for the next "on" phase + color_index += 1 + else: + # "Off" phase: turn all LEDs off + self.driver.fill((0, 0, 0)) + state = not state + last_update = current_time + # Yield once per tick so other logic can run + yield diff --git a/pico/src/patterns/chase.py b/pico/src/patterns/chase.py new file mode 100644 index 0000000..837ac21 --- /dev/null +++ b/pico/src/patterns/chase.py @@ -0,0 +1,124 @@ +import utime + + +class Chase: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating. + Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)""" + colors = preset.c + if len(colors) < 1: + # Need at least 1 color + return + + # Access colors, delay, and n values from preset + if not colors: + return + # If only one color provided, use it for both colors + if len(colors) < 2: + color0 = colors[0] + color1 = colors[0] + else: + color0 = colors[0] + color1 = colors[1] + + color0 = self.driver.apply_brightness(color0, preset.b) + color1 = self.driver.apply_brightness(color1, preset.b) + + n1 = max(1, int(preset.n1)) # LEDs of color 0 + n2 = max(1, int(preset.n2)) # LEDs of color 1 + n3 = int(preset.n3) # Step movement on even steps (can be negative) + n4 = int(preset.n4) # Step movement on odd steps (can be negative) + + segment_length = n1 + n2 + + # Calculate position from step_count + step_count = self.driver.step + # Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc. + if step_count % 2 == 0: + # Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3 + position = (step_count // 2) * (n3 + n4) + n3 + else: + # Odd steps: ((step_count+1)//2) pairs of (n3+n4) + position = ((step_count + 1) // 2) * (n3 + n4) + + # Wrap position to keep it reasonable + max_pos = self.driver.num_leds + segment_length + position = position % max_pos + if position < 0: + position += max_pos + + # If auto is False, run a single step and then stop + if not preset.a: + # Clear all LEDs + self.driver.n.fill((0, 0, 0)) + + # Draw repeating pattern starting at position + for i in range(self.driver.num_leds): + # Calculate position in the repeating segment + relative_pos = (i - position) % segment_length + if relative_pos < 0: + relative_pos = (relative_pos + segment_length) % segment_length + + # Determine which color based on position in segment + if relative_pos < n1: + self.driver.n[i] = color0 + else: + self.driver.n[i] = color1 + + self.driver.n.write() + + # Increment step for next beat + self.driver.step = step_count + 1 + + # Allow tick() to advance the generator once + yield + return + + # Auto mode: continuous loop + # Use transition_duration for timing and force the first update to happen immediately + transition_duration = max(10, int(preset.d)) + last_update = utime.ticks_ms() - transition_duration + + while True: + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, last_update) >= transition_duration: + # Calculate current position from step_count + if step_count % 2 == 0: + position = (step_count // 2) * (n3 + n4) + n3 + else: + position = ((step_count + 1) // 2) * (n3 + n4) + + # Wrap position + max_pos = self.driver.num_leds + segment_length + position = position % max_pos + if position < 0: + position += max_pos + + # Clear all LEDs + self.driver.n.fill((0, 0, 0)) + + # Draw repeating pattern starting at position + for i in range(self.driver.num_leds): + # Calculate position in the repeating segment + relative_pos = (i - position) % segment_length + if relative_pos < 0: + relative_pos = (relative_pos + segment_length) % segment_length + + # Determine which color based on position in segment + if relative_pos < n1: + self.driver.n[i] = color0 + else: + self.driver.n[i] = color1 + + self.driver.n.write() + + # Increment step + step_count += 1 + self.driver.step = step_count + last_update = current_time + + # Yield once per tick so other logic can run + yield diff --git a/pico/src/patterns/circle.py b/pico/src/patterns/circle.py new file mode 100644 index 0000000..f063724 --- /dev/null +++ b/pico/src/patterns/circle.py @@ -0,0 +1,96 @@ +import utime + + +class Circle: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4""" + head = 0 + tail = 0 + + # Calculate timing from preset + head_rate = max(1, int(preset.n1)) # n1 = head moves per second + tail_rate = max(1, int(preset.n3)) # n3 = tail moves per second + max_length = max(1, int(preset.n2)) # n2 = max length + min_length = max(0, int(preset.n4)) # n4 = min length + + head_delay = 1000 // head_rate # ms between head movements + tail_delay = 1000 // tail_rate # ms between tail movements + + last_head_move = utime.ticks_ms() + last_tail_move = utime.ticks_ms() + + phase = "growing" # "growing", "shrinking", or "off" + + # Support up to two colors (like chase). If only one color is provided, + # use black for the second; if none, default to white. + colors = preset.c + if not colors: + base0 = base1 = (255, 255, 255) + elif len(colors) == 1: + base0 = colors[0] + base1 = (0, 0, 0) + else: + base0 = colors[0] + base1 = colors[1] + + color0 = self.driver.apply_brightness(base0, preset.b) + color1 = self.driver.apply_brightness(base1, preset.b) + + while True: + current_time = utime.ticks_ms() + + # Background: use second color during the "off" phase, otherwise clear to black + if phase == "off": + self.driver.n.fill(color1) + else: + self.driver.n.fill((0, 0, 0)) + + # Calculate segment length + segment_length = (head - tail) % self.driver.num_leds + if segment_length == 0 and head != tail: + segment_length = self.driver.num_leds + + # Draw segment from tail to head as a solid color (no per-LED alternation) + current_color = color0 + for i in range(segment_length + 1): + led_pos = (tail + i) % self.driver.num_leds + self.driver.n[led_pos] = current_color + + # Move head continuously at n1 LEDs per second + if utime.ticks_diff(current_time, last_head_move) >= head_delay: + head = (head + 1) % self.driver.num_leds + last_head_move = current_time + + # Tail behavior based on phase + if phase == "growing": + # Growing phase: tail stays at 0 until max length reached + if segment_length >= max_length: + phase = "shrinking" + elif phase == "shrinking": + # Shrinking phase: move tail forward at n3 LEDs per second + if utime.ticks_diff(current_time, last_tail_move) >= tail_delay: + tail = (tail + 1) % self.driver.num_leds + last_tail_move = current_time + + # Check if we've reached min length + current_length = (head - tail) % self.driver.num_leds + if current_length == 0 and head != tail: + current_length = self.driver.num_leds + + # For min_length = 0, we need at least 1 LED (the head) + if min_length == 0 and current_length <= 1: + phase = "off" # All LEDs off for 1 step + elif min_length > 0 and current_length <= min_length: + phase = "growing" # Cycle repeats + else: # phase == "off" + # Off phase: second color fills the ring for 1 step, then restart + tail = head # Reset tail to head position to start fresh + phase = "growing" + + self.driver.n.write() + + # Yield once per tick so other logic can run + yield diff --git a/pico/src/patterns/pulse.py b/pico/src/patterns/pulse.py new file mode 100644 index 0000000..1faf020 --- /dev/null +++ b/pico/src/patterns/pulse.py @@ -0,0 +1,64 @@ +import utime + + +class Pulse: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + self.driver.off() + + # Get colors from preset + colors = preset.c + if not colors: + colors = [(255, 255, 255)] + + color_index = 0 + cycle_start = utime.ticks_ms() + + # State machine based pulse using a single generator loop + while True: + # Read current timing parameters from preset + attack_ms = max(0, int(preset.n1)) # Attack time in ms + hold_ms = max(0, int(preset.n2)) # Hold time in ms + decay_ms = max(0, int(preset.n3)) # Decay time in ms + delay_ms = max(0, int(preset.d)) + + total_ms = attack_ms + hold_ms + decay_ms + delay_ms + if total_ms <= 0: + total_ms = 1 + + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, cycle_start) + + base_color = colors[color_index % len(colors)] + + if elapsed < attack_ms and attack_ms > 0: + # Attack: fade 0 -> 1 + factor = elapsed / attack_ms + color = tuple(int(c * factor) for c in base_color) + self.driver.fill(self.driver.apply_brightness(color, preset.b)) + elif elapsed < attack_ms + hold_ms: + # Hold: full brightness + self.driver.fill(self.driver.apply_brightness(base_color, preset.b)) + elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0: + # Decay: fade 1 -> 0 + dec_elapsed = elapsed - attack_ms - hold_ms + factor = max(0.0, 1.0 - (dec_elapsed / decay_ms)) + color = tuple(int(c * factor) for c in base_color) + self.driver.fill(self.driver.apply_brightness(color, preset.b)) + elif elapsed < total_ms: + # Delay phase: LEDs off between pulses + self.driver.fill((0, 0, 0)) + else: + # End of cycle, move to next color and restart timing + color_index += 1 + cycle_start = now + if not preset.a: + break + # Skip drawing this tick, start next cycle + yield + continue + + # Yield once per tick + yield diff --git a/pico/src/patterns/rainbow.py b/pico/src/patterns/rainbow.py new file mode 100644 index 0000000..64c54e9 --- /dev/null +++ b/pico/src/patterns/rainbow.py @@ -0,0 +1,51 @@ +import utime + + +class Rainbow: + def __init__(self, driver): + self.driver = driver + + def _wheel(self, pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + def run(self, preset): + step = self.driver.step % 256 + step_amount = max(1, int(preset.n1)) # n1 controls step increment + + # If auto is False, run a single step and then stop + if not preset.a: + for i in range(self.driver.num_leds): + rc_index = (i * 256 // self.driver.num_leds) + step + self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b) + self.driver.n.write() + # Increment step by n1 for next manual call + self.driver.step = (step + step_amount) % 256 + # Allow tick() to advance the generator once + yield + return + + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + sleep_ms = max(1, int(preset.d)) # Get delay from preset + if utime.ticks_diff(current_time, last_update) >= sleep_ms: + for i in range(self.driver.num_leds): + rc_index = (i * 256 // self.driver.num_leds) + step + self.driver.n[i] = self.driver.apply_brightness( + self._wheel(rc_index & 255), + preset.b, + ) + self.driver.n.write() + step = (step + step_amount) % 256 + self.driver.step = step + last_update = current_time + # Yield once per tick so other logic can run + yield diff --git a/pico/src/patterns/transition.py b/pico/src/patterns/transition.py new file mode 100644 index 0000000..b29545a --- /dev/null +++ b/pico/src/patterns/transition.py @@ -0,0 +1,57 @@ +import utime + + +class Transition: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Transition between colors, blending over `delay` ms.""" + colors = preset.c + if not colors: + self.driver.off() + yield + return + + # Only one color: just keep it on + if len(colors) == 1: + while True: + self.driver.fill(self.driver.apply_brightness(colors[0], preset.b)) + yield + return + + color_index = 0 + start_time = utime.ticks_ms() + + while True: + if not colors: + break + + # Get current and next color based on live list + c1 = colors[color_index % len(colors)] + c2 = colors[(color_index + 1) % len(colors)] + + duration = max(10, int(preset.d)) # At least 10ms + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, start_time) + + if elapsed >= duration: + # End of this transition step + if not preset.a: + # One-shot: transition from first to second color only + self.driver.fill(self.driver.apply_brightness(c2, preset.b)) + break + # Auto: move to next pair + color_index = (color_index + 1) % len(colors) + start_time = now + yield + continue + + # Interpolate between c1 and c2 + factor = elapsed / duration + interpolated = tuple( + int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3) + ) + self.driver.fill(self.driver.apply_brightness(interpolated, preset.b)) + + yield diff --git a/pico/src/preset.py b/pico/src/preset.py new file mode 100644 index 0000000..d00babc --- /dev/null +++ b/pico/src/preset.py @@ -0,0 +1,79 @@ +class Preset: + def __init__(self, data): + # Set default values for all preset attributes + self.p = "off" + self.d = 100 + self.b = 127 + self.c = [(255, 255, 255)] + self.a = True + self.n1 = 0 + self.n2 = 0 + self.n3 = 0 + self.n4 = 0 + self.n5 = 0 + self.n6 = 0 + + # Override defaults with provided data + self.edit(data) + + def edit(self, data=None): + if not data: + return False + for key, value in data.items(): + setattr(self, key, value) + return True + + @property + def pattern(self): + return self.p + + @pattern.setter + def pattern(self, value): + self.p = value + + @property + def delay(self): + return self.d + + @delay.setter + def delay(self, value): + self.d = value + + @property + def brightness(self): + return self.b + + @brightness.setter + def brightness(self, value): + self.b = value + + @property + def colors(self): + return self.c + + @colors.setter + def colors(self, value): + self.c = value + + @property + def auto(self): + return self.a + + @auto.setter + def auto(self, value): + self.a = value + + def to_dict(self): + return { + "p": self.p, + "d": self.d, + "b": self.b, + "c": self.c, + "a": self.a, + "n1": self.n1, + "n2": self.n2, + "n3": self.n3, + "n4": self.n4, + "n5": self.n5, + "n6": self.n6, + } diff --git a/pico/src/presets.py b/pico/src/presets.py new file mode 100644 index 0000000..9888e54 --- /dev/null +++ b/pico/src/presets.py @@ -0,0 +1,131 @@ +from machine import Pin +from ws2812 import WS2812B +from preset import Preset +from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle +import json + + +class Presets: + def __init__(self, pin, num_leds, state_machine=0): + # WS2812B with brightness=1.0 so Presets.apply_brightness() does all scaling (NeoPixel drop-in) + num_leds = int(num_leds) + if isinstance(pin, Pin): + self.n = WS2812B(pin, num_leds) # NeoPixel-style (Pin, n) + else: + self.n = WS2812B(num_leds, int(pin), state_machine, brightness=1.0) + self.num_leds = num_leds + self.step = 0 + # Global brightness (0–255), controlled via UART/JSON {"b": } + self.b = 255 + + self.generator = None + self.presets = {} + self.selected = None + + # Register all pattern methods + self.patterns = { + "off": self.off, + "on": self.on, + "blink": Blink(self).run, + "rainbow": Rainbow(self).run, + "pulse": Pulse(self).run, + "transition": Transition(self).run, + "chase": Chase(self).run, + "circle": Circle(self).run, + } + + def save(self): + """Save the presets to a file.""" + with open("presets.json", "w") as f: + json.dump({name: preset.to_dict() for name, preset in self.presets.items()}, f) + return True + + def load(self): + """Load presets from a file.""" + try: + with open("presets.json", "r") as f: + data = json.load(f) + except OSError: + # Create an empty presets file if missing + self.presets = {} + self.save() + return True + + self.presets = {} + for name, preset_data in data.items(): + if "c" in preset_data: + preset_data["c"] = [tuple(color) for color in preset_data["c"]] + self.presets[name] = Preset(preset_data) + if self.presets: + print("Loaded presets:") + #for name in sorted(self.presets.keys()): + # print(f" {name}: {self.presets[name].to_dict()}") + return True + + def edit(self, name, data): + """Create or update a preset with the given name.""" + if name in self.presets: + # Update existing preset + self.presets[name].edit(data) + else: + # Create new preset + self.presets[name] = Preset(data) + return True + + def delete(self, name): + if name in self.presets: + del self.presets[name] + return True + return False + + def tick(self): + if self.generator is None: + return + try: + next(self.generator) + except StopIteration: + self.generator = None + + def select(self, preset_name, step=None): + if preset_name in self.presets: + preset = self.presets[preset_name] + if preset.p in self.patterns: + # Set step value if explicitly provided + if step is not None: + self.step = step + elif preset.p == "off" or self.selected != preset_name: + self.step = 0 + self.generator = self.patterns[preset.p](preset) + self.selected = preset_name # Store the preset name, not the object + return True + # If preset doesn't exist or pattern not found, default to "off" + return False + + def update_num_leds(self, pin, num_leds): + num_leds = int(num_leds) + if isinstance(pin, Pin): + self.n = WS2812B(pin, num_leds) + else: + self.n = WS2812B(num_leds, int(pin), 0, brightness=1.0) + self.num_leds = num_leds + + def apply_brightness(self, color, brightness_override=None): + # Combine per-preset brightness (override) with global brightness self.b + local = brightness_override if brightness_override is not None else 255 + # Scale preset brightness by global brightness + effective_brightness = int(local * self.b / 255) + return tuple(int(c * effective_brightness / 255) for c in color) + + def fill(self, color=None): + fill_color = color if color is not None else (0, 0, 0) + for i in range(self.num_leds): + self.n[i] = fill_color + self.n.write() + + def off(self, preset=None): + self.fill((0, 0, 0)) + + def on(self, preset): + colors = preset.c + color = colors[0] if colors else (255, 255, 255) + self.fill(self.apply_brightness(color, preset.b)) diff --git a/pico/src/settings.py b/pico/src/settings.py new file mode 100644 index 0000000..4447da8 --- /dev/null +++ b/pico/src/settings.py @@ -0,0 +1,94 @@ +import json +import ubinascii +import machine + +class Settings(dict): + SETTINGS_FILE = "/settings.json" + + def __init__(self): + super().__init__() + self.load() # Load settings from file during initialization + self.color_order = self.get_color_order(self["color_order"]) + + def _default_name(self): + """Device name: use unique_id on Pico (no WiFi); use AP MAC on ESP32.""" + try: + import network + mac = network.WLAN(network.AP_IF).config("mac") + return "led-%s" % ubinascii.hexlify(mac).decode() + except Exception: + return "led-%s" % ubinascii.hexlify(machine.unique_id()).decode() + + def set_defaults(self): + self["led_pin"] = 10 + self["num_leds"] = 50 + self["color_order"] = "rgb" + self["name"] = self._default_name() + self["debug"] = False + self["startup_preset"] = None + self["brightness"] = 255 + + def save(self): + try: + j = json.dumps(self) + with open(self.SETTINGS_FILE, 'w') as file: + file.write(j) + print("Settings saved successfully.") + except Exception as e: + print(f"Error saving settings: {e}") + + def load(self): + try: + with open(self.SETTINGS_FILE, 'r') as file: + loaded_settings = json.load(file) + self.update(loaded_settings) + print("Settings loaded successfully.") + except Exception as e: + print(f"Error loading settings") + self.set_defaults() + self.save() + + + def get_color_order(self, color_order): + """Convert color order string to tuple of hex string indices.""" + color_orders = { + "rgb": (1, 3, 5), + "rbg": (1, 5, 3), + "grb": (3, 1, 5), + "gbr": (3, 5, 1), + "brg": (5, 1, 3), + "bgr": (5, 3, 1) + } + return color_orders.get(color_order.lower(), (1, 3, 5)) # Default to RGB + + def get_rgb_channel_order(self, color_order=None): + """Convert color order string to RGB channel indices for reordering tuples. + Returns tuple of channel indices: (r_channel, g_channel, b_channel) + Example: 'grb' -> (1, 0, 2) means (G, R, B)""" + if color_order is None: + color_order = self.get("color_order", "rgb") + color_order = color_order.lower() + # Map hex string positions to RGB channel indices + # Position 1 (R in hex) -> channel 0, Position 3 (G) -> channel 1, Position 5 (B) -> channel 2 + hex_to_channel = {1: 0, 3: 1, 5: 2} + hex_indices = self.get_color_order(color_order) + return tuple(hex_to_channel[pos] for pos in hex_indices) + +# Example usage +def main(): + settings = Settings() + print(f"Number of LEDs: {settings['num_leds']}") + settings['num_leds'] = 100 + print(f"Updated number of LEDs: {settings['num_leds']}") + settings.save() + + # Create a new Settings object to test loading + new_settings = Settings() + print(f"Loaded number of LEDs: {new_settings['num_leds']}") + print(settings) + + + +# Run the example +if __name__ == "__main__": + main() diff --git a/pico/src/utils.py b/pico/src/utils.py new file mode 100644 index 0000000..a7aa08d --- /dev/null +++ b/pico/src/utils.py @@ -0,0 +1,53 @@ +def convert_and_reorder_colors(colors, settings_or_color_order): + """Convert hex color strings to RGB tuples and reorder based on device color order. + + Args: + colors: List of colors, either hex strings like "#FF0000" or RGB tuples like (255, 0, 0) + settings_or_color_order: Either a Settings object or a color_order string (e.g., "rgb", "grb") + + Returns: + List of RGB tuples reordered according to device color order + """ + # Get channel order from settings or color_order string + if hasattr(settings_or_color_order, 'get_rgb_channel_order'): + # It's a Settings object + channel_order = settings_or_color_order.get_rgb_channel_order() + elif isinstance(settings_or_color_order, str): + # It's a color_order string, convert to channel order + color_order = settings_or_color_order.lower() + color_orders = { + "rgb": (1, 3, 5), + "rbg": (1, 5, 3), + "grb": (3, 1, 5), + "gbr": (3, 5, 1), + "brg": (5, 1, 3), + "bgr": (5, 3, 1) + } + hex_indices = color_orders.get(color_order, (1, 3, 5)) + # Map hex string positions to RGB channel indices + hex_to_channel = {1: 0, 3: 1, 5: 2} + channel_order = tuple(hex_to_channel[pos] for pos in hex_indices) + else: + # Assume it's already a channel order tuple + channel_order = settings_or_color_order + + converted_colors = [] + for color in colors: + # Convert "#RRGGBB" to (R, G, B) + if isinstance(color, str) and color.startswith("#"): + r = int(color[1:3], 16) + g = int(color[3:5], 16) + b = int(color[5:7], 16) + rgb = (r, g, b) + # Reorder based on device color order + reordered = (rgb[channel_order[0]], rgb[channel_order[1]], rgb[channel_order[2]]) + converted_colors.append(reordered) + elif isinstance(color, (list, tuple)) and len(color) == 3: + # Already a tuple/list, just reorder + rgb = tuple(color) + reordered = (rgb[channel_order[0]], rgb[channel_order[1]], rgb[channel_order[2]]) + converted_colors.append(reordered) + else: + # Keep as-is if not recognized format + converted_colors.append(color) + return converted_colors diff --git a/pico/test/leds.py b/pico/test/leds.py index fb574f7..7919f77 100644 --- a/pico/test/leds.py +++ b/pico/test/leds.py @@ -24,7 +24,7 @@ for pin, num_leds in pins: ws = WS2812B(num_leds, pin, sm, brightness=1.0) # 1.0 so fill() is visible strips.append(ws) sm += 1 - ws.fill((255,0,0)) + ws.fill((8,0,0)) ws.show() time.sleep(1) diff --git a/pico/test/patterns/auto_manual.py b/pico/test/patterns/auto_manual.py new file mode 100644 index 0000000..a5ec964 --- /dev/null +++ b/pico/test/patterns/auto_manual.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, duration_ms): + """Run pattern for specified duration.""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + print("=" * 50) + print("Testing Auto and Manual Modes") + print("=" * 50) + + # Test 1: Rainbow in AUTO mode (continuous) + print("\nTest 1: Rainbow pattern in AUTO mode (should run continuously)") + p.edit("rainbow_auto", { + "p": "rainbow", + "b": 128, + "d": 50, + "n1": 2, + "a": True, + }) + p.select("rainbow_auto") + print("Running rainbow_auto for 3 seconds...") + run_for(p, wdt, 3000) + print("✓ Auto mode: Pattern ran continuously") + + # Test 2: Rainbow in MANUAL mode (one step per tick) + print("\nTest 2: Rainbow pattern in MANUAL mode (one step per tick)") + p.edit("rainbow_manual", { + "p": "rainbow", + "b": 128, + "d": 50, + "n1": 2, + "a": False, + }) + p.select("rainbow_manual") + print("Calling tick() 5 times (should advance 5 steps)...") + for i in range(5): + p.tick() + utime.sleep_ms(100) # Small delay to see changes + print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}") + + # Check if generator stopped after one cycle + if p.generator is None: + print("✓ Manual mode: Generator stopped after one step (as expected)") + else: + print("⚠ Manual mode: Generator still active (may need multiple ticks)") + + # Test 3: Pulse in AUTO mode (continuous cycles) + print("\nTest 3: Pulse pattern in AUTO mode (should pulse continuously)") + p.edit("pulse_auto", { + "p": "pulse", + "b": 128, + "d": 100, + "n1": 500, # Attack + "n2": 200, # Hold + "n3": 500, # Decay + "c": [(255, 0, 0)], + "a": True, + }) + p.select("pulse_auto") + print("Running pulse_auto for 3 seconds...") + run_for(p, wdt, 3000) + print("✓ Auto mode: Pulse ran continuously") + + # Test 4: Pulse in MANUAL mode (one cycle then stop) + print("\nTest 4: Pulse pattern in MANUAL mode (one cycle then stop)") + p.edit("pulse_manual", { + "p": "pulse", + "b": 128, + "d": 100, + "n1": 300, # Attack + "n2": 200, # Hold + "n3": 300, # Decay + "c": [(0, 255, 0)], + "a": False, + }) + p.select("pulse_manual") + print("Running pulse_manual until generator stops...") + tick_count = 0 + max_ticks = 200 # Safety limit + while p.generator is not None and tick_count < max_ticks: + p.tick() + tick_count += 1 + utime.sleep_ms(10) + + if p.generator is None: + print(f"✓ Manual mode: Pulse completed one cycle after {tick_count} ticks") + else: + print(f"⚠ Manual mode: Pulse still running after {tick_count} ticks") + + # Test 5: Transition in AUTO mode (continuous transitions) + print("\nTest 5: Transition pattern in AUTO mode (continuous transitions)") + p.edit("transition_auto", { + "p": "transition", + "b": 128, + "d": 500, + "c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)], + "a": True, + }) + p.select("transition_auto") + print("Running transition_auto for 3 seconds...") + run_for(p, wdt, 3000) + print("✓ Auto mode: Transition ran continuously") + + # Test 6: Transition in MANUAL mode (one transition then stop) + print("\nTest 6: Transition pattern in MANUAL mode (one transition then stop)") + p.edit("transition_manual", { + "p": "transition", + "b": 128, + "d": 500, + "c": [(255, 0, 0), (0, 255, 0)], + "a": False, + }) + p.select("transition_manual") + print("Running transition_manual until generator stops...") + tick_count = 0 + max_ticks = 200 + while p.generator is not None and tick_count < max_ticks: + p.tick() + tick_count += 1 + utime.sleep_ms(10) + + if p.generator is None: + print(f"✓ Manual mode: Transition completed after {tick_count} ticks") + else: + print(f"⚠ Manual mode: Transition still running after {tick_count} ticks") + + # Test 7: Switching between auto and manual modes + print("\nTest 7: Switching between auto and manual modes") + p.edit("switch_test", { + "p": "rainbow", + "b": 128, + "d": 50, + "n1": 2, + "a": True, + }) + p.select("switch_test") + print("Running in auto mode for 1 second...") + run_for(p, wdt, 1000) + + # Switch to manual mode by editing the preset + print("Switching to manual mode...") + p.edit("switch_test", {"a": False}) + p.select("switch_test") # Re-select to apply changes + + print("Calling tick() 3 times in manual mode...") + for i in range(3): + p.tick() + utime.sleep_ms(100) + print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}") + + # Switch back to auto mode + print("Switching back to auto mode...") + p.edit("switch_test", {"a": True}) + p.select("switch_test") + print("Running in auto mode for 1 second...") + run_for(p, wdt, 1000) + print("✓ Successfully switched between auto and manual modes") + + # Cleanup + print("\nCleaning up...") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + p.tick() + utime.sleep_ms(100) + + print("\n" + "=" * 50) + print("All tests completed!") + print("=" * 50) + + +if __name__ == "__main__": + main() diff --git a/pico/test/patterns/blink.py b/pico/test/patterns/blink.py new file mode 100644 index 0000000..7291dde --- /dev/null +++ b/pico/test/patterns/blink.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors) + p.edit("test_blink", { + "p": "blink", + "b": 64, + "d": 200, + "c": [(255, 0, 0), (0, 0, 255)], + }) + p.select("test_blink") + + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 1500: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/pico/test/patterns/chase.py b/pico/test/patterns/chase.py new file mode 100644 index 0000000..39c2618 --- /dev/null +++ b/pico/test/patterns/chase.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1) + print("Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1)") + p.edit("chase1", { + "p": "chase", + "b": 255, + "d": 200, + "n1": 5, + "n2": 5, + "n3": 1, + "n4": 1, + "c": [(255, 0, 0), (0, 255, 0)], + }) + p.select("chase1") + run_for(p, wdt, 3000) + + # Test 2: Forward and backward (n3=2, n4=-1) + print("Test 2: Forward and backward (n3=2, n4=-1)") + p.edit("chase2", { + "p": "chase", + "n1": 3, + "n2": 3, + "n3": 2, + "n4": -1, + "d": 150, + "c": [(0, 0, 255), (255, 255, 0)], + }) + p.select("chase2") + run_for(p, wdt, 3000) + + # Test 3: Large segments (n1=10, n2=5) + print("Test 3: Large segments (n1=10, n2=5, n3=3, n4=3)") + p.edit("chase3", { + "p": "chase", + "n1": 10, + "n2": 5, + "n3": 3, + "n4": 3, + "d": 200, + "c": [(255, 128, 0), (128, 0, 255)], + }) + p.select("chase3") + run_for(p, wdt, 3000) + + # Test 4: Fast movement (n3=5, n4=5) + print("Test 4: Fast movement (n3=5, n4=5)") + p.edit("chase4", { + "p": "chase", + "n1": 4, + "n2": 4, + "n3": 5, + "n4": 5, + "d": 100, + "c": [(255, 0, 255), (0, 255, 255)], + }) + p.select("chase4") + run_for(p, wdt, 2000) + + # Test 5: Backward movement (n3=-2, n4=-2) + print("Test 5: Backward movement (n3=-2, n4=-2)") + p.edit("chase5", { + "p": "chase", + "n1": 6, + "n2": 4, + "n3": -2, + "n4": -2, + "d": 200, + "c": [(255, 255, 255), (0, 0, 0)], + }) + p.select("chase5") + run_for(p, wdt, 3000) + + # Test 6: Alternating forward/backward (n3=3, n4=-2) + print("Test 6: Alternating forward/backward (n3=3, n4=-2)") + p.edit("chase6", { + "p": "chase", + "n1": 5, + "n2": 5, + "n3": 3, + "n4": -2, + "d": 250, + "c": [(255, 0, 0), (0, 255, 0)], + }) + p.select("chase6") + run_for(p, wdt, 4000) + + # Test 7: Manual mode - advance one step per beat + print("Test 7: Manual mode chase (auto=False, n3=2, n4=1)") + p.edit("chase_manual", { + "p": "chase", + "n1": 4, + "n2": 4, + "n3": 2, + "n4": 1, + "d": 200, + "c": [(255, 255, 0), (0, 255, 255)], + "a": False, + }) + p.step = 0 # Reset step counter + print(" Advancing pattern with 10 beats (select + tick)...") + for i in range(10): + p.select("chase_manual") # Simulate beat - restarts generator + p.tick() # Advance one step + utime.sleep_ms(500) # Pause to see the pattern + wdt.feed() + print(f" Beat {i+1}: step={p.step}") + + # Test 8: Verify step increments correctly in manual mode + print("Test 8: Verify step increments (auto=False)") + p.edit("chase_manual2", { + "p": "chase", + "n1": 3, + "n2": 3, + "n3": 1, + "n4": 1, + "a": False, + }) + p.step = 0 + initial_step = p.step + p.select("chase_manual2") + p.tick() + final_step = p.step + print(f" Step updated from {initial_step} to {final_step} (expected: 1)") + if final_step == 1: + print(" ✓ Step increment working correctly") + else: + print(f" ✗ Step increment mismatch! Expected 1, got {final_step}") + + # Cleanup + print("Test complete, turning off") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/pico/test/patterns/circle.py b/pico/test/patterns/circle.py new file mode 100644 index 0000000..2de9d8d --- /dev/null +++ b/pico/test/patterns/circle.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0) + print("Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0)") + p.edit("circle1", { + "p": "circle", + "b": 255, + "n1": 50, # Head moves 50 LEDs/second + "n2": 100, # Max length 100 LEDs + "n3": 200, # Tail moves 200 LEDs/second + "n4": 0, # Min length 0 LEDs + "c": [(255, 0, 0)], # Red + }) + p.select("circle1") + run_for(p, wdt, 5000) + + # Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0) + print("Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0)") + p.edit("circle2", { + "p": "circle", + "n1": 20, + "n2": 50, + "n3": 100, + "n4": 0, + "c": [(0, 255, 0)], # Green + }) + p.select("circle2") + run_for(p, wdt, 5000) + + # Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0) + print("Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0)") + p.edit("circle3", { + "p": "circle", + "n1": 100, + "n2": 30, + "n3": 20, + "n4": 0, + "c": [(0, 0, 255)], # Blue + }) + p.select("circle3") + run_for(p, wdt, 5000) + + # Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10) + print("Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10)") + p.edit("circle4", { + "p": "circle", + "n1": 50, + "n2": 40, + "n3": 100, + "n4": 10, + "c": [(255, 255, 0)], # Yellow + }) + p.select("circle4") + run_for(p, wdt, 5000) + + # Test 5: Very fast (n1=200, n2=20, n3=200, n4=0) + print("Test 5: Very fast (n1=200, n2=20, n3=200, n4=0)") + p.edit("circle5", { + "p": "circle", + "n1": 200, + "n2": 20, + "n3": 200, + "n4": 0, + "c": [(255, 0, 255)], # Magenta + }) + p.select("circle5") + run_for(p, wdt, 3000) + + # Test 6: Very slow (n1=10, n2=25, n3=10, n4=0) + print("Test 6: Very slow (n1=10, n2=25, n3=10, n4=0)") + p.edit("circle6", { + "p": "circle", + "n1": 10, + "n2": 25, + "n3": 10, + "n4": 0, + "c": [(0, 255, 255)], # Cyan + }) + p.select("circle6") + run_for(p, wdt, 5000) + + # Cleanup + print("Test complete, turning off") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/pico/test/patterns/off.py b/pico/test/patterns/off.py new file mode 100644 index 0000000..e85a701 --- /dev/null +++ b/pico/test/patterns/off.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Create an "off" preset (use short-key field `p` for pattern) + p.edit("test_off", {"p": "off"}) + p.select("test_off") + + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 200: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/pico/test/patterns/on.py b/pico/test/patterns/on.py new file mode 100644 index 0000000..44c82c1 --- /dev/null +++ b/pico/test/patterns/on.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Create presets for on and off using the short-key fields that Presets expects + # Preset fields: + # p = pattern name, b = brightness, d = delay, c = list of (r,g,b) colors + p.edit("test_on", { + "p": "on", + "b": 64, + "d": 120, + "c": [(255, 0, 0), (0, 0, 255)], + }) + p.edit("test_off", {"p": "off"}) + + # ON phase + p.select("test_on") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 800: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + # OFF phase + p.select("test_off") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 100: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/pico/test/patterns/pulse.py b/pico/test/patterns/pulse.py new file mode 100644 index 0000000..708b112 --- /dev/null +++ b/pico/test/patterns/pulse.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Simple single-color pulse + print("Test 1: Single-color pulse (attack=500, hold=500, decay=500, delay=500)") + p.edit("pulse1", { + "p": "pulse", + "b": 255, + "c": [(255, 0, 0)], + "n1": 500, # attack ms + "n2": 500, # hold ms + "n3": 500, # decay ms + "d": 500, # delay ms between pulses + "a": True, + }) + p.select("pulse1") + run_for(p, wdt, 5000) + + # Test 2: Faster pulse + print("Test 2: Fast pulse (attack=100, hold=100, decay=100, delay=100)") + p.edit("pulse2", { + "p": "pulse", + "n1": 100, + "n2": 100, + "n3": 100, + "d": 100, + "c": [(0, 255, 0)], + }) + p.select("pulse2") + run_for(p, wdt, 4000) + + # Test 3: Multi-color pulse cycle + print("Test 3: Multi-color pulse (red -> green -> blue)") + p.edit("pulse3", { + "p": "pulse", + "n1": 300, + "n2": 300, + "n3": 300, + "d": 200, + "c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)], + "a": True, + }) + p.select("pulse3") + run_for(p, wdt, 6000) + + # Test 4: One-shot pulse (auto=False) + print("Test 4: Single pulse, auto=False") + p.edit("pulse4", { + "p": "pulse", + "n1": 400, + "n2": 0, + "n3": 400, + "d": 0, + "c": [(255, 255, 255)], + "a": False, + }) + p.select("pulse4") + # Run long enough to allow one full pulse cycle + run_for(p, wdt, 1500) + + # Cleanup + print("Test complete, turning off") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + run_for(p, wdt, 200) + + +if __name__ == "__main__": + main() + + diff --git a/pico/test/patterns/rainbow.py b/pico/test/patterns/rainbow.py new file mode 100644 index 0000000..7773371 --- /dev/null +++ b/pico/test/patterns/rainbow.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic rainbow with auto=True (continuous) + print("Test 1: Basic rainbow (auto=True, n1=1)") + p.edit("rainbow1", { + "p": "rainbow", + "b": 255, + "d": 100, + "n1": 1, + "a": True, + }) + p.select("rainbow1") + run_for(p, wdt, 3000) + + # Test 2: Fast rainbow + print("Test 2: Fast rainbow (low delay, n1=1)") + p.edit("rainbow2", { + "p": "rainbow", + "d": 50, + "n1": 1, + "a": True, + }) + p.select("rainbow2") + run_for(p, wdt, 2000) + + # Test 3: Slow rainbow + print("Test 3: Slow rainbow (high delay, n1=1)") + p.edit("rainbow3", { + "p": "rainbow", + "d": 500, + "n1": 1, + "a": True, + }) + p.select("rainbow3") + run_for(p, wdt, 3000) + + # Test 4: Low brightness rainbow + print("Test 4: Low brightness rainbow (n1=1)") + p.edit("rainbow4", { + "p": "rainbow", + "b": 64, + "d": 100, + "n1": 1, + "a": True, + }) + p.select("rainbow4") + run_for(p, wdt, 2000) + + # Test 5: Single-step rainbow (auto=False) + print("Test 5: Single-step rainbow (auto=False, n1=1)") + p.edit("rainbow5", { + "p": "rainbow", + "b": 255, + "d": 100, + "n1": 1, + "a": False, + }) + p.step = 0 + for i in range(10): + p.select("rainbow5") + # One tick advances the generator one frame when auto=False + p.tick() + utime.sleep_ms(100) + wdt.feed() + + # Test 6: Verify step updates correctly + print("Test 6: Verify step updates (auto=False, n1=1)") + p.edit("rainbow6", { + "p": "rainbow", + "n1": 1, + "a": False, + }) + initial_step = p.step + p.select("rainbow6") + p.tick() + final_step = p.step + print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)") + + # Test 7: Fast step increment (n1=5) + print("Test 7: Fast rainbow (n1=5, auto=True)") + p.edit("rainbow7", { + "p": "rainbow", + "b": 255, + "d": 100, + "n1": 5, + "a": True, + }) + p.select("rainbow7") + run_for(p, wdt, 2000) + + # Test 8: Very fast step increment (n1=10) + print("Test 8: Very fast rainbow (n1=10, auto=True)") + p.edit("rainbow8", { + "p": "rainbow", + "n1": 10, + "a": True, + }) + p.select("rainbow8") + run_for(p, wdt, 2000) + + # Test 9: Verify n1 controls step increment (auto=False) + print("Test 9: Verify n1 step increment (auto=False, n1=5)") + p.edit("rainbow9", { + "p": "rainbow", + "n1": 5, + "a": False, + }) + p.step = 0 + initial_step = p.step + p.select("rainbow9") + p.tick() + final_step = p.step + expected_step = (initial_step + 5) % 256 + print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})") + if final_step == expected_step: + print("✓ n1 step increment working correctly") + else: + print(f"✗ Step increment mismatch! Expected {expected_step}, got {final_step}") + + # Cleanup + print("Test complete, turning off") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/pico/test/patterns/transition.py b/pico/test/patterns/transition.py new file mode 100644 index 0000000..00149c0 --- /dev/null +++ b/pico/test/patterns/transition.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from presets import Presets + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Presets(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Simple two-color transition + print("Test 1: Two-color transition (red <-> blue, delay=1000)") + p.edit("transition1", { + "p": "transition", + "b": 255, + "d": 1000, # transition duration + "c": [(255, 0, 0), (0, 0, 255)], + "a": True, + }) + p.select("transition1") + run_for(p, wdt, 6000) + + # Test 2: Multi-color transition + print("Test 2: Multi-color transition (red -> green -> blue -> white)") + p.edit("transition2", { + "p": "transition", + "d": 800, + "c": [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 255)], + "a": True, + }) + p.select("transition2") + run_for(p, wdt, 8000) + + # Test 3: One-shot transition (auto=False) + print("Test 3: One-shot transition (auto=False)") + p.edit("transition3", { + "p": "transition", + "d": 1000, + "c": [(255, 0, 0), (0, 255, 0)], + "a": False, + }) + p.select("transition3") + # Run long enough for a single transition step + run_for(p, wdt, 2000) + + # Test 4: Single-color behavior (should just stay on) + print("Test 4: Single-color transition (should hold color)") + p.edit("transition4", { + "p": "transition", + "c": [(0, 0, 255)], + "d": 500, + "a": True, + }) + p.select("transition4") + run_for(p, wdt, 3000) + + # Cleanup + print("Test complete, turning off") + p.edit("cleanup_off", {"p": "off"}) + p.select("cleanup_off") + run_for(p, wdt, 200) + + +if __name__ == "__main__": + main() + + diff --git a/pico/test/rainbow.py b/pico/test/rainbow.py index a00b916..16164fc 100644 --- a/pico/test/rainbow.py +++ b/pico/test/rainbow.py @@ -28,13 +28,13 @@ def hue_to_rgb(hue): return (int(r * 255), int(g * 255), int(b * 255)) -def make_rainbow_double(num_leds, brightness=1.0): - """Build 2 full rainbow cycles (2*num_leds pixels, GRB). Returns (double_buf, strip_len). - head must be in 0..strip_len-1 so DMA reads double_buf[head:head+strip_len] with no copy.""" - n = 2 * num_leds +def make_rainbow_ring(total_leds, brightness=1.0): + """Build one rainbow over the whole ring: 2 full hue cycles over total_leds (GRB). + Returns (double_buf, ring_len_bytes). All strips sample from this so phase is continuous.""" + n = 2 * total_leds double_buf = bytearray(n * 3) for i in range(n): - hue = (i / n) * 360 * 2 + hue = ((i % total_leds) / total_leds) * 360 * 2 r, g, b = hue_to_rgb(hue) g = int(g * brightness) & 0xFF r = int(r * brightness) & 0xFF @@ -43,52 +43,83 @@ def make_rainbow_double(num_leds, brightness=1.0): double_buf[o] = g double_buf[o + 1] = r double_buf[o + 2] = b - strip_len = num_leds * 3 - return (double_buf, strip_len) + ring_len_bytes = total_leds * 3 + return (double_buf, ring_len_bytes) -def show_rainbow(strip, double_buf, strip_len, head): - """DMA reads directly from double_buf at head; no copy. head in 0..strip_len-1.""" - strip.show(double_buf, head) +def make_strip_rainbow(num_leds, cumulative_leds, total_ring_leds, brightness=1.0): + """Per-strip double buffer: pixel j has hue at global position (cumulative_leds + j) % total_ring_leds. + Use same head for all strips: head = rainbow_head % (2*num_leds*3).""" + n = 2 * num_leds + buf = bytearray(n * 3) + for j in range(n): + global_pos = (cumulative_leds + j) % total_ring_leds + hue = (global_pos / total_ring_leds) * 360 * 2 + r, g, b = hue_to_rgb(hue) + g = int(g * brightness) & 0xFF + r = int(r * brightness) & 0xFF + b = int(b * brightness) & 0xFF + o = j * 3 + buf[o] = g + buf[o + 1] = r + buf[o + 2] = b + strip_len_bytes = num_leds * 3 + return (buf, strip_len_bytes) -# --- Strips + rainbow buffers per strip --- +def show_rainbow_segment(strip, buf, strip_len_bytes, head): + """DMA reads strip's segment from buf at head.""" + strip.show(buf, head) + + +# --- Strips + one global ring rainbow (all strips in phase) --- +# Each strip can have a different length; one rainbow spans total_ring_leds so hue is continuous. + +# (pin, num_leds) per strip — lengths differ per segment +STRIP_CONFIG = ( + (2, 291), + (3, 290), + (4, 283), + (7, 278), + (0, 275), + (28, 278), + (29, 283), + (6, 290), +) strips = [] -pins = ((2, 291), - (3, 290), - (4, 283), - (7, 278), - (0, 275), - (28, 278), - (29, 283), - (6, 290)) sm = 0 -for pin, num_leds in pins: +for pin, num_leds in STRIP_CONFIG: print(pin, num_leds) ws = WS2812B(num_leds, pin, sm, brightness=1.0) # 1.0 so fill() is visible strips.append(ws) sm += 1 -# One rainbow double buffer per strip (num_leds can differ); no transfer buffer -now = time.ticks_ms() -rainbow_data = [make_rainbow_double(ws.num_leds, ws.brightness) for ws in strips] -# Cumulative LEDs before each strip so rainbow lines up around the ring +# Cumulative LED count before each strip; total ring size cumulative_leds = [0] for ws in strips[:-1]: cumulative_leds.append(cumulative_leds[-1] + ws.num_leds) -# Global phase (bytes); each strip gets head = (phase + cumulative_leds * 3) % strip_len total_ring_leds = cumulative_leds[-1] + strips[-1].num_leds bytes_per_cycle = total_ring_leds * 3 + +# Per-strip rainbow buffers: each strip's segment of the ring (same phase, no shared-buffer DMA) +now = time.ticks_ms() +rainbow_data = [ + make_strip_rainbow(ws.num_leds, cumulative_leds[i], total_ring_leds, ws.brightness) + for i, ws in enumerate(strips) +] print(time.ticks_diff(time.ticks_ms(), now), "ms") + rainbow_head = 0 step = 3 while True: now = time.ticks_ms() - for i, (strip, (double_buf, strip_len)) in enumerate(zip(strips, rainbow_data)): - head = (rainbow_head + cumulative_leds[i] * 3) % strip_len - show_rainbow(strip, double_buf, strip_len, head) + for i, (strip, (buf, strip_len_bytes)) in enumerate(zip(strips, rainbow_data)): + # Same head for all: each strip's buffer is already offset by cumulative_leds[i] + double_len_bytes = 2 * strip.num_leds * 3 + head = rainbow_head % double_len_bytes + show_rainbow_segment(strip, buf, strip_len_bytes, head) rainbow_head = (rainbow_head + step) % bytes_per_cycle #print(time.ticks_diff(time.ticks_ms(), now), "ms") time.sleep_ms(10) diff --git a/pico/test/test_espnow_receive.py b/pico/test/test_espnow_receive.py new file mode 100644 index 0000000..6e05e91 --- /dev/null +++ b/pico/test/test_espnow_receive.py @@ -0,0 +1,694 @@ +#!/usr/bin/env python3 +"""Test ESPNow receive functionality - runs on MicroPython device.""" +import json +import os +import utime +from settings import Settings +from presets import Presets +from utils import convert_and_reorder_colors + + +class MockESPNow: + """Mock ESPNow for testing that can send messages.""" + def __init__(self): + self.messages = [] + self.active_state = False + + def active(self, state): + self.active_state = state + + def any(self): + """Return True if there are messages.""" + return len(self.messages) > 0 + + def recv(self): + """Receive a message (removes it from queue).""" + if self.messages: + return self.messages.pop(0) + return None, None + + def send_message(self, host, msg_data): + """Send a message by adding it to the queue (testing helper).""" + if isinstance(msg_data, dict): + msg = json.dumps(msg_data) + else: + msg = msg_data + self.messages.append((host, msg)) + + def clear(self): + """Clear all messages (testing helper).""" + self.messages = [] + + +from machine import WDT + +def get_wdt(): + """Get a real WDT instance for tests.""" + return WDT(timeout=10000) # 10 second timeout for tests + + +def run_main_loop_iterations(espnow, patterns, settings, wdt, max_iterations=10): + """Run main loop iterations until no messages or max reached.""" + iterations = 0 + results = [] + + while iterations < max_iterations: + wdt.feed() + patterns.tick() + + if espnow.any(): + host, msg = espnow.recv() + data = json.loads(msg) + + if data.get("v") != "1": + results.append(("version_rejected", data)) + continue + + if "presets" in data: + for name, preset_data in data["presets"].items(): + # Convert hex color strings to RGB tuples and reorder based on device color order + if "colors" in preset_data: + preset_data["colors"] = convert_and_reorder_colors(preset_data["colors"], settings) + patterns.edit(name, preset_data) + results.append(("presets_processed", list(data["presets"].keys()))) + + if settings.get("name") in data.get("select", {}): + select_list = data["select"][settings.get("name")] + # Select value is always a list: ["preset_name"] or ["preset_name", step] + if select_list: + preset_name = select_list[0] + step = select_list[1] if len(select_list) > 1 else None + if patterns.select(preset_name, step=step): + results.append(("selected", preset_name)) + + iterations += 1 + + # Stop if no more messages + if not espnow.any(): + break + + return results + + +def test_version_check(): + """Test that messages with wrong version are rejected.""" + print("Test 1: Version check") + settings = Settings() + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Send message with wrong version + mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "2", "presets": {"test": {"pattern": "on"}}}) + results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + assert len([r for r in results if r[0] == "version_rejected"]) > 0, "Should reject wrong version" + assert "test" not in patterns.presets, "Preset should not be created" + print(" ✓ Version check passed") + + # Send message with correct version + mock_espnow.clear() + mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "1", "presets": {"test": {"pattern": "on"}}}) + results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + assert len([r for r in results if r[0] == "presets_processed"]) > 0, "Should process correct version" + assert "test" in patterns.presets, "Preset should be created" + print(" ✓ Correct version accepted") + + +def test_preset_creation(): + """Test preset creation from ESPNow messages.""" + print("\nTest 2: Preset creation") + settings = Settings() + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + msg = { + "v": "1", + "presets": { + "test_blink": { + "pattern": "blink", + "colors": ["#FF0000", "#00FF00"], + "delay": 200, + "brightness": 128 + }, + "test_rainbow": { + "pattern": "rainbow", + "delay": 100, + "n1": 2 + } + } + } + + mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + assert "test_blink" in patterns.presets, "test_blink preset should exist" + assert "test_rainbow" in patterns.presets, "test_rainbow preset should exist" + + # Check preset values + blink_preset = patterns.presets["test_blink"] + assert blink_preset.pattern == "blink", "Pattern should be blink" + assert blink_preset.delay == 200, "Delay should be 200" + assert blink_preset.brightness == 128, "Brightness should be 128" + + rainbow_preset = patterns.presets["test_rainbow"] + assert rainbow_preset.pattern == "rainbow", "Pattern should be rainbow" + assert rainbow_preset.n1 == 2, "n1 should be 2" + + print(" ✓ Presets created correctly") + + +def test_color_conversion(): + """Test hex color string conversion and reordering.""" + print("\nTest 3: Color conversion") + settings = Settings() + settings["color_order"] = "rgb" # Default RGB order + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + msg = { + "v": "1", + "presets": { + "test_colors": { + "pattern": "on", + "colors": ["#FF0000", "#00FF00", "#0000FF"] # Red, Green, Blue + } + } + } + + mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + preset = patterns.presets["test_colors"] + assert len(preset.colors) == 3, "Should have 3 colors" + assert preset.colors[0] == (255, 0, 0), "First color should be red (255,0,0)" + assert preset.colors[1] == (0, 255, 0), "Second color should be green (0,255,0)" + assert preset.colors[2] == (0, 0, 255), "Third color should be blue (0,0,255)" + print(" ✓ Colors converted correctly (RGB order)") + + # Test GRB order + settings["color_order"] = "grb" + patterns2 = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow2 = MockESPNow() + msg2 = { + "v": "1", + "presets": { + "test_grb": { + "pattern": "on", + "colors": ["#FF0000"] # Red in RGB, should become (0, 255, 0) in GRB + } + } + } + mock_espnow2.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2) + wdt2 = get_wdt() + run_main_loop_iterations(mock_espnow2, patterns2, settings, wdt2) + preset2 = patterns2.presets["test_grb"] + assert preset2.colors[0] == (0, 255, 0), "GRB: Red should become green (0,255,0)" + print(" ✓ Colors reordered correctly (GRB order)") + + +def test_preset_update(): + """Test that editing an existing preset updates it.""" + print("\nTest 4: Preset update") + settings = Settings() + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Create initial preset + msg1 = { + "v": "1", + "presets": { + "test_update": { + "pattern": "blink", + "delay": 100, + "brightness": 64 + } + } + } + mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg1) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.presets["test_update"].delay == 100, "Initial delay should be 100" + + # Update preset + mock_espnow.clear() + msg2 = { + "v": "1", + "presets": { + "test_update": { + "pattern": "blink", + "delay": 200, + "brightness": 128 + } + } + } + mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg2) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.presets["test_update"].delay == 200, "Updated delay should be 200" + assert patterns.presets["test_update"].brightness == 128, "Updated brightness should be 128" + print(" ✓ Preset updated correctly") + + +def test_select(): + """Test preset selection.""" + print("\nTest 5: Preset selection") + settings = Settings() + settings["name"] = "device1" + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Create presets + msg1 = { + "v": "1", + "presets": { + "preset1": {"pattern": "on", "colors": [(255, 0, 0)]}, + "preset2": {"pattern": "rainbow", "delay": 50} + } + } + mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg1) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + # Select preset + mock_espnow.clear() + msg2 = { + "v": "1", + "select": { + "device1": ["preset1"], + "device2": ["preset2"] + } + } + mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg2) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "preset1", "Should select preset1" + print(" ✓ Preset selected correctly") + + +def test_full_message(): + """Test a full message with presets and select.""" + print("\nTest 6: Full message (presets + select)") + settings = Settings() + settings["name"] = "test_device" + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + msg = { + "v": "1", + "presets": { + "my_preset": { + "pattern": "pulse", + "colors": ["#FF0000", "#00FF00"], + "delay": 150, + "n1": 500, + "n2": 200, + "n3": 500 + } + }, + "select": { + "test_device": ["my_preset"], + "other_device": ["other_preset"] + } + } + + mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + assert "my_preset" in patterns.presets, "Preset should be created" + assert patterns.selected == "my_preset", "Preset should be selected" + + preset = patterns.presets["my_preset"] + assert preset.pattern == "pulse", "Pattern should be pulse" + assert preset.delay == 150, "Delay should be 150" + assert preset.n1 == 500, "n1 should be 500" + print(" ✓ Full message processed correctly") + + +def test_switch_presets(): + """Test switching between different presets.""" + print("\nTest 7: Switch between presets") + settings = Settings() + settings["name"] = "switch_device" + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Create multiple presets + msg1 = { + "v": "1", + "presets": { + "preset_blink": {"pattern": "blink", "delay": 200, "colors": [(255, 0, 0)]}, + "preset_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 2}, + "preset_pulse": {"pattern": "pulse", "delay": 150, "n1": 500, "n2": 200, "n3": 500} + } + } + mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg1) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + # Select and run first preset for 2 seconds + mock_espnow.clear() + msg2 = { + "v": "1", + "select": { + "switch_device": ["preset_blink"] + } + } + mock_espnow.send_message(b"\x66\x66\x66\x66\x66\x66", msg2) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "preset_blink", "Should select preset_blink" + print(" ✓ Selected preset_blink, running for 2 seconds...") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 2000: + wdt.feed() + patterns.tick() + utime.sleep_ms(10) + + # Switch to second preset and run for 2 seconds + mock_espnow.clear() + msg3 = { + "v": "1", + "select": { + "switch_device": ["preset_rainbow"] + } + } + mock_espnow.send_message(b"\x77\x77\x77\x77\x77\x77", msg3) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "preset_rainbow", "Should switch to preset_rainbow" + print(" ✓ Switched to preset_rainbow, running for 2 seconds...") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 2000: + wdt.feed() + patterns.tick() + utime.sleep_ms(10) + + # Switch to third preset and run for 2 seconds + mock_espnow.clear() + msg4 = { + "v": "1", + "select": { + "switch_device": ["preset_pulse"] + } + } + mock_espnow.send_message(b"\x88\x88\x88\x88\x88\x88", msg4) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "preset_pulse", "Should switch to preset_pulse" + print(" ✓ Switched to preset_pulse, running for 2 seconds...") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 2000: + wdt.feed() + patterns.tick() + utime.sleep_ms(10) + + # Switch back to first preset and run for 2 seconds + mock_espnow.clear() + msg5 = { + "v": "1", + "select": { + "switch_device": ["preset_blink"] + } + } + mock_espnow.send_message(b"\x99\x99\x99\x99\x99\x99", msg5) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "preset_blink", "Should switch back to preset_blink" + print(" ✓ Switched back to preset_blink, running for 2 seconds...") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 2000: + wdt.feed() + patterns.tick() + utime.sleep_ms(10) + + print(" ✓ Preset switching works correctly") + + +def test_beat_functionality(): + """Test beat functionality - calling select() again with same preset restarts pattern.""" + print("\nTest 8: Beat functionality") + settings = Settings() + settings["name"] = "beat_device" + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Create presets with manual mode + msg1 = { + "v": "1", + "presets": { + "beat_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False}, + "beat_chase": {"pattern": "chase", "delay": 200, "n1": 4, "n2": 4, "n3": 2, "n4": 1, "auto": False}, + "beat_pulse": {"pattern": "pulse", "delay": 150, "n1": 300, "n2": 100, "n3": 300, "auto": False} + } + } + mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + # Test 1: Beat with rainbow (manual mode) - should advance one step per beat + print(" Test 8.1: Beat with rainbow (manual mode)") + patterns.step = 0 + mock_espnow.clear() + msg2 = { + "v": "1", + "select": { + "beat_device": ["beat_rainbow"] + } + } + mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "beat_rainbow", "Should select beat_rainbow" + initial_step = patterns.step + + # First beat - advance one step + mock_espnow.clear() + mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg2) # Same select message = beat + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5) + # tick() is already called in run_main_loop_iterations, so step should be incremented + assert patterns.step == (initial_step + 1) % 256, f"Step should increment from {initial_step} to {(initial_step + 1) % 256}, got {patterns.step}" + + # Second beat - advance another step + mock_espnow.clear() + mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2) # Beat again + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5) + assert patterns.step == (initial_step + 2) % 256, f"Step should increment to {(initial_step + 2) % 256}, got {patterns.step}" + print(" ✓ Rainbow beat advances one step per beat") + + # Test 2: Beat with chase (manual mode) - should advance one step per beat + print(" Test 8.2: Beat with chase (manual mode)") + patterns.step = 0 + mock_espnow.clear() + msg3 = { + "v": "1", + "select": { + "beat_device": ["beat_chase"] + } + } + mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg3) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "beat_chase", "Should select beat_chase" + initial_step = patterns.step + + # First beat + mock_espnow.clear() + mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg3) # Beat + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5) + # tick() is already called in run_main_loop_iterations + assert patterns.step == initial_step + 1, f"Chase step should increment from {initial_step} to {initial_step + 1}, got {patterns.step}" + + # Second beat + mock_espnow.clear() + mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg3) # Beat again + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5) + assert patterns.step == initial_step + 2, f"Chase step should increment to {initial_step + 2}, got {patterns.step}" + print(" ✓ Chase beat advances one step per beat") + + # Test 3: Beat with pulse (manual mode) - should restart full cycle + print(" Test 8.3: Beat with pulse (manual mode)") + mock_espnow.clear() + msg4 = { + "v": "1", + "select": { + "beat_device": ["beat_pulse"] + } + } + mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg4) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.selected == "beat_pulse", "Should select beat_pulse" + assert patterns.generator is not None, "Generator should be active" + + # First beat - should restart generator + initial_generator = patterns.generator + mock_espnow.clear() + mock_espnow.send_message(b"\x33\x33\x33\x33\x33\x33", msg4) # Beat + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + assert patterns.generator is not None, "Generator should still be active after beat" + assert patterns.generator != initial_generator, "Generator should be restarted (new instance)" + print(" ✓ Pulse beat restarts generator for full cycle") + + # Test 4: Multiple beats in sequence + print(" Test 8.4: Multiple beats in sequence") + patterns.step = 0 + mock_espnow.clear() + mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg2) # Select rainbow + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + # Send 5 beats + for i in range(5): + mock_espnow.clear() + mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg2) # Beat + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5) + # tick() is already called in run_main_loop_iterations + wdt.feed() + utime.sleep_ms(50) + + assert patterns.step == 5, f"After 5 beats, step should be 5, got {patterns.step}" + print(" ✓ Multiple beats work correctly") + + print(" ✓ Beat functionality works correctly") + + +def test_select_with_step(): + """Test selecting a preset with an explicit step value.""" + print("\nTest 9: Select with step value") + settings = Settings() + settings["name"] = "step_device" + patterns = Presets(settings["led_pin"], settings["num_leds"]) + mock_espnow = MockESPNow() + wdt = get_wdt() + + # Create preset + msg1 = { + "v": "1", + "presets": { + "step_preset": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False} + } + } + mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt) + + # Select with explicit step value + mock_espnow.clear() + msg2 = { + "v": "1", + "select": { + "step_device": ["step_preset", 10] + } + } + mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) + # Ensure tick() is called after select() to advance the step + patterns.tick() + + assert patterns.selected == "step_preset", "Should select step_preset" + # Step is set to 10, then tick() advances it, so it should be 11 + assert patterns.step == 11, f"Step should be set to 10 then advanced to 11 by tick(), got {patterns.step}" + print(" ✓ Step value set correctly") + + # Select without step (should use default behavior) + mock_espnow.clear() + msg3 = { + "v": "1", + "select": { + "step_device": ["step_preset"] + } + } + mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg3) + initial_step = patterns.step # Should be 11 + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) + # Ensure tick() is called after select() to advance the step + patterns.tick() + # Since it's the same preset, step should not be reset, but tick() will advance it + # So step should be initial_step + 1 (one tick call) + assert patterns.step == initial_step + 1, f"Step should advance from {initial_step} to {initial_step + 1} (not reset), got {patterns.step}" + print(" ✓ Step preserved when selecting same preset without step (tick advances it)") + + # Select different preset with step + patterns.edit("other_preset", {"p": "rainbow", "a": False}) + mock_espnow.clear() + msg4 = { + "v": "1", + "select": { + "step_device": ["other_preset", 5] + } + } + mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg4) + run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) + # Ensure tick() is called after select() to advance the step + patterns.tick() + + assert patterns.selected == "other_preset", "Should select other_preset" + # Step is set to 5, then tick() advances it, so it should be 6 + assert patterns.step == 6, f"Step should be set to 5 then advanced to 6 by tick(), got {patterns.step}" + print(" ✓ Step set correctly when switching presets") + + +def test_preset_save_load(): + """Test saving and loading presets to/from JSON.""" + print("\nTest 10: Preset save/load") + settings = Settings() + patterns = Presets(settings["led_pin"], settings["num_leds"]) + + patterns.edit("saved_preset", { + "p": "blink", + "d": 150, + "b": 200, + "c": [(1, 2, 3), (4, 5, 6)], + "a": False, + "n1": 1, + "n2": 2, + "n3": 3, + "n4": 4, + "n5": 5, + "n6": 6, + }) + assert patterns.save(), "Save should return True" + + reloaded = Presets(settings["led_pin"], settings["num_leds"]) + assert reloaded.load(), "Load should return True" + + preset = reloaded.presets.get("saved_preset") + assert preset is not None, "Preset should be loaded" + assert preset.p == "blink", "Pattern should be blink" + assert preset.d == 150, "Delay should be 150" + assert preset.b == 200, "Brightness should be 200" + assert preset.c == [(1, 2, 3), (4, 5, 6)], "Colors should be restored as tuples" + assert preset.a is False, "Auto should be False" + assert (preset.n1, preset.n2, preset.n3, preset.n4, preset.n5, preset.n6) == (1, 2, 3, 4, 5, 6), "n1-n6 should match" + try: + os.remove("presets.json") + except OSError: + pass + print(" ✓ Preset save/load works correctly") + + +def main(): + """Run all tests.""" + print("=" * 60) + print("ESPNow Receive Functionality Tests") + print("=" * 60) + + try: + test_version_check() + test_preset_creation() + test_color_conversion() + test_preset_update() + test_select() + test_full_message() + test_switch_presets() + test_beat_functionality() + test_select_with_step() + test_preset_save_load() + + print("\n" + "=" * 60) + print("All tests passed! ✓") + print("=" * 60) + except AssertionError as e: + print("\n✗ Test failed:", e) + raise + except Exception as e: + print("\n✗ Unexpected error:", e) + raise + + +if __name__ == "__main__": + main()