refactor(led-driver): simplify websocket runtime and test layout

This commit is contained in:
2026-04-14 22:08:23 +12:00
parent ded6e3d360
commit 0a33f399e1
30 changed files with 2850 additions and 503 deletions

View File

@@ -1,4 +1,8 @@
"""LED hello payload and UDP broadcast discovery (controller IP via echo on port 8766).
"""LED hello JSON line and UDP broadcast on port 8766.
Used so led-controller can register the device (name, MAC, IP) when ``wait_reply`` is
false; the controller may then connect to the device's WebSocket. With
``wait_reply`` true, blocks for an echo and returns the controller IP (legacy discovery).
Wi-Fi must already be connected; this module does not use Settings or call connect().
"""
@@ -40,7 +44,13 @@ def ipv4_broadcast(ip, netmask):
im = [int(x) for x in netmask.split(".")]
if len(ia) != 4 or len(im) != 4:
return None
return ".".join(str(ia[i] | (255 - im[i])) for i in range(4))
# STA often reports 255.255.255.255; "broadcast" would equal the host IP — useless for LAN.
if netmask == "255.255.255.255":
return None
bcast = ".".join(str(ia[i] | (255 - im[i])) for i in range(4))
if bcast == ip:
return None
return bcast
def udp_discovery_targets(ip, mask):
@@ -52,6 +62,14 @@ def udp_discovery_targets(ip, mask):
return out
def _udp_discovery_targets_single(ip, mask):
"""One destination: subnet broadcast if known, else limited broadcast."""
b = ipv4_broadcast(ip, mask)
if b:
return [(b, DISCOVERY_UDP_PORT)]
return [("255.255.255.255", DISCOVERY_UDP_PORT)]
def broadcast_hello_udp(
sta,
device_name="",
@@ -59,11 +77,17 @@ def broadcast_hello_udp(
wait_reply=True,
recv_timeout_s=DEFAULT_RECV_TIMEOUT_S,
wdt=None,
dual_destinations=True,
):
"""
Send pack_hello_line via directed then 255.255.255.255 on DISCOVERY_UDP_PORT.
Send pack_hello_line on DISCOVERY_UDP_PORT.
STA must already be connected with a valid IPv4 (caller brings up Wi-Fi).
If dual_destinations (default), send subnet broadcast then 255.255.255.255 so
discovery works on awkward APs — the controller may receive two packets.
If dual_destinations is False, send only one (subnet broadcast or limited),
e.g. after TCP connect so the Pi does not run duplicate resync handlers.
If wait_reply, wait for first UDP echo. Returns controller IP string or None.
"""
ip, mask, _gw, _dns = sta.ifconfig()
@@ -89,7 +113,12 @@ def broadcast_hello_udp(
pass
discovered = None
for dest_ip, dest_port in udp_discovery_targets(ip, mask):
targets = (
udp_discovery_targets(ip, mask)
if dual_destinations
else _udp_discovery_targets_single(ip, mask)
)
for dest_ip, dest_port in targets:
if wdt is not None:
wdt.feed()
label = "%s:%s" % (dest_ip, dest_port)