Compare commits

4 Commits

Author SHA1 Message Date
pi
ded6e3d360 docs: align readme and driver api with tcp/wifi
Made-with: Cursor
2026-04-12 00:13:49 +12:00
pi
a64457a0d5 chore(led-driver): add http_poll client and UDP/mDNS test helpers
Made-with: Cursor
2026-04-11 15:19:07 +12:00
pi
fea4e69140 feat(led-driver): wifi default transport, lazy espnow import, dynamic patterns
Made-with: Cursor
2026-04-11 15:10:12 +12:00
pi
aaaf660e9d feat(driver): discover controller via udp and rediscover on reconnect
Made-with: Cursor
2026-04-06 21:28:00 +12:00
9 changed files with 822 additions and 34 deletions

View File

@@ -1,36 +1,52 @@
# LED Driver - MicroPython # LED Driver MicroPython
MicroPython-based LED driver application for ESP32 microcontrollers. MicroPython LED driver for ESP32: presets, patterns, **Wi-Fi** (TCP + UDP discovery) or **ESP-NOW** transport, optional HTTP polling, and dynamic pattern modules under `src/patterns/`.
## Prerequisites ## Prerequisites
- MicroPython firmware installed on ESP32 - MicroPython firmware on the ESP32
- USB cable for programming - USB cable for programming
- Python 3 with pipenv - Python 3 with pipenv (on the host, for `dev.py` / tests)
## Setup ## Setup
1. Install dependencies: 1. Install dependencies:
```bash ```bash
pipenv install pipenv install
``` ```
2. Deploy to device: 2. Deploy to the device:
```bash ```bash
pipenv run dev pipenv run dev
``` ```
## Project Structure ## Project layout
``` ```
led-driver/ led-driver/
├── src/ ├── src/
│ ├── main.py # Main application code │ ├── main.py # Entry: Wi-Fi/TCP or ESP-NOW path, process_data(), manifest OTA
│ ├── presets.py # LED pattern implementations (includes Preset and Presets classes) │ ├── presets.py # Preset runtime + Presets class
│ ├── settings.py # Settings management │ ├── preset.py # Single preset helpers
── p2p.py # Peer-to-peer communication ── settings.py # settings.json
├── test/ # Pattern tests │ ├── hello.py # UDP discovery (port 8766) / hello payloads
├── web_app.py # Web interface │ ├── http_poll.py # Optional HTTP polling helper
├── dev.py # Development tools │ ├── utils.py # Colour conversion / ordering
└── Pipfile # Python dependencies │ ├── presets.json # Default preset file (on device)
│ └── patterns/ # Pattern modules (.py), loaded dynamically
├── tests/ # Host-side helpers (e.g. udp_client.py, test_mdns.py)
├── test/ # On-device style pattern tests (all.py, patterns/)
├── dev.py # Deploy / sync to serial device
├── docs/API.md # Wire format (long keys); Pi app docs short keys
├── msg.json # Sample message
├── Pipfile
└── LICENSE
``` ```
**Transport:** `settings.json` **`transport_type`** is typically **`wifi`** (TCP to the Pi on port **8765**, discovery on **8766**) or **`espnow`**. ESP-NOW code paths are loaded only when needed so a Wi-Fi-only image stays smaller.
## Further reading
- **`docs/API.md`** — JSON message fields as used in examples (`pattern`, `colors`, …). The Pi app may send **short keys** (`p`, `c`, …); behaviour matches once normalised on device.

View File

@@ -1,10 +1,10 @@
# LED Driver ESPNow API Documentation # LED Driver API (message format)
This document describes the ESPNow message format for controlling LED driver devices. This document describes the **JSON message format** for controlling LED driver devices. The same object is accepted from **ESP-NOW** (when that transport is enabled) and as **one JSON value per line** over **TCP** in **Wi-Fi** mode (see `src/main.py` on the device).
## Message Format ## Message Format
All messages are JSON objects sent via ESPNow with the following structure: All messages are JSON objects with the following structure:
```json ```json
{ {

162
src/hello.py Normal file
View File

@@ -0,0 +1,162 @@
"""LED hello payload and UDP broadcast discovery (controller IP via echo on port 8766).
Wi-Fi must already be connected; this module does not use Settings or call connect().
"""
import json
import socket
import ubinascii
import network
# Match led-controller/tests/udp_server.py
DISCOVERY_UDP_PORT = 8766
DEFAULT_RECV_TIMEOUT_S = 3
def pack_hello_dict(sta, device_name=""):
"""Same fields as main HTTP/ESP-NOW hello."""
mac = sta.config("mac")
return {
"v": "1",
"device_name": device_name,
"mac": ubinascii.hexlify(mac).decode().lower(),
"type": "led",
}
def pack_hello_bytes(sta, device_name=""):
return json.dumps(pack_hello_dict(sta, device_name)).encode("utf-8")
def pack_hello_line(sta, device_name=""):
"""JSON hello + newline (HTTP/UDP discovery payloads)."""
return pack_hello_bytes(sta, device_name) + b"\n"
def ipv4_broadcast(ip, netmask):
"""Directed broadcast (e.g. 192.168.1.0/24 -> 192.168.1.255)."""
ia = [int(x) for x in ip.split(".")]
im = [int(x) for x in netmask.split(".")]
if len(ia) != 4 or len(im) != 4:
return None
return ".".join(str(ia[i] | (255 - im[i])) for i in range(4))
def udp_discovery_targets(ip, mask):
"""(directed_broadcast, port) then limited broadcast."""
out = [("255.255.255.255", DISCOVERY_UDP_PORT)]
b = ipv4_broadcast(ip, mask)
if b:
out.insert(0, (b, DISCOVERY_UDP_PORT))
return out
def broadcast_hello_udp(
sta,
device_name="",
*,
wait_reply=True,
recv_timeout_s=DEFAULT_RECV_TIMEOUT_S,
wdt=None,
):
"""
Send pack_hello_line via directed then 255.255.255.255 on DISCOVERY_UDP_PORT.
STA must already be connected with a valid IPv4 (caller brings up Wi-Fi).
If wait_reply, wait for first UDP echo. Returns controller IP string or None.
"""
ip, mask, _gw, _dns = sta.ifconfig()
msg = pack_hello_line(sta, device_name)
print("hello:", msg)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except (AttributeError, OSError) as e:
print("SO_BROADCAST not set:", e)
try:
sock.bind((ip, 0))
except (AttributeError, OSError, TypeError) as e:
try:
sock.bind(("0.0.0.0", 0))
except (AttributeError, OSError) as e2:
print("bind skipped:", e, e2)
if wait_reply:
try:
sock.settimeout(recv_timeout_s)
except (AttributeError, OSError):
pass
discovered = None
for dest_ip, dest_port in udp_discovery_targets(ip, mask):
if wdt is not None:
wdt.feed()
label = "%s:%s" % (dest_ip, dest_port)
target = (dest_ip, dest_port)
try:
sock.sendto(msg, target)
print("sent hello ->", target)
except OSError as e:
print("sendto failed:", e)
continue
if not wait_reply:
continue
if wdt is not None:
wdt.feed()
try:
data, addr = sock.recvfrom(2048)
print("reply from", addr, ":", data)
remote_ip = addr[0]
if data != msg:
print("(warning: reply payload differs from hello; still using source IP.)")
discovered = remote_ip
print("Discovered controller at", remote_ip)
break
except OSError as e:
print("recv (no reply):", e, "via", label)
if dest_ip == "255.255.255.255":
print(
"(hint: many APs drop Wi-Fi client broadcast; try wired server or AP without client isolation.)"
)
sock.close()
return discovered
def discover_controller_udp(device_name="", wdt=None):
"""
Broadcast hello; return controller IP from first UDP echo, or None.
STA must already be connected.
device_name: logical name in the JSON (caller supplies, e.g. from Settings elsewhere).
wdt: optional WDT to feed during waits.
"""
sta = network.WLAN(network.STA_IF)
if not sta.isconnected():
print("hello: STA not connected — connect Wi-Fi before discovery.")
raise SystemExit(1)
ip, mask, _g, _d = sta.ifconfig()
if ip == "0.0.0.0":
print("hello: STA has no IP address.")
raise SystemExit(1)
print("STA IP:", ip, "mask:", mask)
discovered = broadcast_hello_udp(
sta,
device_name,
wait_reply=True,
wdt=wdt,
)
if discovered:
print("discover done; controller =", repr(discovered))
else:
print("discover done; controller not found")
return discovered
if __name__ == "__main__":
if not discover_controller_udp():
raise SystemExit(1)

68
src/http_poll.py Normal file
View File

@@ -0,0 +1,68 @@
"""Minimal HTTP/1.1 POST JSON client for driver long-poll (MicroPython)."""
import json
import socket
def _send_all(sock, data):
n = 0
while n < len(data):
m = sock.send(data[n:])
if m <= 0:
raise OSError("socket send failed")
n += m
def _read_http_json_body(sock, max_headers=8192):
buf = b""
while b"\r\n\r\n" not in buf:
chunk = sock.recv(256)
if not chunk:
break
buf += chunk
if len(buf) > max_headers:
raise OSError("response headers too large")
if b"\r\n\r\n" not in buf:
raise OSError("incomplete response headers")
head, rest = buf.split(b"\r\n\r\n", 1)
cl = None
for line in head.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
try:
cl = int(line.split(b":", 1)[1].strip())
except (ValueError, IndexError):
cl = None
if cl is None:
body = rest
else:
body = rest
while len(body) < cl:
chunk = sock.recv(min(2048, cl - len(body)))
if not chunk:
break
body += chunk
return json.loads(body.decode("utf-8"))
def http_driver_poll(host, port, payload_dict, timeout_s=40.0):
"""
POST ``/driver/v1/poll`` with JSON body; return parsed JSON (expects ``{"lines": [...]}``).
"""
path = "/driver/v1/poll"
body_bytes = json.dumps(payload_dict).encode("utf-8")
host_s = str(host)
req_head = (
"POST %s HTTP/1.1\r\nHost: %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\nConnection: close\r\n\r\n"
% (path, host_s, len(body_bytes))
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host_s, int(port)))
_send_all(sock, req_head + body_bytes)
return _read_http_json_body(sock)
finally:
try:
sock.close()
except Exception:
pass

View File

@@ -1,6 +1,5 @@
from settings import Settings from settings import Settings
from machine import WDT from machine import WDT
from espnow import ESPNow
import utime import utime
import network import network
from presets import Presets from presets import Presets
@@ -10,9 +9,15 @@ import time
import select import select
import socket import socket
import ubinascii import ubinascii
from hello import discover_controller_udp
try:
import uos as os
except ImportError:
import os
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff" BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
CONTROLLER_TCP_PORT = 8765 CONTROLLER_TCP_PORT = 8765
controller_ip = None
settings = Settings() settings = Settings()
print(settings) print(settings)
@@ -49,6 +54,8 @@ def process_data(payload):
apply_select(data) apply_select(data)
if "default" in data: if "default" in data:
apply_default(data) apply_default(data)
if "manifest" in data:
apply_patterns_ota(data)
if "save" in data and ("presets" in data or "default" in data): if "save" in data and ("presets" in data or "default" in data):
presets.save() presets.save()
@@ -100,6 +107,144 @@ def apply_default(data):
settings["default"] = default_name settings["default"] = default_name
def _parse_http_url(url):
"""Parse http://host[:port]/path into (host, port, path)."""
if not isinstance(url, str):
raise ValueError("url must be a string")
if not url.startswith("http://"):
raise ValueError("only http:// URLs are supported")
remainder = url[7:]
slash_idx = remainder.find("/")
if slash_idx == -1:
host_port = remainder
path = "/"
else:
host_port = remainder[:slash_idx]
path = remainder[slash_idx:]
if ":" in host_port:
host, port_s = host_port.rsplit(":", 1)
port = int(port_s)
else:
host = host_port
port = 80
if not host:
raise ValueError("missing host")
return host, port, path
def _http_get_raw(url, timeout_s=10.0):
host, port, path = _parse_http_url(url)
req = (
"GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host)
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host, int(port)))
sock.send(req)
data = b""
while True:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
finally:
try:
sock.close()
except Exception:
pass
sep = b"\r\n\r\n"
if sep not in data:
raise OSError("invalid HTTP response")
head, body = data.split(sep, 1)
status_line = head.split(b"\r\n", 1)[0]
if b" 200 " not in status_line:
raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8"))
return body
def _http_get_json(url, timeout_s=10.0):
body = _http_get_raw(url, timeout_s=timeout_s)
return json.loads(body.decode("utf-8"))
def _http_get_text(url, timeout_s=10.0):
global controller_ip
# Support relative URLs from controller messages.
if isinstance(url, str) and url.startswith("/"):
if not controller_ip:
raise OSError("controller IP unavailable for relative URL")
url = "http://%s%s" % (controller_ip, url)
try:
body = _http_get_raw(url, timeout_s=timeout_s)
return body.decode("utf-8")
except Exception:
# Fallback for mDNS/unresolvable host: retry against current controller IP.
if not controller_ip or not isinstance(url, str) or not url.startswith("http://"):
raise
_host, _port, path = _parse_http_url(url)
fallback = "http://%s:%d%s" % (controller_ip, _port, path)
body = _http_get_raw(fallback, timeout_s=timeout_s)
return body.decode("utf-8")
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def apply_patterns_ota(data):
manifest_payload = data.get("manifest")
if not manifest_payload:
return
try:
if isinstance(manifest_payload, dict):
manifest = manifest_payload
elif isinstance(manifest_payload, str):
manifest = _http_get_json(manifest_payload, timeout_s=20.0)
else:
print("patterns_ota: invalid manifest payload type")
return
files = manifest.get("files", [])
if not isinstance(files, list) or not files:
print("patterns_ota: no files in manifest")
return
try:
os.mkdir("patterns")
except OSError:
pass
updated = 0
for item in files:
if not isinstance(item, dict):
continue
name = item.get("name")
url = item.get("url")
inline_code = item.get("code")
if not _safe_pattern_filename(name):
continue
if isinstance(inline_code, str):
code = inline_code
elif isinstance(url, str):
code = _http_get_text(url, timeout_s=20.0)
else:
continue
with open("patterns/" + name, "w") as f:
f.write(code)
updated += 1
if updated > 0:
presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e:
print("patterns_ota failed:", e)
# --- TCP framing (bytes) → process_data ------------------------------------------- # --- TCP framing (bytes) → process_data -------------------------------------------
@@ -122,16 +267,17 @@ sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
mac = sta_if.config("mac") mac = sta_if.config("mac")
hello = { hello_payload = {
"v": "1", "v": "1",
"device_name": settings.get("name", ""), "device_name": settings.get("name", ""),
"mac": ubinascii.hexlify(mac).decode().lower(), "mac": ubinascii.hexlify(mac).decode().lower(),
"type": "led", "type": "led",
} }
hello_bytes = json.dumps(hello).encode("utf-8") hello_bytes = json.dumps(hello_payload).encode("utf-8")
hello_line = hello_bytes + b"\n"
if settings["transport_type"] == "espnow": if settings["transport_type"] == "espnow":
from espnow import ESPNow # import only in this branch (avoids load when using Wi-Fi)
sta_if.disconnect() sta_if.disconnect()
sta_if.config(channel=settings.get("wifi_channel", 1)) sta_if.config(channel=settings.get("wifi_channel", 1))
e = ESPNow() e = ESPNow()
@@ -152,6 +298,22 @@ elif settings["transport_type"] == "wifi":
while not sta_if.isconnected(): while not sta_if.isconnected():
time.sleep(1) time.sleep(1)
print(f"WiFi connected {sta_if.ifconfig()[0]}") print(f"WiFi connected {sta_if.ifconfig()[0]}")
controller_ip = discover_controller_udp(
device_name=settings.get("name", ""),
wdt=wdt,
)
if not controller_ip:
raise SystemExit("No controller IP discovered for Wi-Fi transport")
def pick_controller_ip(current):
ip = discover_controller_udp(
device_name=settings.get("name", ""),
wdt=wdt,
)
if ip and ip != current:
print("Controller IP updated to", ip)
return ip if ip else current
reconnect_ms = 1000 reconnect_ms = 1000
next_connect_at = 0 next_connect_at = 0
client = None client = None
@@ -165,8 +327,7 @@ elif settings["transport_type"] == "wifi":
c = None c = None
try: try:
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM) c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect((settings["server_ip"], CONTROLLER_TCP_PORT)) c.connect((controller_ip, CONTROLLER_TCP_PORT))
c.send(hello_line)
c.setblocking(False) c.setblocking(False)
p = select.poll() p = select.poll()
p.register(c, select.POLLIN) p.register(c, select.POLLIN)
@@ -180,6 +341,7 @@ elif settings["transport_type"] == "wifi":
c.close() c.close()
except Exception: except Exception:
pass pass
controller_ip = pick_controller_ip(controller_ip)
next_connect_at = utime.ticks_add(now, reconnect_ms) next_connect_at = utime.ticks_add(now, reconnect_ms)
if client is not None and poller is not None: if client is not None and poller is not None:
@@ -221,6 +383,7 @@ elif settings["transport_type"] == "wifi":
client = None client = None
poller = None poller = None
buf = b"" buf = b""
controller_ip = pick_controller_ip(controller_ip)
next_connect_at = utime.ticks_add(now, reconnect_ms) next_connect_at = utime.ticks_add(now, reconnect_ms)
presets.tick() presets.tick()

View File

@@ -1,9 +1,13 @@
from machine import Pin from machine import Pin
from neopixel import NeoPixel from neopixel import NeoPixel
from preset import Preset from preset import Preset
from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys
try:
import uos as os
except ImportError:
import os
class Presets: class Presets:
@@ -18,17 +22,53 @@ class Presets:
self.presets = {} self.presets = {}
self.selected = None self.selected = None
# Register all pattern methods self.reload_patterns()
def reload_patterns(self):
# Register built-in methods first, then discovered pattern classes
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": Blink(self).run,
"rainbow": Rainbow(self).run,
"pulse": Pulse(self).run,
"transition": Transition(self).run,
"chase": Chase(self).run,
"circle": Circle(self).run,
} }
self.patterns.update(self._load_dynamic_patterns())
def _load_dynamic_patterns(self):
loaded = {}
try:
files = os.listdir("patterns")
except OSError:
return loaded
for filename in files:
if not filename.endswith(".py") or filename == "__init__.py":
continue
module_basename = filename[:-3]
module_name = "patterns." + module_basename
try:
if module_name in sys.modules:
del sys.modules[module_name]
module = __import__(module_name, None, None, ["*"])
except Exception as e:
print("Pattern import failed:", module_name, e)
continue
pattern_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
# Pick the first class in the module that exposes run()
if isinstance(attr, type) and hasattr(attr, "run"):
pattern_class = attr
break
if pattern_class is None:
continue
try:
loaded[module_basename] = pattern_class(self).run
except Exception as e:
print("Pattern init failed:", module_name, e)
return loaded
def save(self): def save(self):
"""Save the presets to a file.""" """Save the presets to a file."""

View File

@@ -21,12 +21,12 @@ class Settings(dict):
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow" self["transport_type"] = "wifi"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
# Wi-Fi + TCP to Pi: leave ssid empty for ESP-NOW-only. # Wi-Fi + TCP to controller: set ssid and password. Use transport_type "espnow"
# for ESP-NOW (requires espnow firmware).
self["ssid"] = "" self["ssid"] = ""
self["password"] = "" self["password"] = ""
self["server_ip"] = ""
def save(self): def save(self):
try: try:

268
tests/test_mdns.py Normal file
View File

@@ -0,0 +1,268 @@
#!/usr/bin/env python3
"""mDNS smoke test — runs on the MicroPython device (ESP32).
Loads Wi-Fi credentials from /settings.json via Settings (same as main firmware).
Sets the network hostname before connect so the chip can advertise as <hostname>.local
on builds where mDNS is enabled (see network.hostname() in MicroPython docs).
By default the script stays connected until you stop it (reset or mpremote Ctrl+C) so you
can ping the mDNS name from another machine (e.g. name "a" -> leda.local; hyphens are omitted
in the hostname because ESP32 mDNS often breaks on '-').
After flashing, do a full hardware reset once so the first DHCP sees the new hostname.
Deploy src to the device (including utils.py with mdns_hostname), then from the host:
mpremote connect PORT run tests/test_mdns.py
If ImportError: copy utils.py from src/ to the device, or rely on the built-in fallback below.
Or with cwd led-driver:
mpremote connect /dev/ttyUSB0 run tests/test_mdns.py
"""
import time
import network
import socket
import utime
from machine import WDT
from settings import Settings
try:
from utils import mdns_hostname
except ImportError:
def mdns_hostname(settings):
"""Same as utils.mdns_hostname (fallback if device utils.py is older than host repo)."""
raw = settings.get("name") or "led"
suffix = []
for c in str(raw).lower():
o = ord(c)
if (48 <= o <= 57) or (97 <= o <= 122):
suffix.append(c)
s = "".join(suffix)
if not s:
s = "device"
h = "led" + s
if len(h) > 32:
h = h[:32]
return h
CONNECT_TIMEOUT_S = 45
# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work
# (PHY calibration) runs with no WDT; WDT is only used in HOLD_S.
WDT_TIMEOUT_MS = 10000
# socket.getaddrinfo("<self>.local", …) often hangs a long time or indefinitely on ESP32; off by default.
SELF_LOCAL_GETADDRINFO = False
# After checks: 0 = exit immediately; >0 = stay up that many seconds; -1 = until reset/Ctrl+C (for remote ping).
HOLD_S = -1
# Set False to silence [mdns-test] timing lines (phase labels + elapsed ms since test start).
DEBUG = True
def _dbg(t0, msg):
if not DEBUG:
return
ms = utime.ticks_diff(utime.ticks_ms(), t0)
print("[mdns-test +%dms] %s" % (ms, msg))
def _set_hostname(h, sta):
"""Apply hostname on this STA object before active(True) / connect (DHCP + mDNS)."""
try:
network.hostname(h)
how = "network.hostname"
except (AttributeError, ValueError, OSError) as e:
how = None
last = e
try:
sta.config(hostname=h)
how = how or "WLAN.config(hostname=)"
except (AttributeError, ValueError, OSError) as e:
if how is None:
last = e
if how:
return how
print("Warning: could not set hostname (%s); mDNS name may be default." % last)
return None
def _sta_ip(sta):
try:
pair = sta.ipconfig("addr4")
if isinstance(pair, tuple) and pair:
return pair[0].split("/")[0] if isinstance(pair[0], str) else str(pair[0])
except (AttributeError, OSError, TypeError, ValueError):
pass
return sta.ifconfig()[0]
def _wait_wifi(sta, timeout_s, wdt, t0):
"""Wait for connection. If wdt is set, feed each iteration (keep gap < WDT_TIMEOUT_MS)."""
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
n = 0
while not sta.isconnected():
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
_dbg(t0, "WiFi wait TIMEOUT after %d iterations, status=%s" % (n, sta.status()))
return False
st = sta.status()
n += 1
if DEBUG:
_dbg(t0, "WiFi wait iter #%d status=%s" % (n, st))
else:
print("WiFi status:", st, "(waiting)")
if wdt is not None:
wdt.feed()
time.sleep(1)
if wdt is not None:
wdt.feed()
_dbg(t0, "WiFi connected after %d wait iterations" % n)
return True
def _try_resolve_local(hostname, t0):
"""Best-effort: resolve our own *.local via getaddrinfo (often blocks a very long time on ESP32)."""
fqdn = hostname + ".local"
_dbg(t0, "getaddrinfo(%r) starting (may block a long time)" % fqdn)
t_gai = utime.ticks_ms()
try:
ai = socket.getaddrinfo(fqdn, 80)
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
print("getaddrinfo(%r) -> %s" % (fqdn, ai))
_dbg(t0, "getaddrinfo OK (call took %dms)" % dt)
return True
except OSError as e:
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
print("getaddrinfo(%r) failed: %s" % (fqdn, e))
_dbg(t0, "getaddrinfo OSError after %dms" % dt)
return False
def main():
t0 = utime.ticks_ms()
_dbg(t0, "start")
settings = Settings()
_dbg(t0, "Settings() loaded")
ssid = settings.get("ssid") or ""
password = settings.get("password") or ""
if not ssid:
print("mDNS test skipped: ssid empty in settings.json (configure Wi-Fi to run this test).")
raise SystemExit(0)
hostname = mdns_hostname(settings)
_dbg(t0, "mdns_hostname -> %r" % hostname)
sta = network.WLAN(network.STA_IF)
how = _set_hostname(hostname, sta)
if how:
print("Hostname set via %s: %r" % (how, hostname))
_dbg(t0, "set_hostname done (%s)" % (how or "failed"))
_dbg(t0, "before sta.active(True) (often slow: RF calibration)")
print("WiFi active(True) (can take a while for calibration)...")
sta.active(True)
_dbg(t0, "after sta.active(True)")
try:
sta.config(pm=network.WLAN.PM_NONE)
_dbg(t0, "sta.config(pm=PM_NONE) OK")
except (AttributeError, ValueError, TypeError) as e:
_dbg(t0, "sta.config(pm=PM_NONE) skipped: %s" % e)
print("Connecting to SSID %r ..." % ssid)
_dbg(t0, "before sta.connect()")
sta.connect(ssid, password)
_dbg(t0, "after sta.connect() (returned; association may still be in progress)")
# No WDT during calibration/wait/getaddrinfo — they can block longer than WDT_TIMEOUT_MS.
if not _wait_wifi(sta, CONNECT_TIMEOUT_S, None, t0):
print("Timeout: not connected. status=", sta.status())
raise SystemExit(1)
ip = _sta_ip(sta)
print("WiFi OK, IP:", ip)
try:
stack_host = network.hostname()
except (AttributeError, ValueError, TypeError, OSError):
stack_host = None
if stack_host:
print(
"mDNS: use what the stack reports — ping %s.local (avahi-resolve -n %s.local)"
% (stack_host, stack_host)
)
if str(stack_host) != str(hostname):
print(
"(We asked for %r but stack reports %r — ping the stack name; cold boot may help.)"
% (hostname, stack_host)
)
else:
print("From another machine: ping %s.local" % hostname)
print("(or: avahi-resolve -n %s.local)" % hostname)
if SELF_LOCAL_GETADDRINFO:
_try_resolve_local(hostname, t0)
else:
_dbg(
t0,
"skip getaddrinfo(%s.local): SELF_LOCAL_GETADDRINFO=False (on-device self-.local lookup often hangs)"
% hostname,
)
print(
"Skipped on-device getaddrinfo(*.local); verify mDNS from a PC (ping above). "
"Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)."
)
# Optional: built-in mdns module (not present on all ESP32 builds)
_dbg(t0, "checking for optional 'mdns' module")
try:
import mdns # noqa: F401
print("Note: 'mdns' module is present; check your port's docs for Server/API.")
except ImportError:
print("No top-level 'mdns' module; relying on stack mDNS from hostname.")
_dbg(t0, "mdns import check done")
if HOLD_S != 0:
forever = HOLD_S < 0
_dbg(
t0,
"starting WDT(%dms) + hold %s"
% (WDT_TIMEOUT_MS, "forever" if forever else ("%ds" % HOLD_S)),
)
wdt = WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
if forever:
ping_target = stack_host or hostname
print(
"Staying online until you stop (reset device or mpremote Ctrl+C). "
"From another host: ping %s.local" % ping_target
)
else:
print("Keeping connection up for %d s (Ctrl+C or reset to stop) ..." % HOLD_S)
end = None if forever else utime.ticks_add(utime.ticks_ms(), HOLD_S * 1000)
hold_i = 0
while True:
wdt.feed()
time.sleep(2)
hold_i += 1
if not sta.isconnected():
print("lost WiFi connection")
break
if forever:
if DEBUG and hold_i % 15 == 0:
_dbg(t0, "hold alive #%d IP %s" % (hold_i, _sta_ip(sta)))
else:
_dbg(t0, "hold tick #%d" % hold_i)
print("still connected, IP", _sta_ip(sta))
if utime.ticks_diff(end, utime.ticks_ms()) <= 0:
break
_dbg(t0, "hold loop finished")
_dbg(t0, "Done.")
print("Done.")
if __name__ == "__main__":
main()

71
tests/udp_client.py Normal file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""UDP discovery test — runs on MicroPython (ESP32).
Brings up Wi-Fi from settings (test harness only), then **`hello.discover_controller_udp(device_name, wdt)`**.
`hello` does not use Settings or connect WiFi.
In firmware, **`main.py`** discovers the controller IP in RAM for HTTP; it is not written to settings.
Deploy `src` (including `hello.py`), then from host with cwd `led-driver`:
mpremote connect PORT run tests/udp_client.py
"""
import time
import network
import utime
from machine import WDT
from hello import discover_controller_udp
from settings import Settings
CONNECT_WAIT_S = 45
WDT_MS = 10000
def _wait_wifi(sta, timeout_s, wdt):
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
while not sta.isconnected():
wdt.feed()
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
return False
print("WiFi status:", sta.status())
wdt.feed()
time.sleep(1)
wdt.feed()
return True
def main():
settings = Settings()
ssid = settings.get("ssid") or ""
password = settings.get("password") or ""
if not ssid:
print("udp_client: set ssid/password in settings.json (test harness Wi-Fi).")
raise SystemExit(1)
sta = network.WLAN(network.STA_IF)
sta.active(True)
try:
sta.config(pm=network.WLAN.PM_NONE)
except (AttributeError, ValueError, TypeError):
pass
wdt = WDT(timeout=WDT_MS)
wdt.feed()
print("udp_client: connecting to", repr(ssid))
sta.connect(ssid, password)
wdt.feed()
if not _wait_wifi(sta, CONNECT_WAIT_S, wdt):
print("WiFi timeout, status=", sta.status())
raise SystemExit(1)
ip = discover_controller_udp(settings.get("name", ""), wdt=wdt)
if not ip:
raise SystemExit(1)
if __name__ == "__main__":
main()