#!/usr/bin/env python3 """Self-contained led-driver test runner for MicroPython/mpremote. Run on device (from led-driver repo root):: mpremote connect run tests/all.py Or via dev helper:: python dev.py test """ import json import sys import utime from machine import WDT def _bootstrap_import_path(): """Find ``settings`` / ``presets`` whether this file lives in ``tests/`` or ``:/``.""" try: import uos as os except ImportError: import os candidates = [] try: here = __file__.rsplit("/", 1)[0] if here: candidates.append(here) parent = here.rsplit("/", 1)[0] if parent: candidates.append(parent) except NameError: pass candidates.extend([".", "..", "/"]) for p in candidates: if p and p not in sys.path: sys.path.insert(0, p) _bootstrap_import_path() from settings import Settings # noqa: E402 from presets import Presets, run_tick # noqa: E402 from preset import Preset # noqa: E402 from utils import convert_and_reorder_colors # noqa: E402 class _TestContext: def __init__(self): self.settings = Settings() self.settings["name"] = self.settings.get("name", "test_device") self.presets = Presets(self.settings["led_pin"], self.settings["num_leds"]) self.presets.b = self.settings.get("brightness", 255) self.wdt = WDT(timeout=10000) def tick_for_ms(self, duration_ms, sleep_ms=5): start = utime.ticks_ms() while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: self.wdt.feed() run_tick(self.presets) utime.sleep_ms(sleep_ms) def _pattern_loaded(ctx, pattern_id): return pattern_id in ctx.presets.patterns def _smoke_preset(ctx, name, data, ms=80): pattern_id = data.get("p") or data.get("pattern") if not _pattern_loaded(ctx, pattern_id): raise AssertionError("pattern not loaded: %s" % pattern_id) ctx.presets.edit(name, data) if not ctx.presets.select(name): raise AssertionError("select failed: %s" % name) ctx.tick_for_ms(ms) def _process_message(ctx, payload): """Small test helper that mirrors the main message handling logic.""" try: if isinstance(payload, (bytes, bytearray)): data = json.loads(payload) elif isinstance(payload, str): data = json.loads(payload) else: data = payload except (TypeError, ValueError): return "invalid_json" if not isinstance(data, dict): return "invalid_shape" if data.get("v") != "1": return "wrong_version" if "b" in data: try: ctx.presets.b = max(0, min(255, int(data["b"]))) except (TypeError, ValueError): pass if isinstance(data.get("presets"), dict): for name, preset_data in data["presets"].items(): if not isinstance(preset_data, dict): continue color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) if color_key is not None: try: preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key], ctx.settings ) except (TypeError, ValueError): continue ctx.presets.edit(name, preset_data) if isinstance(data.get("select"), dict) and ctx.settings.get("name") in data["select"]: select_list = data["select"][ctx.settings.get("name")] if isinstance(select_list, list) and select_list: preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None if isinstance(preset_name, str): ctx.presets.select(preset_name, step=step) if "default" in data: default_name = data["default"] this_device_name = ctx.settings.get("name") this_device_name_norm = ( this_device_name.strip().lower() if isinstance(this_device_name, str) else None ) should_apply_default = True if "targets" in data: should_apply_default = False targets = data.get("targets") if isinstance(targets, list) and this_device_name_norm: normalized_targets = [ target.strip().lower() for target in targets if isinstance(target, str) and target.strip() ] should_apply_default = this_device_name_norm in normalized_targets if ( should_apply_default and isinstance(default_name, str) and default_name and default_name in ctx.presets.presets ): ctx.settings["default"] = default_name if "save" in data: ctx.presets.save() return "ok" def test_invalid_messages_do_not_crash(): ctx = _TestContext() cases = [ b"{not-json", "[]", json.dumps({"v": "2"}), json.dumps({"v": "1", "presets": ["bad"]}), json.dumps({"v": "1", "select": {"test_device": "not-list"}}), json.dumps({"v": "1", "presets": {"x": {"c": ["#GG0000"]}}}), ] for payload in cases: _process_message(ctx, payload) ctx.wdt.feed() def test_preset_edit_sanitization(): ctx = _TestContext() ctx.presets.edit( "sanitize", { "pattern": "blink", "delay": "120", "brightness": "999", "auto": "false", "n1": "-5", "n2": "7", "unknown_field": "ignored", }, ) p = ctx.presets.presets["sanitize"] assert p.p == "blink" assert p.d == 120 assert p.b == 255 assert p.a is False assert p.n1 == 0 assert p.n2 == 7 assert not hasattr(p, "unknown_field") def test_preset_mode_alias_maps_to_n6(): ctx = _TestContext() ctx.presets.edit( "rainbow_mode", {"pattern": "colour_cycle", "mode": 1, "d": 50, "n1": 2, "a": True}, ) p = ctx.presets.presets["rainbow_mode"] assert p.p == "colour_cycle" assert p.n6 == 1 def test_style_mode_and_legacy_aliases(): from patterns.pattern_modes import style_mode p = Preset({"p": "colour_cycle", "mode": 0, "d": 50, "c": [(255, 0, 0)]}) assert style_mode(p, 0, {"rainbow": 1}) == 0 legacy = Preset({"p": "rainbow", "d": 50, "c": [(255, 0, 0)]}) assert style_mode(legacy, 0, {"rainbow": 1}) == 1 ctx = _TestContext() legacy_ids = ( "rainbow", "meteor_rain", "snowfall", "sparkle_trail", "marquee", "northern_wave", ) for lid in legacy_ids: if not _pattern_loaded(ctx, lid): raise AssertionError("legacy alias not registered: %s" % lid) def test_colour_conversion_and_transition(): ctx = _TestContext() msg = { "v": "1", "presets": { "fade": { "p": "transition", "c": ["#ff0000", "#00ff00"], "d": 80, "a": True, } }, "select": {ctx.settings["name"]: ["fade"]}, } result = _process_message(ctx, msg) assert result == "ok" assert ctx.presets.selected == "fade" ctx.tick_for_ms(250) def test_pattern_smoke(): ctx = _TestContext() cases = { "t_on": {"p": "on", "c": [(16, 8, 4)]}, "t_off": {"p": "off"}, "t_blink": {"p": "blink", "c": [(255, 0, 0)], "d": 20}, "t_colour_cycle": {"p": "colour_cycle", "n6": 0, "d": 5, "n1": 2, "c": [(255, 0, 0), (0, 255, 0)]}, "t_chase": {"p": "chase", "c": [(255, 0, 0), (0, 0, 255)], "n1": 3, "n2": 2, "n3": 1, "n4": 1, "d": 20}, } for name, data in cases.items(): _smoke_preset(ctx, name, data, ms=100) def test_merged_pattern_modes(): """Smoke each style (``n6`` / ``mode``) for merged multi-mode patterns.""" ctx = _TestContext() colors = [(200, 220, 255), (255, 180, 80)] cases = ( ("mc_grad", "colour_cycle", {"p": "colour_cycle", "n6": 0, "n1": 2, "d": 8, "c": colors}), ("mc_wheel", "colour_cycle", {"p": "colour_cycle", "mode": 1, "n1": 2, "d": 8}), ("chase_std", "chase", {"p": "chase", "n6": 0, "n1": 2, "n2": 2, "n3": 1, "n4": 1, "d": 15, "c": colors}), ("chase_marq", "chase", {"p": "chase", "n6": 1, "n1": 3, "n2": 2, "n3": 1, "d": 15, "c": colors}), ("meteor_0", "meteor", {"p": "meteor", "n6": 0, "n1": 4, "n2": 2, "n3": 8, "d": 10, "c": colors}), ("meteor_1", "meteor", {"p": "meteor", "n6": 1, "n1": 3, "n2": 2, "n3": 4, "d": 10, "c": colors}), ("part_0", "particles", {"p": "particles", "n6": 0, "n1": 4, "n2": 1, "d": 10, "c": colors}), ("part_1", "particles", {"p": "particles", "mode": 1, "n1": 3, "n2": 1, "n3": 4, "d": 10, "c": colors}), ("spark_0", "sparkle", {"p": "sparkle", "n6": 0, "n1": 4, "n2": 6, "d": 10, "c": colors}), ("spark_1", "sparkle", {"p": "sparkle", "n6": 1, "n1": 3, "n2": 4, "n3": 2, "d": 10, "c": colors}), ("aurora_0", "aurora", {"p": "aurora", "n6": 0, "n1": 3, "n2": 2, "n3": 0, "d": 12, "c": colors}), ("aurora_1", "aurora", {"p": "aurora", "mode": 1, "n1": 8, "n2": 2, "n3": 1, "d": 12, "c": colors}), ) for name, pattern_id, data in cases: if not _pattern_loaded(ctx, pattern_id): continue _smoke_preset(ctx, name, data, ms=60) legacy_smoke = ( ("leg_rainbow", "rainbow", {"p": "rainbow", "d": 8, "n1": 2}), ("leg_ice", "ice_sparkle", {"p": "ice_sparkle", "n1": 3, "n2": 2, "n3": 2, "d": 10, "c": colors}), ("leg_wave", "northern_wave", {"p": "northern_wave", "n1": 6, "n2": 2, "n3": 1, "d": 12, "c": colors}), ("leg_star", "starfall", {"p": "starfall", "n1": 3, "n2": 1, "n3": 3, "d": 10, "c": colors}), ) for name, pattern_id, data in legacy_smoke: if not _pattern_loaded(ctx, pattern_id): continue _smoke_preset(ctx, name, data, ms=60) def test_patterns_do_not_use_blocking_sleep(): try: import uos as os except ImportError: import os pattern_dir = "patterns" offenders = [] try: files = os.listdir(pattern_dir) except OSError: raise AssertionError("patterns directory is missing") skip = frozenset(("__init__.py", "main.py", "pattern_modes.py")) for filename in files: if not filename.endswith(".py") or filename in skip: continue path = pattern_dir + "/" + filename try: with open(path, "r") as f: src = f.read() except OSError: offenders.append(filename + " (unreadable)") continue if ( "utime.sleep(" in src or "utime.sleep_ms(" in src or "time.sleep(" in src or "time.sleep_ms(" in src ): offenders.append(filename) assert not offenders, "blocking sleep found in patterns: %s" % ", ".join(offenders) def test_default_requires_existing_preset(): ctx = _TestContext() _process_message(ctx, {"v": "1", "default": "missing"}) assert ctx.settings.get("default") != "missing" ctx.presets.edit("exists", {"p": "on"}) _process_message(ctx, {"v": "1", "default": "exists"}) assert ctx.settings.get("default") == "exists" def test_default_targets_gate_by_device_name(): ctx = _TestContext() ctx.settings["name"] = "a" ctx.presets.edit("targeted", {"p": "on"}) ctx.settings["default"] = "baseline" _process_message( ctx, {"v": "1", "default": "targeted", "targets": ["11"]}, ) assert ctx.settings.get("default") == "baseline" _process_message( ctx, {"v": "1", "default": "targeted", "targets": [" A "]}, ) assert ctx.settings.get("default") == "targeted" def test_save_and_load_roundtrip(): try: import uos as os except ImportError: import os ctx = _TestContext() ctx.presets.edit( "persist", {"p": "blink", "c": [(1, 2, 3), (4, 5, 6)], "d": 77, "b": 123, "a": False}, ) assert ctx.presets.save() reloaded = Presets(ctx.settings["led_pin"], ctx.settings["num_leds"]) assert reloaded.load(ctx.settings) p = reloaded.presets.get("persist") assert p is not None assert p.p == "blink" assert p.d == 77 assert p.b == 123 assert p.a is False assert p.c == [(1, 2, 3), (4, 5, 6)] try: os.remove("presets.json") except OSError: pass def run_all(): tests = [ test_invalid_messages_do_not_crash, test_preset_edit_sanitization, test_preset_mode_alias_maps_to_n6, test_style_mode_and_legacy_aliases, test_colour_conversion_and_transition, test_pattern_smoke, test_merged_pattern_modes, test_patterns_do_not_use_blocking_sleep, test_default_requires_existing_preset, test_default_targets_gate_by_device_name, test_save_and_load_roundtrip, ] print("=" * 56) print("led-driver self-contained tests") print("=" * 56) for test_func in tests: print("Running %s ..." % test_func.__name__) test_func() print(" PASS") print("-" * 56) print("All tests passed") if __name__ == "__main__": run_all()