#!/usr/bin/env python3 """UDP echo server for testing the led-driver UDP client (MicroPython ESP32). Listens on UDP, prints each datagram (peer + payload), sends the same bytes back. Run on the Pi (or any host on the LAN): python3 tests/udp_server.py python3 tests/udp_server.py -p 8766 --bind 0.0.0.0 Pair with **`led-driver/tests/udp_client.py`**: the device broadcasts a hello; this server echoes so the client learns the controller's **unicast IP** from the reply (firmware uses that for HTTP to the web server only; it is not stored in settings). Some Wi‑Fi APs block broadcast between clients — prefer a wired listener. """ from __future__ import annotations import argparse import json import socket import sys DEFAULT_PORT = 8766 def main() -> int: parser = argparse.ArgumentParser(description="UDP echo server for led-driver tests") parser.add_argument( "--bind", default="0.0.0.0", metavar="ADDR", help="Address to bind (default: all interfaces)", ) parser.add_argument( "-p", "--port", type=int, default=DEFAULT_PORT, help=f"UDP port (default: {DEFAULT_PORT})", ) args = parser.parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except (AttributeError, OSError): pass try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) except (AttributeError, OSError): pass try: sock.bind((args.bind, args.port)) except OSError as e: print(f"bind {args.bind!r}:{args.port} failed: {e}", file=sys.stderr) return 1 print(f"UDP echo listening on {args.bind}:{args.port} (Ctrl+C to stop)") while True: try: data, addr = sock.recvfrom(2048) except KeyboardInterrupt: print("\nStopping.") return 0 client_ip, client_port = addr[0], addr[1] text = data.decode("utf-8", errors="replace") print(f"client_ip={client_ip} client_udp_port={client_port} ({len(data)} bytes)") print(f" payload: {text!r}") line = data.split(b"\n", 1)[0].strip() if line: try: obj = json.loads(line.decode("utf-8")) if isinstance(obj, dict) and obj.get("type") == "led": print( " hello: device_name=%r mac=%r v=%r" % (obj.get("device_name"), obj.get("mac"), obj.get("v")) ) except (UnicodeError, ValueError, TypeError): pass try: sock.sendto(data, addr) except OSError as e: print(f" sendto failed: {e}", file=sys.stderr) if __name__ == "__main__": raise SystemExit(main())