Files
led-controller/tests/udp_server.py
pi 5a1067263a chore: add pattern samples, http driver helpers, OTA/UDP test tools
- patterns/: sample dynamic pattern modules for OTA
- esp32/msg.json: example bridge message shape
- models/http_driver.py, wifi_peer.py: Wi-Fi driver HTTP poll helpers
- tests: pattern OTA send script and UDP discovery echo server
- Submodule led-driver: http_poll and test utilities

Made-with: Cursor
2026-04-11 15:19:15 +12:00

90 lines
2.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""UDP echo server for testing the led-driver UDP client (MicroPython ESP32).
Listens on UDP, prints each datagram (peer + payload), sends the same bytes back.
Run on the Pi (or any host on the LAN):
python3 tests/udp_server.py
python3 tests/udp_server.py -p 8766 --bind 0.0.0.0
Pair with **`led-driver/tests/udp_client.py`**: the device broadcasts a hello; this server
echoes so the client learns the controller's **unicast IP** from the reply (firmware uses that
for HTTP to the web server only; it is not stored in settings). Some WiFi APs block broadcast between clients —
prefer a wired listener.
"""
from __future__ import annotations
import argparse
import json
import socket
import sys
DEFAULT_PORT = 8766
def main() -> int:
parser = argparse.ArgumentParser(description="UDP echo server for led-driver tests")
parser.add_argument(
"--bind",
default="0.0.0.0",
metavar="ADDR",
help="Address to bind (default: all interfaces)",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=DEFAULT_PORT,
help=f"UDP port (default: {DEFAULT_PORT})",
)
args = parser.parse_args()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except (AttributeError, OSError):
pass
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except (AttributeError, OSError):
pass
try:
sock.bind((args.bind, args.port))
except OSError as e:
print(f"bind {args.bind!r}:{args.port} failed: {e}", file=sys.stderr)
return 1
print(f"UDP echo listening on {args.bind}:{args.port} (Ctrl+C to stop)")
while True:
try:
data, addr = sock.recvfrom(2048)
except KeyboardInterrupt:
print("\nStopping.")
return 0
client_ip, client_port = addr[0], addr[1]
text = data.decode("utf-8", errors="replace")
print(f"client_ip={client_ip} client_udp_port={client_port} ({len(data)} bytes)")
print(f" payload: {text!r}")
line = data.split(b"\n", 1)[0].strip()
if line:
try:
obj = json.loads(line.decode("utf-8"))
if isinstance(obj, dict) and obj.get("type") == "led":
print(
" hello: device_name=%r mac=%r v=%r"
% (obj.get("device_name"), obj.get("mac"), obj.get("v"))
)
except (UnicodeError, ValueError, TypeError):
pass
try:
sock.sendto(data, addr)
except OSError as e:
print(f" sendto failed: {e}", file=sys.stderr)
if __name__ == "__main__":
raise SystemExit(main())