Files
led-driver/tests/bridge_ws_blink.py
Jimmy a97f6c7c2c feat(espnow): groups filter and v1 select list on driver
Apply group membership on RX, accept select as [preset_id, step?],
and fix identify/off plus presets layout for manual beat stepping.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-24 01:44:21 +12:00

170 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
pattern, and colours; this script reaches the driver over ESP-NOW through
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
pipenv run python led-driver/tests/bridge_ws_blink.py
pipenv run python led-driver/tests/bridge_ws_blink.py \\
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
From **led-driver** (if Pipenv/env is the parent project)::
pipenv run python tests/bridge_ws_blink.py --dry-run
"""
from __future__ import annotations
import argparse
import asyncio
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional
# led-driver/tests -> led-driver -> led-controller
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
PROJECT_ROOT = LED_DRIVER_ROOT.parent
def _load_bridge_url(explicit: Optional[str]) -> str:
if explicit and explicit.strip():
return explicit.strip()
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
url = str(data.get("bridge_ws_url") or "").strip()
if url:
return url
except (OSError, json.JSONDecodeError, TypeError):
pass
return "ws://192.168.4.1/ws"
def _format_mac(mac: str) -> str:
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
def build_blink_envelope(
mac: str,
*,
preset_id: str = "2",
delay_ms: int = 200,
brightness: int = 64,
) -> Dict[str, Any]:
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
body = {
"p": {
preset_id: {
"p": "blink",
"b": max(0, min(255, int(brightness))),
"d": max(1, int(delay_ms)),
"c": ["#FF0000", "#0000FF"],
"a": True,
}
},
"s": [str(preset_id)],
}
return {"v": "1", "dv": {_format_mac(mac): body}}
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
import websockets
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
print(f"connecting to {url}")
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
print(f"connected, sending {len(packet)} B")
print(packet.decode("utf-8"))
await ws.send(packet)
if hold_s > 0:
print(f"holding connection {hold_s}s …")
await asyncio.sleep(hold_s)
print("done")
def main() -> int:
parser = argparse.ArgumentParser(
description="Send blink preset+select to one driver via bridge WebSocket.",
)
parser.add_argument(
"--url",
default=None,
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
)
parser.add_argument(
"--mac",
default="188b0e1560a8",
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
)
parser.add_argument(
"--preset-id",
default="2",
help="Wire preset slot id (default: 2, matches zone push)",
)
parser.add_argument(
"--delay-ms",
type=int,
default=200,
help="Blink delay in ms (default: 200)",
)
parser.add_argument(
"--brightness",
type=int,
default=64,
help="Preset brightness 0255 (default: 64)",
)
parser.add_argument(
"--hold",
type=float,
default=2.0,
help="Seconds to keep WebSocket open after send (default: 2)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print envelope only; do not connect",
)
args = parser.parse_args()
url = _load_bridge_url(args.url)
try:
envelope = build_blink_envelope(
args.mac,
preset_id=args.preset_id,
delay_ms=args.delay_ms,
brightness=args.brightness,
)
except ValueError as e:
print(f"error: {e}", file=sys.stderr)
return 1
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
if args.dry_run:
print(json.dumps(envelope, indent=2))
return 0
try:
asyncio.run(_send(url, envelope, args.hold))
except KeyboardInterrupt:
print("interrupted")
return 130
except Exception as e:
print(f"failed: {e!r}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())