Files
led-driver/tests/test_mdns.py

240 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""mDNS smoke test — runs on the MicroPython device (ESP32).
Loads Wi-Fi credentials from /settings.json via Settings (same as main firmware).
Sets the network hostname before connect so the chip can advertise as <hostname>.local
on builds where mDNS is enabled (see network.hostname() in MicroPython docs).
By default the script stays connected until you stop it (reset or mpremote Ctrl+C) so you
can ping the mDNS name from another machine (e.g. name "a" -> leda.local; hyphens are omitted
in the hostname because ESP32 mDNS often breaks on '-').
After flashing, do a full hardware reset once so the first DHCP sees the new hostname.
Deploy src to the device (including utils.py with mdns_hostname), then from the host:
mpremote connect PORT run tests/test_mdns.py
Copy ``utils.py`` from ``src/`` onto the device if imports fail.
Or with cwd led-driver:
mpremote connect /dev/ttyUSB0 run tests/test_mdns.py
"""
import time
import network
import socket
import utime
from machine import WDT
from settings import Settings
from utils import mdns_hostname
CONNECT_TIMEOUT_S = 45
# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work
# (PHY calibration) runs with no WDT; WDT is only used in HOLD_S.
WDT_TIMEOUT_MS = 10000
# socket.getaddrinfo("<self>.local", …) often hangs a long time or indefinitely on ESP32; off by default.
SELF_LOCAL_GETADDRINFO = False
# After checks: 0 = exit immediately; >0 = stay up that many seconds; -1 = until reset/Ctrl+C (for remote ping).
HOLD_S = -1
# Set False to silence [mdns-test] timing lines (phase labels + elapsed ms since test start).
DEBUG = True
def _dbg(t0, msg):
if not DEBUG:
return
ms = utime.ticks_diff(utime.ticks_ms(), t0)
print("[mdns-test +%dms] %s" % (ms, msg))
def _set_hostname(h, sta):
"""Apply hostname on this STA object before active(True) / connect (DHCP + mDNS)."""
try:
network.hostname(h)
how = "network.hostname"
except (AttributeError, ValueError, OSError) as e:
how = None
last = e
try:
sta.config(hostname=h)
how = how or "WLAN.config(hostname=)"
except (AttributeError, ValueError, OSError) as e:
if how is None:
last = e
if how:
return how
print("Warning: could not set hostname (%s); mDNS name may be default." % last)
return None
def _sta_ip(sta):
try:
pair = sta.ipconfig("addr4")
if isinstance(pair, tuple) and pair:
return pair[0].split("/")[0] if isinstance(pair[0], str) else str(pair[0])
except (AttributeError, OSError, TypeError, ValueError):
pass
return sta.ifconfig()[0]
def _wait_wifi(sta, timeout_s, wdt, t0):
"""Wait for connection. If wdt is set, feed each iteration (keep gap < WDT_TIMEOUT_MS)."""
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
n = 0
while not sta.isconnected():
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
_dbg(t0, "WiFi wait TIMEOUT after %d iterations, status=%s" % (n, sta.status()))
return False
st = sta.status()
n += 1
if DEBUG:
_dbg(t0, "WiFi wait iter #%d status=%s" % (n, st))
else:
print("WiFi status:", st, "(waiting)")
if wdt is not None:
wdt.feed()
time.sleep(1)
if wdt is not None:
wdt.feed()
_dbg(t0, "WiFi connected after %d wait iterations" % n)
return True
def _try_resolve_local(hostname, t0):
"""Best-effort: resolve our own *.local via getaddrinfo (often blocks a very long time on ESP32)."""
fqdn = hostname + ".local"
_dbg(t0, "getaddrinfo(%r) starting (may block a long time)" % fqdn)
t_gai = utime.ticks_ms()
try:
ai = socket.getaddrinfo(fqdn, 80)
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
print("getaddrinfo(%r) -> %s" % (fqdn, ai))
_dbg(t0, "getaddrinfo OK (call took %dms)" % dt)
return True
except OSError as e:
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
print("getaddrinfo(%r) failed: %s" % (fqdn, e))
_dbg(t0, "getaddrinfo OSError after %dms" % dt)
return False
def main():
t0 = utime.ticks_ms()
_dbg(t0, "start")
settings = Settings()
_dbg(t0, "Settings() loaded")
ssid = settings.get("ssid") or ""
password = settings.get("password") or ""
if not ssid:
print("mDNS test skipped: ssid empty in settings.json (configure Wi-Fi to run this test).")
raise SystemExit(0)
hostname = mdns_hostname(settings)
_dbg(t0, "mdns_hostname -> %r" % hostname)
sta = network.WLAN(network.STA_IF)
how = _set_hostname(hostname, sta)
if how:
print("Hostname set via %s: %r" % (how, hostname))
_dbg(t0, "set_hostname done (%s)" % (how or "failed"))
_dbg(t0, "before sta.active(True) (often slow: RF calibration)")
print("WiFi active(True) (can take a while for calibration)...")
sta.active(True)
_dbg(t0, "after sta.active(True)")
try:
sta.config(pm=network.WLAN.PM_NONE)
_dbg(t0, "sta.config(pm=PM_NONE) OK")
except (AttributeError, ValueError, TypeError) as e:
_dbg(t0, "sta.config(pm=PM_NONE) skipped: %s" % e)
print("Connecting to SSID %r ..." % ssid)
_dbg(t0, "before sta.connect()")
sta.connect(ssid, password)
_dbg(t0, "after sta.connect() (returned; association may still be in progress)")
# No WDT during calibration/wait/getaddrinfo — they can block longer than WDT_TIMEOUT_MS.
if not _wait_wifi(sta, CONNECT_TIMEOUT_S, None, t0):
print("Timeout: not connected. status=", sta.status())
raise SystemExit(1)
ip = _sta_ip(sta)
print("WiFi OK, IP:", ip)
try:
stack_host = network.hostname()
except (AttributeError, ValueError, TypeError, OSError):
stack_host = None
if stack_host:
print(
"mDNS: use what the stack reports — ping %s.local (avahi-resolve -n %s.local)"
% (stack_host, stack_host)
)
if str(stack_host) != str(hostname):
print(
"(We asked for %r but stack reports %r — ping the stack name; cold boot may help.)"
% (hostname, stack_host)
)
else:
print("From another machine: ping %s.local" % hostname)
print("(or: avahi-resolve -n %s.local)" % hostname)
if SELF_LOCAL_GETADDRINFO:
_try_resolve_local(hostname, t0)
else:
_dbg(
t0,
"skip getaddrinfo(%s.local): SELF_LOCAL_GETADDRINFO=False (on-device self-.local lookup often hangs)"
% hostname,
)
print(
"Skipped on-device getaddrinfo(*.local); verify mDNS from a PC (ping above). "
"Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)."
)
if HOLD_S != 0:
forever = HOLD_S < 0
_dbg(
t0,
"starting WDT(%dms) + hold %s"
% (WDT_TIMEOUT_MS, "forever" if forever else ("%ds" % HOLD_S)),
)
wdt = WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
if forever:
ping_target = stack_host or hostname
print(
"Staying online until you stop (reset device or mpremote Ctrl+C). "
"From another host: ping %s.local" % ping_target
)
else:
print("Keeping connection up for %d s (Ctrl+C or reset to stop) ..." % HOLD_S)
end = None if forever else utime.ticks_add(utime.ticks_ms(), HOLD_S * 1000)
hold_i = 0
while True:
wdt.feed()
time.sleep(2)
hold_i += 1
if not sta.isconnected():
print("lost WiFi connection")
break
if forever:
if DEBUG and hold_i % 15 == 0:
_dbg(t0, "hold alive #%d IP %s" % (hold_i, _sta_ip(sta)))
else:
_dbg(t0, "hold tick #%d" % hold_i)
print("still connected, IP", _sta_ip(sta))
if utime.ticks_diff(end, utime.ticks_ms()) <= 0:
break
_dbg(t0, "hold loop finished")
_dbg(t0, "Done.")
print("Done.")
if __name__ == "__main__":
main()