Add Pico presets engine, patterns, and tests.
Wire the Pico to UART-driven preset selection, add pattern modules and presets data, remove old p2p/settings code, and update tests and LED driver. Made-with: Cursor
This commit is contained in:
78
pico/test/chase.py
Normal file
78
pico/test/chase.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import sys
|
||||
if "lib" not in sys.path:
|
||||
sys.path.insert(0, "lib")
|
||||
if "../lib" not in sys.path:
|
||||
sys.path.insert(0, "../lib")
|
||||
from ws2812 import WS2812B
|
||||
import time
|
||||
|
||||
# --- Chase test: pregenerated double buffer per strip, show via head offset (same as rainbow) ---
|
||||
|
||||
# (pin, num_leds) per strip — same config as rainbow
|
||||
STRIP_CONFIG = (
|
||||
(2, 291),
|
||||
(3, 290),
|
||||
(4, 283),
|
||||
(7, 278),
|
||||
(0, 275),
|
||||
(28, 278),
|
||||
(29, 283),
|
||||
(6, 290),
|
||||
)
|
||||
|
||||
strips = []
|
||||
sm = 0
|
||||
for pin, num_leds in STRIP_CONFIG:
|
||||
print(pin, num_leds)
|
||||
ws = WS2812B(num_leds, pin, sm, brightness=1.0)
|
||||
strips.append(ws)
|
||||
sm += 1
|
||||
|
||||
cumulative_leds = [0]
|
||||
for ws in strips[:-1]:
|
||||
cumulative_leds.append(cumulative_leds[-1] + ws.num_leds)
|
||||
total_ring_leds = cumulative_leds[-1] + strips[-1].num_leds
|
||||
|
||||
# Chase: trail length (0 = single LED), color (R,G,B)
|
||||
TRAIL_LEN = 8
|
||||
CHASE_COLOR = (0, 255, 100) # cyan-green
|
||||
|
||||
|
||||
def make_chase_double(num_leds, cumulative_leds, total_ring_leds, color, trail_len=0):
|
||||
"""Pregenerate strip double buffer: when head shows index b first, that pixel is at
|
||||
distance (2*cumulative_leds - b) % total_ring_leds from chase head. GRB order."""
|
||||
n = 2 * num_leds
|
||||
buf = bytearray(n * 3)
|
||||
for b in range(n):
|
||||
dist = (2 * cumulative_leds - b) % total_ring_leds
|
||||
if dist == 0:
|
||||
r, grn, b_ = color[0], color[1], color[2]
|
||||
elif trail_len and 0 < dist <= trail_len:
|
||||
fade = 1.0 - (dist / (trail_len + 1))
|
||||
r = int(color[0] * fade)
|
||||
grn = int(color[1] * fade)
|
||||
b_ = int(color[2] * fade)
|
||||
else:
|
||||
r = grn = b_ = 0
|
||||
o = b * 3
|
||||
buf[o] = grn
|
||||
buf[o + 1] = r
|
||||
buf[o + 2] = b_
|
||||
return buf
|
||||
|
||||
|
||||
# Pregenerate one double buffer per strip
|
||||
chase_buffers = [
|
||||
make_chase_double(ws.num_leds, cumulative_leds[i], total_ring_leds, CHASE_COLOR, TRAIL_LEN)
|
||||
for i, ws in enumerate(strips)
|
||||
]
|
||||
|
||||
chase_pos = 0
|
||||
while True:
|
||||
for i, strip in enumerate(strips):
|
||||
# head in [0, strip_len) so DMA read head..head+num_leds*3 stays in double buffer (same as rainbow)
|
||||
strip_len = strip.num_leds * 3
|
||||
head = (chase_pos + cumulative_leds[i]) * 3 % strip_len
|
||||
strip.show(chase_buffers[i], head)
|
||||
chase_pos = (chase_pos + 1) % total_ring_leds
|
||||
time.sleep_ms(20)
|
||||
@@ -28,13 +28,13 @@ def hue_to_rgb(hue):
|
||||
return (int(r * 255), int(g * 255), int(b * 255))
|
||||
|
||||
|
||||
def make_rainbow_ring(total_leds, brightness=1.0):
|
||||
"""Build one rainbow over the whole ring: 2 full hue cycles over total_leds (GRB).
|
||||
Returns (double_buf, ring_len_bytes). All strips sample from this so phase is continuous."""
|
||||
n = 2 * total_leds
|
||||
def make_rainbow_double(num_leds, brightness=1.0):
|
||||
"""Build 2 full rainbow cycles (2*num_leds pixels, GRB). Returns (double_buf, strip_len).
|
||||
head must be in 0..strip_len-1 so DMA reads double_buf[head:head+strip_len] with no copy."""
|
||||
n = 2 * num_leds
|
||||
double_buf = bytearray(n * 3)
|
||||
for i in range(n):
|
||||
hue = ((i % total_leds) / total_leds) * 360 * 2
|
||||
hue = (i / n) * 360 * 2
|
||||
r, g, b = hue_to_rgb(hue)
|
||||
g = int(g * brightness) & 0xFF
|
||||
r = int(r * brightness) & 0xFF
|
||||
@@ -43,48 +43,27 @@ def make_rainbow_ring(total_leds, brightness=1.0):
|
||||
double_buf[o] = g
|
||||
double_buf[o + 1] = r
|
||||
double_buf[o + 2] = b
|
||||
ring_len_bytes = total_leds * 3
|
||||
return (double_buf, ring_len_bytes)
|
||||
strip_len = num_leds * 3
|
||||
return (double_buf, strip_len)
|
||||
|
||||
|
||||
def make_strip_rainbow(num_leds, cumulative_leds, total_ring_leds, brightness=1.0):
|
||||
"""Per-strip double buffer: pixel j has hue at global position (cumulative_leds + j) % total_ring_leds.
|
||||
Use same head for all strips: head = rainbow_head % (2*num_leds*3)."""
|
||||
n = 2 * num_leds
|
||||
buf = bytearray(n * 3)
|
||||
for j in range(n):
|
||||
global_pos = (cumulative_leds + j) % total_ring_leds
|
||||
hue = (global_pos / total_ring_leds) * 360 * 2
|
||||
r, g, b = hue_to_rgb(hue)
|
||||
g = int(g * brightness) & 0xFF
|
||||
r = int(r * brightness) & 0xFF
|
||||
b = int(b * brightness) & 0xFF
|
||||
o = j * 3
|
||||
buf[o] = g
|
||||
buf[o + 1] = r
|
||||
buf[o + 2] = b
|
||||
strip_len_bytes = num_leds * 3
|
||||
return (buf, strip_len_bytes)
|
||||
def show_rainbow(strip, double_buf, strip_len, head):
|
||||
"""DMA reads directly from double_buf at head; no copy. head in 0..strip_len-1."""
|
||||
strip.show(double_buf, head)
|
||||
|
||||
|
||||
def show_rainbow_segment(strip, buf, strip_len_bytes, head):
|
||||
"""DMA reads strip's segment from buf at head."""
|
||||
strip.show(buf, head)
|
||||
|
||||
|
||||
# --- Strips + one global ring rainbow (all strips in phase) ---
|
||||
# Each strip can have a different length; one rainbow spans total_ring_leds so hue is continuous.
|
||||
|
||||
# (pin, num_leds) per strip — lengths differ per segment
|
||||
# --- Strips + rainbow buffers per strip ---
|
||||
# Each strip can have a different length; buffers and phase are per-strip.
|
||||
# Strip config must match pico/src/main.py pins.
|
||||
STRIP_CONFIG = (
|
||||
(2, 291),
|
||||
(7, 291),
|
||||
(3, 290),
|
||||
(4, 283),
|
||||
(7, 278),
|
||||
(0, 275),
|
||||
(6, 283),
|
||||
(28, 278),
|
||||
(29, 283),
|
||||
(6, 290),
|
||||
(29, 275),
|
||||
(4, 278),
|
||||
(0, 283),
|
||||
(2, 290),
|
||||
)
|
||||
|
||||
strips = []
|
||||
@@ -102,24 +81,19 @@ for ws in strips[:-1]:
|
||||
total_ring_leds = cumulative_leds[-1] + strips[-1].num_leds
|
||||
bytes_per_cycle = total_ring_leds * 3
|
||||
|
||||
# Per-strip rainbow buffers: each strip's segment of the ring (same phase, no shared-buffer DMA)
|
||||
# One rainbow double buffer per strip (length = 2 * num_leds for that strip)
|
||||
now = time.ticks_ms()
|
||||
rainbow_data = [
|
||||
make_strip_rainbow(ws.num_leds, cumulative_leds[i], total_ring_leds, ws.brightness)
|
||||
for i, ws in enumerate(strips)
|
||||
]
|
||||
rainbow_data = [make_rainbow_double(ws.num_leds, ws.brightness) for ws in strips]
|
||||
# Global phase in bytes; each strip: head = (phase + cumulative_leds[i]*3) % strip_len[i]
|
||||
print(time.ticks_diff(time.ticks_ms(), now), "ms")
|
||||
|
||||
rainbow_head = 0
|
||||
step = 3
|
||||
|
||||
while True:
|
||||
now = time.ticks_ms()
|
||||
for i, (strip, (buf, strip_len_bytes)) in enumerate(zip(strips, rainbow_data)):
|
||||
# Same head for all: each strip's buffer is already offset by cumulative_leds[i]
|
||||
double_len_bytes = 2 * strip.num_leds * 3
|
||||
head = rainbow_head % double_len_bytes
|
||||
show_rainbow_segment(strip, buf, strip_len_bytes, head)
|
||||
for i, (strip, (double_buf, strip_len)) in enumerate(zip(strips, rainbow_data)):
|
||||
head = (rainbow_head + cumulative_leds[i] * 3) % strip_len
|
||||
show_rainbow(strip, double_buf, strip_len, head)
|
||||
rainbow_head = (rainbow_head + step) % bytes_per_cycle
|
||||
#print(time.ticks_diff(time.ticks_ms(), now), "ms")
|
||||
time.sleep_ms(10)
|
||||
|
||||
81
pico/test/roll_strips.py
Normal file
81
pico/test/roll_strips.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import math
|
||||
import sys
|
||||
if "lib" not in sys.path:
|
||||
sys.path.insert(0, "lib")
|
||||
if "../lib" not in sys.path:
|
||||
sys.path.insert(0, "../lib")
|
||||
from ws2812 import WS2812B
|
||||
import time
|
||||
|
||||
# --- Roll: N buffers (length = max strip), gradient full -> off; sequence through strips ---
|
||||
N_BUFFERS = 32 # more buffers = smoother transition
|
||||
|
||||
STRIP_CONFIG = (
|
||||
(2, 291),
|
||||
(3, 290),
|
||||
(4, 283),
|
||||
(7, 278),
|
||||
(0, 275),
|
||||
(28, 278),
|
||||
(29, 283),
|
||||
(6, 290),
|
||||
)
|
||||
|
||||
strips = []
|
||||
sm = 0
|
||||
for pin, num_leds in STRIP_CONFIG:
|
||||
print(pin, num_leds)
|
||||
ws = WS2812B(num_leds, pin, sm, brightness=1.0)
|
||||
strips.append(ws)
|
||||
sm += 1
|
||||
|
||||
num_strips = len(strips)
|
||||
max_leds = max(ws.num_leds for ws in strips)
|
||||
# Color when "on" (R, G, B); GRB order in buffer
|
||||
ROLL_COLOR = (0, 255, 120) # cyan-green
|
||||
|
||||
|
||||
def make_gradient_buffers(n_buffers, max_leds, color):
|
||||
"""Create n_buffers buffers, each max_leds long. Buffer 0 = full brightness, last = off.
|
||||
Gradient is logarithmic (perceptually smoother: more steps near full, fewer near off). GRB order."""
|
||||
out = []
|
||||
for j in range(n_buffers):
|
||||
# log gradient: scale = 255 * log(1 + (n - 1 - j)) / log(n) so 255 at j=0, 0 at j=n-1
|
||||
if n_buffers <= 1:
|
||||
scale = 255
|
||||
elif j >= n_buffers - 1:
|
||||
scale = 0
|
||||
else:
|
||||
# 1 + (n_buffers - 1 - j) runs from n_buffers down to 1
|
||||
scale = int(255 * math.log(1 + (n_buffers - 1 - j)) / math.log(n_buffers))
|
||||
scale = min(255, scale)
|
||||
buf = bytearray(max_leds * 3)
|
||||
r = (color[0] * scale) // 255
|
||||
g = (color[1] * scale) // 255
|
||||
b = (color[2] * scale) // 255
|
||||
for i in range(max_leds):
|
||||
o = i * 3
|
||||
buf[o] = g & 0xFF
|
||||
buf[o + 1] = r & 0xFF
|
||||
buf[o + 2] = b & 0xFF
|
||||
out.append(buf)
|
||||
return out
|
||||
|
||||
|
||||
# N buffers: first full, last off, gradient between
|
||||
buffers = make_gradient_buffers(N_BUFFERS, max_leds, ROLL_COLOR)
|
||||
|
||||
step = 0
|
||||
delay_ms = 50
|
||||
# Deadline-based loop: no extra pause at rotation wrap, smooth continuous roll
|
||||
next_ms = time.ticks_ms()
|
||||
|
||||
while True:
|
||||
for i, strip in enumerate(strips):
|
||||
buf_index = (step + i) % N_BUFFERS
|
||||
strip.show(buffers[buf_index], 0)
|
||||
step += 1 # unbounded; wrap only in index so no hitch at cycle end
|
||||
next_ms += delay_ms
|
||||
# Sleep until next frame time (handles drift, no pause at wrap)
|
||||
while time.ticks_diff(next_ms, time.ticks_ms()) > 0:
|
||||
time.sleep_ms(1)
|
||||
45
pico/test/test_serial.py
Normal file
45
pico/test/test_serial.py
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Serial loopback test – single file, runs on Pico and ESP32.
|
||||
Wire TX to RX (Pico: GP0–GP1, ESP32: 17–18), then:
|
||||
mpremote run pico/test/test_serial.py
|
||||
|
||||
For ESP32→Pico: run test_serial_send.py on ESP32, test_serial_receive.py on Pico; wire ESP32 TX (17) to Pico RX (1).
|
||||
"""
|
||||
import time
|
||||
import sys
|
||||
from machine import UART, Pin
|
||||
|
||||
if "esp32" in sys.platform:
|
||||
UART_ID, TX_PIN, RX_PIN, BAUD = 1, 17, 18, 115200
|
||||
else:
|
||||
UART_ID, TX_PIN, RX_PIN, BAUD = 0, 0, 1, 115200
|
||||
|
||||
READ_TIMEOUT_MS = 100
|
||||
LINE_TERM = b"\n"
|
||||
|
||||
print("UART loopback: %s UART%d TX=%s RX=%s %d baud" % (sys.platform, UART_ID, TX_PIN, RX_PIN, BAUD))
|
||||
uart = UART(UART_ID, baudrate=BAUD, tx=Pin(TX_PIN, Pin.OUT), rx=Pin(RX_PIN, Pin.IN))
|
||||
uart.read()
|
||||
to_send = [b"hello", b"123", b"{\"v\":\"1\"}"]
|
||||
errors = []
|
||||
for msg in to_send:
|
||||
uart.write(msg + LINE_TERM)
|
||||
time.sleep_ms(20)
|
||||
buf = bytearray()
|
||||
deadline = time.ticks_add(time.ticks_ms(), READ_TIMEOUT_MS)
|
||||
while time.ticks_diff(deadline, time.ticks_ms()) > 0:
|
||||
n = uart.any()
|
||||
if n:
|
||||
buf.extend(uart.read(n))
|
||||
if LINE_TERM in buf:
|
||||
break
|
||||
time.sleep_ms(2)
|
||||
got = bytes(buf).strip()
|
||||
if got != msg:
|
||||
errors.append((msg, got))
|
||||
uart.deinit()
|
||||
if errors:
|
||||
print("FAIL loopback:", errors)
|
||||
else:
|
||||
print("PASS loopback: sent and received", to_send)
|
||||
32
pico/test/test_serial_receive.py
Normal file
32
pico/test/test_serial_receive.py
Normal file
@@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Serial receive test – single file. Run on Pico (RX side).
|
||||
Wire: ESP32 TX (GPIO17) → Pico RX (GPIO1); GND ↔ GND. Run send test on ESP32.
|
||||
mpremote run pico/test/test_serial_receive.py
|
||||
"""
|
||||
import time
|
||||
import sys
|
||||
from machine import UART, Pin
|
||||
|
||||
if "esp32" in sys.platform:
|
||||
UART_ID, TX_PIN, RX_PIN, BAUD = 1, 17, 18, 115200
|
||||
else:
|
||||
UART_ID, TX_PIN, RX_PIN, BAUD = 0, 0, 1, 115200
|
||||
|
||||
print("UART receive: %s UART%d TX=%s RX=%s %d baud (10 s)" % (sys.platform, UART_ID, TX_PIN, RX_PIN, BAUD))
|
||||
uart = UART(UART_ID, baudrate=BAUD, tx=Pin(TX_PIN, Pin.OUT), rx=Pin(RX_PIN, Pin.IN))
|
||||
buf = bytearray()
|
||||
deadline = time.ticks_add(time.ticks_ms(), 10000)
|
||||
while time.ticks_diff(deadline, time.ticks_ms()) > 0:
|
||||
n = uart.any()
|
||||
if n:
|
||||
buf.extend(uart.read(n))
|
||||
while b"\n" in buf:
|
||||
idx = buf.index(b"\n")
|
||||
line = bytes(buf[:idx]).strip()
|
||||
buf = buf[idx + 1:]
|
||||
if line:
|
||||
print("rx:", line.decode("utf-8", "replace"))
|
||||
time.sleep_ms(10)
|
||||
uart.deinit()
|
||||
print("Receive test done.")
|
||||
23
pico/test/test_serial_send.py
Normal file
23
pico/test/test_serial_send.py
Normal file
@@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Serial send test – single file. Run on ESP32 (TX side).
|
||||
Wire: ESP32 TX (GPIO17) → Pico RX (GPIO1); GND ↔ GND. Run receive test on Pico.
|
||||
mpremote run pico/test/test_serial_send.py
|
||||
"""
|
||||
import time
|
||||
import sys
|
||||
from machine import UART, Pin
|
||||
|
||||
if "esp32" in sys.platform:
|
||||
UART_ID, TX_PIN, BAUD = 1, 17, 115200
|
||||
else:
|
||||
UART_ID, TX_PIN, BAUD = 0, 0, 115200
|
||||
|
||||
print("UART send: %s UART%d TX=%s %d baud" % (sys.platform, UART_ID, TX_PIN, BAUD))
|
||||
uart = UART(UART_ID, baudrate=BAUD, tx=Pin(TX_PIN, Pin.OUT))
|
||||
for line in [b"serial send test 1", b"serial send test 2", b"{\"v\":\"1\",\"b\":128}"]:
|
||||
uart.write(line + b"\n")
|
||||
print("sent:", line.decode("utf-8"))
|
||||
time.sleep_ms(50)
|
||||
uart.deinit()
|
||||
print("Send test done.")
|
||||
Reference in New Issue
Block a user