262 lines
8.0 KiB
Python
262 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Self-contained led-driver test runner for MicroPython/mpremote."""
|
|
|
|
import json
|
|
import os
|
|
import utime
|
|
from machine import WDT
|
|
|
|
from settings import Settings
|
|
from presets import Presets
|
|
from utils import convert_and_reorder_colors
|
|
|
|
|
|
class _TestContext:
|
|
def __init__(self):
|
|
self.settings = Settings()
|
|
self.settings["name"] = self.settings.get("name", "test_device")
|
|
self.presets = Presets(self.settings["led_pin"], self.settings["num_leds"])
|
|
self.presets.b = self.settings.get("brightness", 255)
|
|
self.wdt = WDT(timeout=10000)
|
|
|
|
def tick_for_ms(self, duration_ms, sleep_ms=5):
|
|
start = utime.ticks_ms()
|
|
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
|
|
self.wdt.feed()
|
|
self.presets.tick()
|
|
utime.sleep_ms(sleep_ms)
|
|
|
|
|
|
def _process_message(ctx, payload):
|
|
"""Small test helper that mirrors the main message handling logic."""
|
|
try:
|
|
if isinstance(payload, (bytes, bytearray)):
|
|
data = json.loads(payload)
|
|
elif isinstance(payload, str):
|
|
data = json.loads(payload)
|
|
else:
|
|
data = payload
|
|
except (TypeError, ValueError):
|
|
return "invalid_json"
|
|
|
|
if not isinstance(data, dict):
|
|
return "invalid_shape"
|
|
if data.get("v") != "1":
|
|
return "wrong_version"
|
|
|
|
if "b" in data:
|
|
try:
|
|
ctx.presets.b = max(0, min(255, int(data["b"])))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if isinstance(data.get("presets"), dict):
|
|
for name, preset_data in data["presets"].items():
|
|
if not isinstance(preset_data, dict):
|
|
continue
|
|
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
|
|
if color_key is not None:
|
|
try:
|
|
preset_data[color_key] = convert_and_reorder_colors(
|
|
preset_data[color_key], ctx.settings
|
|
)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
ctx.presets.edit(name, preset_data)
|
|
|
|
if isinstance(data.get("select"), dict) and ctx.settings.get("name") in data["select"]:
|
|
select_list = data["select"][ctx.settings.get("name")]
|
|
if isinstance(select_list, list) and select_list:
|
|
preset_name = select_list[0]
|
|
step = select_list[1] if len(select_list) > 1 else None
|
|
if isinstance(preset_name, str):
|
|
ctx.presets.select(preset_name, step=step)
|
|
|
|
if "default" in data:
|
|
default_name = data["default"]
|
|
this_device_name = ctx.settings.get("name")
|
|
this_device_name_norm = (
|
|
this_device_name.strip().lower()
|
|
if isinstance(this_device_name, str)
|
|
else None
|
|
)
|
|
should_apply_default = True
|
|
if "targets" in data:
|
|
should_apply_default = False
|
|
targets = data.get("targets")
|
|
if isinstance(targets, list) and this_device_name_norm:
|
|
normalized_targets = [
|
|
target.strip().lower()
|
|
for target in targets
|
|
if isinstance(target, str) and target.strip()
|
|
]
|
|
should_apply_default = this_device_name_norm in normalized_targets
|
|
if (
|
|
should_apply_default
|
|
and
|
|
isinstance(default_name, str)
|
|
and default_name
|
|
and default_name in ctx.presets.presets
|
|
):
|
|
ctx.settings["default"] = default_name
|
|
|
|
if "save" in data:
|
|
ctx.presets.save()
|
|
|
|
return "ok"
|
|
|
|
|
|
def test_invalid_messages_do_not_crash():
|
|
ctx = _TestContext()
|
|
cases = [
|
|
b"{not-json",
|
|
"[]",
|
|
json.dumps({"v": "2"}),
|
|
json.dumps({"v": "1", "presets": ["bad"]}),
|
|
json.dumps({"v": "1", "select": {"test_device": "not-list"}}),
|
|
json.dumps({"v": "1", "presets": {"x": {"c": ["#GG0000"]}}}),
|
|
]
|
|
for payload in cases:
|
|
_process_message(ctx, payload)
|
|
ctx.wdt.feed()
|
|
|
|
|
|
def test_preset_edit_sanitization():
|
|
ctx = _TestContext()
|
|
ctx.presets.edit(
|
|
"sanitize",
|
|
{
|
|
"pattern": "blink",
|
|
"delay": "120",
|
|
"brightness": "999",
|
|
"auto": "false",
|
|
"n1": "-5",
|
|
"n2": "7",
|
|
"unknown_field": "ignored",
|
|
},
|
|
)
|
|
p = ctx.presets.presets["sanitize"]
|
|
assert p.p == "blink"
|
|
assert p.d == 120
|
|
assert p.b == 255
|
|
assert p.a is False
|
|
assert p.n1 == 0
|
|
assert p.n2 == 7
|
|
assert not hasattr(p, "unknown_field")
|
|
|
|
|
|
def test_colour_conversion_and_transition():
|
|
ctx = _TestContext()
|
|
msg = {
|
|
"v": "1",
|
|
"presets": {
|
|
"fade": {
|
|
"p": "transition",
|
|
"c": ["#ff0000", "#00ff00"],
|
|
"d": 80,
|
|
"a": True,
|
|
}
|
|
},
|
|
"select": {ctx.settings["name"]: ["fade"]},
|
|
}
|
|
result = _process_message(ctx, msg)
|
|
assert result == "ok"
|
|
assert ctx.presets.selected == "fade"
|
|
# Smoke-run the generator to ensure math runs without type errors.
|
|
ctx.tick_for_ms(250)
|
|
|
|
|
|
def test_pattern_smoke():
|
|
ctx = _TestContext()
|
|
cases = {
|
|
"t_on": {"p": "on", "c": [(16, 8, 4)]},
|
|
"t_off": {"p": "off"},
|
|
"t_blink": {"p": "blink", "c": [(255, 0, 0)], "d": 20},
|
|
"t_rainbow": {"p": "rainbow", "d": 5, "n1": 2},
|
|
"t_pulse": {"p": "pulse", "c": [(255, 0, 0)], "n1": 20, "n2": 10, "n3": 20, "d": 10},
|
|
"t_transition": {"p": "transition", "c": [(255, 0, 0), (0, 0, 255)], "d": 30},
|
|
"t_chase": {"p": "chase", "c": [(255, 0, 0), (0, 0, 255)], "n1": 3, "n2": 2, "n3": 1, "n4": 1, "d": 20},
|
|
"t_circle": {"p": "circle", "c": [(255, 255, 0), (0, 0, 8)], "n1": 5, "n2": 10, "n3": 5, "n4": 2},
|
|
}
|
|
for name, data in cases.items():
|
|
ctx.presets.edit(name, data)
|
|
assert ctx.presets.select(name), "select failed: %s" % name
|
|
ctx.tick_for_ms(120)
|
|
|
|
|
|
def test_default_requires_existing_preset():
|
|
ctx = _TestContext()
|
|
_process_message(ctx, {"v": "1", "default": "missing"})
|
|
assert ctx.settings.get("default") != "missing"
|
|
|
|
ctx.presets.edit("exists", {"p": "on"})
|
|
_process_message(ctx, {"v": "1", "default": "exists"})
|
|
assert ctx.settings.get("default") == "exists"
|
|
|
|
def test_default_targets_gate_by_device_name():
|
|
ctx = _TestContext()
|
|
ctx.settings["name"] = "a"
|
|
ctx.presets.edit("targeted", {"p": "on"})
|
|
ctx.settings["default"] = "baseline"
|
|
|
|
_process_message(
|
|
ctx,
|
|
{"v": "1", "default": "targeted", "targets": ["11"]},
|
|
)
|
|
assert ctx.settings.get("default") == "baseline"
|
|
|
|
_process_message(
|
|
ctx,
|
|
{"v": "1", "default": "targeted", "targets": [" A "]},
|
|
)
|
|
assert ctx.settings.get("default") == "targeted"
|
|
|
|
|
|
def test_save_and_load_roundtrip():
|
|
ctx = _TestContext()
|
|
ctx.presets.edit(
|
|
"persist",
|
|
{"p": "blink", "c": [(1, 2, 3), (4, 5, 6)], "d": 77, "b": 123, "a": False},
|
|
)
|
|
assert ctx.presets.save()
|
|
|
|
reloaded = Presets(ctx.settings["led_pin"], ctx.settings["num_leds"])
|
|
assert reloaded.load(ctx.settings)
|
|
p = reloaded.presets.get("persist")
|
|
assert p is not None
|
|
assert p.p == "blink"
|
|
assert p.d == 77
|
|
assert p.b == 123
|
|
assert p.a is False
|
|
assert p.c == [(1, 2, 3), (4, 5, 6)]
|
|
|
|
try:
|
|
os.remove("presets.json")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def run_all():
|
|
tests = [
|
|
test_invalid_messages_do_not_crash,
|
|
test_preset_edit_sanitization,
|
|
test_colour_conversion_and_transition,
|
|
test_pattern_smoke,
|
|
test_default_requires_existing_preset,
|
|
test_default_targets_gate_by_device_name,
|
|
test_save_and_load_roundtrip,
|
|
]
|
|
print("=" * 56)
|
|
print("led-driver self-contained tests")
|
|
print("=" * 56)
|
|
for test_func in tests:
|
|
print("Running %s ..." % test_func.__name__)
|
|
test_func()
|
|
print(" PASS")
|
|
print("-" * 56)
|
|
print("All tests passed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_all()
|