feat(led-driver): wifi default transport, lazy espnow import, dynamic patterns

Made-with: Cursor
This commit is contained in:
pi
2026-04-11 15:10:12 +12:00
parent aaaf660e9d
commit fea4e69140
3 changed files with 198 additions and 11 deletions

View File

@@ -1,6 +1,5 @@
from settings import Settings from settings import Settings
from machine import WDT from machine import WDT
from espnow import ESPNow
import utime import utime
import network import network
from presets import Presets from presets import Presets
@@ -11,9 +10,14 @@ import select
import socket import socket
import ubinascii import ubinascii
from hello import discover_controller_udp from hello import discover_controller_udp
try:
import uos as os
except ImportError:
import os
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff" BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
CONTROLLER_TCP_PORT = 8765 CONTROLLER_TCP_PORT = 8765
controller_ip = None
settings = Settings() settings = Settings()
print(settings) print(settings)
@@ -50,6 +54,8 @@ def process_data(payload):
apply_select(data) apply_select(data)
if "default" in data: if "default" in data:
apply_default(data) apply_default(data)
if "manifest" in data:
apply_patterns_ota(data)
if "save" in data and ("presets" in data or "default" in data): if "save" in data and ("presets" in data or "default" in data):
presets.save() presets.save()
@@ -101,6 +107,144 @@ def apply_default(data):
settings["default"] = default_name settings["default"] = default_name
def _parse_http_url(url):
"""Parse http://host[:port]/path into (host, port, path)."""
if not isinstance(url, str):
raise ValueError("url must be a string")
if not url.startswith("http://"):
raise ValueError("only http:// URLs are supported")
remainder = url[7:]
slash_idx = remainder.find("/")
if slash_idx == -1:
host_port = remainder
path = "/"
else:
host_port = remainder[:slash_idx]
path = remainder[slash_idx:]
if ":" in host_port:
host, port_s = host_port.rsplit(":", 1)
port = int(port_s)
else:
host = host_port
port = 80
if not host:
raise ValueError("missing host")
return host, port, path
def _http_get_raw(url, timeout_s=10.0):
host, port, path = _parse_http_url(url)
req = (
"GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host)
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host, int(port)))
sock.send(req)
data = b""
while True:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
finally:
try:
sock.close()
except Exception:
pass
sep = b"\r\n\r\n"
if sep not in data:
raise OSError("invalid HTTP response")
head, body = data.split(sep, 1)
status_line = head.split(b"\r\n", 1)[0]
if b" 200 " not in status_line:
raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8"))
return body
def _http_get_json(url, timeout_s=10.0):
body = _http_get_raw(url, timeout_s=timeout_s)
return json.loads(body.decode("utf-8"))
def _http_get_text(url, timeout_s=10.0):
global controller_ip
# Support relative URLs from controller messages.
if isinstance(url, str) and url.startswith("/"):
if not controller_ip:
raise OSError("controller IP unavailable for relative URL")
url = "http://%s%s" % (controller_ip, url)
try:
body = _http_get_raw(url, timeout_s=timeout_s)
return body.decode("utf-8")
except Exception:
# Fallback for mDNS/unresolvable host: retry against current controller IP.
if not controller_ip or not isinstance(url, str) or not url.startswith("http://"):
raise
_host, _port, path = _parse_http_url(url)
fallback = "http://%s:%d%s" % (controller_ip, _port, path)
body = _http_get_raw(fallback, timeout_s=timeout_s)
return body.decode("utf-8")
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def apply_patterns_ota(data):
manifest_payload = data.get("manifest")
if not manifest_payload:
return
try:
if isinstance(manifest_payload, dict):
manifest = manifest_payload
elif isinstance(manifest_payload, str):
manifest = _http_get_json(manifest_payload, timeout_s=20.0)
else:
print("patterns_ota: invalid manifest payload type")
return
files = manifest.get("files", [])
if not isinstance(files, list) or not files:
print("patterns_ota: no files in manifest")
return
try:
os.mkdir("patterns")
except OSError:
pass
updated = 0
for item in files:
if not isinstance(item, dict):
continue
name = item.get("name")
url = item.get("url")
inline_code = item.get("code")
if not _safe_pattern_filename(name):
continue
if isinstance(inline_code, str):
code = inline_code
elif isinstance(url, str):
code = _http_get_text(url, timeout_s=20.0)
else:
continue
with open("patterns/" + name, "w") as f:
f.write(code)
updated += 1
if updated > 0:
presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e:
print("patterns_ota failed:", e)
# --- TCP framing (bytes) → process_data ------------------------------------------- # --- TCP framing (bytes) → process_data -------------------------------------------
@@ -132,6 +276,8 @@ hello_payload = {
hello_bytes = json.dumps(hello_payload).encode("utf-8") hello_bytes = json.dumps(hello_payload).encode("utf-8")
if settings["transport_type"] == "espnow": if settings["transport_type"] == "espnow":
from espnow import ESPNow # import only in this branch (avoids load when using Wi-Fi)
sta_if.disconnect() sta_if.disconnect()
sta_if.config(channel=settings.get("wifi_channel", 1)) sta_if.config(channel=settings.get("wifi_channel", 1))
e = ESPNow() e = ESPNow()

View File

@@ -1,9 +1,13 @@
from machine import Pin from machine import Pin
from neopixel import NeoPixel from neopixel import NeoPixel
from preset import Preset from preset import Preset
from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
import json import json
import sys
try:
import uos as os
except ImportError:
import os
class Presets: class Presets:
@@ -18,17 +22,53 @@ class Presets:
self.presets = {} self.presets = {}
self.selected = None self.selected = None
# Register all pattern methods self.reload_patterns()
def reload_patterns(self):
# Register built-in methods first, then discovered pattern classes
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on": self.on, "on": self.on,
"blink": Blink(self).run,
"rainbow": Rainbow(self).run,
"pulse": Pulse(self).run,
"transition": Transition(self).run,
"chase": Chase(self).run,
"circle": Circle(self).run,
} }
self.patterns.update(self._load_dynamic_patterns())
def _load_dynamic_patterns(self):
loaded = {}
try:
files = os.listdir("patterns")
except OSError:
return loaded
for filename in files:
if not filename.endswith(".py") or filename == "__init__.py":
continue
module_basename = filename[:-3]
module_name = "patterns." + module_basename
try:
if module_name in sys.modules:
del sys.modules[module_name]
module = __import__(module_name, None, None, ["*"])
except Exception as e:
print("Pattern import failed:", module_name, e)
continue
pattern_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
# Pick the first class in the module that exposes run()
if isinstance(attr, type) and hasattr(attr, "run"):
pattern_class = attr
break
if pattern_class is None:
continue
try:
loaded[module_basename] = pattern_class(self).run
except Exception as e:
print("Pattern init failed:", module_name, e)
return loaded
def save(self): def save(self):
"""Save the presets to a file.""" """Save the presets to a file."""

View File

@@ -21,9 +21,10 @@ class Settings(dict):
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow" self["transport_type"] = "wifi"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
# Wi-Fi + TCP to Pi: leave ssid empty for ESP-NOW-only. # Wi-Fi + TCP to controller: set ssid and password. Use transport_type "espnow"
# for ESP-NOW (requires espnow firmware).
self["ssid"] = "" self["ssid"] = ""
self["password"] = "" self["password"] = ""