fix(wifi): limit outbound driver WS to hello-triggered attempts
Remove periodic UDP hello loop; dial each driver at most wifi_driver_initial_connect_attempts times per discovery hello. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
73
src/main.py
73
src/main.py
@@ -100,11 +100,7 @@ async def _handle_udp_discovery(sock, udp_holder=None) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def _prime_wifi_outbound_driver_connections() -> None:
|
def _prime_wifi_outbound_driver_connections() -> None:
|
||||||
"""
|
"""On boot, dial each registered Wi-Fi driver (same 4-attempt limit as UDP hello)."""
|
||||||
For each Wi‑Fi device in the registry with a usable IPv4, start (or keep) the
|
|
||||||
outbound WebSocket task. The client loop reconnects automatically if the link
|
|
||||||
drops. Presets are not pushed automatically; use Send Presets / profile apply.
|
|
||||||
"""
|
|
||||||
n = 0
|
n = 0
|
||||||
try:
|
try:
|
||||||
dev = Device()
|
dev = Device()
|
||||||
@@ -143,69 +139,6 @@ def _ipv4_address(addr: str) -> str | None:
|
|||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
async def _periodic_wifi_driver_hello_loop(settings, udp_holder) -> None:
|
|
||||||
"""
|
|
||||||
While a registered Wi-Fi driver has no outbound WebSocket, send a short JSON hello on
|
|
||||||
UDP discovery port so the device can announce itself and we can reconnect.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
interval = float(settings.get("wifi_driver_hello_interval_s", 10.0))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
interval = 10.0
|
|
||||||
if interval <= 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.setblocking(False)
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
try:
|
|
||||||
while not udp_holder.get("closing"):
|
|
||||||
slept = 0.0
|
|
||||||
while slept < interval and not udp_holder.get("closing"):
|
|
||||||
chunk = min(1.0, interval - slept)
|
|
||||||
await asyncio.sleep(chunk)
|
|
||||||
slept += chunk
|
|
||||||
if udp_holder.get("closing"):
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
dev = Device()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[hello] device list failed: {e!r}")
|
|
||||||
continue
|
|
||||||
for _mac_key, doc in list(dev.items()):
|
|
||||||
if not isinstance(doc, dict):
|
|
||||||
continue
|
|
||||||
if doc.get("transport") != "wifi":
|
|
||||||
continue
|
|
||||||
ip = _ipv4_address(str(doc.get("address") or ""))
|
|
||||||
if not ip:
|
|
||||||
continue
|
|
||||||
if tcp_client_registry.tcp_client_connected(ip):
|
|
||||||
continue
|
|
||||||
name = (doc.get("name") or "").strip()
|
|
||||||
mac = normalize_mac(doc.get("id") or _mac_key)
|
|
||||||
if not name or not mac:
|
|
||||||
continue
|
|
||||||
line = (
|
|
||||||
json.dumps(
|
|
||||||
{"m": "hello", "device_name": name, "mac": mac},
|
|
||||||
separators=(",", ":"),
|
|
||||||
)
|
|
||||||
+ "\n"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
await loop.sock_sendto(
|
|
||||||
sock, line.encode("utf-8"), (ip, DISCOVERY_UDP_PORT)
|
|
||||||
)
|
|
||||||
except OSError as e:
|
|
||||||
print(f"[hello] UDP to {ip!r} failed: {e!r}")
|
|
||||||
finally:
|
|
||||||
try:
|
|
||||||
sock.close()
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
async def _run_udp_discovery_server(udp_holder=None) -> None:
|
async def _run_udp_discovery_server(udp_holder=None) -> None:
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
sock.setblocking(False)
|
sock.setblocking(False)
|
||||||
@@ -573,10 +506,6 @@ async def main(port=80):
|
|||||||
asyncio.create_task(
|
asyncio.create_task(
|
||||||
_run_udp_discovery_server(udp_holder), name="udp"
|
_run_udp_discovery_server(udp_holder), name="udp"
|
||||||
),
|
),
|
||||||
asyncio.create_task(
|
|
||||||
_periodic_wifi_driver_hello_loop(settings, udp_holder),
|
|
||||||
name="hello",
|
|
||||||
),
|
|
||||||
]
|
]
|
||||||
await asyncio.gather(*server_tasks)
|
await asyncio.gather(*server_tasks)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ from websockets.exceptions import ConnectionClosed
|
|||||||
_connections: dict[str, object] = {}
|
_connections: dict[str, object] = {}
|
||||||
_send_locks: dict[str, asyncio.Lock] = {}
|
_send_locks: dict[str, asyncio.Lock] = {}
|
||||||
_tasks: dict[str, asyncio.Task] = {}
|
_tasks: dict[str, asyncio.Task] = {}
|
||||||
_unreachable_counts: dict[str, int] = {}
|
|
||||||
_settings = None
|
_settings = None
|
||||||
|
|
||||||
_tcp_status_broadcast = None
|
_tcp_status_broadcast = None
|
||||||
@@ -119,7 +118,6 @@ def _register_ws(ip: str, ws) -> None:
|
|||||||
if not key:
|
if not key:
|
||||||
return
|
return
|
||||||
_connections[key] = ws
|
_connections[key] = ws
|
||||||
_unreachable_counts.pop(key, None)
|
|
||||||
if key not in _send_locks:
|
if key not in _send_locks:
|
||||||
_send_locks[key] = asyncio.Lock()
|
_send_locks[key] = asyncio.Lock()
|
||||||
_schedule_status_broadcast(key, True)
|
_schedule_status_broadcast(key, True)
|
||||||
@@ -275,52 +273,43 @@ async def _driver_connection_loop(ip: str) -> None:
|
|||||||
if stagger > 0:
|
if stagger > 0:
|
||||||
await asyncio.sleep(stagger)
|
await asyncio.sleep(stagger)
|
||||||
|
|
||||||
# Only bound boot-time: after we have connected once, keep retrying (Wi-Fi drops, reboots).
|
|
||||||
connected_once = False
|
|
||||||
boot_attempts = 0
|
|
||||||
try:
|
try:
|
||||||
while True:
|
for attempt in range(1, max_boot_attempts + 1):
|
||||||
if not connected_once:
|
|
||||||
if boot_attempts >= max_boot_attempts:
|
|
||||||
print(
|
|
||||||
f"[WS] driver {ip} still unreachable after {max_boot_attempts} "
|
|
||||||
f"initial dial attempt(s); stopping until next UDP hello / registry prime"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
boot_attempts += 1
|
|
||||||
try:
|
try:
|
||||||
print(f"[WS] connecting to {uri!r}")
|
print(f"[WS] connecting to {uri!r} (attempt {attempt}/{max_boot_attempts})")
|
||||||
async with websockets.connect(
|
async with websockets.connect(
|
||||||
uri,
|
uri,
|
||||||
ping_interval=20,
|
ping_interval=20,
|
||||||
ping_timeout=15,
|
ping_timeout=15,
|
||||||
open_timeout=open_timeout,
|
open_timeout=open_timeout,
|
||||||
) as ws:
|
) as ws:
|
||||||
connected_once = True
|
|
||||||
_register_ws(ip, ws)
|
_register_ws(ip, ws)
|
||||||
try:
|
try:
|
||||||
await _recv_forward_loop(ip, ws)
|
await _recv_forward_loop(ip, ws)
|
||||||
finally:
|
finally:
|
||||||
unregister_tcp_writer(ip, ws)
|
unregister_tcp_writer(ip, ws)
|
||||||
|
return
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
raise
|
raise
|
||||||
except ConnectionClosed as e:
|
except ConnectionClosed as e:
|
||||||
print(f"[WS] driver {ip} closed: {e}")
|
print(f"[WS] driver {ip} closed: {e}")
|
||||||
unregister_tcp_writer(ip, None)
|
unregister_tcp_writer(ip, None)
|
||||||
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if _benign_ws_connect_failure(e):
|
if _benign_ws_connect_failure(e):
|
||||||
n = _unreachable_counts.get(ip, 0) + 1
|
|
||||||
_unreachable_counts[ip] = n
|
|
||||||
if n == 1 or (n % 30) == 0:
|
|
||||||
print(
|
print(
|
||||||
f"[WS] driver {ip} unreachable, retry in {retry_interval_s}s: {e} (x{n})"
|
f"[WS] driver {ip} unreachable (attempt {attempt}/{max_boot_attempts}): {e}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
print(f"[WS] driver {ip} session error: {e!r}")
|
print(f"[WS] driver {ip} session error: {e!r}")
|
||||||
traceback.print_exception(type(e), e, e.__traceback__)
|
traceback.print_exception(type(e), e, e.__traceback__)
|
||||||
_unreachable_counts.pop(ip, None)
|
|
||||||
unregister_tcp_writer(ip, None)
|
unregister_tcp_writer(ip, None)
|
||||||
|
if attempt < max_boot_attempts:
|
||||||
await asyncio.sleep(retry_interval_s)
|
await asyncio.sleep(retry_interval_s)
|
||||||
|
print(
|
||||||
|
f"[WS] driver {ip} still unreachable after {max_boot_attempts} attempt(s); "
|
||||||
|
"waiting for next UDP hello"
|
||||||
|
)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
unregister_tcp_writer(ip, None)
|
unregister_tcp_writer(ip, None)
|
||||||
raise
|
raise
|
||||||
@@ -329,10 +318,12 @@ async def _driver_connection_loop(ip: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def ensure_driver_connection(peer_ip: str) -> None:
|
def ensure_driver_connection(peer_ip: str) -> None:
|
||||||
"""Start (or keep) a background task that maintains ``ws://<ip>:port/ws``."""
|
"""Dial ``ws://<ip>:port/ws`` up to wifi_driver_initial_connect_attempts times (UDP hello only)."""
|
||||||
key = normalize_tcp_peer_ip(peer_ip)
|
key = normalize_tcp_peer_ip(peer_ip)
|
||||||
if not key:
|
if not key:
|
||||||
return
|
return
|
||||||
|
if tcp_client_connected(key):
|
||||||
|
return
|
||||||
t = _tasks.get(key)
|
t = _tasks.get(key)
|
||||||
if t is not None and not t.done():
|
if t is not None and not t.done():
|
||||||
return
|
return
|
||||||
@@ -353,4 +344,3 @@ def cancel_all_driver_tasks() -> None:
|
|||||||
_schedule_status_broadcast(ip, False)
|
_schedule_status_broadcast(ip, False)
|
||||||
_connections.clear()
|
_connections.clear()
|
||||||
_send_locks.clear()
|
_send_locks.clear()
|
||||||
_unreachable_counts.clear()
|
|
||||||
|
|||||||
@@ -57,12 +57,9 @@ class Settings(dict):
|
|||||||
self['wifi_driver_ws_port'] = 80
|
self['wifi_driver_ws_port'] = 80
|
||||||
if 'wifi_driver_ws_path' not in self:
|
if 'wifi_driver_ws_path' not in self:
|
||||||
self['wifi_driver_ws_path'] = '/ws'
|
self['wifi_driver_ws_path'] = '/ws'
|
||||||
# Seconds between UDP discovery nudges when a Wi-Fi driver WebSocket is
|
# Legacy (unused): periodic UDP nudges removed; connect only on driver hello.
|
||||||
# down (0 disables). Helps drivers that reconnect after seeing traffic on 8766.
|
|
||||||
if 'wifi_driver_hello_interval_s' not in self:
|
if 'wifi_driver_hello_interval_s' not in self:
|
||||||
self['wifi_driver_hello_interval_s'] = 10.0
|
self['wifi_driver_hello_interval_s'] = 0
|
||||||
# Legacy key (no longer read): initial outbound dial limit uses
|
|
||||||
# wifi_driver_initial_connect_attempts instead.
|
|
||||||
if 'wifi_driver_connect_retry_window_s' not in self:
|
if 'wifi_driver_connect_retry_window_s' not in self:
|
||||||
self['wifi_driver_connect_retry_window_s'] = 120.0
|
self['wifi_driver_connect_retry_window_s'] = 120.0
|
||||||
# Spread outbound dials 0..N s by device IP so six+ drivers do not all hit the AP at once.
|
# Spread outbound dials 0..N s by device IP so six+ drivers do not all hit the AP at once.
|
||||||
@@ -74,7 +71,7 @@ class Settings(dict):
|
|||||||
# Pause between outbound WebSocket dial attempts (seconds).
|
# Pause between outbound WebSocket dial attempts (seconds).
|
||||||
if 'wifi_driver_connect_retry_interval_s' not in self:
|
if 'wifi_driver_connect_retry_interval_s' not in self:
|
||||||
self['wifi_driver_connect_retry_interval_s'] = 2.0
|
self['wifi_driver_connect_retry_interval_s'] = 2.0
|
||||||
# Outbound dial attempts to the saved driver IP before first success; then wait for UDP discovery.
|
# Outbound WebSocket dial attempts per driver UDP hello (then wait for next hello).
|
||||||
if 'wifi_driver_initial_connect_attempts' not in self:
|
if 'wifi_driver_initial_connect_attempts' not in self:
|
||||||
self['wifi_driver_initial_connect_attempts'] = 4
|
self['wifi_driver_initial_connect_attempts'] = 4
|
||||||
# UART to ESP32 ESP-NOW bridge; default off (Wi-Fi drivers need no serial).
|
# UART to ESP32 ESP-NOW bridge; default off (Wi-Fi drivers need no serial).
|
||||||
|
|||||||
Reference in New Issue
Block a user