From a64457a0d5ee3f6c3281ab42a33d5316a5336bbb Mon Sep 17 00:00:00 2001 From: pi Date: Sat, 11 Apr 2026 15:19:07 +1200 Subject: [PATCH] chore(led-driver): add http_poll client and UDP/mDNS test helpers Made-with: Cursor --- src/http_poll.py | 68 +++++++++++ tests/test_mdns.py | 268 ++++++++++++++++++++++++++++++++++++++++++++ tests/udp_client.py | 71 ++++++++++++ 3 files changed, 407 insertions(+) create mode 100644 src/http_poll.py create mode 100644 tests/test_mdns.py create mode 100644 tests/udp_client.py diff --git a/src/http_poll.py b/src/http_poll.py new file mode 100644 index 0000000..035f1f1 --- /dev/null +++ b/src/http_poll.py @@ -0,0 +1,68 @@ +"""Minimal HTTP/1.1 POST JSON client for driver long-poll (MicroPython).""" + +import json +import socket + + +def _send_all(sock, data): + n = 0 + while n < len(data): + m = sock.send(data[n:]) + if m <= 0: + raise OSError("socket send failed") + n += m + + +def _read_http_json_body(sock, max_headers=8192): + buf = b"" + while b"\r\n\r\n" not in buf: + chunk = sock.recv(256) + if not chunk: + break + buf += chunk + if len(buf) > max_headers: + raise OSError("response headers too large") + if b"\r\n\r\n" not in buf: + raise OSError("incomplete response headers") + head, rest = buf.split(b"\r\n\r\n", 1) + cl = None + for line in head.split(b"\r\n"): + if line.lower().startswith(b"content-length:"): + try: + cl = int(line.split(b":", 1)[1].strip()) + except (ValueError, IndexError): + cl = None + if cl is None: + body = rest + else: + body = rest + while len(body) < cl: + chunk = sock.recv(min(2048, cl - len(body))) + if not chunk: + break + body += chunk + return json.loads(body.decode("utf-8")) + + +def http_driver_poll(host, port, payload_dict, timeout_s=40.0): + """ + POST ``/driver/v1/poll`` with JSON body; return parsed JSON (expects ``{"lines": [...]}``). + """ + path = "/driver/v1/poll" + body_bytes = json.dumps(payload_dict).encode("utf-8") + host_s = str(host) + req_head = ( + "POST %s HTTP/1.1\r\nHost: %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\nConnection: close\r\n\r\n" + % (path, host_s, len(body_bytes)) + ).encode("utf-8") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.settimeout(timeout_s) + sock.connect((host_s, int(port))) + _send_all(sock, req_head + body_bytes) + return _read_http_json_body(sock) + finally: + try: + sock.close() + except Exception: + pass diff --git a/tests/test_mdns.py b/tests/test_mdns.py new file mode 100644 index 0000000..701138c --- /dev/null +++ b/tests/test_mdns.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +"""mDNS smoke test — runs on the MicroPython device (ESP32). + +Loads Wi-Fi credentials from /settings.json via Settings (same as main firmware). +Sets the network hostname before connect so the chip can advertise as .local +on builds where mDNS is enabled (see network.hostname() in MicroPython docs). + +By default the script stays connected until you stop it (reset or mpremote Ctrl+C) so you +can ping the mDNS name from another machine (e.g. name "a" -> leda.local; hyphens are omitted +in the hostname because ESP32 mDNS often breaks on '-'). + +After flashing, do a full hardware reset once so the first DHCP sees the new hostname. + +Deploy src to the device (including utils.py with mdns_hostname), then from the host: + + mpremote connect PORT run tests/test_mdns.py + +If ImportError: copy utils.py from src/ to the device, or rely on the built-in fallback below. + +Or with cwd led-driver: + + mpremote connect /dev/ttyUSB0 run tests/test_mdns.py +""" + +import time +import network +import socket + +import utime +from machine import WDT + +from settings import Settings + +try: + from utils import mdns_hostname +except ImportError: + + def mdns_hostname(settings): + """Same as utils.mdns_hostname (fallback if device utils.py is older than host repo).""" + raw = settings.get("name") or "led" + suffix = [] + for c in str(raw).lower(): + o = ord(c) + if (48 <= o <= 57) or (97 <= o <= 122): + suffix.append(c) + s = "".join(suffix) + if not s: + s = "device" + h = "led" + s + if len(h) > 32: + h = h[:32] + return h + +CONNECT_TIMEOUT_S = 45 +# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work +# (PHY calibration) runs with no WDT; WDT is only used in HOLD_S. +WDT_TIMEOUT_MS = 10000 +# socket.getaddrinfo(".local", …) often hangs a long time or indefinitely on ESP32; off by default. +SELF_LOCAL_GETADDRINFO = False +# After checks: 0 = exit immediately; >0 = stay up that many seconds; -1 = until reset/Ctrl+C (for remote ping). +HOLD_S = -1 +# Set False to silence [mdns-test] timing lines (phase labels + elapsed ms since test start). +DEBUG = True + + +def _dbg(t0, msg): + if not DEBUG: + return + ms = utime.ticks_diff(utime.ticks_ms(), t0) + print("[mdns-test +%dms] %s" % (ms, msg)) + + +def _set_hostname(h, sta): + """Apply hostname on this STA object before active(True) / connect (DHCP + mDNS).""" + try: + network.hostname(h) + how = "network.hostname" + except (AttributeError, ValueError, OSError) as e: + how = None + last = e + try: + sta.config(hostname=h) + how = how or "WLAN.config(hostname=)" + except (AttributeError, ValueError, OSError) as e: + if how is None: + last = e + if how: + return how + print("Warning: could not set hostname (%s); mDNS name may be default." % last) + return None + + +def _sta_ip(sta): + try: + pair = sta.ipconfig("addr4") + if isinstance(pair, tuple) and pair: + return pair[0].split("/")[0] if isinstance(pair[0], str) else str(pair[0]) + except (AttributeError, OSError, TypeError, ValueError): + pass + return sta.ifconfig()[0] + + +def _wait_wifi(sta, timeout_s, wdt, t0): + """Wait for connection. If wdt is set, feed each iteration (keep gap < WDT_TIMEOUT_MS).""" + deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000)) + n = 0 + while not sta.isconnected(): + if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0: + _dbg(t0, "WiFi wait TIMEOUT after %d iterations, status=%s" % (n, sta.status())) + return False + st = sta.status() + n += 1 + if DEBUG: + _dbg(t0, "WiFi wait iter #%d status=%s" % (n, st)) + else: + print("WiFi status:", st, "(waiting)") + if wdt is not None: + wdt.feed() + time.sleep(1) + if wdt is not None: + wdt.feed() + _dbg(t0, "WiFi connected after %d wait iterations" % n) + return True + + +def _try_resolve_local(hostname, t0): + """Best-effort: resolve our own *.local via getaddrinfo (often blocks a very long time on ESP32).""" + fqdn = hostname + ".local" + _dbg(t0, "getaddrinfo(%r) starting (may block a long time)" % fqdn) + t_gai = utime.ticks_ms() + try: + ai = socket.getaddrinfo(fqdn, 80) + dt = utime.ticks_diff(utime.ticks_ms(), t_gai) + print("getaddrinfo(%r) -> %s" % (fqdn, ai)) + _dbg(t0, "getaddrinfo OK (call took %dms)" % dt) + return True + except OSError as e: + dt = utime.ticks_diff(utime.ticks_ms(), t_gai) + print("getaddrinfo(%r) failed: %s" % (fqdn, e)) + _dbg(t0, "getaddrinfo OSError after %dms" % dt) + return False + + +def main(): + t0 = utime.ticks_ms() + _dbg(t0, "start") + + settings = Settings() + _dbg(t0, "Settings() loaded") + ssid = settings.get("ssid") or "" + password = settings.get("password") or "" + if not ssid: + print("mDNS test skipped: ssid empty in settings.json (configure Wi-Fi to run this test).") + raise SystemExit(0) + + hostname = mdns_hostname(settings) + _dbg(t0, "mdns_hostname -> %r" % hostname) + sta = network.WLAN(network.STA_IF) + how = _set_hostname(hostname, sta) + if how: + print("Hostname set via %s: %r" % (how, hostname)) + _dbg(t0, "set_hostname done (%s)" % (how or "failed")) + + _dbg(t0, "before sta.active(True) (often slow: RF calibration)") + print("WiFi active(True) (can take a while for calibration)...") + sta.active(True) + _dbg(t0, "after sta.active(True)") + try: + sta.config(pm=network.WLAN.PM_NONE) + _dbg(t0, "sta.config(pm=PM_NONE) OK") + except (AttributeError, ValueError, TypeError) as e: + _dbg(t0, "sta.config(pm=PM_NONE) skipped: %s" % e) + + print("Connecting to SSID %r ..." % ssid) + _dbg(t0, "before sta.connect()") + sta.connect(ssid, password) + _dbg(t0, "after sta.connect() (returned; association may still be in progress)") + # No WDT during calibration/wait/getaddrinfo — they can block longer than WDT_TIMEOUT_MS. + if not _wait_wifi(sta, CONNECT_TIMEOUT_S, None, t0): + print("Timeout: not connected. status=", sta.status()) + raise SystemExit(1) + + ip = _sta_ip(sta) + print("WiFi OK, IP:", ip) + try: + stack_host = network.hostname() + except (AttributeError, ValueError, TypeError, OSError): + stack_host = None + if stack_host: + print( + "mDNS: use what the stack reports — ping %s.local (avahi-resolve -n %s.local)" + % (stack_host, stack_host) + ) + if str(stack_host) != str(hostname): + print( + "(We asked for %r but stack reports %r — ping the stack name; cold boot may help.)" + % (hostname, stack_host) + ) + else: + print("From another machine: ping %s.local" % hostname) + print("(or: avahi-resolve -n %s.local)" % hostname) + + if SELF_LOCAL_GETADDRINFO: + _try_resolve_local(hostname, t0) + else: + _dbg( + t0, + "skip getaddrinfo(%s.local): SELF_LOCAL_GETADDRINFO=False (on-device self-.local lookup often hangs)" + % hostname, + ) + print( + "Skipped on-device getaddrinfo(*.local); verify mDNS from a PC (ping above). " + "Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)." + ) + + # Optional: built-in mdns module (not present on all ESP32 builds) + _dbg(t0, "checking for optional 'mdns' module") + try: + import mdns # noqa: F401 + + print("Note: 'mdns' module is present; check your port's docs for Server/API.") + except ImportError: + print("No top-level 'mdns' module; relying on stack mDNS from hostname.") + _dbg(t0, "mdns import check done") + + if HOLD_S != 0: + forever = HOLD_S < 0 + _dbg( + t0, + "starting WDT(%dms) + hold %s" + % (WDT_TIMEOUT_MS, "forever" if forever else ("%ds" % HOLD_S)), + ) + wdt = WDT(timeout=WDT_TIMEOUT_MS) + wdt.feed() + if forever: + ping_target = stack_host or hostname + print( + "Staying online until you stop (reset device or mpremote Ctrl+C). " + "From another host: ping %s.local" % ping_target + ) + else: + print("Keeping connection up for %d s (Ctrl+C or reset to stop) ..." % HOLD_S) + end = None if forever else utime.ticks_add(utime.ticks_ms(), HOLD_S * 1000) + hold_i = 0 + while True: + wdt.feed() + time.sleep(2) + hold_i += 1 + if not sta.isconnected(): + print("lost WiFi connection") + break + if forever: + if DEBUG and hold_i % 15 == 0: + _dbg(t0, "hold alive #%d IP %s" % (hold_i, _sta_ip(sta))) + else: + _dbg(t0, "hold tick #%d" % hold_i) + print("still connected, IP", _sta_ip(sta)) + if utime.ticks_diff(end, utime.ticks_ms()) <= 0: + break + + _dbg(t0, "hold loop finished") + + _dbg(t0, "Done.") + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/tests/udp_client.py b/tests/udp_client.py new file mode 100644 index 0000000..61a1052 --- /dev/null +++ b/tests/udp_client.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +"""UDP discovery test — runs on MicroPython (ESP32). + +Brings up Wi-Fi from settings (test harness only), then **`hello.discover_controller_udp(device_name, wdt)`**. +`hello` does not use Settings or connect Wi‑Fi. + +In firmware, **`main.py`** discovers the controller IP in RAM for HTTP; it is not written to settings. + +Deploy `src` (including `hello.py`), then from host with cwd `led-driver`: + + mpremote connect PORT run tests/udp_client.py +""" + +import time + +import network +import utime +from machine import WDT + +from hello import discover_controller_udp +from settings import Settings + +CONNECT_WAIT_S = 45 +WDT_MS = 10000 + + +def _wait_wifi(sta, timeout_s, wdt): + deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000)) + while not sta.isconnected(): + wdt.feed() + if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0: + return False + print("WiFi status:", sta.status()) + wdt.feed() + time.sleep(1) + wdt.feed() + return True + + +def main(): + settings = Settings() + ssid = settings.get("ssid") or "" + password = settings.get("password") or "" + + if not ssid: + print("udp_client: set ssid/password in settings.json (test harness Wi-Fi).") + raise SystemExit(1) + + sta = network.WLAN(network.STA_IF) + sta.active(True) + try: + sta.config(pm=network.WLAN.PM_NONE) + except (AttributeError, ValueError, TypeError): + pass + + wdt = WDT(timeout=WDT_MS) + wdt.feed() + print("udp_client: connecting to", repr(ssid)) + sta.connect(ssid, password) + wdt.feed() + if not _wait_wifi(sta, CONNECT_WAIT_S, wdt): + print("WiFi timeout, status=", sta.status()) + raise SystemExit(1) + + ip = discover_controller_udp(settings.get("name", ""), wdt=wdt) + if not ip: + raise SystemExit(1) + + +if __name__ == "__main__": + main()