import asyncio import json # Default: broadcast (6 bytes). Pi always sends 6-byte address + payload to ESP32. BROADCAST_MAC = bytes.fromhex("ffffffffffff") def _encode_payload(data): if isinstance(data, str): return data.encode() if isinstance(data, dict): return json.dumps(data).encode() return data def _parse_mac(addr): """Convert 12-char hex string or 6-byte bytes to 6-byte MAC.""" if addr is None or addr == b"": return BROADCAST_MAC if isinstance(addr, bytes) and len(addr) == 6: return addr if isinstance(addr, str) and len(addr) == 12: return bytes.fromhex(addr) return BROADCAST_MAC async def _to_thread(func, *args): to_thread = getattr(asyncio, "to_thread", None) if to_thread: return await to_thread(func, *args) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args) class SerialSender: def __init__(self, port, baudrate, default_addr=None): import serial self._serial = serial.Serial(port, baudrate=baudrate, timeout=1) self._default_addr = _parse_mac(default_addr) async def send(self, data, addr=None): mac = _parse_mac(addr) if addr is not None else self._default_addr payload = _encode_payload(data) await _to_thread(self._serial.write, mac + payload) return True _current_sender = None def set_sender(sender): global _current_sender _current_sender = sender def get_current_sender(): return _current_sender def get_sender(settings): port = settings.get("serial_port", "/dev/ttyS0") baudrate = settings.get("serial_baudrate", 912000) default_addr = settings.get("serial_destination_mac", "ffffffffffff") return SerialSender(port, baudrate, default_addr=default_addr)