chore: add pattern samples, http driver helpers, OTA/UDP test tools
- patterns/: sample dynamic pattern modules for OTA - esp32/msg.json: example bridge message shape - models/http_driver.py, wifi_peer.py: Wi-Fi driver HTTP poll helpers - tests: pattern OTA send script and UDP discovery echo server - Submodule led-driver: http_poll and test utilities Made-with: Cursor
This commit is contained in:
89
tests/udp_server.py
Normal file
89
tests/udp_server.py
Normal file
@@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env python3
|
||||
"""UDP echo server for testing the led-driver UDP client (MicroPython ESP32).
|
||||
|
||||
Listens on UDP, prints each datagram (peer + payload), sends the same bytes back.
|
||||
|
||||
Run on the Pi (or any host on the LAN):
|
||||
|
||||
python3 tests/udp_server.py
|
||||
python3 tests/udp_server.py -p 8766 --bind 0.0.0.0
|
||||
|
||||
Pair with **`led-driver/tests/udp_client.py`**: the device broadcasts a hello; this server
|
||||
echoes so the client learns the controller's **unicast IP** from the reply (firmware uses that
|
||||
for HTTP to the web server only; it is not stored in settings). Some Wi‑Fi APs block broadcast between clients —
|
||||
prefer a wired listener.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import socket
|
||||
import sys
|
||||
|
||||
|
||||
DEFAULT_PORT = 8766
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="UDP echo server for led-driver tests")
|
||||
parser.add_argument(
|
||||
"--bind",
|
||||
default="0.0.0.0",
|
||||
metavar="ADDR",
|
||||
help="Address to bind (default: all interfaces)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--port",
|
||||
type=int,
|
||||
default=DEFAULT_PORT,
|
||||
help=f"UDP port (default: {DEFAULT_PORT})",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
try:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
except (AttributeError, OSError):
|
||||
pass
|
||||
try:
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
except (AttributeError, OSError):
|
||||
pass
|
||||
try:
|
||||
sock.bind((args.bind, args.port))
|
||||
except OSError as e:
|
||||
print(f"bind {args.bind!r}:{args.port} failed: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
print(f"UDP echo listening on {args.bind}:{args.port} (Ctrl+C to stop)")
|
||||
while True:
|
||||
try:
|
||||
data, addr = sock.recvfrom(2048)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping.")
|
||||
return 0
|
||||
client_ip, client_port = addr[0], addr[1]
|
||||
text = data.decode("utf-8", errors="replace")
|
||||
print(f"client_ip={client_ip} client_udp_port={client_port} ({len(data)} bytes)")
|
||||
print(f" payload: {text!r}")
|
||||
line = data.split(b"\n", 1)[0].strip()
|
||||
if line:
|
||||
try:
|
||||
obj = json.loads(line.decode("utf-8"))
|
||||
if isinstance(obj, dict) and obj.get("type") == "led":
|
||||
print(
|
||||
" hello: device_name=%r mac=%r v=%r"
|
||||
% (obj.get("device_name"), obj.get("mac"), obj.get("v"))
|
||||
)
|
||||
except (UnicodeError, ValueError, TypeError):
|
||||
pass
|
||||
try:
|
||||
sock.sendto(data, addr)
|
||||
except OSError as e:
|
||||
print(f" sendto failed: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user