Files
led-driver/test/test_espnow_receive.py
jimmy 43957adb28 Rename patterns module to presets
Rename the driver module and update imports so tests and main entry use the new presets naming, while moving Preset to its own file.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-07 11:40:04 +13:00

654 lines
23 KiB
Python

#!/usr/bin/env python3
"""Test ESPNow receive functionality - runs on MicroPython device."""
import json
import utime
from settings import Settings
from presets import Presets
from utils import convert_and_reorder_colors
class MockESPNow:
"""Mock ESPNow for testing that can send messages."""
def __init__(self):
self.messages = []
self.active_state = False
def active(self, state):
self.active_state = state
def any(self):
"""Return True if there are messages."""
return len(self.messages) > 0
def recv(self):
"""Receive a message (removes it from queue)."""
if self.messages:
return self.messages.pop(0)
return None, None
def send_message(self, host, msg_data):
"""Send a message by adding it to the queue (testing helper)."""
if isinstance(msg_data, dict):
msg = json.dumps(msg_data)
else:
msg = msg_data
self.messages.append((host, msg))
def clear(self):
"""Clear all messages (testing helper)."""
self.messages = []
from machine import WDT
def get_wdt():
"""Get a real WDT instance for tests."""
return WDT(timeout=10000) # 10 second timeout for tests
def run_main_loop_iterations(espnow, patterns, settings, wdt, max_iterations=10):
"""Run main loop iterations until no messages or max reached."""
iterations = 0
results = []
while iterations < max_iterations:
wdt.feed()
patterns.tick()
if espnow.any():
host, msg = espnow.recv()
data = json.loads(msg)
if data.get("v") != "1":
results.append(("version_rejected", data))
continue
if "presets" in data:
for name, preset_data in data["presets"].items():
# Convert hex color strings to RGB tuples and reorder based on device color order
if "colors" in preset_data:
preset_data["colors"] = convert_and_reorder_colors(preset_data["colors"], settings)
patterns.edit(name, preset_data)
results.append(("presets_processed", list(data["presets"].keys())))
if settings.get("name") in data.get("select", {}):
select_list = data["select"][settings.get("name")]
# Select value is always a list: ["preset_name"] or ["preset_name", step]
if select_list:
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
if patterns.select(preset_name, step=step):
results.append(("selected", preset_name))
iterations += 1
# Stop if no more messages
if not espnow.any():
break
return results
def test_version_check():
"""Test that messages with wrong version are rejected."""
print("Test 1: Version check")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Send message with wrong version
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "2", "presets": {"test": {"pattern": "on"}}})
results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert len([r for r in results if r[0] == "version_rejected"]) > 0, "Should reject wrong version"
assert "test" not in patterns.presets, "Preset should not be created"
print(" ✓ Version check passed")
# Send message with correct version
mock_espnow.clear()
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", {"v": "1", "presets": {"test": {"pattern": "on"}}})
results = run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert len([r for r in results if r[0] == "presets_processed"]) > 0, "Should process correct version"
assert "test" in patterns.presets, "Preset should be created"
print(" ✓ Correct version accepted")
def test_preset_creation():
"""Test preset creation from ESPNow messages."""
print("\nTest 2: Preset creation")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"test_blink": {
"pattern": "blink",
"colors": ["#FF0000", "#00FF00"],
"delay": 200,
"brightness": 128
},
"test_rainbow": {
"pattern": "rainbow",
"delay": 100,
"n1": 2
}
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert "test_blink" in patterns.presets, "test_blink preset should exist"
assert "test_rainbow" in patterns.presets, "test_rainbow preset should exist"
# Check preset values
blink_preset = patterns.presets["test_blink"]
assert blink_preset.pattern == "blink", "Pattern should be blink"
assert blink_preset.delay == 200, "Delay should be 200"
assert blink_preset.brightness == 128, "Brightness should be 128"
rainbow_preset = patterns.presets["test_rainbow"]
assert rainbow_preset.pattern == "rainbow", "Pattern should be rainbow"
assert rainbow_preset.n1 == 2, "n1 should be 2"
print(" ✓ Presets created correctly")
def test_color_conversion():
"""Test hex color string conversion and reordering."""
print("\nTest 3: Color conversion")
settings = Settings()
settings["color_order"] = "rgb" # Default RGB order
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"test_colors": {
"pattern": "on",
"colors": ["#FF0000", "#00FF00", "#0000FF"] # Red, Green, Blue
}
}
}
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
preset = patterns.presets["test_colors"]
assert len(preset.colors) == 3, "Should have 3 colors"
assert preset.colors[0] == (255, 0, 0), "First color should be red (255,0,0)"
assert preset.colors[1] == (0, 255, 0), "Second color should be green (0,255,0)"
assert preset.colors[2] == (0, 0, 255), "Third color should be blue (0,0,255)"
print(" ✓ Colors converted correctly (RGB order)")
# Test GRB order
settings["color_order"] = "grb"
patterns2 = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow2 = MockESPNow()
msg2 = {
"v": "1",
"presets": {
"test_grb": {
"pattern": "on",
"colors": ["#FF0000"] # Red in RGB, should become (0, 255, 0) in GRB
}
}
}
mock_espnow2.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2)
wdt2 = get_wdt()
run_main_loop_iterations(mock_espnow2, patterns2, settings, wdt2)
preset2 = patterns2.presets["test_grb"]
assert preset2.colors[0] == (0, 255, 0), "GRB: Red should become green (0,255,0)"
print(" ✓ Colors reordered correctly (GRB order)")
def test_preset_update():
"""Test that editing an existing preset updates it."""
print("\nTest 4: Preset update")
settings = Settings()
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create initial preset
msg1 = {
"v": "1",
"presets": {
"test_update": {
"pattern": "blink",
"delay": 100,
"brightness": 64
}
}
}
mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.presets["test_update"].delay == 100, "Initial delay should be 100"
# Update preset
mock_espnow.clear()
msg2 = {
"v": "1",
"presets": {
"test_update": {
"pattern": "blink",
"delay": 200,
"brightness": 128
}
}
}
mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.presets["test_update"].delay == 200, "Updated delay should be 200"
assert patterns.presets["test_update"].brightness == 128, "Updated brightness should be 128"
print(" ✓ Preset updated correctly")
def test_select():
"""Test preset selection."""
print("\nTest 5: Preset selection")
settings = Settings()
settings["name"] = "device1"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create presets
msg1 = {
"v": "1",
"presets": {
"preset1": {"pattern": "on", "colors": [(255, 0, 0)]},
"preset2": {"pattern": "rainbow", "delay": 50}
}
}
mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select preset
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"device1": ["preset1"],
"device2": ["preset2"]
}
}
mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset1", "Should select preset1"
print(" ✓ Preset selected correctly")
def test_full_message():
"""Test a full message with presets and select."""
print("\nTest 6: Full message (presets + select)")
settings = Settings()
settings["name"] = "test_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
msg = {
"v": "1",
"presets": {
"my_preset": {
"pattern": "pulse",
"colors": ["#FF0000", "#00FF00"],
"delay": 150,
"n1": 500,
"n2": 200,
"n3": 500
}
},
"select": {
"test_device": ["my_preset"],
"other_device": ["other_preset"]
}
}
mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert "my_preset" in patterns.presets, "Preset should be created"
assert patterns.selected == "my_preset", "Preset should be selected"
preset = patterns.presets["my_preset"]
assert preset.pattern == "pulse", "Pattern should be pulse"
assert preset.delay == 150, "Delay should be 150"
assert preset.n1 == 500, "n1 should be 500"
print(" ✓ Full message processed correctly")
def test_switch_presets():
"""Test switching between different presets."""
print("\nTest 7: Switch between presets")
settings = Settings()
settings["name"] = "switch_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create multiple presets
msg1 = {
"v": "1",
"presets": {
"preset_blink": {"pattern": "blink", "delay": 200, "colors": [(255, 0, 0)]},
"preset_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 2},
"preset_pulse": {"pattern": "pulse", "delay": 150, "n1": 500, "n2": 200, "n3": 500}
}
}
mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select and run first preset for 2 seconds
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"switch_device": ["preset_blink"]
}
}
mock_espnow.send_message(b"\x66\x66\x66\x66\x66\x66", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_blink", "Should select preset_blink"
print(" ✓ Selected preset_blink, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch to second preset and run for 2 seconds
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"switch_device": ["preset_rainbow"]
}
}
mock_espnow.send_message(b"\x77\x77\x77\x77\x77\x77", msg3)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_rainbow", "Should switch to preset_rainbow"
print(" ✓ Switched to preset_rainbow, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch to third preset and run for 2 seconds
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"switch_device": ["preset_pulse"]
}
}
mock_espnow.send_message(b"\x88\x88\x88\x88\x88\x88", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_pulse", "Should switch to preset_pulse"
print(" ✓ Switched to preset_pulse, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
# Switch back to first preset and run for 2 seconds
mock_espnow.clear()
msg5 = {
"v": "1",
"select": {
"switch_device": ["preset_blink"]
}
}
mock_espnow.send_message(b"\x99\x99\x99\x99\x99\x99", msg5)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "preset_blink", "Should switch back to preset_blink"
print(" ✓ Switched back to preset_blink, running for 2 seconds...")
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
patterns.tick()
utime.sleep_ms(10)
print(" ✓ Preset switching works correctly")
def test_beat_functionality():
"""Test beat functionality - calling select() again with same preset restarts pattern."""
print("\nTest 8: Beat functionality")
settings = Settings()
settings["name"] = "beat_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create presets with manual mode
msg1 = {
"v": "1",
"presets": {
"beat_rainbow": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False},
"beat_chase": {"pattern": "chase", "delay": 200, "n1": 4, "n2": 4, "n3": 2, "n4": 1, "auto": False},
"beat_pulse": {"pattern": "pulse", "delay": 150, "n1": 300, "n2": 100, "n3": 300, "auto": False}
}
}
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Test 1: Beat with rainbow (manual mode) - should advance one step per beat
print(" Test 8.1: Beat with rainbow (manual mode)")
patterns.step = 0
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"beat_device": ["beat_rainbow"]
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_rainbow", "Should select beat_rainbow"
initial_step = patterns.step
# First beat - advance one step
mock_espnow.clear()
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg2) # Same select message = beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations, so step should be incremented
assert patterns.step == (initial_step + 1) % 256, f"Step should increment from {initial_step} to {(initial_step + 1) % 256}, got {patterns.step}"
# Second beat - advance another step
mock_espnow.clear()
mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg2) # Beat again
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
assert patterns.step == (initial_step + 2) % 256, f"Step should increment to {(initial_step + 2) % 256}, got {patterns.step}"
print(" ✓ Rainbow beat advances one step per beat")
# Test 2: Beat with chase (manual mode) - should advance one step per beat
print(" Test 8.2: Beat with chase (manual mode)")
patterns.step = 0
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"beat_device": ["beat_chase"]
}
}
mock_espnow.send_message(b"\xee\xee\xee\xee\xee\xee", msg3)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_chase", "Should select beat_chase"
initial_step = patterns.step
# First beat
mock_espnow.clear()
mock_espnow.send_message(b"\xff\xff\xff\xff\xff\xff", msg3) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations
assert patterns.step == initial_step + 1, f"Chase step should increment from {initial_step} to {initial_step + 1}, got {patterns.step}"
# Second beat
mock_espnow.clear()
mock_espnow.send_message(b"\x11\x11\x11\x11\x11\x11", msg3) # Beat again
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
assert patterns.step == initial_step + 2, f"Chase step should increment to {initial_step + 2}, got {patterns.step}"
print(" ✓ Chase beat advances one step per beat")
# Test 3: Beat with pulse (manual mode) - should restart full cycle
print(" Test 8.3: Beat with pulse (manual mode)")
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"beat_device": ["beat_pulse"]
}
}
mock_espnow.send_message(b"\x22\x22\x22\x22\x22\x22", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.selected == "beat_pulse", "Should select beat_pulse"
assert patterns.generator is not None, "Generator should be active"
# First beat - should restart generator
initial_generator = patterns.generator
mock_espnow.clear()
mock_espnow.send_message(b"\x33\x33\x33\x33\x33\x33", msg4) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
assert patterns.generator is not None, "Generator should still be active after beat"
assert patterns.generator != initial_generator, "Generator should be restarted (new instance)"
print(" ✓ Pulse beat restarts generator for full cycle")
# Test 4: Multiple beats in sequence
print(" Test 8.4: Multiple beats in sequence")
patterns.step = 0
mock_espnow.clear()
mock_espnow.send_message(b"\x44\x44\x44\x44\x44\x44", msg2) # Select rainbow
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Send 5 beats
for i in range(5):
mock_espnow.clear()
mock_espnow.send_message(b"\x55\x55\x55\x55\x55\x55", msg2) # Beat
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=5)
# tick() is already called in run_main_loop_iterations
wdt.feed()
utime.sleep_ms(50)
assert patterns.step == 5, f"After 5 beats, step should be 5, got {patterns.step}"
print(" ✓ Multiple beats work correctly")
print(" ✓ Beat functionality works correctly")
def test_select_with_step():
"""Test selecting a preset with an explicit step value."""
print("\nTest 9: Select with step value")
settings = Settings()
settings["name"] = "step_device"
patterns = Presets(settings["led_pin"], settings["num_leds"])
mock_espnow = MockESPNow()
wdt = get_wdt()
# Create preset
msg1 = {
"v": "1",
"presets": {
"step_preset": {"pattern": "rainbow", "delay": 100, "n1": 1, "auto": False}
}
}
mock_espnow.send_message(b"\xaa\xaa\xaa\xaa\xaa\xaa", msg1)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt)
# Select with explicit step value
mock_espnow.clear()
msg2 = {
"v": "1",
"select": {
"step_device": ["step_preset", 10]
}
}
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
assert patterns.selected == "step_preset", "Should select step_preset"
# Step is set to 10, then tick() advances it, so it should be 11
assert patterns.step == 11, f"Step should be set to 10 then advanced to 11 by tick(), got {patterns.step}"
print(" ✓ Step value set correctly")
# Select without step (should use default behavior)
mock_espnow.clear()
msg3 = {
"v": "1",
"select": {
"step_device": ["step_preset"]
}
}
mock_espnow.send_message(b"\xcc\xcc\xcc\xcc\xcc\xcc", msg3)
initial_step = patterns.step # Should be 11
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
# Since it's the same preset, step should not be reset, but tick() will advance it
# So step should be initial_step + 1 (one tick call)
assert patterns.step == initial_step + 1, f"Step should advance from {initial_step} to {initial_step + 1} (not reset), got {patterns.step}"
print(" ✓ Step preserved when selecting same preset without step (tick advances it)")
# Select different preset with step
patterns.edit("other_preset", {"p": "rainbow", "a": False})
mock_espnow.clear()
msg4 = {
"v": "1",
"select": {
"step_device": ["other_preset", 5]
}
}
mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step
patterns.tick()
assert patterns.selected == "other_preset", "Should select other_preset"
# Step is set to 5, then tick() advances it, so it should be 6
assert patterns.step == 6, f"Step should be set to 5 then advanced to 6 by tick(), got {patterns.step}"
print(" ✓ Step set correctly when switching presets")
def main():
"""Run all tests."""
print("=" * 60)
print("ESPNow Receive Functionality Tests")
print("=" * 60)
try:
test_version_check()
test_preset_creation()
test_color_conversion()
test_preset_update()
test_select()
test_full_message()
test_switch_presets()
test_beat_functionality()
test_select_with_step()
print("\n" + "=" * 60)
print("All tests passed! ✓")
print("=" * 60)
except AssertionError as e:
print("\n✗ Test failed:", e)
raise
except Exception as e:
print("\n✗ Unexpected error:", e)
raise
if __name__ == "__main__":
main()