feat(controller): migrate wifi drivers from tcp to websocket clients

This commit is contained in:
2026-04-14 23:13:26 +12:00
parent f5a7b42e7c
commit 96712dda88
19 changed files with 1195 additions and 673 deletions

View File

@@ -0,0 +1,281 @@
"""Outbound WebSocket clients to Wi-Fi LED drivers (firmware serves ``/ws`` on device)."""
from __future__ import annotations
import asyncio
import errno
import json
import traceback
import websockets
from websockets.exceptions import ConnectionClosed
_connections: dict[str, object] = {}
_send_locks: dict[str, asyncio.Lock] = {}
_tasks: dict[str, asyncio.Task] = {}
_unreachable_counts: dict[str, int] = {}
_settings = None
_tcp_status_broadcast = None
def set_settings(settings) -> None:
global _settings
_settings = settings
def set_tcp_status_broadcaster(coro) -> None:
global _tcp_status_broadcast
_tcp_status_broadcast = coro
def _schedule_status_broadcast(ip: str, connected: bool) -> None:
fn = _tcp_status_broadcast
if not fn:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
try:
loop.create_task(fn(ip, connected))
except Exception:
pass
def _benign_ws_connect_failure(exc: BaseException) -> bool:
"""True for common \"driver down / no route\" errors while dialling the WebSocket."""
if isinstance(exc, (asyncio.TimeoutError, TimeoutError)):
return True
if isinstance(exc, ConnectionRefusedError):
return True
if not isinstance(exc, OSError):
return False
en = exc.errno
if en is None:
return False
codes = {errno.ECONNREFUSED, errno.ETIMEDOUT}
for name in ("EHOSTUNREACH", "ENETUNREACH", "ENETDOWN", "EADDRNOTAVAIL"):
if hasattr(errno, name):
codes.add(getattr(errno, name))
return en in codes
def normalize_tcp_peer_ip(ip: str) -> str:
"""Match peer addresses to registry IPs (strip IPv4-mapped IPv6 prefix)."""
s = str(ip).strip()
if s.lower().startswith("::ffff:"):
s = s[7:]
return s
def _ws_open(ws) -> bool:
try:
return ws.close_code is None
except Exception:
return False
def prune_stale_tcp_writers() -> None:
"""Drop closed WebSocket entries (name kept for callers)."""
stale = [ip for ip, ws in list(_connections.items()) if not _ws_open(ws)]
for ip in stale:
_connections.pop(ip, None)
_schedule_status_broadcast(ip, False)
def _register_ws(ip: str, ws) -> None:
key = normalize_tcp_peer_ip(ip)
if not key:
return
_connections[key] = ws
_unreachable_counts.pop(key, None)
if key not in _send_locks:
_send_locks[key] = asyncio.Lock()
_schedule_status_broadcast(key, True)
print(f"[WS] driver connected {key!r}")
def unregister_tcp_writer(peer_ip: str, ws=None) -> str:
"""
Remove the WebSocket for peer_ip. If ``ws`` is given, only pop when it is still
the registered instance.
Returns ``removed``, ``noop``, or ``superseded`` (same contract as former TCP registry).
"""
if not peer_ip:
return "noop"
key = normalize_tcp_peer_ip(peer_ip)
if not key:
return "noop"
current = _connections.get(key)
if ws is not None:
if current is None:
return "noop"
if current is not ws:
return "superseded"
had = key in _connections
if had:
_connections.pop(key, None)
_schedule_status_broadcast(key, False)
print(f"[WS] driver disconnected: {key}")
return "removed"
return "noop"
def list_connected_ips():
"""IPs with an active outbound WebSocket to the driver."""
prune_stale_tcp_writers()
return list(_connections.keys())
def tcp_client_connected(ip: str) -> bool:
"""True if the controller has an outbound WebSocket to this driver IP."""
prune_stale_tcp_writers()
key = normalize_tcp_peer_ip(ip)
return bool(key and key in _connections)
async def send_json_line_to_ip(ip: str, json_str: str) -> bool:
"""Send one JSON text frame (v1 line; trailing newline stripped for WebSocket)."""
ip = normalize_tcp_peer_ip(ip)
ws = _connections.get(ip)
if ws is None or not _ws_open(ws):
return False
text = json_str.rstrip("\n")
lock = _send_locks.setdefault(ip, asyncio.Lock())
try:
async with lock:
await ws.send(text)
return True
except Exception as exc:
print(f"[WS] send to {ip} failed: {exc}")
unregister_tcp_writer(ip, ws)
return False
async def _recv_forward_loop(ip: str, ws) -> None:
from models.transport import get_current_sender
sender = get_current_sender()
async for message in ws:
if isinstance(message, bytes):
try:
text = message.decode("utf-8")
except UnicodeDecodeError:
print(f"[WS] recv {ip} (non-UTF-8, {len(message)} bytes)")
continue
else:
text = message
text = text.strip()
if not text:
continue
print(f"[WS] recv {ip}: {text}")
if not sender:
continue
try:
parsed = json.loads(text)
except json.JSONDecodeError:
try:
await sender.send(text)
except Exception:
pass
continue
if isinstance(parsed, dict):
addr = parsed.pop("to", None)
payload = json.dumps(parsed) if parsed else "{}"
try:
await sender.send(payload, addr=addr)
except Exception as e:
print(f"[WS] forward to bridge failed: {e}")
else:
try:
await sender.send(text)
except Exception:
pass
async def _driver_connection_loop(ip: str) -> None:
global _settings
if _settings is None:
return
port = int(_settings.get("wifi_driver_ws_port", 80))
path = str(_settings.get("wifi_driver_ws_path", "/ws"))
if not path.startswith("/"):
path = "/" + path
uri = f"ws://{ip}:{port}{path}"
retry_interval_s = 2.0
retry_window_s = 30.0
deadline = asyncio.get_running_loop().time() + retry_window_s
try:
while True:
now = asyncio.get_running_loop().time()
if now >= deadline:
print(
f"[WS] driver {ip} still unreachable after {int(retry_window_s)}s; "
"stopping retries until next hello"
)
break
try:
print(f"[WS] connecting to {uri!r}")
async with websockets.connect(
uri,
ping_interval=20,
ping_timeout=15,
open_timeout=30,
) as ws:
_register_ws(ip, ws)
try:
await _recv_forward_loop(ip, ws)
finally:
unregister_tcp_writer(ip, ws)
except asyncio.CancelledError:
raise
except ConnectionClosed as e:
print(f"[WS] driver {ip} closed: {e}")
unregister_tcp_writer(ip, None)
except Exception as e:
if _benign_ws_connect_failure(e):
n = _unreachable_counts.get(ip, 0) + 1
_unreachable_counts[ip] = n
if n == 1 or (n % 30) == 0:
print(f"[WS] driver {ip} unreachable, retry in 2s: {e} (x{n})")
else:
print(f"[WS] driver {ip} session error: {e!r}")
traceback.print_exception(type(e), e, e.__traceback__)
_unreachable_counts.pop(ip, None)
unregister_tcp_writer(ip, None)
await asyncio.sleep(retry_interval_s)
except asyncio.CancelledError:
unregister_tcp_writer(ip, None)
raise
finally:
_tasks.pop(ip, None)
def ensure_driver_connection(peer_ip: str) -> None:
"""Start (or keep) a background task that maintains ``ws://<ip>:port/ws``."""
key = normalize_tcp_peer_ip(peer_ip)
if not key:
return
t = _tasks.get(key)
if t is not None and not t.done():
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
_tasks[key] = loop.create_task(_driver_connection_loop(key))
def cancel_all_driver_tasks() -> None:
"""Signal shutdown: cancel outbound driver connection tasks."""
for _ip, t in list(_tasks.items()):
if not t.done():
t.cancel()
_tasks.clear()
for ip in list(_connections.keys()):
_schedule_status_broadcast(ip, False)
_connections.clear()
_send_locks.clear()
_unreachable_counts.clear()