#!/usr/bin/env python3 """Send binary ESP-NOW packets via the bridge (broadcast passthrough). The simplified ``espnow-sender`` forwards each WebSocket **binary** message unchanged to ESP-NOW ``ff:ff:ff:ff:ff:ff``. No ``pack_ws_downlink`` wrapper and no 1-byte ack — raw wire packets only (see ``docs/espnow-binary-protocol.md``). Group membership is expected to be configured on each **led-driver**; this script only broadcasts **CMD** (and optional **GROUPS** / **GROUP_CMD** for manual testing). Examples:: pipenv run python tests/bridge_broadcast_test.py pipenv run python tests/bridge_broadcast_test.py --url ws://192.168.4.1/ws pipenv run python tests/bridge_broadcast_test.py --brightness 200 pipenv run python tests/bridge_broadcast_test.py --select led-abc --state on pipenv run python tests/bridge_broadcast_test.py --groups 5,18 --group-cmd 18 --brightness 64 """ from __future__ import annotations import argparse import asyncio import json import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(PROJECT_ROOT / "src")) from util.espnow_wire import ( # noqa: E402 pack_cmd_from_kwargs, pack_group_cmd_from_kwargs, pack_groups, wire_msg_type, ) MSG_TYPE_NAMES = { 0x01: "ANNOUNCE", 0x02: "GROUPS", 0x03: "CMD", 0x04: "GROUP_CMD", 0x10: "BRIDGE_CH", } def _load_bridge_url(explicit: str | None) -> str: if explicit and explicit.strip(): return explicit.strip() path = PROJECT_ROOT / "settings.json" if path.is_file(): try: data = json.loads(path.read_text(encoding="utf-8")) url = str(data.get("bridge_ws_url") or "").strip() if url: return url except (OSError, json.JSONDecodeError, TypeError): pass return "ws://192.168.4.1/ws" def _describe_packet(pkt: bytes) -> str: if len(pkt) < 2: return f"{len(pkt)} B" name = MSG_TYPE_NAMES.get(pkt[1], f"0x{pkt[1]:02x}") return f"{name} {len(pkt)} B" async def _send_packets(url: str, packets: list[bytes], delay_s: float) -> None: import websockets print(f"connecting to {url}") async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: print("connected (broadcast passthrough)") for i, pkt in enumerate(packets): print(f" send [{i + 1}/{len(packets)}] {_describe_packet(pkt)}") await ws.send(pkt) if delay_s > 0 and i + 1 < len(packets): await asyncio.sleep(delay_s) print("done") def _build_packets(args: argparse.Namespace) -> list[bytes]: packets: list[bytes] = [] if args.groups: gids = [g.strip() for g in args.groups.split(",") if g.strip()] if gids: packets.append(pack_groups(gids)) if args.group_cmd: packets.append( pack_group_cmd_from_kwargs( args.group_cmd, brightness_0_255=args.brightness, select={args.select: [args.state]} if args.select else None, save=args.save, ) ) if args.brightness is not None and not args.group_cmd: packets.append( pack_cmd_from_kwargs(brightness_0_255=args.brightness, save=args.save) ) if args.select: packets.append( pack_cmd_from_kwargs( select={args.select: [args.state]}, save=args.save, ) ) if args.off: if args.select: packets.append( pack_cmd_from_kwargs(select={args.select: ["off"]}, save=args.save) ) else: packets.append(pack_cmd_from_kwargs(select={"all": ["off"]}, save=args.save)) if not packets: packets.append(pack_cmd_from_kwargs(brightness_0_255=128)) packets.append(pack_cmd_from_kwargs(select={"all": ["on"]})) packets.append(pack_cmd_from_kwargs(select={"all": ["off"]})) for pkt in packets: if wire_msg_type(pkt) is None: raise ValueError("built packet is not valid wire format") return packets def main() -> int: parser = argparse.ArgumentParser( description="Broadcast binary ESP-NOW packets through the bridge WebSocket.", ) parser.add_argument( "--url", default=None, help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)", ) parser.add_argument( "--delay", type=float, default=0.5, help="Seconds between packets (default: 0.5)", ) parser.add_argument( "--brightness", "-b", type=int, default=None, metavar="0-255", help="Broadcast CMD: global brightness", ) parser.add_argument( "--select", metavar="DEVICE_NAME", help="Broadcast CMD: device name in select map (must match driver settings name)", ) parser.add_argument( "--state", default="on", help="Pattern/state for --select (default: on)", ) parser.add_argument( "--off", action="store_true", help="After other commands, send select off (all devices if --select omitted)", ) parser.add_argument( "--groups", metavar="ID,ID", help="Optional GROUPS broadcast (normally configured on device instead)", ) parser.add_argument( "--group-cmd", metavar="GROUP_ID", help="Optional GROUP_CMD broadcast (driver must list this group locally)", ) parser.add_argument( "--save", action="store_true", help="Set save flag on CMD / GROUP_CMD envelopes", ) parser.add_argument( "--dry-run", action="store_true", help="Print packets only; do not connect", ) args = parser.parse_args() url = _load_bridge_url(args.url) try: packets = _build_packets(args) except ValueError as e: print(f"error: {e}", file=sys.stderr) return 1 print(f"url={url!r} packets={len(packets)}") for pkt in packets: print(f" {_describe_packet(pkt)} hex={pkt.hex()}") if args.dry_run: return 0 try: asyncio.run(_send_packets(url, packets, args.delay)) except KeyboardInterrupt: print("interrupted") return 130 except Exception as e: print(f"failed: {e!r}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())