- Track Wi-Fi TCP clients, liveness pings, disconnect broadcast, bind errors via gather\n- Device list/get include connected; POST identify with __identify preset\n- Presets push/send delivery helpers; bump led-driver hello type Made-with: Cursor
69 lines
1.9 KiB
Python
69 lines
1.9 KiB
Python
import asyncio
|
|
import json
|
|
|
|
|
|
# Default: broadcast (6 bytes). Pi always sends 6-byte address + payload to ESP32.
|
|
BROADCAST_MAC = bytes.fromhex("ffffffffffff")
|
|
|
|
|
|
def _encode_payload(data):
|
|
if isinstance(data, str):
|
|
return data.encode()
|
|
if isinstance(data, dict):
|
|
return json.dumps(data).encode()
|
|
return data
|
|
|
|
|
|
def _parse_mac(addr):
|
|
"""Convert 12-char hex string or 6-byte bytes to 6-byte MAC."""
|
|
if addr is None or addr == b"":
|
|
return BROADCAST_MAC
|
|
if isinstance(addr, bytes) and len(addr) == 6:
|
|
return addr
|
|
if isinstance(addr, str) and len(addr) == 12:
|
|
return bytes.fromhex(addr)
|
|
return BROADCAST_MAC
|
|
|
|
|
|
async def _to_thread(func, *args):
|
|
to_thread = getattr(asyncio, "to_thread", None)
|
|
if to_thread:
|
|
return await to_thread(func, *args)
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, func, *args)
|
|
|
|
|
|
class SerialSender:
|
|
def __init__(self, port, baudrate, default_addr=None):
|
|
import serial
|
|
|
|
self._serial = serial.Serial(port, baudrate=baudrate, timeout=1)
|
|
self._default_addr = _parse_mac(default_addr)
|
|
self._write_lock = asyncio.Lock()
|
|
|
|
async def send(self, data, addr=None):
|
|
mac = _parse_mac(addr) if addr is not None else self._default_addr
|
|
payload = _encode_payload(data)
|
|
async with self._write_lock:
|
|
await _to_thread(self._serial.write, mac + payload)
|
|
return True
|
|
|
|
|
|
_current_sender = None
|
|
|
|
|
|
def set_sender(sender):
|
|
global _current_sender
|
|
_current_sender = sender
|
|
|
|
|
|
def get_current_sender():
|
|
return _current_sender
|
|
|
|
|
|
def get_sender(settings):
|
|
port = settings.get("serial_port", "/dev/ttyS0")
|
|
baudrate = settings.get("serial_baudrate", 912000)
|
|
default_addr = settings.get("serial_destination_mac", "ffffffffffff")
|
|
return SerialSender(port, baudrate, default_addr=default_addr)
|