"""Transport to LED drivers via ESP-NOW bridge WebSocket or USB serial.""" import json from typing import Any, Dict, List, Optional, Union from models.bridge_serial_client import get_bridge_serial_client from models.bridge_ws_client import get_bridge_client from util.bridge_envelope import ( BROADCAST_HEX, BROADCAST_MAC, build_devices_envelope, format_mac_key, is_broadcast_mac, normalize_mac_key, ) from util.espnow_wire import WIRE_MAGIC class NullBridge: """No bridge configured.""" async def send(self, data, addr=None): return False class BridgeWsTransport: """Send v1 JSON or devices envelope via bridge WebSocket.""" async def send(self, data: Union[bytes, str, Dict[str, Any]], addr=None) -> bool: client = get_bridge_client() if client is None: return False if isinstance(data, dict): if data.get("v") == "1" and ("devices" in data or "dv" in data): from util.v1_wire import compact_envelope return await client.send_packet(compact_envelope(data)) packet = json.dumps(data, separators=(",", ":")).encode("utf-8") elif isinstance(data, str): packet = data.encode("utf-8") elif isinstance(data, (bytes, bytearray)): packet = bytes(data) else: return False if not packet: return False if packet[0] == WIRE_MAGIC: return await client.send_packet(packet) if packet[0:1] != b"{": return False mac_key = _addr_to_envelope_key(addr) if mac_key is None: return await client.send_packet(packet) try: body = json.loads(packet.decode("utf-8")) except (UnicodeError, ValueError, TypeError): return False if not isinstance(body, dict) or body.get("v") != "1": return False envelope = build_devices_envelope({mac_key: body}) return await client.send_packet(envelope) async def send_envelope(self, envelope: Dict[str, Any]) -> bool: client = get_bridge_client() if client is None: return False return await client.send_packet(envelope) class BridgeSerialTransport: """Send v1 JSON or devices envelope via bridge USB/serial.""" async def send(self, data: Union[bytes, str, Dict[str, Any]], addr=None) -> bool: client = get_bridge_serial_client() if client is None: return False if isinstance(data, dict): if data.get("v") == "1" and ("devices" in data or "dv" in data): from util.v1_wire import compact_envelope return await client.send_packet(compact_envelope(data)) packet = json.dumps(data, separators=(",", ":")).encode("utf-8") elif isinstance(data, str): packet = data.encode("utf-8") elif isinstance(data, (bytes, bytearray)): packet = bytes(data) else: return False if not packet: return False if packet[0] == WIRE_MAGIC: return await client.send_packet(packet) if packet[0:1] != b"{": return False mac_key = _addr_to_envelope_key(addr) if mac_key is None: return await client.send_packet(packet) try: body = json.loads(packet.decode("utf-8")) except (UnicodeError, ValueError, TypeError): return False if not isinstance(body, dict) or body.get("v") != "1": return False envelope = build_devices_envelope({mac_key: body}) return await client.send_packet(envelope) async def send_envelope(self, envelope: Dict[str, Any]) -> bool: client = get_bridge_serial_client() if client is None: return False return await client.send_packet(envelope) def _addr_to_envelope_key(addr) -> Optional[str]: if addr is None: return BROADCAST_MAC s = str(addr).strip().lower() if is_broadcast_mac(s): return BROADCAST_MAC h = normalize_mac_key(s) if h: try: return format_mac_key(h) except ValueError: return None return None _current_bridge = None def set_bridge(bridge): global _current_bridge _current_bridge = bridge def get_current_bridge(): return _current_bridge def get_bridge(settings): mode = str(settings.get("bridge_transport") or "wifi").strip().lower() if mode == "wifi": url = str(settings.get("bridge_ws_url") or "").strip() if not url: print("[startup] bridge Wi‑Fi disabled (set bridge_ws_url in settings.json)") return NullBridge() print(f"[startup] ESP-NOW via bridge WebSocket {url!r}") return BridgeWsTransport() port = str(settings.get("bridge_serial_port") or "").strip() if not port: print( "[startup] bridge serial disabled (set bridge_serial_port in settings.json)" ) return NullBridge() print(f"[startup] ESP-NOW via bridge USB serial {port!r}") return BridgeSerialTransport()