chore(led-driver): add http_poll client and UDP/mDNS test helpers
Made-with: Cursor
This commit is contained in:
68
src/http_poll.py
Normal file
68
src/http_poll.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""Minimal HTTP/1.1 POST JSON client for driver long-poll (MicroPython)."""
|
||||
|
||||
import json
|
||||
import socket
|
||||
|
||||
|
||||
def _send_all(sock, data):
|
||||
n = 0
|
||||
while n < len(data):
|
||||
m = sock.send(data[n:])
|
||||
if m <= 0:
|
||||
raise OSError("socket send failed")
|
||||
n += m
|
||||
|
||||
|
||||
def _read_http_json_body(sock, max_headers=8192):
|
||||
buf = b""
|
||||
while b"\r\n\r\n" not in buf:
|
||||
chunk = sock.recv(256)
|
||||
if not chunk:
|
||||
break
|
||||
buf += chunk
|
||||
if len(buf) > max_headers:
|
||||
raise OSError("response headers too large")
|
||||
if b"\r\n\r\n" not in buf:
|
||||
raise OSError("incomplete response headers")
|
||||
head, rest = buf.split(b"\r\n\r\n", 1)
|
||||
cl = None
|
||||
for line in head.split(b"\r\n"):
|
||||
if line.lower().startswith(b"content-length:"):
|
||||
try:
|
||||
cl = int(line.split(b":", 1)[1].strip())
|
||||
except (ValueError, IndexError):
|
||||
cl = None
|
||||
if cl is None:
|
||||
body = rest
|
||||
else:
|
||||
body = rest
|
||||
while len(body) < cl:
|
||||
chunk = sock.recv(min(2048, cl - len(body)))
|
||||
if not chunk:
|
||||
break
|
||||
body += chunk
|
||||
return json.loads(body.decode("utf-8"))
|
||||
|
||||
|
||||
def http_driver_poll(host, port, payload_dict, timeout_s=40.0):
|
||||
"""
|
||||
POST ``/driver/v1/poll`` with JSON body; return parsed JSON (expects ``{"lines": [...]}``).
|
||||
"""
|
||||
path = "/driver/v1/poll"
|
||||
body_bytes = json.dumps(payload_dict).encode("utf-8")
|
||||
host_s = str(host)
|
||||
req_head = (
|
||||
"POST %s HTTP/1.1\r\nHost: %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\nConnection: close\r\n\r\n"
|
||||
% (path, host_s, len(body_bytes))
|
||||
).encode("utf-8")
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
sock.settimeout(timeout_s)
|
||||
sock.connect((host_s, int(port)))
|
||||
_send_all(sock, req_head + body_bytes)
|
||||
return _read_http_json_body(sock)
|
||||
finally:
|
||||
try:
|
||||
sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
268
tests/test_mdns.py
Normal file
268
tests/test_mdns.py
Normal file
@@ -0,0 +1,268 @@
|
||||
#!/usr/bin/env python3
|
||||
"""mDNS smoke test — runs on the MicroPython device (ESP32).
|
||||
|
||||
Loads Wi-Fi credentials from /settings.json via Settings (same as main firmware).
|
||||
Sets the network hostname before connect so the chip can advertise as <hostname>.local
|
||||
on builds where mDNS is enabled (see network.hostname() in MicroPython docs).
|
||||
|
||||
By default the script stays connected until you stop it (reset or mpremote Ctrl+C) so you
|
||||
can ping the mDNS name from another machine (e.g. name "a" -> leda.local; hyphens are omitted
|
||||
in the hostname because ESP32 mDNS often breaks on '-').
|
||||
|
||||
After flashing, do a full hardware reset once so the first DHCP sees the new hostname.
|
||||
|
||||
Deploy src to the device (including utils.py with mdns_hostname), then from the host:
|
||||
|
||||
mpremote connect PORT run tests/test_mdns.py
|
||||
|
||||
If ImportError: copy utils.py from src/ to the device, or rely on the built-in fallback below.
|
||||
|
||||
Or with cwd led-driver:
|
||||
|
||||
mpremote connect /dev/ttyUSB0 run tests/test_mdns.py
|
||||
"""
|
||||
|
||||
import time
|
||||
import network
|
||||
import socket
|
||||
|
||||
import utime
|
||||
from machine import WDT
|
||||
|
||||
from settings import Settings
|
||||
|
||||
try:
|
||||
from utils import mdns_hostname
|
||||
except ImportError:
|
||||
|
||||
def mdns_hostname(settings):
|
||||
"""Same as utils.mdns_hostname (fallback if device utils.py is older than host repo)."""
|
||||
raw = settings.get("name") or "led"
|
||||
suffix = []
|
||||
for c in str(raw).lower():
|
||||
o = ord(c)
|
||||
if (48 <= o <= 57) or (97 <= o <= 122):
|
||||
suffix.append(c)
|
||||
s = "".join(suffix)
|
||||
if not s:
|
||||
s = "device"
|
||||
h = "led" + s
|
||||
if len(h) > 32:
|
||||
h = h[:32]
|
||||
return h
|
||||
|
||||
CONNECT_TIMEOUT_S = 45
|
||||
# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work
|
||||
# (PHY calibration) runs with no WDT; WDT is only used in HOLD_S.
|
||||
WDT_TIMEOUT_MS = 10000
|
||||
# socket.getaddrinfo("<self>.local", …) often hangs a long time or indefinitely on ESP32; off by default.
|
||||
SELF_LOCAL_GETADDRINFO = False
|
||||
# After checks: 0 = exit immediately; >0 = stay up that many seconds; -1 = until reset/Ctrl+C (for remote ping).
|
||||
HOLD_S = -1
|
||||
# Set False to silence [mdns-test] timing lines (phase labels + elapsed ms since test start).
|
||||
DEBUG = True
|
||||
|
||||
|
||||
def _dbg(t0, msg):
|
||||
if not DEBUG:
|
||||
return
|
||||
ms = utime.ticks_diff(utime.ticks_ms(), t0)
|
||||
print("[mdns-test +%dms] %s" % (ms, msg))
|
||||
|
||||
|
||||
def _set_hostname(h, sta):
|
||||
"""Apply hostname on this STA object before active(True) / connect (DHCP + mDNS)."""
|
||||
try:
|
||||
network.hostname(h)
|
||||
how = "network.hostname"
|
||||
except (AttributeError, ValueError, OSError) as e:
|
||||
how = None
|
||||
last = e
|
||||
try:
|
||||
sta.config(hostname=h)
|
||||
how = how or "WLAN.config(hostname=)"
|
||||
except (AttributeError, ValueError, OSError) as e:
|
||||
if how is None:
|
||||
last = e
|
||||
if how:
|
||||
return how
|
||||
print("Warning: could not set hostname (%s); mDNS name may be default." % last)
|
||||
return None
|
||||
|
||||
|
||||
def _sta_ip(sta):
|
||||
try:
|
||||
pair = sta.ipconfig("addr4")
|
||||
if isinstance(pair, tuple) and pair:
|
||||
return pair[0].split("/")[0] if isinstance(pair[0], str) else str(pair[0])
|
||||
except (AttributeError, OSError, TypeError, ValueError):
|
||||
pass
|
||||
return sta.ifconfig()[0]
|
||||
|
||||
|
||||
def _wait_wifi(sta, timeout_s, wdt, t0):
|
||||
"""Wait for connection. If wdt is set, feed each iteration (keep gap < WDT_TIMEOUT_MS)."""
|
||||
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
|
||||
n = 0
|
||||
while not sta.isconnected():
|
||||
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
|
||||
_dbg(t0, "WiFi wait TIMEOUT after %d iterations, status=%s" % (n, sta.status()))
|
||||
return False
|
||||
st = sta.status()
|
||||
n += 1
|
||||
if DEBUG:
|
||||
_dbg(t0, "WiFi wait iter #%d status=%s" % (n, st))
|
||||
else:
|
||||
print("WiFi status:", st, "(waiting)")
|
||||
if wdt is not None:
|
||||
wdt.feed()
|
||||
time.sleep(1)
|
||||
if wdt is not None:
|
||||
wdt.feed()
|
||||
_dbg(t0, "WiFi connected after %d wait iterations" % n)
|
||||
return True
|
||||
|
||||
|
||||
def _try_resolve_local(hostname, t0):
|
||||
"""Best-effort: resolve our own *.local via getaddrinfo (often blocks a very long time on ESP32)."""
|
||||
fqdn = hostname + ".local"
|
||||
_dbg(t0, "getaddrinfo(%r) starting (may block a long time)" % fqdn)
|
||||
t_gai = utime.ticks_ms()
|
||||
try:
|
||||
ai = socket.getaddrinfo(fqdn, 80)
|
||||
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
|
||||
print("getaddrinfo(%r) -> %s" % (fqdn, ai))
|
||||
_dbg(t0, "getaddrinfo OK (call took %dms)" % dt)
|
||||
return True
|
||||
except OSError as e:
|
||||
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
|
||||
print("getaddrinfo(%r) failed: %s" % (fqdn, e))
|
||||
_dbg(t0, "getaddrinfo OSError after %dms" % dt)
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
t0 = utime.ticks_ms()
|
||||
_dbg(t0, "start")
|
||||
|
||||
settings = Settings()
|
||||
_dbg(t0, "Settings() loaded")
|
||||
ssid = settings.get("ssid") or ""
|
||||
password = settings.get("password") or ""
|
||||
if not ssid:
|
||||
print("mDNS test skipped: ssid empty in settings.json (configure Wi-Fi to run this test).")
|
||||
raise SystemExit(0)
|
||||
|
||||
hostname = mdns_hostname(settings)
|
||||
_dbg(t0, "mdns_hostname -> %r" % hostname)
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
how = _set_hostname(hostname, sta)
|
||||
if how:
|
||||
print("Hostname set via %s: %r" % (how, hostname))
|
||||
_dbg(t0, "set_hostname done (%s)" % (how or "failed"))
|
||||
|
||||
_dbg(t0, "before sta.active(True) (often slow: RF calibration)")
|
||||
print("WiFi active(True) (can take a while for calibration)...")
|
||||
sta.active(True)
|
||||
_dbg(t0, "after sta.active(True)")
|
||||
try:
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
_dbg(t0, "sta.config(pm=PM_NONE) OK")
|
||||
except (AttributeError, ValueError, TypeError) as e:
|
||||
_dbg(t0, "sta.config(pm=PM_NONE) skipped: %s" % e)
|
||||
|
||||
print("Connecting to SSID %r ..." % ssid)
|
||||
_dbg(t0, "before sta.connect()")
|
||||
sta.connect(ssid, password)
|
||||
_dbg(t0, "after sta.connect() (returned; association may still be in progress)")
|
||||
# No WDT during calibration/wait/getaddrinfo — they can block longer than WDT_TIMEOUT_MS.
|
||||
if not _wait_wifi(sta, CONNECT_TIMEOUT_S, None, t0):
|
||||
print("Timeout: not connected. status=", sta.status())
|
||||
raise SystemExit(1)
|
||||
|
||||
ip = _sta_ip(sta)
|
||||
print("WiFi OK, IP:", ip)
|
||||
try:
|
||||
stack_host = network.hostname()
|
||||
except (AttributeError, ValueError, TypeError, OSError):
|
||||
stack_host = None
|
||||
if stack_host:
|
||||
print(
|
||||
"mDNS: use what the stack reports — ping %s.local (avahi-resolve -n %s.local)"
|
||||
% (stack_host, stack_host)
|
||||
)
|
||||
if str(stack_host) != str(hostname):
|
||||
print(
|
||||
"(We asked for %r but stack reports %r — ping the stack name; cold boot may help.)"
|
||||
% (hostname, stack_host)
|
||||
)
|
||||
else:
|
||||
print("From another machine: ping %s.local" % hostname)
|
||||
print("(or: avahi-resolve -n %s.local)" % hostname)
|
||||
|
||||
if SELF_LOCAL_GETADDRINFO:
|
||||
_try_resolve_local(hostname, t0)
|
||||
else:
|
||||
_dbg(
|
||||
t0,
|
||||
"skip getaddrinfo(%s.local): SELF_LOCAL_GETADDRINFO=False (on-device self-.local lookup often hangs)"
|
||||
% hostname,
|
||||
)
|
||||
print(
|
||||
"Skipped on-device getaddrinfo(*.local); verify mDNS from a PC (ping above). "
|
||||
"Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)."
|
||||
)
|
||||
|
||||
# Optional: built-in mdns module (not present on all ESP32 builds)
|
||||
_dbg(t0, "checking for optional 'mdns' module")
|
||||
try:
|
||||
import mdns # noqa: F401
|
||||
|
||||
print("Note: 'mdns' module is present; check your port's docs for Server/API.")
|
||||
except ImportError:
|
||||
print("No top-level 'mdns' module; relying on stack mDNS from hostname.")
|
||||
_dbg(t0, "mdns import check done")
|
||||
|
||||
if HOLD_S != 0:
|
||||
forever = HOLD_S < 0
|
||||
_dbg(
|
||||
t0,
|
||||
"starting WDT(%dms) + hold %s"
|
||||
% (WDT_TIMEOUT_MS, "forever" if forever else ("%ds" % HOLD_S)),
|
||||
)
|
||||
wdt = WDT(timeout=WDT_TIMEOUT_MS)
|
||||
wdt.feed()
|
||||
if forever:
|
||||
ping_target = stack_host or hostname
|
||||
print(
|
||||
"Staying online until you stop (reset device or mpremote Ctrl+C). "
|
||||
"From another host: ping %s.local" % ping_target
|
||||
)
|
||||
else:
|
||||
print("Keeping connection up for %d s (Ctrl+C or reset to stop) ..." % HOLD_S)
|
||||
end = None if forever else utime.ticks_add(utime.ticks_ms(), HOLD_S * 1000)
|
||||
hold_i = 0
|
||||
while True:
|
||||
wdt.feed()
|
||||
time.sleep(2)
|
||||
hold_i += 1
|
||||
if not sta.isconnected():
|
||||
print("lost WiFi connection")
|
||||
break
|
||||
if forever:
|
||||
if DEBUG and hold_i % 15 == 0:
|
||||
_dbg(t0, "hold alive #%d IP %s" % (hold_i, _sta_ip(sta)))
|
||||
else:
|
||||
_dbg(t0, "hold tick #%d" % hold_i)
|
||||
print("still connected, IP", _sta_ip(sta))
|
||||
if utime.ticks_diff(end, utime.ticks_ms()) <= 0:
|
||||
break
|
||||
|
||||
_dbg(t0, "hold loop finished")
|
||||
|
||||
_dbg(t0, "Done.")
|
||||
print("Done.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
71
tests/udp_client.py
Normal file
71
tests/udp_client.py
Normal file
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
"""UDP discovery test — runs on MicroPython (ESP32).
|
||||
|
||||
Brings up Wi-Fi from settings (test harness only), then **`hello.discover_controller_udp(device_name, wdt)`**.
|
||||
`hello` does not use Settings or connect Wi‑Fi.
|
||||
|
||||
In firmware, **`main.py`** discovers the controller IP in RAM for HTTP; it is not written to settings.
|
||||
|
||||
Deploy `src` (including `hello.py`), then from host with cwd `led-driver`:
|
||||
|
||||
mpremote connect PORT run tests/udp_client.py
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import network
|
||||
import utime
|
||||
from machine import WDT
|
||||
|
||||
from hello import discover_controller_udp
|
||||
from settings import Settings
|
||||
|
||||
CONNECT_WAIT_S = 45
|
||||
WDT_MS = 10000
|
||||
|
||||
|
||||
def _wait_wifi(sta, timeout_s, wdt):
|
||||
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
|
||||
while not sta.isconnected():
|
||||
wdt.feed()
|
||||
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
|
||||
return False
|
||||
print("WiFi status:", sta.status())
|
||||
wdt.feed()
|
||||
time.sleep(1)
|
||||
wdt.feed()
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
settings = Settings()
|
||||
ssid = settings.get("ssid") or ""
|
||||
password = settings.get("password") or ""
|
||||
|
||||
if not ssid:
|
||||
print("udp_client: set ssid/password in settings.json (test harness Wi-Fi).")
|
||||
raise SystemExit(1)
|
||||
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
try:
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
except (AttributeError, ValueError, TypeError):
|
||||
pass
|
||||
|
||||
wdt = WDT(timeout=WDT_MS)
|
||||
wdt.feed()
|
||||
print("udp_client: connecting to", repr(ssid))
|
||||
sta.connect(ssid, password)
|
||||
wdt.feed()
|
||||
if not _wait_wifi(sta, CONNECT_WAIT_S, wdt):
|
||||
print("WiFi timeout, status=", sta.status())
|
||||
raise SystemExit(1)
|
||||
|
||||
ip = discover_controller_udp(settings.get("name", ""), wdt=wdt)
|
||||
if not ip:
|
||||
raise SystemExit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user