"""LED hello JSON line and UDP broadcast on port 8766. Used so led-controller can register the device (name, MAC, IP) when ``wait_reply`` is false; the controller may then connect to the device's WebSocket. With ``wait_reply`` true, blocks for an echo and returns the controller IP (legacy discovery). Wi-Fi must already be connected; this module does not use Settings or call connect(). """ import json import socket import ubinascii import network # Match led-controller/tests/udp_server.py DISCOVERY_UDP_PORT = 8766 DEFAULT_RECV_TIMEOUT_S = 3 def pack_hello_dict(sta, device_name=""): """Same fields as main HTTP/ESP-NOW hello.""" mac = sta.config("mac") return { "v": "1", "device_name": device_name, "mac": ubinascii.hexlify(mac).decode().lower(), "type": "led", } def pack_hello_bytes(sta, device_name=""): return json.dumps(pack_hello_dict(sta, device_name)).encode("utf-8") def pack_hello_line(sta, device_name=""): """JSON hello + newline (HTTP/UDP discovery payloads).""" return pack_hello_bytes(sta, device_name) + b"\n" def ipv4_broadcast(ip, netmask): """Directed broadcast (e.g. 192.168.1.0/24 -> 192.168.1.255).""" ia = [int(x) for x in ip.split(".")] im = [int(x) for x in netmask.split(".")] if len(ia) != 4 or len(im) != 4: return None # STA often reports 255.255.255.255; "broadcast" would equal the host IP — useless for LAN. if netmask == "255.255.255.255": return None bcast = ".".join(str(ia[i] | (255 - im[i])) for i in range(4)) if bcast == ip: return None return bcast def udp_discovery_targets(ip, mask): """(directed_broadcast, port) then limited broadcast.""" out = [("255.255.255.255", DISCOVERY_UDP_PORT)] b = ipv4_broadcast(ip, mask) if b: out.insert(0, (b, DISCOVERY_UDP_PORT)) return out def _udp_discovery_targets_single(ip, mask): """One destination: subnet broadcast if known, else limited broadcast.""" b = ipv4_broadcast(ip, mask) if b: return [(b, DISCOVERY_UDP_PORT)] return [("255.255.255.255", DISCOVERY_UDP_PORT)] def broadcast_hello_udp( sta, device_name="", *, wait_reply=True, recv_timeout_s=DEFAULT_RECV_TIMEOUT_S, wdt=None, dual_destinations=True, ): """ Send pack_hello_line on DISCOVERY_UDP_PORT. STA must already be connected with a valid IPv4 (caller brings up Wi-Fi). If dual_destinations (default), send subnet broadcast then 255.255.255.255 so discovery works on awkward APs — the controller may receive two packets. If dual_destinations is False, send only one (subnet broadcast or limited), e.g. after TCP connect so the Pi does not run duplicate resync handlers. If wait_reply, wait for first UDP echo. Returns controller IP string or None. """ ip, mask, _gw, _dns = sta.ifconfig() msg = pack_hello_line(sta, device_name) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) except (AttributeError, OSError) as e: print("SO_BROADCAST not set:", e) try: sock.bind((ip, 0)) except (AttributeError, OSError, TypeError) as e: try: sock.bind(("0.0.0.0", 0)) except (AttributeError, OSError) as e2: print("bind skipped:", e, e2) if wait_reply: try: sock.settimeout(recv_timeout_s) except (AttributeError, OSError): pass discovered = None targets = ( udp_discovery_targets(ip, mask) if dual_destinations else _udp_discovery_targets_single(ip, mask) ) for dest_ip, dest_port in targets: if wdt is not None: wdt.feed() target = (dest_ip, dest_port) try: sock.sendto(msg, target) except OSError as e: print("sendto failed:", e) continue if not wait_reply: continue if wdt is not None: wdt.feed() try: _data, addr = sock.recvfrom(2048) remote_ip = addr[0] discovered = remote_ip break except OSError: pass sock.close() return discovered def discover_controller_udp(device_name="", wdt=None): """ Broadcast hello; return controller IP from first UDP echo, or None. STA must already be connected. device_name: logical name in the JSON (caller supplies, e.g. from Settings elsewhere). wdt: optional WDT to feed during waits. """ sta = network.WLAN(network.STA_IF) if not sta.isconnected(): print("hello: STA not connected — connect Wi-Fi before discovery.") raise SystemExit(1) ip, mask, _g, _d = sta.ifconfig() if ip == "0.0.0.0": print("hello: STA has no IP address.") raise SystemExit(1) discovered = broadcast_hello_udp( sta, device_name, wait_reply=True, wdt=wdt, ) return discovered if __name__ == "__main__": if not discover_controller_udp(): raise SystemExit(1)