Import led-driver app: pico/ and esp32/ layout

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-02-19 18:14:17 +13:00
parent 86b28a1b9c
commit 0c73d56ab5
31 changed files with 2907 additions and 54 deletions

View File

@@ -1,2 +1,30 @@
# led-bar
## Recovery: when the board is stuck and you have to nuke the flash
**Option A clear startup files (no reflash)**
If the board runs but REPL/Thonny is blocked by `main.py` or `boot.py`, remove them so the next boot drops straight to REPL. From your PC (with the Pico connected via USB):
```bash
# Remove startup files (paths are on the device; try without colons if one form fails)
mpremote fs rm boot.py
mpremote fs rm main.py
mpremote reset
```
If that fails, try one of these:
```bash
mpremote rm boot.py
mpremote rm main.py
```
Or in **Thonny**: Stop the running program (Ctrl+C or Stop button), then **View → Files**, rightclick the device, delete `boot.py` and `main.py` on the device, then **Tools → Reset**.
If the board doesnt respond to serial at all, use Option B.
**Option B full flash erase (Pico 2)**
1. Unplug the Pico 2.
2. Hold **BOOTSEL**, plug USB in, then release BOOTSEL.
3. It should mount as a drive. Delete any existing UF2 if you want a clean state.
4. Copy the MicroPython UF2 for Pico 2 (RP2350) onto the drive. The board will reboot with a fresh install and empty filesystem.

108
dev.py
View File

@@ -1,33 +1,91 @@
#!/usr/bin/env python3
import os
import subprocess
import serial
import sys
print(sys.argv)
port = sys.argv[1]
cmd = sys.argv[1]
for cmd in sys.argv[1:]:
print(cmd)
match cmd:
case "src":
subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", ".", ":" ], cwd="src")
case "lib":
subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", "lib", ":" ])
case "ls":
subprocess.call(["mpremote", "connect", port, "fs", "ls", ":" ])
case "reset":
with serial.Serial(port, baudrate=115200) as ser:
ser.write(b'\x03\x03\x04')
case "follow":
with serial.Serial(port, baudrate=115200) as ser:
while True:
if ser.in_waiting > 0: # Check if there is data in the buffer
data = ser.readline().decode('utf-8').strip() # Read and decode the data
print(data)
import serial
def usage() -> None:
print("Usage:")
print(" dev.py <port> <device> [src] [lib] [reset] [follow]")
print(" e.g. dev.py /dev/ttyUSB0 pico src lib")
print(" e.g. dev.py /dev/ttyUSB0 esp32 src reset follow")
print(" device: pico | esp32. If no src/lib given, deploys both.")
def main() -> None:
if len(sys.argv) < 3:
usage()
return
port = sys.argv[1]
device = sys.argv[2].lower()
actions = [a.lower() for a in sys.argv[3:]]
if port.startswith("/") or (len(port) >= 3 and port.upper().startswith("COM")):
pass
else:
print("First argument must be serial port (e.g. /dev/ttyUSB0 or COM3).")
usage()
return
if device not in ("pico", "esp32"):
print("Device must be pico or esp32.")
usage()
return
if not actions:
actions = ["src", "lib"]
src_dir = f"{device}/src"
lib_dir = f"{device}/lib"
for a in actions:
print(a)
match a:
case "src":
if os.path.isdir(src_dir):
# Ensure remote directories exist before copying files
created_dirs: set[str] = set()
for dirpath, _, filenames in os.walk(src_dir):
for name in filenames:
path = os.path.join(dirpath, name)
rel = os.path.relpath(path, src_dir).replace(os.sep, "/")
remote_dir = ""
if "/" in rel:
remote_dir = rel.rsplit("/", 1)[0]
if remote_dir and remote_dir not in created_dirs:
subprocess.call(
["mpremote", "connect", port, "fs", "mkdir", ":" + remote_dir],
)
created_dirs.add(remote_dir)
subprocess.call(
["mpremote", "connect", port, "fs", "cp", path, ":" + rel],
)
else:
print(" (no src dir)")
case "lib":
if os.path.isdir(lib_dir):
subprocess.call(
["mpremote", "connect", port, "fs", "cp", "-r", lib_dir, ":"],
)
else:
print(" (no lib dir)")
case "reset":
with serial.Serial(port, baudrate=115200) as ser:
ser.write(b"\x03\x03\x04")
case "follow":
with serial.Serial(port, baudrate=115200) as ser:
while True:
if ser.in_waiting > 0:
data = ser.readline().decode("utf-8").strip()
print(data)
case _:
print("Unknown action:", a)
usage()
if __name__ == "__main__":
main()

34
esp32/src/main.py Normal file
View File

@@ -0,0 +1,34 @@
"""
XIAO ESP32-C6: ESPNOW -> UART passthrough to Pico.
Receives messages via ESPNOW, forwards them unchanged to UART (GPIO17).
UART at 921600 baud. LED on GPIO15 blinks on activity.
"""
import network
import espnow
import machine
import time
# UART: TX on GPIO17 -> Pico RX, max baud for throughput
UART_BAUD = 921600
uart = machine.UART(1, baudrate=UART_BAUD, tx=17)
led = machine.Pin(15, machine.Pin.OUT)
# WLAN must be active for ESPNOW (no need to connect)
sta = network.WLAN(network.WLAN.IF_STA)
sta.active(True)
sta.disconnect()
e = espnow.ESPNow()
e.active(True)
# No peers needed to receive; add_peer() only for send()
# Recv timeout 0 = non-blocking
print("ESP32: ESPNOW -> UART passthrough, %d baud" % UART_BAUD)
while True:
mac, msg = e.irecv(0)
if msg:
uart.write(msg)
led.value(1)
else:
led.value(0)
time.sleep_ms(1)

View File

@@ -0,0 +1,64 @@
"""
ESP32-C6 test: send JSON messages to Pico over UART (GPIO17).
Settings use strips = [[pin, num_leds], ...]. Run with Pico connected on RX.
Run with mpremote (from repo root):
./esp32/run_test_uart_json.sh
# or
mpremote run esp32/test/test_uart_send_json.py
# or with port
mpremote connect /dev/ttyUSB0 run esp32/test/test_uart_send_json.py
"""
import machine
import time
import json
UART_TX_PIN = 17
UART_BAUD = 921600
LED_PIN = 15
def send_json(uart, obj):
line = json.dumps(obj) + "\n"
uart.write(line)
print("TX:", line.strip())
def main():
uart = machine.UART(1, baudrate=UART_BAUD, tx=UART_TX_PIN)
led = machine.Pin(LED_PIN, machine.Pin.OUT)
# 1) Settings: one strip, pin 2, 10 LEDs (list of lists)
send_json(uart, {
"v": 1,
"settings": {
"strips": [[2, 10]],
"brightness": 30,
},
})
led.value(1)
time.sleep(0.2)
led.value(0)
time.sleep(0.3)
# 2) led-controller format: light + settings.color (hex)
send_json(uart, {"light": "strip1", "settings": {"color": "#FF0000"}, "save": False})
time.sleep(0.5)
# 3) led-controller format: light + settings.r,g,b
send_json(uart, {"light": "strip1", "settings": {"r": 0, "g": 255, "b": 0}, "save": False})
time.sleep(0.5)
# 4) led-controller format: blue (hex)
send_json(uart, {"light": "strip1", "settings": {"color": "#0000FF"}, "save": False})
time.sleep(0.5)
# 5) Off (existing format)
send_json(uart, {"v": 1, "off": True})
time.sleep(0.3)
print("Done. Pico: settings -> red (hex) -> green (r,g,b) -> blue (hex) -> off.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,32 @@
"""
ESP32-C6 UART TX + LED test. Sends a few commands on GPIO17, blinks LED on GPIO15.
Run on device: exec(open('test/test_uart_tx').read()) or import test.test_uart_tx
Does not require Pico connected.
"""
import machine
import time
UART_TX_PIN = 17
LED_PIN = 15
def main():
uart = machine.UART(1, baudrate=115200, tx=UART_TX_PIN)
led = machine.Pin(LED_PIN, machine.Pin.OUT)
def send(cmd):
uart.write(cmd + "\n")
print("TX:", cmd)
# Blink and send a short command sequence
commands = ["off", "fill 255 0 0", "fill 0 255 0", "fill 0 0 255", "off"]
for i, cmd in enumerate(commands):
led.value(1)
send(cmd)
time.sleep(0.3)
led.value(0)
time.sleep(0.2)
print("Done. Connect Pico to see strip follow commands.")
if __name__ == "__main__":
main()

102
main.py Normal file
View File

@@ -0,0 +1,102 @@
# """
# Pico: receive led-driver JSON from UART (one message per line). Runs Presets + patterns.
# UART RX on D7 (GPIO1). Non-blocking so presets.tick() runs every loop.
# """
# from settings import Settings
# from machine import UART, Pin
# import utime
# from presets import Presets
# from utils import convert_and_reorder_colors
# import json
# # UART (Pico XIAO: D7 = GPIO1)
# UART_RX_PIN = 1
# UART_BAUD = 115200
# UART_ID = 0
# settings = Settings()
# print(settings)
# presets = Presets(settings["led_pin"], settings["num_leds"])
# presets.load()
# presets.b = settings.get("brightness", 255)
# startup_preset = settings.get("startup_preset")
# if startup_preset:
# presets.select(startup_preset)
# print("Selected startup preset:", startup_preset)
# last_brightness_save = 0
# # Non-blocking UART
# uart = UART(UART_ID, baudrate=UART_BAUD, rx=Pin(UART_RX_PIN), rxbuf=512, timeout=0)
# uart_buf = bytearray()
# print("UART RX on pin %s, %s baud (one JSON object per line)" % (UART_RX_PIN, UART_BAUD))
# def process_message(data):
# """Handle one JSON message (led-driver protocol: v, b, presets, select, default, save)."""
# if data.get("v") != "1":
# return
# global last_brightness_save
# if "b" in data:
# try:
# presets.b = max(0, min(255, int(data["b"])))
# settings["brightness"] = presets.b
# now = utime.ticks_ms()
# if utime.ticks_diff(now, last_brightness_save) >= 500:
# settings.save()
# last_brightness_save = now
# except (TypeError, ValueError):
# pass
# if "presets" in data:
# for id, preset_data in data["presets"].items():
# if "c" in preset_data:
# preset_data["c"] = convert_and_reorder_colors(preset_data["c"], settings)
# presets.edit(id, preset_data)
# print("Edited preset", id, preset_data.get("name", ""))
# if settings.get("name") in data.get("select", {}):
# select_list = data["select"][settings.get("name")]
# if select_list:
# preset_name = select_list[0]
# step = select_list[1] if len(select_list) > 1 else None
# presets.select(preset_name, step=step)
# if "default" in data:
# settings["startup_preset"] = data["default"]
# print("Set startup preset to", data["default"])
# settings.save()
# if "save" in data:
# presets.save()
# while True:
# presets.tick()
# n = uart.any()
# if n:
# data_in = uart.read(n)
# if data_in:
# for b in data_in:
# if b in (0x0A, 0x0D): # LF or CR
# if uart_buf:
# try:
# msg = uart_buf.decode("utf-8").strip()
# if msg:
# data = json.loads(msg)
# process_message(data)
# except (ValueError, UnicodeError):
# pass
# uart_buf = bytearray()
# else:
# if len(uart_buf) < 1024:
# uart_buf.append(b)
# utime.sleep_ms(1)
from neopixel import NeoPixel
from machine import Pin
pins = ((2,270), (3,271), (4,272), (0,273), (7,274), (6,275), (29,276), (28,277))
for pin, num_leds in pins:
print(pin, num_leds)
np = NeoPixel(Pin(pin), num_leds)
np.fill((8, 0, 0))
np.write()

106
pico/src/main.py Normal file
View File

@@ -0,0 +1,106 @@
# """
# Pico: receive led-driver JSON from UART (one message per line). Runs Presets + patterns.
# UART RX on D7 (GPIO1). Non-blocking so presets.tick() runs every loop.
# """
# from settings import Settings
# from machine import UART, Pin
# import utime
# from presets import Presets
# from utils import convert_and_reorder_colors
# import json
# # UART (Pico XIAO: D7 = GPIO1)
# UART_RX_PIN = 1
# UART_BAUD = 115200
# UART_ID = 0
# settings = Settings()
# print(settings)
# presets = Presets(settings["led_pin"], settings["num_leds"])
# presets.load()
# presets.b = settings.get("brightness", 255)
# startup_preset = settings.get("startup_preset")
# if startup_preset:
# presets.select(startup_preset)
# print("Selected startup preset:", startup_preset)
# last_brightness_save = 0
# # Non-blocking UART
# uart = UART(UART_ID, baudrate=UART_BAUD, rx=Pin(UART_RX_PIN), rxbuf=512, timeout=0)
# uart_buf = bytearray()
# print("UART RX on pin %s, %s baud (one JSON object per line)" % (UART_RX_PIN, UART_BAUD))
# def process_message(data):
# """Handle one JSON message (led-driver protocol: v, b, presets, select, default, save)."""
# if data.get("v") != "1":
# return
# global last_brightness_save
# if "b" in data:
# try:
# presets.b = max(0, min(255, int(data["b"])))
# settings["brightness"] = presets.b
# now = utime.ticks_ms()
# if utime.ticks_diff(now, last_brightness_save) >= 500:
# settings.save()
# last_brightness_save = now
# except (TypeError, ValueError):
# pass
# if "presets" in data:
# for id, preset_data in data["presets"].items():
# if "c" in preset_data:
# preset_data["c"] = convert_and_reorder_colors(preset_data["c"], settings)
# presets.edit(id, preset_data)
# print("Edited preset", id, preset_data.get("name", ""))
# if settings.get("name") in data.get("select", {}):
# select_list = data["select"][settings.get("name")]
# if select_list:
# preset_name = select_list[0]
# step = select_list[1] if len(select_list) > 1 else None
# presets.select(preset_name, step=step)
# if "default" in data:
# settings["startup_preset"] = data["default"]
# print("Set startup preset to", data["default"])
# settings.save()
# if "save" in data:
# presets.save()
# while True:
# presets.tick()
# n = uart.any()
# if n:
# data_in = uart.read(n)
# if data_in:
# for b in data_in:
# if b in (0x0A, 0x0D): # LF or CR
# if uart_buf:
# try:
# msg = uart_buf.decode("utf-8").strip()
# if msg:
# data = json.loads(msg)
# process_message(data)
# except (ValueError, UnicodeError):
# pass
# uart_buf = bytearray()
# else:
# if len(uart_buf) < 1024:
# uart_buf.append(b)
# utime.sleep_ms(1)
from neopixel import NeoPixel
from machine import Pin
from ws2812 import WS2812B
sm = 0
pins = ((2,270), (3,271), (4,272), (0,273), (7,274), (6,275), (29,276), (28,277))
for pin, num_leds in pins:
print(pin, num_leds)
np = WS2812B(num_leds, pin, sm, 0.1)
sm += 1
np.fill((8, 0, 0))
np.show()

16
pico/src/p2p.py Normal file
View File

@@ -0,0 +1,16 @@
import asyncio
import aioespnow
import json
async def p2p(settings, patterns):
e = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support
e.active(True)
async for mac, msg in e:
try:
data = json.loads(msg)
except:
print(f"Failed to load espnow data {msg}")
continue
if "names" not in data or settings.get("name") in data.get("names", []):
await settings.set_settings(data.get("settings", {}), patterns, data.get("save", False))

View File

@@ -0,0 +1,6 @@
from .blink import Blink
from .rainbow import Rainbow
from .pulse import Pulse
from .transition import Transition
from .chase import Chase
from .circle import Circle

View File

@@ -0,0 +1,33 @@
import utime
class Blink:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Blink pattern: toggles LEDs on/off using preset delay, cycling through colors."""
# Use provided colors, or default to white if none
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
state = True # True = on, False = off
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
# Re-read delay each loop so live updates to preset.d take effect
delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms:
if state:
base_color = colors[color_index % len(colors)]
color = self.driver.apply_brightness(base_color, preset.b)
self.driver.fill(color)
# Advance to next color for the next "on" phase
color_index += 1
else:
# "Off" phase: turn all LEDs off
self.driver.fill((0, 0, 0))
state = not state
last_update = current_time
# Yield once per tick so other logic can run
yield

124
pico/src/patterns/chase.py Normal file
View File

@@ -0,0 +1,124 @@
import utime
class Chase:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating.
Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)"""
colors = preset.c
if len(colors) < 1:
# Need at least 1 color
return
# Access colors, delay, and n values from preset
if not colors:
return
# If only one color provided, use it for both colors
if len(colors) < 2:
color0 = colors[0]
color1 = colors[0]
else:
color0 = colors[0]
color1 = colors[1]
color0 = self.driver.apply_brightness(color0, preset.b)
color1 = self.driver.apply_brightness(color1, preset.b)
n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1
n3 = int(preset.n3) # Step movement on even steps (can be negative)
n4 = int(preset.n4) # Step movement on odd steps (can be negative)
segment_length = n1 + n2
# Calculate position from step_count
step_count = self.driver.step
# Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc.
if step_count % 2 == 0:
# Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3
position = (step_count // 2) * (n3 + n4) + n3
else:
# Odd steps: ((step_count+1)//2) pairs of (n3+n4)
position = ((step_count + 1) // 2) * (n3 + n4)
# Wrap position to keep it reasonable
max_pos = self.driver.num_leds + segment_length
position = position % max_pos
if position < 0:
position += max_pos
# If auto is False, run a single step and then stop
if not preset.a:
# Clear all LEDs
self.driver.n.fill((0, 0, 0))
# Draw repeating pattern starting at position
for i in range(self.driver.num_leds):
# Calculate position in the repeating segment
relative_pos = (i - position) % segment_length
if relative_pos < 0:
relative_pos = (relative_pos + segment_length) % segment_length
# Determine which color based on position in segment
if relative_pos < n1:
self.driver.n[i] = color0
else:
self.driver.n[i] = color1
self.driver.n.write()
# Increment step for next beat
self.driver.step = step_count + 1
# Allow tick() to advance the generator once
yield
return
# Auto mode: continuous loop
# Use transition_duration for timing and force the first update to happen immediately
transition_duration = max(10, int(preset.d))
last_update = utime.ticks_ms() - transition_duration
while True:
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, last_update) >= transition_duration:
# Calculate current position from step_count
if step_count % 2 == 0:
position = (step_count // 2) * (n3 + n4) + n3
else:
position = ((step_count + 1) // 2) * (n3 + n4)
# Wrap position
max_pos = self.driver.num_leds + segment_length
position = position % max_pos
if position < 0:
position += max_pos
# Clear all LEDs
self.driver.n.fill((0, 0, 0))
# Draw repeating pattern starting at position
for i in range(self.driver.num_leds):
# Calculate position in the repeating segment
relative_pos = (i - position) % segment_length
if relative_pos < 0:
relative_pos = (relative_pos + segment_length) % segment_length
# Determine which color based on position in segment
if relative_pos < n1:
self.driver.n[i] = color0
else:
self.driver.n[i] = color1
self.driver.n.write()
# Increment step
step_count += 1
self.driver.step = step_count
last_update = current_time
# Yield once per tick so other logic can run
yield

View File

@@ -0,0 +1,96 @@
import utime
class Circle:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4"""
head = 0
tail = 0
# Calculate timing from preset
head_rate = max(1, int(preset.n1)) # n1 = head moves per second
tail_rate = max(1, int(preset.n3)) # n3 = tail moves per second
max_length = max(1, int(preset.n2)) # n2 = max length
min_length = max(0, int(preset.n4)) # n4 = min length
head_delay = 1000 // head_rate # ms between head movements
tail_delay = 1000 // tail_rate # ms between tail movements
last_head_move = utime.ticks_ms()
last_tail_move = utime.ticks_ms()
phase = "growing" # "growing", "shrinking", or "off"
# Support up to two colors (like chase). If only one color is provided,
# use black for the second; if none, default to white.
colors = preset.c
if not colors:
base0 = base1 = (255, 255, 255)
elif len(colors) == 1:
base0 = colors[0]
base1 = (0, 0, 0)
else:
base0 = colors[0]
base1 = colors[1]
color0 = self.driver.apply_brightness(base0, preset.b)
color1 = self.driver.apply_brightness(base1, preset.b)
while True:
current_time = utime.ticks_ms()
# Background: use second color during the "off" phase, otherwise clear to black
if phase == "off":
self.driver.n.fill(color1)
else:
self.driver.n.fill((0, 0, 0))
# Calculate segment length
segment_length = (head - tail) % self.driver.num_leds
if segment_length == 0 and head != tail:
segment_length = self.driver.num_leds
# Draw segment from tail to head as a solid color (no per-LED alternation)
current_color = color0
for i in range(segment_length + 1):
led_pos = (tail + i) % self.driver.num_leds
self.driver.n[led_pos] = current_color
# Move head continuously at n1 LEDs per second
if utime.ticks_diff(current_time, last_head_move) >= head_delay:
head = (head + 1) % self.driver.num_leds
last_head_move = current_time
# Tail behavior based on phase
if phase == "growing":
# Growing phase: tail stays at 0 until max length reached
if segment_length >= max_length:
phase = "shrinking"
elif phase == "shrinking":
# Shrinking phase: move tail forward at n3 LEDs per second
if utime.ticks_diff(current_time, last_tail_move) >= tail_delay:
tail = (tail + 1) % self.driver.num_leds
last_tail_move = current_time
# Check if we've reached min length
current_length = (head - tail) % self.driver.num_leds
if current_length == 0 and head != tail:
current_length = self.driver.num_leds
# For min_length = 0, we need at least 1 LED (the head)
if min_length == 0 and current_length <= 1:
phase = "off" # All LEDs off for 1 step
elif min_length > 0 and current_length <= min_length:
phase = "growing" # Cycle repeats
else: # phase == "off"
# Off phase: second color fills the ring for 1 step, then restart
tail = head # Reset tail to head position to start fresh
phase = "growing"
self.driver.n.write()
# Yield once per tick so other logic can run
yield

View File

@@ -0,0 +1,64 @@
import utime
class Pulse:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
self.driver.off()
# Get colors from preset
colors = preset.c
if not colors:
colors = [(255, 255, 255)]
color_index = 0
cycle_start = utime.ticks_ms()
# State machine based pulse using a single generator loop
while True:
# Read current timing parameters from preset
attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms
decay_ms = max(0, int(preset.n3)) # Decay time in ms
delay_ms = max(0, int(preset.d))
total_ms = attack_ms + hold_ms + decay_ms + delay_ms
if total_ms <= 0:
total_ms = 1
now = utime.ticks_ms()
elapsed = utime.ticks_diff(now, cycle_start)
base_color = colors[color_index % len(colors)]
if elapsed < attack_ms and attack_ms > 0:
# Attack: fade 0 -> 1
factor = elapsed / attack_ms
color = tuple(int(c * factor) for c in base_color)
self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < attack_ms + hold_ms:
# Hold: full brightness
self.driver.fill(self.driver.apply_brightness(base_color, preset.b))
elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0:
# Decay: fade 1 -> 0
dec_elapsed = elapsed - attack_ms - hold_ms
factor = max(0.0, 1.0 - (dec_elapsed / decay_ms))
color = tuple(int(c * factor) for c in base_color)
self.driver.fill(self.driver.apply_brightness(color, preset.b))
elif elapsed < total_ms:
# Delay phase: LEDs off between pulses
self.driver.fill((0, 0, 0))
else:
# End of cycle, move to next color and restart timing
color_index += 1
cycle_start = now
if not preset.a:
break
# Skip drawing this tick, start next cycle
yield
continue
# Yield once per tick
yield

View File

@@ -0,0 +1,51 @@
import utime
class Rainbow:
def __init__(self, driver):
self.driver = driver
def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def run(self, preset):
step = self.driver.step % 256
step_amount = max(1, int(preset.n1)) # n1 controls step increment
# If auto is False, run a single step and then stop
if not preset.a:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b)
self.driver.n.write()
# Increment step by n1 for next manual call
self.driver.step = (step + step_amount) % 256
# Allow tick() to advance the generator once
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
sleep_ms = max(1, int(preset.d)) # Get delay from preset
if utime.ticks_diff(current_time, last_update) >= sleep_ms:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(
self._wheel(rc_index & 255),
preset.b,
)
self.driver.n.write()
step = (step + step_amount) % 256
self.driver.step = step
last_update = current_time
# Yield once per tick so other logic can run
yield

View File

@@ -0,0 +1,57 @@
import utime
class Transition:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Transition between colors, blending over `delay` ms."""
colors = preset.c
if not colors:
self.driver.off()
yield
return
# Only one color: just keep it on
if len(colors) == 1:
while True:
self.driver.fill(self.driver.apply_brightness(colors[0], preset.b))
yield
return
color_index = 0
start_time = utime.ticks_ms()
while True:
if not colors:
break
# Get current and next color based on live list
c1 = colors[color_index % len(colors)]
c2 = colors[(color_index + 1) % len(colors)]
duration = max(10, int(preset.d)) # At least 10ms
now = utime.ticks_ms()
elapsed = utime.ticks_diff(now, start_time)
if elapsed >= duration:
# End of this transition step
if not preset.a:
# One-shot: transition from first to second color only
self.driver.fill(self.driver.apply_brightness(c2, preset.b))
break
# Auto: move to next pair
color_index = (color_index + 1) % len(colors)
start_time = now
yield
continue
# Interpolate between c1 and c2
factor = elapsed / duration
interpolated = tuple(
int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3)
)
self.driver.fill(self.driver.apply_brightness(interpolated, preset.b))
yield

79
pico/src/preset.py Normal file
View File

@@ -0,0 +1,79 @@
class Preset:
def __init__(self, data):
# Set default values for all preset attributes
self.p = "off"
self.d = 100
self.b = 127
self.c = [(255, 255, 255)]
self.a = True
self.n1 = 0
self.n2 = 0
self.n3 = 0
self.n4 = 0
self.n5 = 0
self.n6 = 0
# Override defaults with provided data
self.edit(data)
def edit(self, data=None):
if not data:
return False
for key, value in data.items():
setattr(self, key, value)
return True
@property
def pattern(self):
return self.p
@pattern.setter
def pattern(self, value):
self.p = value
@property
def delay(self):
return self.d
@delay.setter
def delay(self, value):
self.d = value
@property
def brightness(self):
return self.b
@brightness.setter
def brightness(self, value):
self.b = value
@property
def colors(self):
return self.c
@colors.setter
def colors(self, value):
self.c = value
@property
def auto(self):
return self.a
@auto.setter
def auto(self, value):
self.a = value
def to_dict(self):
return {
"p": self.p,
"d": self.d,
"b": self.b,
"c": self.c,
"a": self.a,
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"n4": self.n4,
"n5": self.n5,
"n6": self.n6,
}

131
pico/src/presets.py Normal file
View File

@@ -0,0 +1,131 @@
from machine import Pin
from ws2812 import WS2812B
from preset import Preset
from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle
import json
class Presets:
def __init__(self, pin, num_leds, state_machine=0):
# WS2812B with brightness=1.0 so Presets.apply_brightness() does all scaling (NeoPixel drop-in)
num_leds = int(num_leds)
if isinstance(pin, Pin):
self.n = WS2812B(pin, num_leds) # NeoPixel-style (Pin, n)
else:
self.n = WS2812B(num_leds, int(pin), state_machine, brightness=1.0)
self.num_leds = num_leds
self.step = 0
# Global brightness (0255), controlled via UART/JSON {"b": <value>}
self.b = 255
self.generator = None
self.presets = {}
self.selected = None
# Register all pattern methods
self.patterns = {
"off": self.off,
"on": self.on,
"blink": Blink(self).run,
"rainbow": Rainbow(self).run,
"pulse": Pulse(self).run,
"transition": Transition(self).run,
"chase": Chase(self).run,
"circle": Circle(self).run,
}
def save(self):
"""Save the presets to a file."""
with open("presets.json", "w") as f:
json.dump({name: preset.to_dict() for name, preset in self.presets.items()}, f)
return True
def load(self):
"""Load presets from a file."""
try:
with open("presets.json", "r") as f:
data = json.load(f)
except OSError:
# Create an empty presets file if missing
self.presets = {}
self.save()
return True
self.presets = {}
for name, preset_data in data.items():
if "c" in preset_data:
preset_data["c"] = [tuple(color) for color in preset_data["c"]]
self.presets[name] = Preset(preset_data)
if self.presets:
print("Loaded presets:")
#for name in sorted(self.presets.keys()):
# print(f" {name}: {self.presets[name].to_dict()}")
return True
def edit(self, name, data):
"""Create or update a preset with the given name."""
if name in self.presets:
# Update existing preset
self.presets[name].edit(data)
else:
# Create new preset
self.presets[name] = Preset(data)
return True
def delete(self, name):
if name in self.presets:
del self.presets[name]
return True
return False
def tick(self):
if self.generator is None:
return
try:
next(self.generator)
except StopIteration:
self.generator = None
def select(self, preset_name, step=None):
if preset_name in self.presets:
preset = self.presets[preset_name]
if preset.p in self.patterns:
# Set step value if explicitly provided
if step is not None:
self.step = step
elif preset.p == "off" or self.selected != preset_name:
self.step = 0
self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object
return True
# If preset doesn't exist or pattern not found, default to "off"
return False
def update_num_leds(self, pin, num_leds):
num_leds = int(num_leds)
if isinstance(pin, Pin):
self.n = WS2812B(pin, num_leds)
else:
self.n = WS2812B(num_leds, int(pin), 0, brightness=1.0)
self.num_leds = num_leds
def apply_brightness(self, color, brightness_override=None):
# Combine per-preset brightness (override) with global brightness self.b
local = brightness_override if brightness_override is not None else 255
# Scale preset brightness by global brightness
effective_brightness = int(local * self.b / 255)
return tuple(int(c * effective_brightness / 255) for c in color)
def fill(self, color=None):
fill_color = color if color is not None else (0, 0, 0)
for i in range(self.num_leds):
self.n[i] = fill_color
self.n.write()
def off(self, preset=None):
self.fill((0, 0, 0))
def on(self, preset):
colors = preset.c
color = colors[0] if colors else (255, 255, 255)
self.fill(self.apply_brightness(color, preset.b))

94
pico/src/settings.py Normal file
View File

@@ -0,0 +1,94 @@
import json
import ubinascii
import machine
class Settings(dict):
SETTINGS_FILE = "/settings.json"
def __init__(self):
super().__init__()
self.load() # Load settings from file during initialization
self.color_order = self.get_color_order(self["color_order"])
def _default_name(self):
"""Device name: use unique_id on Pico (no WiFi); use AP MAC on ESP32."""
try:
import network
mac = network.WLAN(network.AP_IF).config("mac")
return "led-%s" % ubinascii.hexlify(mac).decode()
except Exception:
return "led-%s" % ubinascii.hexlify(machine.unique_id()).decode()
def set_defaults(self):
self["led_pin"] = 10
self["num_leds"] = 50
self["color_order"] = "rgb"
self["name"] = self._default_name()
self["debug"] = False
self["startup_preset"] = None
self["brightness"] = 255
def save(self):
try:
j = json.dumps(self)
with open(self.SETTINGS_FILE, 'w') as file:
file.write(j)
print("Settings saved successfully.")
except Exception as e:
print(f"Error saving settings: {e}")
def load(self):
try:
with open(self.SETTINGS_FILE, 'r') as file:
loaded_settings = json.load(file)
self.update(loaded_settings)
print("Settings loaded successfully.")
except Exception as e:
print(f"Error loading settings")
self.set_defaults()
self.save()
def get_color_order(self, color_order):
"""Convert color order string to tuple of hex string indices."""
color_orders = {
"rgb": (1, 3, 5),
"rbg": (1, 5, 3),
"grb": (3, 1, 5),
"gbr": (3, 5, 1),
"brg": (5, 1, 3),
"bgr": (5, 3, 1)
}
return color_orders.get(color_order.lower(), (1, 3, 5)) # Default to RGB
def get_rgb_channel_order(self, color_order=None):
"""Convert color order string to RGB channel indices for reordering tuples.
Returns tuple of channel indices: (r_channel, g_channel, b_channel)
Example: 'grb' -> (1, 0, 2) means (G, R, B)"""
if color_order is None:
color_order = self.get("color_order", "rgb")
color_order = color_order.lower()
# Map hex string positions to RGB channel indices
# Position 1 (R in hex) -> channel 0, Position 3 (G) -> channel 1, Position 5 (B) -> channel 2
hex_to_channel = {1: 0, 3: 1, 5: 2}
hex_indices = self.get_color_order(color_order)
return tuple(hex_to_channel[pos] for pos in hex_indices)
# Example usage
def main():
settings = Settings()
print(f"Number of LEDs: {settings['num_leds']}")
settings['num_leds'] = 100
print(f"Updated number of LEDs: {settings['num_leds']}")
settings.save()
# Create a new Settings object to test loading
new_settings = Settings()
print(f"Loaded number of LEDs: {new_settings['num_leds']}")
print(settings)
# Run the example
if __name__ == "__main__":
main()

53
pico/src/utils.py Normal file
View File

@@ -0,0 +1,53 @@
def convert_and_reorder_colors(colors, settings_or_color_order):
"""Convert hex color strings to RGB tuples and reorder based on device color order.
Args:
colors: List of colors, either hex strings like "#FF0000" or RGB tuples like (255, 0, 0)
settings_or_color_order: Either a Settings object or a color_order string (e.g., "rgb", "grb")
Returns:
List of RGB tuples reordered according to device color order
"""
# Get channel order from settings or color_order string
if hasattr(settings_or_color_order, 'get_rgb_channel_order'):
# It's a Settings object
channel_order = settings_or_color_order.get_rgb_channel_order()
elif isinstance(settings_or_color_order, str):
# It's a color_order string, convert to channel order
color_order = settings_or_color_order.lower()
color_orders = {
"rgb": (1, 3, 5),
"rbg": (1, 5, 3),
"grb": (3, 1, 5),
"gbr": (3, 5, 1),
"brg": (5, 1, 3),
"bgr": (5, 3, 1)
}
hex_indices = color_orders.get(color_order, (1, 3, 5))
# Map hex string positions to RGB channel indices
hex_to_channel = {1: 0, 3: 1, 5: 2}
channel_order = tuple(hex_to_channel[pos] for pos in hex_indices)
else:
# Assume it's already a channel order tuple
channel_order = settings_or_color_order
converted_colors = []
for color in colors:
# Convert "#RRGGBB" to (R, G, B)
if isinstance(color, str) and color.startswith("#"):
r = int(color[1:3], 16)
g = int(color[3:5], 16)
b = int(color[5:7], 16)
rgb = (r, g, b)
# Reorder based on device color order
reordered = (rgb[channel_order[0]], rgb[channel_order[1]], rgb[channel_order[2]])
converted_colors.append(reordered)
elif isinstance(color, (list, tuple)) and len(color) == 3:
# Already a tuple/list, just reorder
rgb = tuple(color)
reordered = (rgb[channel_order[0]], rgb[channel_order[1]], rgb[channel_order[2]])
converted_colors.append(reordered)
else:
# Keep as-is if not recognized format
converted_colors.append(color)
return converted_colors

View File

@@ -24,7 +24,7 @@ for pin, num_leds in pins:
ws = WS2812B(num_leds, pin, sm, brightness=1.0) # 1.0 so fill() is visible
strips.append(ws)
sm += 1
ws.fill((255,0,0))
ws.fill((8,0,0))
ws.show()
time.sleep(1)

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, duration_ms):
"""Run pattern for specified duration."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
print("=" * 50)
print("Testing Auto and Manual Modes")
print("=" * 50)
# Test 1: Rainbow in AUTO mode (continuous)
print("\nTest 1: Rainbow pattern in AUTO mode (should run continuously)")
p.edit("rainbow_auto", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": True,
})
p.select("rainbow_auto")
print("Running rainbow_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Pattern ran continuously")
# Test 2: Rainbow in MANUAL mode (one step per tick)
print("\nTest 2: Rainbow pattern in MANUAL mode (one step per tick)")
p.edit("rainbow_manual", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": False,
})
p.select("rainbow_manual")
print("Calling tick() 5 times (should advance 5 steps)...")
for i in range(5):
p.tick()
utime.sleep_ms(100) # Small delay to see changes
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
# Check if generator stopped after one cycle
if p.generator is None:
print("✓ Manual mode: Generator stopped after one step (as expected)")
else:
print("⚠ Manual mode: Generator still active (may need multiple ticks)")
# Test 3: Pulse in AUTO mode (continuous cycles)
print("\nTest 3: Pulse pattern in AUTO mode (should pulse continuously)")
p.edit("pulse_auto", {
"p": "pulse",
"b": 128,
"d": 100,
"n1": 500, # Attack
"n2": 200, # Hold
"n3": 500, # Decay
"c": [(255, 0, 0)],
"a": True,
})
p.select("pulse_auto")
print("Running pulse_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Pulse ran continuously")
# Test 4: Pulse in MANUAL mode (one cycle then stop)
print("\nTest 4: Pulse pattern in MANUAL mode (one cycle then stop)")
p.edit("pulse_manual", {
"p": "pulse",
"b": 128,
"d": 100,
"n1": 300, # Attack
"n2": 200, # Hold
"n3": 300, # Decay
"c": [(0, 255, 0)],
"a": False,
})
p.select("pulse_manual")
print("Running pulse_manual until generator stops...")
tick_count = 0
max_ticks = 200 # Safety limit
while p.generator is not None and tick_count < max_ticks:
p.tick()
tick_count += 1
utime.sleep_ms(10)
if p.generator is None:
print(f"✓ Manual mode: Pulse completed one cycle after {tick_count} ticks")
else:
print(f"⚠ Manual mode: Pulse still running after {tick_count} ticks")
# Test 5: Transition in AUTO mode (continuous transitions)
print("\nTest 5: Transition pattern in AUTO mode (continuous transitions)")
p.edit("transition_auto", {
"p": "transition",
"b": 128,
"d": 500,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
"a": True,
})
p.select("transition_auto")
print("Running transition_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Transition ran continuously")
# Test 6: Transition in MANUAL mode (one transition then stop)
print("\nTest 6: Transition pattern in MANUAL mode (one transition then stop)")
p.edit("transition_manual", {
"p": "transition",
"b": 128,
"d": 500,
"c": [(255, 0, 0), (0, 255, 0)],
"a": False,
})
p.select("transition_manual")
print("Running transition_manual until generator stops...")
tick_count = 0
max_ticks = 200
while p.generator is not None and tick_count < max_ticks:
p.tick()
tick_count += 1
utime.sleep_ms(10)
if p.generator is None:
print(f"✓ Manual mode: Transition completed after {tick_count} ticks")
else:
print(f"⚠ Manual mode: Transition still running after {tick_count} ticks")
# Test 7: Switching between auto and manual modes
print("\nTest 7: Switching between auto and manual modes")
p.edit("switch_test", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": True,
})
p.select("switch_test")
print("Running in auto mode for 1 second...")
run_for(p, wdt, 1000)
# Switch to manual mode by editing the preset
print("Switching to manual mode...")
p.edit("switch_test", {"a": False})
p.select("switch_test") # Re-select to apply changes
print("Calling tick() 3 times in manual mode...")
for i in range(3):
p.tick()
utime.sleep_ms(100)
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
# Switch back to auto mode
print("Switching back to auto mode...")
p.edit("switch_test", {"a": True})
p.select("switch_test")
print("Running in auto mode for 1 second...")
run_for(p, wdt, 1000)
print("✓ Successfully switched between auto and manual modes")
# Cleanup
print("\nCleaning up...")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
p.tick()
utime.sleep_ms(100)
print("\n" + "=" * 50)
print("All tests completed!")
print("=" * 50)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors)
p.edit("test_blink", {
"p": "blink",
"b": 64,
"d": 200,
"c": [(255, 0, 0), (0, 0, 255)],
})
p.select("test_blink")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
wdt.feed()
p.tick()
utime.sleep_ms(10)
if __name__ == "__main__":
main()

161
pico/test/patterns/chase.py Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1)
print("Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1)")
p.edit("chase1", {
"p": "chase",
"b": 255,
"d": 200,
"n1": 5,
"n2": 5,
"n3": 1,
"n4": 1,
"c": [(255, 0, 0), (0, 255, 0)],
})
p.select("chase1")
run_for(p, wdt, 3000)
# Test 2: Forward and backward (n3=2, n4=-1)
print("Test 2: Forward and backward (n3=2, n4=-1)")
p.edit("chase2", {
"p": "chase",
"n1": 3,
"n2": 3,
"n3": 2,
"n4": -1,
"d": 150,
"c": [(0, 0, 255), (255, 255, 0)],
})
p.select("chase2")
run_for(p, wdt, 3000)
# Test 3: Large segments (n1=10, n2=5)
print("Test 3: Large segments (n1=10, n2=5, n3=3, n4=3)")
p.edit("chase3", {
"p": "chase",
"n1": 10,
"n2": 5,
"n3": 3,
"n4": 3,
"d": 200,
"c": [(255, 128, 0), (128, 0, 255)],
})
p.select("chase3")
run_for(p, wdt, 3000)
# Test 4: Fast movement (n3=5, n4=5)
print("Test 4: Fast movement (n3=5, n4=5)")
p.edit("chase4", {
"p": "chase",
"n1": 4,
"n2": 4,
"n3": 5,
"n4": 5,
"d": 100,
"c": [(255, 0, 255), (0, 255, 255)],
})
p.select("chase4")
run_for(p, wdt, 2000)
# Test 5: Backward movement (n3=-2, n4=-2)
print("Test 5: Backward movement (n3=-2, n4=-2)")
p.edit("chase5", {
"p": "chase",
"n1": 6,
"n2": 4,
"n3": -2,
"n4": -2,
"d": 200,
"c": [(255, 255, 255), (0, 0, 0)],
})
p.select("chase5")
run_for(p, wdt, 3000)
# Test 6: Alternating forward/backward (n3=3, n4=-2)
print("Test 6: Alternating forward/backward (n3=3, n4=-2)")
p.edit("chase6", {
"p": "chase",
"n1": 5,
"n2": 5,
"n3": 3,
"n4": -2,
"d": 250,
"c": [(255, 0, 0), (0, 255, 0)],
})
p.select("chase6")
run_for(p, wdt, 4000)
# Test 7: Manual mode - advance one step per beat
print("Test 7: Manual mode chase (auto=False, n3=2, n4=1)")
p.edit("chase_manual", {
"p": "chase",
"n1": 4,
"n2": 4,
"n3": 2,
"n4": 1,
"d": 200,
"c": [(255, 255, 0), (0, 255, 255)],
"a": False,
})
p.step = 0 # Reset step counter
print(" Advancing pattern with 10 beats (select + tick)...")
for i in range(10):
p.select("chase_manual") # Simulate beat - restarts generator
p.tick() # Advance one step
utime.sleep_ms(500) # Pause to see the pattern
wdt.feed()
print(f" Beat {i+1}: step={p.step}")
# Test 8: Verify step increments correctly in manual mode
print("Test 8: Verify step increments (auto=False)")
p.edit("chase_manual2", {
"p": "chase",
"n1": 3,
"n2": 3,
"n3": 1,
"n4": 1,
"a": False,
})
p.step = 0
initial_step = p.step
p.select("chase_manual2")
p.tick()
final_step = p.step
print(f" Step updated from {initial_step} to {final_step} (expected: 1)")
if final_step == 1:
print(" ✓ Step increment working correctly")
else:
print(f" ✗ Step increment mismatch! Expected 1, got {final_step}")
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0)
print("Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0)")
p.edit("circle1", {
"p": "circle",
"b": 255,
"n1": 50, # Head moves 50 LEDs/second
"n2": 100, # Max length 100 LEDs
"n3": 200, # Tail moves 200 LEDs/second
"n4": 0, # Min length 0 LEDs
"c": [(255, 0, 0)], # Red
})
p.select("circle1")
run_for(p, wdt, 5000)
# Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0)
print("Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0)")
p.edit("circle2", {
"p": "circle",
"n1": 20,
"n2": 50,
"n3": 100,
"n4": 0,
"c": [(0, 255, 0)], # Green
})
p.select("circle2")
run_for(p, wdt, 5000)
# Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0)
print("Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0)")
p.edit("circle3", {
"p": "circle",
"n1": 100,
"n2": 30,
"n3": 20,
"n4": 0,
"c": [(0, 0, 255)], # Blue
})
p.select("circle3")
run_for(p, wdt, 5000)
# Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10)
print("Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10)")
p.edit("circle4", {
"p": "circle",
"n1": 50,
"n2": 40,
"n3": 100,
"n4": 10,
"c": [(255, 255, 0)], # Yellow
})
p.select("circle4")
run_for(p, wdt, 5000)
# Test 5: Very fast (n1=200, n2=20, n3=200, n4=0)
print("Test 5: Very fast (n1=200, n2=20, n3=200, n4=0)")
p.edit("circle5", {
"p": "circle",
"n1": 200,
"n2": 20,
"n3": 200,
"n4": 0,
"c": [(255, 0, 255)], # Magenta
})
p.select("circle5")
run_for(p, wdt, 3000)
# Test 6: Very slow (n1=10, n2=25, n3=10, n4=0)
print("Test 6: Very slow (n1=10, n2=25, n3=10, n4=0)")
p.edit("circle6", {
"p": "circle",
"n1": 10,
"n2": 25,
"n3": 10,
"n4": 0,
"c": [(0, 255, 255)], # Cyan
})
p.select("circle6")
run_for(p, wdt, 5000)
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

30
pico/test/patterns/off.py Normal file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Create an "off" preset (use short-key field `p` for pattern)
p.edit("test_off", {"p": "off"})
p.select("test_off")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 200:
wdt.feed()
p.tick()
utime.sleep_ms(10)
if __name__ == "__main__":
main()

47
pico/test/patterns/on.py Normal file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Create presets for on and off using the short-key fields that Presets expects
# Preset fields:
# p = pattern name, b = brightness, d = delay, c = list of (r,g,b) colors
p.edit("test_on", {
"p": "on",
"b": 64,
"d": 120,
"c": [(255, 0, 0), (0, 0, 255)],
})
p.edit("test_off", {"p": "off"})
# ON phase
p.select("test_on")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 800:
wdt.feed()
p.tick()
utime.sleep_ms(10)
# OFF phase
p.select("test_off")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 100:
wdt.feed()
p.tick()
utime.sleep_ms(10)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Simple single-color pulse
print("Test 1: Single-color pulse (attack=500, hold=500, decay=500, delay=500)")
p.edit("pulse1", {
"p": "pulse",
"b": 255,
"c": [(255, 0, 0)],
"n1": 500, # attack ms
"n2": 500, # hold ms
"n3": 500, # decay ms
"d": 500, # delay ms between pulses
"a": True,
})
p.select("pulse1")
run_for(p, wdt, 5000)
# Test 2: Faster pulse
print("Test 2: Fast pulse (attack=100, hold=100, decay=100, delay=100)")
p.edit("pulse2", {
"p": "pulse",
"n1": 100,
"n2": 100,
"n3": 100,
"d": 100,
"c": [(0, 255, 0)],
})
p.select("pulse2")
run_for(p, wdt, 4000)
# Test 3: Multi-color pulse cycle
print("Test 3: Multi-color pulse (red -> green -> blue)")
p.edit("pulse3", {
"p": "pulse",
"n1": 300,
"n2": 300,
"n3": 300,
"d": 200,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
"a": True,
})
p.select("pulse3")
run_for(p, wdt, 6000)
# Test 4: One-shot pulse (auto=False)
print("Test 4: Single pulse, auto=False")
p.edit("pulse4", {
"p": "pulse",
"n1": 400,
"n2": 0,
"n3": 400,
"d": 0,
"c": [(255, 255, 255)],
"a": False,
})
p.select("pulse4")
# Run long enough to allow one full pulse cycle
run_for(p, wdt, 1500)
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 200)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,151 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic rainbow with auto=True (continuous)
print("Test 1: Basic rainbow (auto=True, n1=1)")
p.edit("rainbow1", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 1,
"a": True,
})
p.select("rainbow1")
run_for(p, wdt, 3000)
# Test 2: Fast rainbow
print("Test 2: Fast rainbow (low delay, n1=1)")
p.edit("rainbow2", {
"p": "rainbow",
"d": 50,
"n1": 1,
"a": True,
})
p.select("rainbow2")
run_for(p, wdt, 2000)
# Test 3: Slow rainbow
print("Test 3: Slow rainbow (high delay, n1=1)")
p.edit("rainbow3", {
"p": "rainbow",
"d": 500,
"n1": 1,
"a": True,
})
p.select("rainbow3")
run_for(p, wdt, 3000)
# Test 4: Low brightness rainbow
print("Test 4: Low brightness rainbow (n1=1)")
p.edit("rainbow4", {
"p": "rainbow",
"b": 64,
"d": 100,
"n1": 1,
"a": True,
})
p.select("rainbow4")
run_for(p, wdt, 2000)
# Test 5: Single-step rainbow (auto=False)
print("Test 5: Single-step rainbow (auto=False, n1=1)")
p.edit("rainbow5", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 1,
"a": False,
})
p.step = 0
for i in range(10):
p.select("rainbow5")
# One tick advances the generator one frame when auto=False
p.tick()
utime.sleep_ms(100)
wdt.feed()
# Test 6: Verify step updates correctly
print("Test 6: Verify step updates (auto=False, n1=1)")
p.edit("rainbow6", {
"p": "rainbow",
"n1": 1,
"a": False,
})
initial_step = p.step
p.select("rainbow6")
p.tick()
final_step = p.step
print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)")
# Test 7: Fast step increment (n1=5)
print("Test 7: Fast rainbow (n1=5, auto=True)")
p.edit("rainbow7", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 5,
"a": True,
})
p.select("rainbow7")
run_for(p, wdt, 2000)
# Test 8: Very fast step increment (n1=10)
print("Test 8: Very fast rainbow (n1=10, auto=True)")
p.edit("rainbow8", {
"p": "rainbow",
"n1": 10,
"a": True,
})
p.select("rainbow8")
run_for(p, wdt, 2000)
# Test 9: Verify n1 controls step increment (auto=False)
print("Test 9: Verify n1 step increment (auto=False, n1=5)")
p.edit("rainbow9", {
"p": "rainbow",
"n1": 5,
"a": False,
})
p.step = 0
initial_step = p.step
p.select("rainbow9")
p.tick()
final_step = p.step
expected_step = (initial_step + 5) % 256
print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})")
if final_step == expected_step:
print("✓ n1 step increment working correctly")
else:
print(f"✗ Step increment mismatch! Expected {expected_step}, got {final_step}")
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
p.tick()
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Simple two-color transition
print("Test 1: Two-color transition (red <-> blue, delay=1000)")
p.edit("transition1", {
"p": "transition",
"b": 255,
"d": 1000, # transition duration
"c": [(255, 0, 0), (0, 0, 255)],
"a": True,
})
p.select("transition1")
run_for(p, wdt, 6000)
# Test 2: Multi-color transition
print("Test 2: Multi-color transition (red -> green -> blue -> white)")
p.edit("transition2", {
"p": "transition",
"d": 800,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 255)],
"a": True,
})
p.select("transition2")
run_for(p, wdt, 8000)
# Test 3: One-shot transition (auto=False)
print("Test 3: One-shot transition (auto=False)")
p.edit("transition3", {
"p": "transition",
"d": 1000,
"c": [(255, 0, 0), (0, 255, 0)],
"a": False,
})
p.select("transition3")
# Run long enough for a single transition step
run_for(p, wdt, 2000)
# Test 4: Single-color behavior (should just stay on)
print("Test 4: Single-color transition (should hold color)")
p.edit("transition4", {
"p": "transition",
"c": [(0, 0, 255)],
"d": 500,
"a": True,
})
p.select("transition4")
run_for(p, wdt, 3000)
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 200)
if __name__ == "__main__":
main()

View File

@@ -28,13 +28,13 @@ def hue_to_rgb(hue):
return (int(r * 255), int(g * 255), int(b * 255))
def make_rainbow_double(num_leds, brightness=1.0):
"""Build 2 full rainbow cycles (2*num_leds pixels, GRB). Returns (double_buf, strip_len).
head must be in 0..strip_len-1 so DMA reads double_buf[head:head+strip_len] with no copy."""
n = 2 * num_leds
def make_rainbow_ring(total_leds, brightness=1.0):
"""Build one rainbow over the whole ring: 2 full hue cycles over total_leds (GRB).
Returns (double_buf, ring_len_bytes). All strips sample from this so phase is continuous."""
n = 2 * total_leds
double_buf = bytearray(n * 3)
for i in range(n):
hue = (i / n) * 360 * 2
hue = ((i % total_leds) / total_leds) * 360 * 2
r, g, b = hue_to_rgb(hue)
g = int(g * brightness) & 0xFF
r = int(r * brightness) & 0xFF
@@ -43,52 +43,83 @@ def make_rainbow_double(num_leds, brightness=1.0):
double_buf[o] = g
double_buf[o + 1] = r
double_buf[o + 2] = b
strip_len = num_leds * 3
return (double_buf, strip_len)
ring_len_bytes = total_leds * 3
return (double_buf, ring_len_bytes)
def show_rainbow(strip, double_buf, strip_len, head):
"""DMA reads directly from double_buf at head; no copy. head in 0..strip_len-1."""
strip.show(double_buf, head)
def make_strip_rainbow(num_leds, cumulative_leds, total_ring_leds, brightness=1.0):
"""Per-strip double buffer: pixel j has hue at global position (cumulative_leds + j) % total_ring_leds.
Use same head for all strips: head = rainbow_head % (2*num_leds*3)."""
n = 2 * num_leds
buf = bytearray(n * 3)
for j in range(n):
global_pos = (cumulative_leds + j) % total_ring_leds
hue = (global_pos / total_ring_leds) * 360 * 2
r, g, b = hue_to_rgb(hue)
g = int(g * brightness) & 0xFF
r = int(r * brightness) & 0xFF
b = int(b * brightness) & 0xFF
o = j * 3
buf[o] = g
buf[o + 1] = r
buf[o + 2] = b
strip_len_bytes = num_leds * 3
return (buf, strip_len_bytes)
# --- Strips + rainbow buffers per strip ---
def show_rainbow_segment(strip, buf, strip_len_bytes, head):
"""DMA reads strip's segment from buf at head."""
strip.show(buf, head)
# --- Strips + one global ring rainbow (all strips in phase) ---
# Each strip can have a different length; one rainbow spans total_ring_leds so hue is continuous.
# (pin, num_leds) per strip — lengths differ per segment
STRIP_CONFIG = (
(2, 291),
(3, 290),
(4, 283),
(7, 278),
(0, 275),
(28, 278),
(29, 283),
(6, 290),
)
strips = []
pins = ((2, 291),
(3, 290),
(4, 283),
(7, 278),
(0, 275),
(28, 278),
(29, 283),
(6, 290))
sm = 0
for pin, num_leds in pins:
for pin, num_leds in STRIP_CONFIG:
print(pin, num_leds)
ws = WS2812B(num_leds, pin, sm, brightness=1.0) # 1.0 so fill() is visible
strips.append(ws)
sm += 1
# One rainbow double buffer per strip (num_leds can differ); no transfer buffer
now = time.ticks_ms()
rainbow_data = [make_rainbow_double(ws.num_leds, ws.brightness) for ws in strips]
# Cumulative LEDs before each strip so rainbow lines up around the ring
# Cumulative LED count before each strip; total ring size
cumulative_leds = [0]
for ws in strips[:-1]:
cumulative_leds.append(cumulative_leds[-1] + ws.num_leds)
# Global phase (bytes); each strip gets head = (phase + cumulative_leds * 3) % strip_len
total_ring_leds = cumulative_leds[-1] + strips[-1].num_leds
bytes_per_cycle = total_ring_leds * 3
# Per-strip rainbow buffers: each strip's segment of the ring (same phase, no shared-buffer DMA)
now = time.ticks_ms()
rainbow_data = [
make_strip_rainbow(ws.num_leds, cumulative_leds[i], total_ring_leds, ws.brightness)
for i, ws in enumerate(strips)
]
print(time.ticks_diff(time.ticks_ms(), now), "ms")
rainbow_head = 0
step = 3
while True:
now = time.ticks_ms()
for i, (strip, (double_buf, strip_len)) in enumerate(zip(strips, rainbow_data)):
head = (rainbow_head + cumulative_leds[i] * 3) % strip_len
show_rainbow(strip, double_buf, strip_len, head)
for i, (strip, (buf, strip_len_bytes)) in enumerate(zip(strips, rainbow_data)):
# Same head for all: each strip's buffer is already offset by cumulative_leds[i]
double_len_bytes = 2 * strip.num_leds * 3
head = rainbow_head % double_len_bytes
show_rainbow_segment(strip, buf, strip_len_bytes, head)
rainbow_head = (rainbow_head + step) % bytes_per_cycle
#print(time.ticks_diff(time.ticks_ms(), now), "ms")
time.sleep_ms(10)

View File

@@ -0,0 +1,694 @@
#!/usr/bin/env python3
"""Test ESPNow receive functionality - runs on MicroPython device."""
import json
import os
import utime
from settings import Settings
from presets import Presets
from utils import convert_and_reorder_colors
class MockESPNow:
"""Mock ESPNow for testing that can send messages."""
def __init__(self):
self.messages = []
self.active_state = False
def active(self, state):
self.active_state = state
def any(self):
"""Return True if there are messages."""
return len(self.messages) > 0
def recv(self):
"""Receive a message (removes it from queue)."""
if self.messages:
return self.messages.pop(0)
return None, None
def send_message(self, host, msg_data):
"""Send a message by adding it to the queue (testing helper)."""
if isinstance(msg_data, dict):
msg = json.dumps(msg_data)
else:
msg = msg_data
self.messages.append((host, msg))
def clear(self):
"""Clear all messages (testing helper)."""
self.messages = []
from machine import WDT
def get_wdt():
"""Get a real WDT instance for tests."""
return WDT(timeout=10000) # 10 second timeout for tests
def run_main_loop_iterations(espnow, patterns, settings, wdt, max_iterations=10):
"""Run main loop iterations until no messages or max reached."""
iterations = 0
results = []
while iterations < max_iterations:
wdt.feed()
patterns.tick()
if espnow.any():
host, msg = espnow.recv()
data = json.loads(msg)
if data.get("v") != "1":
results.append(("version_rejected", data))
continue
if "presets" in data:
for name, preset_data in data["presets"].items():
# Convert hex color strings to RGB tuples and reorder based on device color order
if "colors" in preset_data:
preset_data["colors"] = convert_and_reorder_colors(preset_data["colors"], settings)
patterns.edit(name, preset_data)
results.append(("presets_processed", list(data["presets"].keys())))
if settings.get("name") in data.get("select", {}):
select_list = data["select"][settings.get("name")]
# Select value is always a list: ["preset_name"] or ["preset_name", step]
if select_list:
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
if patterns.select(preset_name, step=step):
results.append(("selected", preset_name))
iterations += 1
# Stop if no more messages
if not espnow.any():
break
return results
def test_version_check():
"""Test that messages with wrong version are rejected."""
print("Test 1: Version check")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Send message with wrong version
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "2", "presets": {"test": {"pattern": "on"}}})
results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert len([r for r in results if r[0] == "version_rejected"]) > 0, "Should reject wrong version"
assert "test" not in patterns.presets, "Preset should not be created"
print(" ✓ Version check passed")
# Send message with correct version
mock_espnow.clear()
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "1", "presets": {"test": {"pattern": "on"}}})
results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert len([r for r in results if r[0] == "presets_processed"]) > 0, "Should process correct version"
assert "test" in patterns.presets, "Preset should be created"
print(" ✓ Correct version accepted")
def test_preset_creation():
"""Test preset creation from ESPNow messages."""
print("\nTest 2: Preset creation")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"test_blink": {
"pattern": "blink",
"colors": ["#FF0000", "#00FF00"],
"delay": 200,
"brightness": 128
},
"test_rainbow": {
"pattern": "rainbow",
"delay": 100,
"n1": 2
}
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert "test_blink" in patterns.presets, "test_blink preset should exist"
assert "test_rainbow" in patterns.presets, "test_rainbow preset should exist"
# Check preset values
blink_preset = patterns.presets["test_blink"]
assert blink_preset.pattern == "blink", "Pattern should be blink"
assert blink_preset.delay == 200, "Delay should be 200"
assert blink_preset.brightness == 128, "Brightness should be 128"
rainbow_preset = patterns.presets["test_rainbow"]
assert rainbow_preset.pattern == "rainbow", "Pattern should be rainbow"
assert rainbow_preset.n1 == 2, "n1 should be 2"
print(" ✓ Presets created correctly")
def test_color_conversion():
"""Test hex color string conversion and reordering."""
print("\nTest 3: Color conversion")
settings = Settings()
settings["color_order"] = "rgb" # Default RGB order
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"test_colors": {
"pattern": "on",
"colors": ["#FF0000", "#00FF00", "#0000FF"] # Red, Green, Blue
}
}
}
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
preset = patterns.presets["test_colors"]
assert len(preset.colors) == 3, "Should have 3 colors"
assert preset.colors[0] == (255, 0, 0), "First color should be red (255,0,0)"
assert preset.colors[1] == (0, 255, 0), "Second color should be green (0,255,0)"
assert preset.colors[2] == (0, 0, 255), "Third color should be blue (0,0,255)"
print(" ✓ Colors converted correctly (RGB order)")
# Test GRB order
settings["color_order"] = "grb"
patterns2 = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow2 = MockESPNow()
msg2 = {
"v": "1",
"presets": {
"test_grb": {
"pattern": "on",
"colors": ["#FF0000"] # Red in RGB, should become (0, 255, 0) in GRB
}
}
}
mock_espnow2.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2)
wdt2 = get_wdt()
run_main_loop_iterations(mock_espnow2, patterns2, settings, wdt2)
preset2 = patterns2.presets["test_grb"]
assert preset2.colors[0] == (0, 255, 0), "GRB: Red should become green (0,255,0)"
print(" ✓ Colors reordered correctly (GRB order)")
def test_preset_update():
"""Test that editing an existing preset updates it."""
print("\nTest 4: Preset update")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create initial preset
msg1 = {
"v": "1",
"presets": {
"test_update": {
"pattern": "blink",
"delay": 100,
"brightness": 64
}
}
}
mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.presets["test_update"].delay == 100, "Initial delay should be 100"
# Update preset
mock_espnow.clear()
msg2 = {
"v": "1",
"presets": {
"test_update": {
"pattern": "blink",
"delay": 200,
"brightness": 128
}
}
}
mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.presets["test_update"].delay == 200, "Updated delay should be 200"
assert patterns.presets["test_update"].brightness == 128, "Updated brightness should be 128"
print(" ✓ Preset updated correctly")
def test_select():
"""Test preset selection."""
print("\nTest 5: Preset selection")
settings = Settings()
settings["name"] = "device1"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create presets
msg1 = {
"v": "1",
"presets": {
"preset1": {"pattern": "on", "colors": [(255, 0, 0)]},
"preset2": {"pattern": "rainbow", "delay": 50}
}
}
mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select preset
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"device1": ["preset1"],
"device2": ["preset2"]
}
}
mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset1", "Should select preset1"
print(" ✓ Preset selected correctly")
def test_full_message():
"""Test a full message with presets and select."""
print("\nTest 6: Full message (presets + select)")
settings = Settings()
settings["name"] = "test_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"my_preset": {
"pattern": "pulse",
"colors": ["#FF0000", "#00FF00"],
"delay": 150,
"n1": 500,
"n2": 200,
"n3": 500
}
},
"select": {
"test_device": ["my_preset"],
"other_device": ["other_preset"]
}
}
mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert "my_preset" in patterns.presets, "Preset should be created"
assert patterns.selected == "my_preset", "Preset should be selected"
preset = patterns.presets["my_preset"]
assert preset.pattern == "pulse", "Pattern should be pulse"
assert preset.delay == 150, "Delay should be 150"
assert preset.n1 == 500, "n1 should be 500"
print(" ✓ Full message processed correctly")
def test_switch_presets():
"""Test switching between different presets."""
print("\nTest 7: Switch between presets")
settings = Settings()
settings["name"] = "switch_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create multiple presets
msg1 = {
"v": "1",
"presets": {
"preset_blink": {"pattern": "blink", "delay": 200, "colors": [(255, 0, 0)]},
"preset_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 2},
"preset_pulse": {"pattern": "pulse", "delay": 150, "n1": 500, "n2": 200, "n3": 500}
}
}
mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select and run first preset for 2 seconds
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"switch_device": ["preset_blink"]
}
}
mock_espnow.send_message(b"\x66\x66\x66\x66\x66\x66", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_blink", "Should select preset_blink"
print(" ✓ Selected preset_blink, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch to second preset and run for 2 seconds
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"switch_device": ["preset_rainbow"]
}
}
mock_espnow.send_message(b"\x77\x77\x77\x77\x77\x77", msg3)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_rainbow", "Should switch to preset_rainbow"
print(" ✓ Switched to preset_rainbow, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch to third preset and run for 2 seconds
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"switch_device": ["preset_pulse"]
}
}
mock_espnow.send_message(b"\x88\x88\x88\x88\x88\x88", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_pulse", "Should switch to preset_pulse"
print(" ✓ Switched to preset_pulse, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch back to first preset and run for 2 seconds
mock_espnow.clear()
msg5 = {
"v": "1",
"select": {
"switch_device": ["preset_blink"]
}
}
mock_espnow.send_message(b"\x99\x99\x99\x99\x99\x99", msg5)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_blink", "Should switch back to preset_blink"
print(" ✓ Switched back to preset_blink, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
print(" ✓ Preset switching works correctly")
def test_beat_functionality():
"""Test beat functionality - calling select() again with same preset restarts pattern."""
print("\nTest 8: Beat functionality")
settings = Settings()
settings["name"] = "beat_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create presets with manual mode
msg1 = {
"v": "1",
"presets": {
"beat_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False},
"beat_chase": {"pattern": "chase", "delay": 200, "n1": 4, "n2": 4, "n3": 2, "n4": 1, "auto": False},
"beat_pulse": {"pattern": "pulse", "delay": 150, "n1": 300, "n2": 100, "n3": 300, "auto": False}
}
}
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Test 1: Beat with rainbow (manual mode) - should advance one step per beat
print(" Test 8.1: Beat with rainbow (manual mode)")
patterns.step = 0
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"beat_device": ["beat_rainbow"]
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_rainbow", "Should select beat_rainbow"
initial_step = patterns.step
# First beat - advance one step
mock_espnow.clear()
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg2) # Same select message = beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations, so step should be incremented
assert patterns.step == (initial_step + 1) % 256, f"Step should increment from {initial_step} to {(initial_step + 1) % 256}, got {patterns.step}"
# Second beat - advance another step
mock_espnow.clear()
mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2) # Beat again
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
assert patterns.step == (initial_step + 2) % 256, f"Step should increment to {(initial_step + 2) % 256}, got {patterns.step}"
print(" ✓ Rainbow beat advances one step per beat")
# Test 2: Beat with chase (manual mode) - should advance one step per beat
print(" Test 8.2: Beat with chase (manual mode)")
patterns.step = 0
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"beat_device": ["beat_chase"]
}
}
mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg3)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_chase", "Should select beat_chase"
initial_step = patterns.step
# First beat
mock_espnow.clear()
mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg3) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations
assert patterns.step == initial_step + 1, f"Chase step should increment from {initial_step} to {initial_step + 1}, got {patterns.step}"
# Second beat
mock_espnow.clear()
mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg3) # Beat again
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
assert patterns.step == initial_step + 2, f"Chase step should increment to {initial_step + 2}, got {patterns.step}"
print(" ✓ Chase beat advances one step per beat")
# Test 3: Beat with pulse (manual mode) - should restart full cycle
print(" Test 8.3: Beat with pulse (manual mode)")
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"beat_device": ["beat_pulse"]
}
}
mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_pulse", "Should select beat_pulse"
assert patterns.generator is not None, "Generator should be active"
# First beat - should restart generator
initial_generator = patterns.generator
mock_espnow.clear()
mock_espnow.send_message(b"\x33\x33\x33\x33\x33\x33", msg4) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.generator is not None, "Generator should still be active after beat"
assert patterns.generator != initial_generator, "Generator should be restarted (new instance)"
print(" ✓ Pulse beat restarts generator for full cycle")
# Test 4: Multiple beats in sequence
print(" Test 8.4: Multiple beats in sequence")
patterns.step = 0
mock_espnow.clear()
mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg2) # Select rainbow
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Send 5 beats
for i in range(5):
mock_espnow.clear()
mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg2) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations
wdt.feed()
utime.sleep_ms(50)
assert patterns.step == 5, f"After 5 beats, step should be 5, got {patterns.step}"
print(" ✓ Multiple beats work correctly")
print(" ✓ Beat functionality works correctly")
def test_select_with_step():
"""Test selecting a preset with an explicit step value."""
print("\nTest 9: Select with step value")
settings = Settings()
settings["name"] = "step_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create preset
msg1 = {
"v": "1",
"presets": {
"step_preset": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False}
}
}
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select with explicit step value
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"step_device": ["step_preset", 10]
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
assert patterns.selected == "step_preset", "Should select step_preset"
# Step is set to 10, then tick() advances it, so it should be 11
assert patterns.step == 11, f"Step should be set to 10 then advanced to 11 by tick(), got {patterns.step}"
print(" ✓ Step value set correctly")
# Select without step (should use default behavior)
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"step_device": ["step_preset"]
}
}
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg3)
initial_step = patterns.step # Should be 11
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
# Since it's the same preset, step should not be reset, but tick() will advance it
# So step should be initial_step + 1 (one tick call)
assert patterns.step == initial_step + 1, f"Step should advance from {initial_step} to {initial_step + 1} (not reset), got {patterns.step}"
print(" ✓ Step preserved when selecting same preset without step (tick advances it)")
# Select different preset with step
patterns.edit("other_preset", {"p": "rainbow", "a": False})
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"step_device": ["other_preset", 5]
}
}
mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
assert patterns.selected == "other_preset", "Should select other_preset"
# Step is set to 5, then tick() advances it, so it should be 6
assert patterns.step == 6, f"Step should be set to 5 then advanced to 6 by tick(), got {patterns.step}"
print(" ✓ Step set correctly when switching presets")
def test_preset_save_load():
"""Test saving and loading presets to/from JSON."""
print("\nTest 10: Preset save/load")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
patterns.edit("saved_preset", {
"p": "blink",
"d": 150,
"b": 200,
"c": [(1, 2, 3), (4, 5, 6)],
"a": False,
"n1": 1,
"n2": 2,
"n3": 3,
"n4": 4,
"n5": 5,
"n6": 6,
})
assert patterns.save(), "Save should return True"
reloaded = Presets(settings["led_pin"], settings["num_leds"])
assert reloaded.load(), "Load should return True"
preset = reloaded.presets.get("saved_preset")
assert preset is not None, "Preset should be loaded"
assert preset.p == "blink", "Pattern should be blink"
assert preset.d == 150, "Delay should be 150"
assert preset.b == 200, "Brightness should be 200"
assert preset.c == [(1, 2, 3), (4, 5, 6)], "Colors should be restored as tuples"
assert preset.a is False, "Auto should be False"
assert (preset.n1, preset.n2, preset.n3, preset.n4, preset.n5, preset.n6) == (1, 2, 3, 4, 5, 6), "n1-n6 should match"
try:
os.remove("presets.json")
except OSError:
pass
print(" ✓ Preset save/load works correctly")
def main():
"""Run all tests."""
print("=" * 60)
print("ESPNow Receive Functionality Tests")
print("=" * 60)
try:
test_version_check()
test_preset_creation()
test_color_conversion()
test_preset_update()
test_select()
test_full_message()
test_switch_presets()
test_beat_functionality()
test_select_with_step()
test_preset_save_load()
print("\n" + "=" * 60)
print("All tests passed! ✓")
print("=" * 60)
except AssertionError as e:
print("\n✗ Test failed:", e)
raise
except Exception as e:
print("\n✗ Unexpected error:", e)
raise
if __name__ == "__main__":
main()