#!/usr/bin/env python3 """Send v1 JSON to drivers via the bridge (broadcast passthrough). The simplified ``espnow-sender`` forwards each WebSocket message unchanged to ESP-NOW ``ff:ff:ff:ff:ff:ff``. Drivers accept JSON when the payload starts with ``{`` (see ``led-driver/src/main.py``). Examples:: pipenv run python tests/bridge_broadcast_test.py pipenv run python tests/bridge_broadcast_test.py --url ws://192.168.4.1/ws pipenv run python tests/bridge_broadcast_test.py --brightness 200 pipenv run python tests/bridge_broadcast_test.py --select led-abc --state on """ from __future__ import annotations import argparse import asyncio import json import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(PROJECT_ROOT / "src")) from util.espnow_message import build_message # noqa: E402 def _load_bridge_url(explicit: str | None) -> str: if explicit and explicit.strip(): return explicit.strip() path = PROJECT_ROOT / "settings.json" if path.is_file(): try: data = json.loads(path.read_text(encoding="utf-8")) url = str(data.get("bridge_ws_url") or "").strip() if url: return url except (OSError, json.JSONDecodeError, TypeError): pass return "ws://192.168.4.1/ws" async def _send_messages(url: str, messages: list[bytes], delay_s: float) -> None: import websockets print(f"connecting to {url}") async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: print("connected (broadcast JSON passthrough)") for i, pkt in enumerate(messages): preview = pkt[:80].decode("utf-8", errors="replace") if len(pkt) > 80: preview += "…" print(f" send [{i + 1}/{len(messages)}] {len(pkt)} B {preview!r}") await ws.send(pkt) if delay_s > 0 and i + 1 < len(messages): await asyncio.sleep(delay_s) print("done") def _build_messages(args: argparse.Namespace) -> list[bytes]: messages: list[bytes] = [] if args.brightness is not None: body: dict = { "v": "1", "b": max(0, min(255, int(args.brightness))), } if args.save: body["save"] = True messages.append(json.dumps(body, separators=(",", ":")).encode("utf-8")) if args.select: messages.append( build_message( select={args.select: [args.state]}, save=args.save, ).encode("utf-8") ) if args.off: if args.select: messages.append( build_message(select={args.select: ["off"]}, save=args.save).encode("utf-8") ) else: messages.append( build_message(select={"all": ["off"]}, save=args.save).encode("utf-8") ) if not messages: messages.append( json.dumps({"v": "1", "b": 128}, separators=(",", ":")).encode("utf-8") ) messages.append( build_message(select={"all": ["on"]}).encode("utf-8") ) messages.append( build_message(select={"all": ["off"]}).encode("utf-8") ) for pkt in messages: if not pkt or pkt[0:1] != b"{": raise ValueError("built message is not v1 JSON") return messages def main() -> int: parser = argparse.ArgumentParser( description="Broadcast v1 JSON to LED drivers through the bridge WebSocket.", ) parser.add_argument( "--url", default=None, help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)", ) parser.add_argument( "--delay", type=float, default=0.5, help="Seconds between messages (default: 0.5)", ) parser.add_argument( "--brightness", "-b", type=int, default=None, metavar="0-255", help="Global brightness (b field)", ) parser.add_argument( "--select", metavar="DEVICE_NAME", help="Device name in select map (must match driver settings name)", ) parser.add_argument( "--state", default="on", help="Pattern/state for --select (default: on)", ) parser.add_argument( "--off", action="store_true", help="Send select off (all devices if --select omitted)", ) parser.add_argument( "--save", action="store_true", help="Set save flag on messages", ) parser.add_argument( "--dry-run", action="store_true", help="Print messages only; do not connect", ) args = parser.parse_args() url = _load_bridge_url(args.url) try: messages = _build_messages(args) except ValueError as e: print(f"error: {e}", file=sys.stderr) return 1 print(f"url={url!r} messages={len(messages)}") for pkt in messages: print(f" {pkt.decode('utf-8')}") if args.dry_run: return 0 try: asyncio.run(_send_messages(url, messages, args.delay)) except KeyboardInterrupt: print("interrupted") return 130 except Exception as e: print(f"failed: {e!r}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())