Compare commits
4 Commits
cef9e00819
...
ded6e3d360
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ded6e3d360 | ||
|
|
a64457a0d5 | ||
|
|
fea4e69140 | ||
|
|
aaaf660e9d |
44
README.md
44
README.md
@@ -1,36 +1,52 @@
|
|||||||
# LED Driver - MicroPython
|
# LED Driver — MicroPython
|
||||||
|
|
||||||
MicroPython-based LED driver application for ESP32 microcontrollers.
|
MicroPython LED driver for ESP32: presets, patterns, **Wi-Fi** (TCP + UDP discovery) or **ESP-NOW** transport, optional HTTP polling, and dynamic pattern modules under `src/patterns/`.
|
||||||
|
|
||||||
## Prerequisites
|
## Prerequisites
|
||||||
|
|
||||||
- MicroPython firmware installed on ESP32
|
- MicroPython firmware on the ESP32
|
||||||
- USB cable for programming
|
- USB cable for programming
|
||||||
- Python 3 with pipenv
|
- Python 3 with pipenv (on the host, for `dev.py` / tests)
|
||||||
|
|
||||||
## Setup
|
## Setup
|
||||||
|
|
||||||
1. Install dependencies:
|
1. Install dependencies:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pipenv install
|
pipenv install
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Deploy to device:
|
2. Deploy to the device:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pipenv run dev
|
pipenv run dev
|
||||||
```
|
```
|
||||||
|
|
||||||
## Project Structure
|
## Project layout
|
||||||
|
|
||||||
```
|
```
|
||||||
led-driver/
|
led-driver/
|
||||||
├── src/
|
├── src/
|
||||||
│ ├── main.py # Main application code
|
│ ├── main.py # Entry: Wi-Fi/TCP or ESP-NOW path, process_data(), manifest OTA
|
||||||
│ ├── presets.py # LED pattern implementations (includes Preset and Presets classes)
|
│ ├── presets.py # Preset runtime + Presets class
|
||||||
│ ├── settings.py # Settings management
|
│ ├── preset.py # Single preset helpers
|
||||||
│ └── p2p.py # Peer-to-peer communication
|
│ ├── settings.py # settings.json
|
||||||
├── test/ # Pattern tests
|
│ ├── hello.py # UDP discovery (port 8766) / hello payloads
|
||||||
├── web_app.py # Web interface
|
│ ├── http_poll.py # Optional HTTP polling helper
|
||||||
├── dev.py # Development tools
|
│ ├── utils.py # Colour conversion / ordering
|
||||||
└── Pipfile # Python dependencies
|
│ ├── presets.json # Default preset file (on device)
|
||||||
|
│ └── patterns/ # Pattern modules (.py), loaded dynamically
|
||||||
|
├── tests/ # Host-side helpers (e.g. udp_client.py, test_mdns.py)
|
||||||
|
├── test/ # On-device style pattern tests (all.py, patterns/)
|
||||||
|
├── dev.py # Deploy / sync to serial device
|
||||||
|
├── docs/API.md # Wire format (long keys); Pi app docs short keys
|
||||||
|
├── msg.json # Sample message
|
||||||
|
├── Pipfile
|
||||||
|
└── LICENSE
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Transport:** `settings.json` **`transport_type`** is typically **`wifi`** (TCP to the Pi on port **8765**, discovery on **8766**) or **`espnow`**. ESP-NOW code paths are loaded only when needed so a Wi-Fi-only image stays smaller.
|
||||||
|
|
||||||
|
## Further reading
|
||||||
|
|
||||||
|
- **`docs/API.md`** — JSON message fields as used in examples (`pattern`, `colors`, …). The Pi app may send **short keys** (`p`, `c`, …); behaviour matches once normalised on device.
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
# LED Driver ESPNow API Documentation
|
# LED Driver API (message format)
|
||||||
|
|
||||||
This document describes the ESPNow message format for controlling LED driver devices.
|
This document describes the **JSON message format** for controlling LED driver devices. The same object is accepted from **ESP-NOW** (when that transport is enabled) and as **one JSON value per line** over **TCP** in **Wi-Fi** mode (see `src/main.py` on the device).
|
||||||
|
|
||||||
## Message Format
|
## Message Format
|
||||||
|
|
||||||
All messages are JSON objects sent via ESPNow with the following structure:
|
All messages are JSON objects with the following structure:
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
|
|||||||
162
src/hello.py
Normal file
162
src/hello.py
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
"""LED hello payload and UDP broadcast discovery (controller IP via echo on port 8766).
|
||||||
|
|
||||||
|
Wi-Fi must already be connected; this module does not use Settings or call connect().
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import socket
|
||||||
|
import ubinascii
|
||||||
|
|
||||||
|
import network
|
||||||
|
|
||||||
|
# Match led-controller/tests/udp_server.py
|
||||||
|
DISCOVERY_UDP_PORT = 8766
|
||||||
|
DEFAULT_RECV_TIMEOUT_S = 3
|
||||||
|
|
||||||
|
|
||||||
|
def pack_hello_dict(sta, device_name=""):
|
||||||
|
"""Same fields as main HTTP/ESP-NOW hello."""
|
||||||
|
mac = sta.config("mac")
|
||||||
|
return {
|
||||||
|
"v": "1",
|
||||||
|
"device_name": device_name,
|
||||||
|
"mac": ubinascii.hexlify(mac).decode().lower(),
|
||||||
|
"type": "led",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def pack_hello_bytes(sta, device_name=""):
|
||||||
|
return json.dumps(pack_hello_dict(sta, device_name)).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def pack_hello_line(sta, device_name=""):
|
||||||
|
"""JSON hello + newline (HTTP/UDP discovery payloads)."""
|
||||||
|
return pack_hello_bytes(sta, device_name) + b"\n"
|
||||||
|
|
||||||
|
|
||||||
|
def ipv4_broadcast(ip, netmask):
|
||||||
|
"""Directed broadcast (e.g. 192.168.1.0/24 -> 192.168.1.255)."""
|
||||||
|
ia = [int(x) for x in ip.split(".")]
|
||||||
|
im = [int(x) for x in netmask.split(".")]
|
||||||
|
if len(ia) != 4 or len(im) != 4:
|
||||||
|
return None
|
||||||
|
return ".".join(str(ia[i] | (255 - im[i])) for i in range(4))
|
||||||
|
|
||||||
|
|
||||||
|
def udp_discovery_targets(ip, mask):
|
||||||
|
"""(directed_broadcast, port) then limited broadcast."""
|
||||||
|
out = [("255.255.255.255", DISCOVERY_UDP_PORT)]
|
||||||
|
b = ipv4_broadcast(ip, mask)
|
||||||
|
if b:
|
||||||
|
out.insert(0, (b, DISCOVERY_UDP_PORT))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def broadcast_hello_udp(
|
||||||
|
sta,
|
||||||
|
device_name="",
|
||||||
|
*,
|
||||||
|
wait_reply=True,
|
||||||
|
recv_timeout_s=DEFAULT_RECV_TIMEOUT_S,
|
||||||
|
wdt=None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Send pack_hello_line via directed then 255.255.255.255 on DISCOVERY_UDP_PORT.
|
||||||
|
STA must already be connected with a valid IPv4 (caller brings up Wi-Fi).
|
||||||
|
|
||||||
|
If wait_reply, wait for first UDP echo. Returns controller IP string or None.
|
||||||
|
"""
|
||||||
|
ip, mask, _gw, _dns = sta.ifconfig()
|
||||||
|
msg = pack_hello_line(sta, device_name)
|
||||||
|
print("hello:", msg)
|
||||||
|
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
try:
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
except (AttributeError, OSError) as e:
|
||||||
|
print("SO_BROADCAST not set:", e)
|
||||||
|
try:
|
||||||
|
sock.bind((ip, 0))
|
||||||
|
except (AttributeError, OSError, TypeError) as e:
|
||||||
|
try:
|
||||||
|
sock.bind(("0.0.0.0", 0))
|
||||||
|
except (AttributeError, OSError) as e2:
|
||||||
|
print("bind skipped:", e, e2)
|
||||||
|
if wait_reply:
|
||||||
|
try:
|
||||||
|
sock.settimeout(recv_timeout_s)
|
||||||
|
except (AttributeError, OSError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
discovered = None
|
||||||
|
for dest_ip, dest_port in udp_discovery_targets(ip, mask):
|
||||||
|
if wdt is not None:
|
||||||
|
wdt.feed()
|
||||||
|
label = "%s:%s" % (dest_ip, dest_port)
|
||||||
|
target = (dest_ip, dest_port)
|
||||||
|
try:
|
||||||
|
sock.sendto(msg, target)
|
||||||
|
print("sent hello ->", target)
|
||||||
|
except OSError as e:
|
||||||
|
print("sendto failed:", e)
|
||||||
|
continue
|
||||||
|
if not wait_reply:
|
||||||
|
continue
|
||||||
|
if wdt is not None:
|
||||||
|
wdt.feed()
|
||||||
|
try:
|
||||||
|
data, addr = sock.recvfrom(2048)
|
||||||
|
print("reply from", addr, ":", data)
|
||||||
|
remote_ip = addr[0]
|
||||||
|
if data != msg:
|
||||||
|
print("(warning: reply payload differs from hello; still using source IP.)")
|
||||||
|
discovered = remote_ip
|
||||||
|
print("Discovered controller at", remote_ip)
|
||||||
|
break
|
||||||
|
except OSError as e:
|
||||||
|
print("recv (no reply):", e, "via", label)
|
||||||
|
if dest_ip == "255.255.255.255":
|
||||||
|
print(
|
||||||
|
"(hint: many APs drop Wi-Fi client broadcast; try wired server or AP without client isolation.)"
|
||||||
|
)
|
||||||
|
|
||||||
|
sock.close()
|
||||||
|
return discovered
|
||||||
|
|
||||||
|
|
||||||
|
def discover_controller_udp(device_name="", wdt=None):
|
||||||
|
"""
|
||||||
|
Broadcast hello; return controller IP from first UDP echo, or None.
|
||||||
|
STA must already be connected.
|
||||||
|
|
||||||
|
device_name: logical name in the JSON (caller supplies, e.g. from Settings elsewhere).
|
||||||
|
wdt: optional WDT to feed during waits.
|
||||||
|
"""
|
||||||
|
sta = network.WLAN(network.STA_IF)
|
||||||
|
if not sta.isconnected():
|
||||||
|
print("hello: STA not connected — connect Wi-Fi before discovery.")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
ip, mask, _g, _d = sta.ifconfig()
|
||||||
|
if ip == "0.0.0.0":
|
||||||
|
print("hello: STA has no IP address.")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
print("STA IP:", ip, "mask:", mask)
|
||||||
|
|
||||||
|
discovered = broadcast_hello_udp(
|
||||||
|
sta,
|
||||||
|
device_name,
|
||||||
|
wait_reply=True,
|
||||||
|
wdt=wdt,
|
||||||
|
)
|
||||||
|
if discovered:
|
||||||
|
print("discover done; controller =", repr(discovered))
|
||||||
|
else:
|
||||||
|
print("discover done; controller not found")
|
||||||
|
return discovered
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if not discover_controller_udp():
|
||||||
|
raise SystemExit(1)
|
||||||
68
src/http_poll.py
Normal file
68
src/http_poll.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
"""Minimal HTTP/1.1 POST JSON client for driver long-poll (MicroPython)."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import socket
|
||||||
|
|
||||||
|
|
||||||
|
def _send_all(sock, data):
|
||||||
|
n = 0
|
||||||
|
while n < len(data):
|
||||||
|
m = sock.send(data[n:])
|
||||||
|
if m <= 0:
|
||||||
|
raise OSError("socket send failed")
|
||||||
|
n += m
|
||||||
|
|
||||||
|
|
||||||
|
def _read_http_json_body(sock, max_headers=8192):
|
||||||
|
buf = b""
|
||||||
|
while b"\r\n\r\n" not in buf:
|
||||||
|
chunk = sock.recv(256)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
buf += chunk
|
||||||
|
if len(buf) > max_headers:
|
||||||
|
raise OSError("response headers too large")
|
||||||
|
if b"\r\n\r\n" not in buf:
|
||||||
|
raise OSError("incomplete response headers")
|
||||||
|
head, rest = buf.split(b"\r\n\r\n", 1)
|
||||||
|
cl = None
|
||||||
|
for line in head.split(b"\r\n"):
|
||||||
|
if line.lower().startswith(b"content-length:"):
|
||||||
|
try:
|
||||||
|
cl = int(line.split(b":", 1)[1].strip())
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
cl = None
|
||||||
|
if cl is None:
|
||||||
|
body = rest
|
||||||
|
else:
|
||||||
|
body = rest
|
||||||
|
while len(body) < cl:
|
||||||
|
chunk = sock.recv(min(2048, cl - len(body)))
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
body += chunk
|
||||||
|
return json.loads(body.decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def http_driver_poll(host, port, payload_dict, timeout_s=40.0):
|
||||||
|
"""
|
||||||
|
POST ``/driver/v1/poll`` with JSON body; return parsed JSON (expects ``{"lines": [...]}``).
|
||||||
|
"""
|
||||||
|
path = "/driver/v1/poll"
|
||||||
|
body_bytes = json.dumps(payload_dict).encode("utf-8")
|
||||||
|
host_s = str(host)
|
||||||
|
req_head = (
|
||||||
|
"POST %s HTTP/1.1\r\nHost: %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\nConnection: close\r\n\r\n"
|
||||||
|
% (path, host_s, len(body_bytes))
|
||||||
|
).encode("utf-8")
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
|
sock.settimeout(timeout_s)
|
||||||
|
sock.connect((host_s, int(port)))
|
||||||
|
_send_all(sock, req_head + body_bytes)
|
||||||
|
return _read_http_json_body(sock)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
sock.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
175
src/main.py
175
src/main.py
@@ -1,6 +1,5 @@
|
|||||||
from settings import Settings
|
from settings import Settings
|
||||||
from machine import WDT
|
from machine import WDT
|
||||||
from espnow import ESPNow
|
|
||||||
import utime
|
import utime
|
||||||
import network
|
import network
|
||||||
from presets import Presets
|
from presets import Presets
|
||||||
@@ -10,9 +9,15 @@ import time
|
|||||||
import select
|
import select
|
||||||
import socket
|
import socket
|
||||||
import ubinascii
|
import ubinascii
|
||||||
|
from hello import discover_controller_udp
|
||||||
|
try:
|
||||||
|
import uos as os
|
||||||
|
except ImportError:
|
||||||
|
import os
|
||||||
|
|
||||||
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
|
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
|
||||||
CONTROLLER_TCP_PORT = 8765
|
CONTROLLER_TCP_PORT = 8765
|
||||||
|
controller_ip = None
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
print(settings)
|
print(settings)
|
||||||
@@ -49,6 +54,8 @@ def process_data(payload):
|
|||||||
apply_select(data)
|
apply_select(data)
|
||||||
if "default" in data:
|
if "default" in data:
|
||||||
apply_default(data)
|
apply_default(data)
|
||||||
|
if "manifest" in data:
|
||||||
|
apply_patterns_ota(data)
|
||||||
if "save" in data and ("presets" in data or "default" in data):
|
if "save" in data and ("presets" in data or "default" in data):
|
||||||
presets.save()
|
presets.save()
|
||||||
|
|
||||||
@@ -100,6 +107,144 @@ def apply_default(data):
|
|||||||
settings["default"] = default_name
|
settings["default"] = default_name
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_http_url(url):
|
||||||
|
"""Parse http://host[:port]/path into (host, port, path)."""
|
||||||
|
if not isinstance(url, str):
|
||||||
|
raise ValueError("url must be a string")
|
||||||
|
if not url.startswith("http://"):
|
||||||
|
raise ValueError("only http:// URLs are supported")
|
||||||
|
remainder = url[7:]
|
||||||
|
slash_idx = remainder.find("/")
|
||||||
|
if slash_idx == -1:
|
||||||
|
host_port = remainder
|
||||||
|
path = "/"
|
||||||
|
else:
|
||||||
|
host_port = remainder[:slash_idx]
|
||||||
|
path = remainder[slash_idx:]
|
||||||
|
if ":" in host_port:
|
||||||
|
host, port_s = host_port.rsplit(":", 1)
|
||||||
|
port = int(port_s)
|
||||||
|
else:
|
||||||
|
host = host_port
|
||||||
|
port = 80
|
||||||
|
if not host:
|
||||||
|
raise ValueError("missing host")
|
||||||
|
return host, port, path
|
||||||
|
|
||||||
|
|
||||||
|
def _http_get_raw(url, timeout_s=10.0):
|
||||||
|
host, port, path = _parse_http_url(url)
|
||||||
|
req = (
|
||||||
|
"GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host)
|
||||||
|
).encode("utf-8")
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
|
sock.settimeout(timeout_s)
|
||||||
|
sock.connect((host, int(port)))
|
||||||
|
sock.send(req)
|
||||||
|
data = b""
|
||||||
|
while True:
|
||||||
|
chunk = sock.recv(1024)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
data += chunk
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
sock.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
sep = b"\r\n\r\n"
|
||||||
|
if sep not in data:
|
||||||
|
raise OSError("invalid HTTP response")
|
||||||
|
head, body = data.split(sep, 1)
|
||||||
|
status_line = head.split(b"\r\n", 1)[0]
|
||||||
|
if b" 200 " not in status_line:
|
||||||
|
raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8"))
|
||||||
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
def _http_get_json(url, timeout_s=10.0):
|
||||||
|
body = _http_get_raw(url, timeout_s=timeout_s)
|
||||||
|
return json.loads(body.decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def _http_get_text(url, timeout_s=10.0):
|
||||||
|
global controller_ip
|
||||||
|
# Support relative URLs from controller messages.
|
||||||
|
if isinstance(url, str) and url.startswith("/"):
|
||||||
|
if not controller_ip:
|
||||||
|
raise OSError("controller IP unavailable for relative URL")
|
||||||
|
url = "http://%s%s" % (controller_ip, url)
|
||||||
|
try:
|
||||||
|
body = _http_get_raw(url, timeout_s=timeout_s)
|
||||||
|
return body.decode("utf-8")
|
||||||
|
except Exception:
|
||||||
|
# Fallback for mDNS/unresolvable host: retry against current controller IP.
|
||||||
|
if not controller_ip or not isinstance(url, str) or not url.startswith("http://"):
|
||||||
|
raise
|
||||||
|
_host, _port, path = _parse_http_url(url)
|
||||||
|
fallback = "http://%s:%d%s" % (controller_ip, _port, path)
|
||||||
|
body = _http_get_raw(fallback, timeout_s=timeout_s)
|
||||||
|
return body.decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_pattern_filename(name):
|
||||||
|
if not isinstance(name, str):
|
||||||
|
return False
|
||||||
|
if not name.endswith(".py"):
|
||||||
|
return False
|
||||||
|
if "/" in name or "\\" in name or ".." in name:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def apply_patterns_ota(data):
|
||||||
|
manifest_payload = data.get("manifest")
|
||||||
|
if not manifest_payload:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
if isinstance(manifest_payload, dict):
|
||||||
|
manifest = manifest_payload
|
||||||
|
elif isinstance(manifest_payload, str):
|
||||||
|
manifest = _http_get_json(manifest_payload, timeout_s=20.0)
|
||||||
|
else:
|
||||||
|
print("patterns_ota: invalid manifest payload type")
|
||||||
|
return
|
||||||
|
files = manifest.get("files", [])
|
||||||
|
if not isinstance(files, list) or not files:
|
||||||
|
print("patterns_ota: no files in manifest")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.mkdir("patterns")
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
updated = 0
|
||||||
|
for item in files:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
name = item.get("name")
|
||||||
|
url = item.get("url")
|
||||||
|
inline_code = item.get("code")
|
||||||
|
if not _safe_pattern_filename(name):
|
||||||
|
continue
|
||||||
|
if isinstance(inline_code, str):
|
||||||
|
code = inline_code
|
||||||
|
elif isinstance(url, str):
|
||||||
|
code = _http_get_text(url, timeout_s=20.0)
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
with open("patterns/" + name, "w") as f:
|
||||||
|
f.write(code)
|
||||||
|
updated += 1
|
||||||
|
if updated > 0:
|
||||||
|
presets.reload_patterns()
|
||||||
|
print("patterns_ota: updated", updated, "pattern file(s)")
|
||||||
|
else:
|
||||||
|
print("patterns_ota: no valid files downloaded")
|
||||||
|
except Exception as e:
|
||||||
|
print("patterns_ota failed:", e)
|
||||||
|
|
||||||
|
|
||||||
# --- TCP framing (bytes) → process_data -------------------------------------------
|
# --- TCP framing (bytes) → process_data -------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
@@ -122,16 +267,17 @@ sta_if.active(True)
|
|||||||
sta_if.config(pm=network.WLAN.PM_NONE)
|
sta_if.config(pm=network.WLAN.PM_NONE)
|
||||||
|
|
||||||
mac = sta_if.config("mac")
|
mac = sta_if.config("mac")
|
||||||
hello = {
|
hello_payload = {
|
||||||
"v": "1",
|
"v": "1",
|
||||||
"device_name": settings.get("name", ""),
|
"device_name": settings.get("name", ""),
|
||||||
"mac": ubinascii.hexlify(mac).decode().lower(),
|
"mac": ubinascii.hexlify(mac).decode().lower(),
|
||||||
"type": "led",
|
"type": "led",
|
||||||
}
|
}
|
||||||
hello_bytes = json.dumps(hello).encode("utf-8")
|
hello_bytes = json.dumps(hello_payload).encode("utf-8")
|
||||||
hello_line = hello_bytes + b"\n"
|
|
||||||
|
|
||||||
if settings["transport_type"] == "espnow":
|
if settings["transport_type"] == "espnow":
|
||||||
|
from espnow import ESPNow # import only in this branch (avoids load when using Wi-Fi)
|
||||||
|
|
||||||
sta_if.disconnect()
|
sta_if.disconnect()
|
||||||
sta_if.config(channel=settings.get("wifi_channel", 1))
|
sta_if.config(channel=settings.get("wifi_channel", 1))
|
||||||
e = ESPNow()
|
e = ESPNow()
|
||||||
@@ -152,6 +298,22 @@ elif settings["transport_type"] == "wifi":
|
|||||||
while not sta_if.isconnected():
|
while not sta_if.isconnected():
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
print(f"WiFi connected {sta_if.ifconfig()[0]}")
|
print(f"WiFi connected {sta_if.ifconfig()[0]}")
|
||||||
|
controller_ip = discover_controller_udp(
|
||||||
|
device_name=settings.get("name", ""),
|
||||||
|
wdt=wdt,
|
||||||
|
)
|
||||||
|
if not controller_ip:
|
||||||
|
raise SystemExit("No controller IP discovered for Wi-Fi transport")
|
||||||
|
|
||||||
|
def pick_controller_ip(current):
|
||||||
|
ip = discover_controller_udp(
|
||||||
|
device_name=settings.get("name", ""),
|
||||||
|
wdt=wdt,
|
||||||
|
)
|
||||||
|
if ip and ip != current:
|
||||||
|
print("Controller IP updated to", ip)
|
||||||
|
return ip if ip else current
|
||||||
|
|
||||||
reconnect_ms = 1000
|
reconnect_ms = 1000
|
||||||
next_connect_at = 0
|
next_connect_at = 0
|
||||||
client = None
|
client = None
|
||||||
@@ -165,8 +327,7 @@ elif settings["transport_type"] == "wifi":
|
|||||||
c = None
|
c = None
|
||||||
try:
|
try:
|
||||||
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
c.connect((settings["server_ip"], CONTROLLER_TCP_PORT))
|
c.connect((controller_ip, CONTROLLER_TCP_PORT))
|
||||||
c.send(hello_line)
|
|
||||||
c.setblocking(False)
|
c.setblocking(False)
|
||||||
p = select.poll()
|
p = select.poll()
|
||||||
p.register(c, select.POLLIN)
|
p.register(c, select.POLLIN)
|
||||||
@@ -180,6 +341,7 @@ elif settings["transport_type"] == "wifi":
|
|||||||
c.close()
|
c.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
controller_ip = pick_controller_ip(controller_ip)
|
||||||
next_connect_at = utime.ticks_add(now, reconnect_ms)
|
next_connect_at = utime.ticks_add(now, reconnect_ms)
|
||||||
|
|
||||||
if client is not None and poller is not None:
|
if client is not None and poller is not None:
|
||||||
@@ -221,6 +383,7 @@ elif settings["transport_type"] == "wifi":
|
|||||||
client = None
|
client = None
|
||||||
poller = None
|
poller = None
|
||||||
buf = b""
|
buf = b""
|
||||||
|
controller_ip = pick_controller_ip(controller_ip)
|
||||||
next_connect_at = utime.ticks_add(now, reconnect_ms)
|
next_connect_at = utime.ticks_add(now, reconnect_ms)
|
||||||
|
|
||||||
presets.tick()
|
presets.tick()
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
from machine import Pin
|
from machine import Pin
|
||||||
from neopixel import NeoPixel
|
from neopixel import NeoPixel
|
||||||
from preset import Preset
|
from preset import Preset
|
||||||
from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle
|
|
||||||
from utils import convert_and_reorder_colors
|
from utils import convert_and_reorder_colors
|
||||||
import json
|
import json
|
||||||
|
import sys
|
||||||
|
try:
|
||||||
|
import uos as os
|
||||||
|
except ImportError:
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
class Presets:
|
class Presets:
|
||||||
@@ -18,17 +22,53 @@ class Presets:
|
|||||||
self.presets = {}
|
self.presets = {}
|
||||||
self.selected = None
|
self.selected = None
|
||||||
|
|
||||||
# Register all pattern methods
|
self.reload_patterns()
|
||||||
|
|
||||||
|
def reload_patterns(self):
|
||||||
|
# Register built-in methods first, then discovered pattern classes
|
||||||
self.patterns = {
|
self.patterns = {
|
||||||
"off": self.off,
|
"off": self.off,
|
||||||
"on": self.on,
|
"on": self.on,
|
||||||
"blink": Blink(self).run,
|
|
||||||
"rainbow": Rainbow(self).run,
|
|
||||||
"pulse": Pulse(self).run,
|
|
||||||
"transition": Transition(self).run,
|
|
||||||
"chase": Chase(self).run,
|
|
||||||
"circle": Circle(self).run,
|
|
||||||
}
|
}
|
||||||
|
self.patterns.update(self._load_dynamic_patterns())
|
||||||
|
|
||||||
|
def _load_dynamic_patterns(self):
|
||||||
|
loaded = {}
|
||||||
|
try:
|
||||||
|
files = os.listdir("patterns")
|
||||||
|
except OSError:
|
||||||
|
return loaded
|
||||||
|
|
||||||
|
for filename in files:
|
||||||
|
if not filename.endswith(".py") or filename == "__init__.py":
|
||||||
|
continue
|
||||||
|
module_basename = filename[:-3]
|
||||||
|
module_name = "patterns." + module_basename
|
||||||
|
try:
|
||||||
|
if module_name in sys.modules:
|
||||||
|
del sys.modules[module_name]
|
||||||
|
module = __import__(module_name, None, None, ["*"])
|
||||||
|
except Exception as e:
|
||||||
|
print("Pattern import failed:", module_name, e)
|
||||||
|
continue
|
||||||
|
|
||||||
|
pattern_class = None
|
||||||
|
for attr_name in dir(module):
|
||||||
|
attr = getattr(module, attr_name)
|
||||||
|
# Pick the first class in the module that exposes run()
|
||||||
|
if isinstance(attr, type) and hasattr(attr, "run"):
|
||||||
|
pattern_class = attr
|
||||||
|
break
|
||||||
|
|
||||||
|
if pattern_class is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
loaded[module_basename] = pattern_class(self).run
|
||||||
|
except Exception as e:
|
||||||
|
print("Pattern init failed:", module_name, e)
|
||||||
|
|
||||||
|
return loaded
|
||||||
|
|
||||||
def save(self):
|
def save(self):
|
||||||
"""Save the presets to a file."""
|
"""Save the presets to a file."""
|
||||||
|
|||||||
@@ -21,12 +21,12 @@ class Settings(dict):
|
|||||||
self["debug"] = False
|
self["debug"] = False
|
||||||
self["default"] = "on"
|
self["default"] = "on"
|
||||||
self["brightness"] = 32
|
self["brightness"] = 32
|
||||||
self["transport_type"] = "espnow"
|
self["transport_type"] = "wifi"
|
||||||
self["wifi_channel"] = 1
|
self["wifi_channel"] = 1
|
||||||
# Wi-Fi + TCP to Pi: leave ssid empty for ESP-NOW-only.
|
# Wi-Fi + TCP to controller: set ssid and password. Use transport_type "espnow"
|
||||||
|
# for ESP-NOW (requires espnow firmware).
|
||||||
self["ssid"] = ""
|
self["ssid"] = ""
|
||||||
self["password"] = ""
|
self["password"] = ""
|
||||||
self["server_ip"] = ""
|
|
||||||
|
|
||||||
def save(self):
|
def save(self):
|
||||||
try:
|
try:
|
||||||
|
|||||||
268
tests/test_mdns.py
Normal file
268
tests/test_mdns.py
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""mDNS smoke test — runs on the MicroPython device (ESP32).
|
||||||
|
|
||||||
|
Loads Wi-Fi credentials from /settings.json via Settings (same as main firmware).
|
||||||
|
Sets the network hostname before connect so the chip can advertise as <hostname>.local
|
||||||
|
on builds where mDNS is enabled (see network.hostname() in MicroPython docs).
|
||||||
|
|
||||||
|
By default the script stays connected until you stop it (reset or mpremote Ctrl+C) so you
|
||||||
|
can ping the mDNS name from another machine (e.g. name "a" -> leda.local; hyphens are omitted
|
||||||
|
in the hostname because ESP32 mDNS often breaks on '-').
|
||||||
|
|
||||||
|
After flashing, do a full hardware reset once so the first DHCP sees the new hostname.
|
||||||
|
|
||||||
|
Deploy src to the device (including utils.py with mdns_hostname), then from the host:
|
||||||
|
|
||||||
|
mpremote connect PORT run tests/test_mdns.py
|
||||||
|
|
||||||
|
If ImportError: copy utils.py from src/ to the device, or rely on the built-in fallback below.
|
||||||
|
|
||||||
|
Or with cwd led-driver:
|
||||||
|
|
||||||
|
mpremote connect /dev/ttyUSB0 run tests/test_mdns.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import network
|
||||||
|
import socket
|
||||||
|
|
||||||
|
import utime
|
||||||
|
from machine import WDT
|
||||||
|
|
||||||
|
from settings import Settings
|
||||||
|
|
||||||
|
try:
|
||||||
|
from utils import mdns_hostname
|
||||||
|
except ImportError:
|
||||||
|
|
||||||
|
def mdns_hostname(settings):
|
||||||
|
"""Same as utils.mdns_hostname (fallback if device utils.py is older than host repo)."""
|
||||||
|
raw = settings.get("name") or "led"
|
||||||
|
suffix = []
|
||||||
|
for c in str(raw).lower():
|
||||||
|
o = ord(c)
|
||||||
|
if (48 <= o <= 57) or (97 <= o <= 122):
|
||||||
|
suffix.append(c)
|
||||||
|
s = "".join(suffix)
|
||||||
|
if not s:
|
||||||
|
s = "device"
|
||||||
|
h = "led" + s
|
||||||
|
if len(h) > 32:
|
||||||
|
h = h[:32]
|
||||||
|
return h
|
||||||
|
|
||||||
|
CONNECT_TIMEOUT_S = 45
|
||||||
|
# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work
|
||||||
|
# (PHY calibration) runs with no WDT; WDT is only used in HOLD_S.
|
||||||
|
WDT_TIMEOUT_MS = 10000
|
||||||
|
# socket.getaddrinfo("<self>.local", …) often hangs a long time or indefinitely on ESP32; off by default.
|
||||||
|
SELF_LOCAL_GETADDRINFO = False
|
||||||
|
# After checks: 0 = exit immediately; >0 = stay up that many seconds; -1 = until reset/Ctrl+C (for remote ping).
|
||||||
|
HOLD_S = -1
|
||||||
|
# Set False to silence [mdns-test] timing lines (phase labels + elapsed ms since test start).
|
||||||
|
DEBUG = True
|
||||||
|
|
||||||
|
|
||||||
|
def _dbg(t0, msg):
|
||||||
|
if not DEBUG:
|
||||||
|
return
|
||||||
|
ms = utime.ticks_diff(utime.ticks_ms(), t0)
|
||||||
|
print("[mdns-test +%dms] %s" % (ms, msg))
|
||||||
|
|
||||||
|
|
||||||
|
def _set_hostname(h, sta):
|
||||||
|
"""Apply hostname on this STA object before active(True) / connect (DHCP + mDNS)."""
|
||||||
|
try:
|
||||||
|
network.hostname(h)
|
||||||
|
how = "network.hostname"
|
||||||
|
except (AttributeError, ValueError, OSError) as e:
|
||||||
|
how = None
|
||||||
|
last = e
|
||||||
|
try:
|
||||||
|
sta.config(hostname=h)
|
||||||
|
how = how or "WLAN.config(hostname=)"
|
||||||
|
except (AttributeError, ValueError, OSError) as e:
|
||||||
|
if how is None:
|
||||||
|
last = e
|
||||||
|
if how:
|
||||||
|
return how
|
||||||
|
print("Warning: could not set hostname (%s); mDNS name may be default." % last)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _sta_ip(sta):
|
||||||
|
try:
|
||||||
|
pair = sta.ipconfig("addr4")
|
||||||
|
if isinstance(pair, tuple) and pair:
|
||||||
|
return pair[0].split("/")[0] if isinstance(pair[0], str) else str(pair[0])
|
||||||
|
except (AttributeError, OSError, TypeError, ValueError):
|
||||||
|
pass
|
||||||
|
return sta.ifconfig()[0]
|
||||||
|
|
||||||
|
|
||||||
|
def _wait_wifi(sta, timeout_s, wdt, t0):
|
||||||
|
"""Wait for connection. If wdt is set, feed each iteration (keep gap < WDT_TIMEOUT_MS)."""
|
||||||
|
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
|
||||||
|
n = 0
|
||||||
|
while not sta.isconnected():
|
||||||
|
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
|
||||||
|
_dbg(t0, "WiFi wait TIMEOUT after %d iterations, status=%s" % (n, sta.status()))
|
||||||
|
return False
|
||||||
|
st = sta.status()
|
||||||
|
n += 1
|
||||||
|
if DEBUG:
|
||||||
|
_dbg(t0, "WiFi wait iter #%d status=%s" % (n, st))
|
||||||
|
else:
|
||||||
|
print("WiFi status:", st, "(waiting)")
|
||||||
|
if wdt is not None:
|
||||||
|
wdt.feed()
|
||||||
|
time.sleep(1)
|
||||||
|
if wdt is not None:
|
||||||
|
wdt.feed()
|
||||||
|
_dbg(t0, "WiFi connected after %d wait iterations" % n)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _try_resolve_local(hostname, t0):
|
||||||
|
"""Best-effort: resolve our own *.local via getaddrinfo (often blocks a very long time on ESP32)."""
|
||||||
|
fqdn = hostname + ".local"
|
||||||
|
_dbg(t0, "getaddrinfo(%r) starting (may block a long time)" % fqdn)
|
||||||
|
t_gai = utime.ticks_ms()
|
||||||
|
try:
|
||||||
|
ai = socket.getaddrinfo(fqdn, 80)
|
||||||
|
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
|
||||||
|
print("getaddrinfo(%r) -> %s" % (fqdn, ai))
|
||||||
|
_dbg(t0, "getaddrinfo OK (call took %dms)" % dt)
|
||||||
|
return True
|
||||||
|
except OSError as e:
|
||||||
|
dt = utime.ticks_diff(utime.ticks_ms(), t_gai)
|
||||||
|
print("getaddrinfo(%r) failed: %s" % (fqdn, e))
|
||||||
|
_dbg(t0, "getaddrinfo OSError after %dms" % dt)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
t0 = utime.ticks_ms()
|
||||||
|
_dbg(t0, "start")
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
_dbg(t0, "Settings() loaded")
|
||||||
|
ssid = settings.get("ssid") or ""
|
||||||
|
password = settings.get("password") or ""
|
||||||
|
if not ssid:
|
||||||
|
print("mDNS test skipped: ssid empty in settings.json (configure Wi-Fi to run this test).")
|
||||||
|
raise SystemExit(0)
|
||||||
|
|
||||||
|
hostname = mdns_hostname(settings)
|
||||||
|
_dbg(t0, "mdns_hostname -> %r" % hostname)
|
||||||
|
sta = network.WLAN(network.STA_IF)
|
||||||
|
how = _set_hostname(hostname, sta)
|
||||||
|
if how:
|
||||||
|
print("Hostname set via %s: %r" % (how, hostname))
|
||||||
|
_dbg(t0, "set_hostname done (%s)" % (how or "failed"))
|
||||||
|
|
||||||
|
_dbg(t0, "before sta.active(True) (often slow: RF calibration)")
|
||||||
|
print("WiFi active(True) (can take a while for calibration)...")
|
||||||
|
sta.active(True)
|
||||||
|
_dbg(t0, "after sta.active(True)")
|
||||||
|
try:
|
||||||
|
sta.config(pm=network.WLAN.PM_NONE)
|
||||||
|
_dbg(t0, "sta.config(pm=PM_NONE) OK")
|
||||||
|
except (AttributeError, ValueError, TypeError) as e:
|
||||||
|
_dbg(t0, "sta.config(pm=PM_NONE) skipped: %s" % e)
|
||||||
|
|
||||||
|
print("Connecting to SSID %r ..." % ssid)
|
||||||
|
_dbg(t0, "before sta.connect()")
|
||||||
|
sta.connect(ssid, password)
|
||||||
|
_dbg(t0, "after sta.connect() (returned; association may still be in progress)")
|
||||||
|
# No WDT during calibration/wait/getaddrinfo — they can block longer than WDT_TIMEOUT_MS.
|
||||||
|
if not _wait_wifi(sta, CONNECT_TIMEOUT_S, None, t0):
|
||||||
|
print("Timeout: not connected. status=", sta.status())
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
ip = _sta_ip(sta)
|
||||||
|
print("WiFi OK, IP:", ip)
|
||||||
|
try:
|
||||||
|
stack_host = network.hostname()
|
||||||
|
except (AttributeError, ValueError, TypeError, OSError):
|
||||||
|
stack_host = None
|
||||||
|
if stack_host:
|
||||||
|
print(
|
||||||
|
"mDNS: use what the stack reports — ping %s.local (avahi-resolve -n %s.local)"
|
||||||
|
% (stack_host, stack_host)
|
||||||
|
)
|
||||||
|
if str(stack_host) != str(hostname):
|
||||||
|
print(
|
||||||
|
"(We asked for %r but stack reports %r — ping the stack name; cold boot may help.)"
|
||||||
|
% (hostname, stack_host)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("From another machine: ping %s.local" % hostname)
|
||||||
|
print("(or: avahi-resolve -n %s.local)" % hostname)
|
||||||
|
|
||||||
|
if SELF_LOCAL_GETADDRINFO:
|
||||||
|
_try_resolve_local(hostname, t0)
|
||||||
|
else:
|
||||||
|
_dbg(
|
||||||
|
t0,
|
||||||
|
"skip getaddrinfo(%s.local): SELF_LOCAL_GETADDRINFO=False (on-device self-.local lookup often hangs)"
|
||||||
|
% hostname,
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
"Skipped on-device getaddrinfo(*.local); verify mDNS from a PC (ping above). "
|
||||||
|
"Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Optional: built-in mdns module (not present on all ESP32 builds)
|
||||||
|
_dbg(t0, "checking for optional 'mdns' module")
|
||||||
|
try:
|
||||||
|
import mdns # noqa: F401
|
||||||
|
|
||||||
|
print("Note: 'mdns' module is present; check your port's docs for Server/API.")
|
||||||
|
except ImportError:
|
||||||
|
print("No top-level 'mdns' module; relying on stack mDNS from hostname.")
|
||||||
|
_dbg(t0, "mdns import check done")
|
||||||
|
|
||||||
|
if HOLD_S != 0:
|
||||||
|
forever = HOLD_S < 0
|
||||||
|
_dbg(
|
||||||
|
t0,
|
||||||
|
"starting WDT(%dms) + hold %s"
|
||||||
|
% (WDT_TIMEOUT_MS, "forever" if forever else ("%ds" % HOLD_S)),
|
||||||
|
)
|
||||||
|
wdt = WDT(timeout=WDT_TIMEOUT_MS)
|
||||||
|
wdt.feed()
|
||||||
|
if forever:
|
||||||
|
ping_target = stack_host or hostname
|
||||||
|
print(
|
||||||
|
"Staying online until you stop (reset device or mpremote Ctrl+C). "
|
||||||
|
"From another host: ping %s.local" % ping_target
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("Keeping connection up for %d s (Ctrl+C or reset to stop) ..." % HOLD_S)
|
||||||
|
end = None if forever else utime.ticks_add(utime.ticks_ms(), HOLD_S * 1000)
|
||||||
|
hold_i = 0
|
||||||
|
while True:
|
||||||
|
wdt.feed()
|
||||||
|
time.sleep(2)
|
||||||
|
hold_i += 1
|
||||||
|
if not sta.isconnected():
|
||||||
|
print("lost WiFi connection")
|
||||||
|
break
|
||||||
|
if forever:
|
||||||
|
if DEBUG and hold_i % 15 == 0:
|
||||||
|
_dbg(t0, "hold alive #%d IP %s" % (hold_i, _sta_ip(sta)))
|
||||||
|
else:
|
||||||
|
_dbg(t0, "hold tick #%d" % hold_i)
|
||||||
|
print("still connected, IP", _sta_ip(sta))
|
||||||
|
if utime.ticks_diff(end, utime.ticks_ms()) <= 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
_dbg(t0, "hold loop finished")
|
||||||
|
|
||||||
|
_dbg(t0, "Done.")
|
||||||
|
print("Done.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
71
tests/udp_client.py
Normal file
71
tests/udp_client.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""UDP discovery test — runs on MicroPython (ESP32).
|
||||||
|
|
||||||
|
Brings up Wi-Fi from settings (test harness only), then **`hello.discover_controller_udp(device_name, wdt)`**.
|
||||||
|
`hello` does not use Settings or connect Wi‑Fi.
|
||||||
|
|
||||||
|
In firmware, **`main.py`** discovers the controller IP in RAM for HTTP; it is not written to settings.
|
||||||
|
|
||||||
|
Deploy `src` (including `hello.py`), then from host with cwd `led-driver`:
|
||||||
|
|
||||||
|
mpremote connect PORT run tests/udp_client.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import network
|
||||||
|
import utime
|
||||||
|
from machine import WDT
|
||||||
|
|
||||||
|
from hello import discover_controller_udp
|
||||||
|
from settings import Settings
|
||||||
|
|
||||||
|
CONNECT_WAIT_S = 45
|
||||||
|
WDT_MS = 10000
|
||||||
|
|
||||||
|
|
||||||
|
def _wait_wifi(sta, timeout_s, wdt):
|
||||||
|
deadline = utime.ticks_add(utime.ticks_ms(), int(timeout_s * 1000))
|
||||||
|
while not sta.isconnected():
|
||||||
|
wdt.feed()
|
||||||
|
if utime.ticks_diff(deadline, utime.ticks_ms()) <= 0:
|
||||||
|
return False
|
||||||
|
print("WiFi status:", sta.status())
|
||||||
|
wdt.feed()
|
||||||
|
time.sleep(1)
|
||||||
|
wdt.feed()
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
settings = Settings()
|
||||||
|
ssid = settings.get("ssid") or ""
|
||||||
|
password = settings.get("password") or ""
|
||||||
|
|
||||||
|
if not ssid:
|
||||||
|
print("udp_client: set ssid/password in settings.json (test harness Wi-Fi).")
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
sta = network.WLAN(network.STA_IF)
|
||||||
|
sta.active(True)
|
||||||
|
try:
|
||||||
|
sta.config(pm=network.WLAN.PM_NONE)
|
||||||
|
except (AttributeError, ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
wdt = WDT(timeout=WDT_MS)
|
||||||
|
wdt.feed()
|
||||||
|
print("udp_client: connecting to", repr(ssid))
|
||||||
|
sta.connect(ssid, password)
|
||||||
|
wdt.feed()
|
||||||
|
if not _wait_wifi(sta, CONNECT_WAIT_S, wdt):
|
||||||
|
print("WiFi timeout, status=", sta.status())
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
ip = discover_controller_udp(settings.get("name", ""), wdt=wdt)
|
||||||
|
if not ip:
|
||||||
|
raise SystemExit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user