#!/usr/bin/env python3 """Self-contained led-driver test runner for MicroPython/mpremote.""" import json import os import utime from machine import WDT from settings import Settings from presets import Presets from utils import convert_and_reorder_colors class _TestContext: def __init__(self): self.settings = Settings() self.settings["name"] = self.settings.get("name", "test_device") self.presets = Presets(self.settings["led_pin"], self.settings["num_leds"]) self.presets.b = self.settings.get("brightness", 255) self.wdt = WDT(timeout=10000) def tick_for_ms(self, duration_ms, sleep_ms=5): start = utime.ticks_ms() while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: self.wdt.feed() self.presets.tick() utime.sleep_ms(sleep_ms) def _process_message(ctx, payload): """Small test helper that mirrors the main message handling logic.""" try: if isinstance(payload, (bytes, bytearray)): data = json.loads(payload) elif isinstance(payload, str): data = json.loads(payload) else: data = payload except (TypeError, ValueError): return "invalid_json" if not isinstance(data, dict): return "invalid_shape" if data.get("v") != "1": return "wrong_version" if "b" in data: try: ctx.presets.b = max(0, min(255, int(data["b"]))) except (TypeError, ValueError): pass if isinstance(data.get("presets"), dict): for name, preset_data in data["presets"].items(): if not isinstance(preset_data, dict): continue color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) if color_key is not None: try: preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key], ctx.settings ) except (TypeError, ValueError): continue ctx.presets.edit(name, preset_data) if isinstance(data.get("select"), dict) and ctx.settings.get("name") in data["select"]: select_list = data["select"][ctx.settings.get("name")] if isinstance(select_list, list) and select_list: preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None if isinstance(preset_name, str): ctx.presets.select(preset_name, step=step) if "default" in data: default_name = data["default"] this_device_name = ctx.settings.get("name") this_device_name_norm = ( this_device_name.strip().lower() if isinstance(this_device_name, str) else None ) should_apply_default = True if "targets" in data: should_apply_default = False targets = data.get("targets") if isinstance(targets, list) and this_device_name_norm: normalized_targets = [ target.strip().lower() for target in targets if isinstance(target, str) and target.strip() ] should_apply_default = this_device_name_norm in normalized_targets if ( should_apply_default and isinstance(default_name, str) and default_name and default_name in ctx.presets.presets ): ctx.settings["default"] = default_name if "save" in data: ctx.presets.save() return "ok" def test_invalid_messages_do_not_crash(): ctx = _TestContext() cases = [ b"{not-json", "[]", json.dumps({"v": "2"}), json.dumps({"v": "1", "presets": ["bad"]}), json.dumps({"v": "1", "select": {"test_device": "not-list"}}), json.dumps({"v": "1", "presets": {"x": {"c": ["#GG0000"]}}}), ] for payload in cases: _process_message(ctx, payload) ctx.wdt.feed() def test_preset_edit_sanitization(): ctx = _TestContext() ctx.presets.edit( "sanitize", { "pattern": "blink", "delay": "120", "brightness": "999", "auto": "false", "n1": "-5", "n2": "7", "unknown_field": "ignored", }, ) p = ctx.presets.presets["sanitize"] assert p.p == "blink" assert p.d == 120 assert p.b == 255 assert p.a is False assert p.n1 == 0 assert p.n2 == 7 assert not hasattr(p, "unknown_field") def test_colour_conversion_and_transition(): ctx = _TestContext() msg = { "v": "1", "presets": { "fade": { "p": "transition", "c": ["#ff0000", "#00ff00"], "d": 80, "a": True, } }, "select": {ctx.settings["name"]: ["fade"]}, } result = _process_message(ctx, msg) assert result == "ok" assert ctx.presets.selected == "fade" # Smoke-run the generator to ensure math runs without type errors. ctx.tick_for_ms(250) def test_pattern_smoke(): ctx = _TestContext() cases = { "t_on": {"p": "on", "c": [(16, 8, 4)]}, "t_off": {"p": "off"}, "t_blink": {"p": "blink", "c": [(255, 0, 0)], "d": 20}, "t_rainbow": {"p": "rainbow", "d": 5, "n1": 2}, "t_pulse": {"p": "pulse", "c": [(255, 0, 0)], "n1": 20, "n2": 10, "n3": 20, "d": 10}, "t_transition": {"p": "transition", "c": [(255, 0, 0), (0, 0, 255)], "d": 30}, "t_chase": {"p": "chase", "c": [(255, 0, 0), (0, 0, 255)], "n1": 3, "n2": 2, "n3": 1, "n4": 1, "d": 20}, "t_circle": {"p": "circle", "c": [(255, 255, 0), (0, 0, 8)], "n1": 5, "n2": 10, "n3": 5, "n4": 2}, } for name, data in cases.items(): ctx.presets.edit(name, data) assert ctx.presets.select(name), "select failed: %s" % name ctx.tick_for_ms(120) def test_default_requires_existing_preset(): ctx = _TestContext() _process_message(ctx, {"v": "1", "default": "missing"}) assert ctx.settings.get("default") != "missing" ctx.presets.edit("exists", {"p": "on"}) _process_message(ctx, {"v": "1", "default": "exists"}) assert ctx.settings.get("default") == "exists" def test_default_targets_gate_by_device_name(): ctx = _TestContext() ctx.settings["name"] = "a" ctx.presets.edit("targeted", {"p": "on"}) ctx.settings["default"] = "baseline" _process_message( ctx, {"v": "1", "default": "targeted", "targets": ["11"]}, ) assert ctx.settings.get("default") == "baseline" _process_message( ctx, {"v": "1", "default": "targeted", "targets": [" A "]}, ) assert ctx.settings.get("default") == "targeted" def test_save_and_load_roundtrip(): ctx = _TestContext() ctx.presets.edit( "persist", {"p": "blink", "c": [(1, 2, 3), (4, 5, 6)], "d": 77, "b": 123, "a": False}, ) assert ctx.presets.save() reloaded = Presets(ctx.settings["led_pin"], ctx.settings["num_leds"]) assert reloaded.load(ctx.settings) p = reloaded.presets.get("persist") assert p is not None assert p.p == "blink" assert p.d == 77 assert p.b == 123 assert p.a is False assert p.c == [(1, 2, 3), (4, 5, 6)] try: os.remove("presets.json") except OSError: pass def run_all(): tests = [ test_invalid_messages_do_not_crash, test_preset_edit_sanitization, test_colour_conversion_and_transition, test_pattern_smoke, test_default_requires_existing_preset, test_default_targets_gate_by_device_name, test_save_and_load_roundtrip, ] print("=" * 56) print("led-driver self-contained tests") print("=" * 56) for test_func in tests: print("Running %s ..." % test_func.__name__) test_func() print(" PASS") print("-" * 56) print("All tests passed") if __name__ == "__main__": run_all()