#!/usr/bin/env python3 """Send blink preset + select to a driver via the ESP-NOW bridge WebSocket. Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot, pattern, and colours; this script reaches the driver over ESP-NOW through ``espnow-sender`` (devices envelope, not legacy broadcast JSON). Run from the **led-controller** repo (needs ``websockets`` in Pipenv):: pipenv run python led-driver/tests/bridge_ws_blink.py pipenv run python led-driver/tests/bridge_ws_blink.py \\ --url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8 From **led-driver** (if Pipenv/env is the parent project):: pipenv run python tests/bridge_ws_blink.py --dry-run """ from __future__ import annotations import argparse import asyncio import json import re import sys from pathlib import Path from typing import Any, Dict, Optional # led-driver/tests -> led-driver -> led-controller LED_DRIVER_ROOT = Path(__file__).resolve().parents[1] PROJECT_ROOT = LED_DRIVER_ROOT.parent def _load_bridge_url(explicit: Optional[str]) -> str: if explicit and explicit.strip(): return explicit.strip() for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"): if not path.is_file(): continue try: data = json.loads(path.read_text(encoding="utf-8")) url = str(data.get("bridge_ws_url") or "").strip() if url: return url except (OSError, json.JSONDecodeError, TypeError): pass return "ws://192.168.4.1/ws" def _format_mac(mac: str) -> str: s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower()) if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s): raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)") return ":".join(s[i : i + 2] for i in range(0, 12, 2)) def build_blink_envelope( mac: str, *, preset_id: str = "2", delay_ms: int = 200, brightness: int = 64, ) -> Dict[str, Any]: """v1 devices envelope: preset body + list select (same shape as the Pi).""" body = { "p": { preset_id: { "p": "blink", "b": max(0, min(255, int(brightness))), "d": max(1, int(delay_ms)), "c": ["#FF0000", "#0000FF"], "a": True, } }, "s": [str(preset_id)], } return {"v": "1", "dv": {_format_mac(mac): body}} async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None: import websockets packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8") print(f"connecting to {url}") async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: print(f"connected, sending {len(packet)} B") print(packet.decode("utf-8")) await ws.send(packet) if hold_s > 0: print(f"holding connection {hold_s}s …") await asyncio.sleep(hold_s) print("done") def main() -> int: parser = argparse.ArgumentParser( description="Send blink preset+select to one driver via bridge WebSocket.", ) parser.add_argument( "--url", default=None, help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)", ) parser.add_argument( "--mac", default="188b0e1560a8", help="Driver MAC (12 hex, colons optional). Default: registry example id.", ) parser.add_argument( "--preset-id", default="2", help="Wire preset slot id (default: 2, matches zone push)", ) parser.add_argument( "--delay-ms", type=int, default=200, help="Blink delay in ms (default: 200)", ) parser.add_argument( "--brightness", type=int, default=64, help="Preset brightness 0–255 (default: 64)", ) parser.add_argument( "--hold", type=float, default=2.0, help="Seconds to keep WebSocket open after send (default: 2)", ) parser.add_argument( "--dry-run", action="store_true", help="Print envelope only; do not connect", ) args = parser.parse_args() url = _load_bridge_url(args.url) try: envelope = build_blink_envelope( args.mac, preset_id=args.preset_id, delay_ms=args.delay_ms, brightness=args.brightness, ) except ValueError as e: print(f"error: {e}", file=sys.stderr) return 1 print(f"url={url!r} mac={_format_mac(args.mac)!r}") if args.dry_run: print(json.dumps(envelope, indent=2)) return 0 try: asyncio.run(_send(url, envelope, args.hold)) except KeyboardInterrupt: print("interrupted") return 130 except Exception as e: print(f"failed: {e!r}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())