#!/usr/bin/env python3 """Discover a Wi‑Fi LED driver via UDP hello, then drive it over WebSocket. 1. Listens on UDP (default port 8766) for the same JSON line the firmware sends (``v``, ``device_name``, ``mac``, ``type``: ``led``). 2. Opens ``ws://:/ws``. 3. Pushes a few test presets (``v``: ``"1"``) and cycles ``select`` for the reported ``device_name``. The firmware sends UDP hello about one second **after** HTTP is listening, so this script retries the WebSocket handshake by default. The device ``settings.json`` ``name`` must match ``device_name`` in the hello (and in each ``select`` map). Examples:: pipenv install --dev pipenv run python tests/device_ws_cycle.py pipenv run python tests/device_ws_cycle.py --timeout 60 --cycle-s 4 # Skip UDP; connect directly (set ``--device-name`` to the device's ``name``):: pipenv run python tests/device_ws_cycle.py --host 192.168.1.42 --device-name a """ from __future__ import annotations import argparse import asyncio import json import socket import sys def _parse_hello_line(data: bytes) -> tuple[dict | None, bytes]: line = data.split(b"\n", 1)[0].strip() if not line: return None, line try: obj = json.loads(line.decode("utf-8")) except (UnicodeError, ValueError, TypeError): return None, line if not isinstance(obj, dict): return None, line return obj, line def wait_for_udp_hello( bind: str, port: int, timeout_s: float, echo: bool, ) -> tuple[str, str, dict]: """Block until a valid hello arrives. Returns (device_ip, device_name, hello_dict).""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except OSError: pass try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) except OSError: pass sock.bind((bind, port)) sock.settimeout(timeout_s) print( f"UDP listening on {bind}:{port} (timeout {timeout_s}s) — " "power the device or wait for hello…", ) while True: try: data, addr = sock.recvfrom(2048) except socket.timeout as e: raise SystemExit(f"No UDP hello before timeout: {e}") from e peer_ip = addr[0] parsed, raw_line = _parse_hello_line(data) if parsed is None: print(f"Ignored datagram from {peer_ip!r}: {raw_line!r}") continue if str(parsed.get("v") or "") != "1": print(f"Ignored v={parsed.get('v')!r} from {peer_ip!r}") continue dev_type = parsed.get("type") or parsed.get("device_type") if dev_type is not None and dev_type != "led": print(f"Ignored type={dev_type!r} from {peer_ip!r}") continue name = str(parsed.get("device_name") or "").strip() mac = parsed.get("mac") if not name or not mac: print( f"Ignored hello without device_name/mac from {peer_ip!r}: {parsed!r}", ) continue print( f"Heard hello: ip={peer_ip!r} device_name={name!r} mac={mac!r}", ) if echo: try: sock.sendto(data, addr) except OSError as e: print(f"UDP echo to {addr} failed: {e!r}") return peer_ip, name, parsed finally: try: sock.close() except OSError: pass PRESETS = { "_test_on": {"p": "on", "c": [(0, 80, 200)]}, "_test_blink": {"p": "blink", "d": 120, "b": 200, "c": [(255, 40, 0), (0, 40, 255)]}, "_test_rainbow": {"p": "rainbow", "d": 12, "n1": 2, "a": True}, } PRESET_ORDER = ["_test_on", "_test_blink", "_test_rainbow"] async def cycle_presets( host: str, device_name: str, ws_port: int, ws_path: str, cycle_s: float, passes: int, *, ws_open_timeout_s: float, ws_connect_retries: int, ws_connect_retry_delay_s: float, ) -> None: try: import websockets except ImportError as e: raise SystemExit( "Install websockets: pipenv install websockets (or: pip install websockets)" ) from e path = ws_path if ws_path.startswith("/") else "/" + ws_path uri = f"ws://{host}:{ws_port}{path}" print(f"WebSocket connect {uri!r} …") n = max(1, ws_connect_retries) last_err: BaseException | None = None for attempt in range(n): try: async with websockets.connect( uri, open_timeout=ws_open_timeout_s, ) as ws: print("Connected.") push = json.dumps({"v": "1", "presets": PRESETS}) await ws.send(push) print(f"Sent presets: {list(PRESETS.keys())}") await asyncio.sleep(0.25) for p in range(passes): print(f"--- pass {p + 1}/{passes} ---") for pname in PRESET_ORDER: sel = json.dumps({"v": "1", "select": {device_name: [pname]}}) await ws.send(sel) print(f" select {pname!r}") await asyncio.sleep(cycle_s) print("Done.") return except (TimeoutError, OSError, ConnectionError) as e: last_err = e if attempt + 1 < n: print( f" connect failed ({e!r}), retry {attempt + 2}/{n} in " f"{ws_connect_retry_delay_s}s …", ) await asyncio.sleep(ws_connect_retry_delay_s) raise TimeoutError( f"WebSocket handshake failed after {n} attempts: {last_err!r}", ) from last_err def main() -> int: parser = argparse.ArgumentParser( description="UDP hello discovery + WebSocket preset cycle (led-driver)", ) parser.add_argument( "--bind", default="0.0.0.0", help="UDP bind address (default 0.0.0.0)", ) parser.add_argument( "-p", "--udp-port", type=int, default=8766, help="UDP listen port (default 8766)", ) parser.add_argument( "--timeout", type=float, default=120.0, help="Seconds to wait for first hello (default 120)", ) parser.add_argument( "--no-echo", action="store_true", help="Do not echo the datagram back (firmware often uses wait_reply=False)", ) parser.add_argument( "--host", default="", metavar="IP", help="Skip UDP and use this device IP", ) parser.add_argument( "--device-name", default="", metavar="NAME", help="Device settings name for select map (required with --host if not default)", ) parser.add_argument( "--ws-port", type=int, default=80, help="Device WebSocket port (default 80)", ) parser.add_argument( "--ws-path", default="/ws", help="WebSocket path (default /ws)", ) parser.add_argument( "--cycle-s", type=float, default=3.0, help="Seconds between select commands (default 3)", ) parser.add_argument( "--passes", type=int, default=2, help="How many full cycles through all test presets (default 2)", ) parser.add_argument( "--ws-open-timeout", type=float, default=30.0, help="Per-attempt WebSocket handshake timeout in seconds (default 30)", ) parser.add_argument( "--ws-retries", type=int, default=15, help="WebSocket connect attempts (default 15; use with device hello after HTTP)", ) parser.add_argument( "--ws-retry-delay", type=float, default=1.0, help="Seconds between WebSocket retries (default 1)", ) args = parser.parse_args() if args.host: host = args.host.strip() device_name = (args.device_name or "a").strip() if not device_name: print("--device-name is required when using a generic --host", file=sys.stderr) return 1 print(f"Using host {host!r} device_name {device_name!r} (no UDP)") else: host, device_name, _hello = wait_for_udp_hello( args.bind, args.udp_port, args.timeout, echo=not args.no_echo, ) try: asyncio.run( cycle_presets( host=host, device_name=device_name, ws_port=args.ws_port, ws_path=args.ws_path, cycle_s=args.cycle_s, passes=max(1, args.passes), ws_open_timeout_s=args.ws_open_timeout, ws_connect_retries=args.ws_retries, ws_connect_retry_delay_s=args.ws_retry_delay, ) ) except KeyboardInterrupt: print("\nInterrupted.") return 130 return 0 if __name__ == "__main__": raise SystemExit(main())