import asyncio import json import argparse import signal try: import websockets # type: ignore except Exception as e: print("Please install websockets: pip install websockets") raise WS_URI = "ws://192.168.4.1/ws" # Default pattern suite aligned with current firmware patterns PATTERN_SUITE = [ {"pattern": "flicker", "delay": 80, "iterations": 30, "repeat_delay": 80, "colors": ["#ffaa00"]}, {"pattern": "fill_range", "n1": 10, "n2": 20, "delay": 400, "iterations": 1, "repeat_delay": 500, "colors": ["#888888"]}, {"pattern": "n_chase", "n1": 5, "n2": 5, "delay": 250, "iterations": 40, "repeat_delay": 120, "colors": ["#00ff88"]}, {"pattern": "alternating", "n1": 6, "n2": 6, "delay": 300, "iterations": 20, "repeat_delay": 300, "colors": ["#ff8800"]}, {"pattern": "pulse", "delay": 200, "iterations": 6, "repeat_delay": 300, "colors": ["#ffffff"]}, ] def build_message( pattern: str, n: int | None = None, delay: int | None = None, colors: list[str] | None = None, brightness: int | None = None, num_leds: int | None = None, n1: int | None = None, n2: int | None = None, name: str = "0", pattern_step: int | None = None, ): settings: dict[str, object] = { "pattern": pattern, } if n is not None: settings["n"] = n if delay is not None: settings["delay"] = delay if colors is not None: settings["colors"] = colors if brightness is not None: settings["brightness"] = brightness if num_leds is not None: settings["num_leds"] = num_leds if n1 is not None: settings["n1"] = n1 if n2 is not None: settings["n2"] = n2 if pattern_step is not None: settings["pattern_step"] = pattern_step # ESP-NOW-style nested payload keyed by name (e.g., "0") return {name: settings} async def send_once(uri: str, payload: dict, hold_ms: int | None = None): async with websockets.connect(uri) as ws: await ws.send(json.dumps(payload)) if hold_ms and hold_ms > 0: await asyncio.sleep(hold_ms / 1000) async def run_suite(uri: str): async with websockets.connect(uri) as ws: for cfg in PATTERN_SUITE: iterations = int(cfg.get("iterations", 10)) interval_ms = int(cfg.get("interval_ms", cfg.get("delay", 100) or 100)) repeat_ms = int(cfg.get("repeat_delay", interval_ms)) for i in range(iterations): msg = build_message( cfg.get("pattern", "off"), i, delay=cfg.get("delay"), colors=cfg.get("colors"), brightness=cfg.get("brightness", 127), num_leds=cfg.get("num_leds"), n1=cfg.get("n1"), n2=cfg.get("n2"), name=cfg.get("name", "0"), pattern_step=cfg.get("pattern_step"), ) print(msg) await ws.send(json.dumps(msg)) await asyncio.sleep(repeat_ms / 1000) def _parse_args(): p = argparse.ArgumentParser(description="WebSocket LED pattern tester") p.add_argument("--uri", default=WS_URI, help="WebSocket URI, default ws://192.168.4.1/ws") p.add_argument("--pattern", help="Single pattern to send (overrides suite)") p.add_argument("--delay", type=int, help="Delay ms") p.add_argument("--brightness", type=int, help="Brightness 0-255") p.add_argument("--num-leds", type=int, help="Number of LEDs") p.add_argument("--colors", nargs="*", help="Hex colors like #ff0000 #00ff00") p.add_argument("--on-width", type=int) p.add_argument("--off-width", type=int) p.add_argument("--n1", type=int) p.add_argument("--n2", type=int) p.add_argument("--name", default="0", help="Target name key for nested payload (default: 0)") p.add_argument("--iterations", type=int, help="How many cycles/messages to send") p.add_argument("--interval", type=int, help="Interval between messages in ms (default: delay or 100)") p.add_argument("--repeat-delay", dest="repeat_delay", type=int, help="Delay between repeats in ms (overrides --interval if set)") p.add_argument("--hold", type=int, default=1500, help="Hold ms for single send") return p.parse_args() def _setup_sigint(loop: asyncio.AbstractEventLoop): for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, loop.stop) except NotImplementedError: pass async def main_async(): args = _parse_args() if args.pattern: iterations = int(args.iterations or 1) interval_ms = int(args.interval or (args.delay if args.delay is not None else 100)) repeat_ms = int(args.repeat_delay or interval_ms) async with websockets.connect(args.uri) as ws: for i in range(iterations): msg = build_message( pattern=args.pattern, n=i, delay=args.delay, colors=args.colors, brightness=args.brightness, num_leds=args.num_leds, n1=args.n1, n2=args.n2, name=args.name, ) print(msg) await ws.send(json.dumps(msg)) await asyncio.sleep(repeat_ms / 1000) else: await run_suite(args.uri) def main(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) _setup_sigint(loop) try: loop.run_until_complete(main_async()) finally: try: loop.run_until_complete(asyncio.sleep(0)) except Exception: pass loop.close() if __name__ == "__main__": main()