From fea4e69140b7142563159daced00972c5275acb5 Mon Sep 17 00:00:00 2001 From: pi Date: Sat, 11 Apr 2026 15:10:12 +1200 Subject: [PATCH] feat(led-driver): wifi default transport, lazy espnow import, dynamic patterns Made-with: Cursor --- src/main.py | 148 +++++++++++++++++++++++++++++++++++++++++++++++- src/presets.py | 56 +++++++++++++++--- src/settings.py | 5 +- 3 files changed, 198 insertions(+), 11 deletions(-) diff --git a/src/main.py b/src/main.py index e183a5e..1cde978 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,5 @@ from settings import Settings from machine import WDT -from espnow import ESPNow import utime import network from presets import Presets @@ -11,9 +10,14 @@ import select import socket import ubinascii from hello import discover_controller_udp +try: + import uos as os +except ImportError: + import os BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff" CONTROLLER_TCP_PORT = 8765 +controller_ip = None settings = Settings() print(settings) @@ -50,6 +54,8 @@ def process_data(payload): apply_select(data) if "default" in data: apply_default(data) + if "manifest" in data: + apply_patterns_ota(data) if "save" in data and ("presets" in data or "default" in data): presets.save() @@ -101,6 +107,144 @@ def apply_default(data): settings["default"] = default_name +def _parse_http_url(url): + """Parse http://host[:port]/path into (host, port, path).""" + if not isinstance(url, str): + raise ValueError("url must be a string") + if not url.startswith("http://"): + raise ValueError("only http:// URLs are supported") + remainder = url[7:] + slash_idx = remainder.find("/") + if slash_idx == -1: + host_port = remainder + path = "/" + else: + host_port = remainder[:slash_idx] + path = remainder[slash_idx:] + if ":" in host_port: + host, port_s = host_port.rsplit(":", 1) + port = int(port_s) + else: + host = host_port + port = 80 + if not host: + raise ValueError("missing host") + return host, port, path + + +def _http_get_raw(url, timeout_s=10.0): + host, port, path = _parse_http_url(url) + req = ( + "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host) + ).encode("utf-8") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.settimeout(timeout_s) + sock.connect((host, int(port))) + sock.send(req) + data = b"" + while True: + chunk = sock.recv(1024) + if not chunk: + break + data += chunk + finally: + try: + sock.close() + except Exception: + pass + sep = b"\r\n\r\n" + if sep not in data: + raise OSError("invalid HTTP response") + head, body = data.split(sep, 1) + status_line = head.split(b"\r\n", 1)[0] + if b" 200 " not in status_line: + raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8")) + return body + + +def _http_get_json(url, timeout_s=10.0): + body = _http_get_raw(url, timeout_s=timeout_s) + return json.loads(body.decode("utf-8")) + + +def _http_get_text(url, timeout_s=10.0): + global controller_ip + # Support relative URLs from controller messages. + if isinstance(url, str) and url.startswith("/"): + if not controller_ip: + raise OSError("controller IP unavailable for relative URL") + url = "http://%s%s" % (controller_ip, url) + try: + body = _http_get_raw(url, timeout_s=timeout_s) + return body.decode("utf-8") + except Exception: + # Fallback for mDNS/unresolvable host: retry against current controller IP. + if not controller_ip or not isinstance(url, str) or not url.startswith("http://"): + raise + _host, _port, path = _parse_http_url(url) + fallback = "http://%s:%d%s" % (controller_ip, _port, path) + body = _http_get_raw(fallback, timeout_s=timeout_s) + return body.decode("utf-8") + + +def _safe_pattern_filename(name): + if not isinstance(name, str): + return False + if not name.endswith(".py"): + return False + if "/" in name or "\\" in name or ".." in name: + return False + return True + + +def apply_patterns_ota(data): + manifest_payload = data.get("manifest") + if not manifest_payload: + return + try: + if isinstance(manifest_payload, dict): + manifest = manifest_payload + elif isinstance(manifest_payload, str): + manifest = _http_get_json(manifest_payload, timeout_s=20.0) + else: + print("patterns_ota: invalid manifest payload type") + return + files = manifest.get("files", []) + if not isinstance(files, list) or not files: + print("patterns_ota: no files in manifest") + return + try: + os.mkdir("patterns") + except OSError: + pass + updated = 0 + for item in files: + if not isinstance(item, dict): + continue + name = item.get("name") + url = item.get("url") + inline_code = item.get("code") + if not _safe_pattern_filename(name): + continue + if isinstance(inline_code, str): + code = inline_code + elif isinstance(url, str): + code = _http_get_text(url, timeout_s=20.0) + else: + continue + with open("patterns/" + name, "w") as f: + f.write(code) + updated += 1 + if updated > 0: + presets.reload_patterns() + print("patterns_ota: updated", updated, "pattern file(s)") + else: + print("patterns_ota: no valid files downloaded") + except Exception as e: + print("patterns_ota failed:", e) + + # --- TCP framing (bytes) → process_data ------------------------------------------- @@ -132,6 +276,8 @@ hello_payload = { hello_bytes = json.dumps(hello_payload).encode("utf-8") if settings["transport_type"] == "espnow": + from espnow import ESPNow # import only in this branch (avoids load when using Wi-Fi) + sta_if.disconnect() sta_if.config(channel=settings.get("wifi_channel", 1)) e = ESPNow() diff --git a/src/presets.py b/src/presets.py index c5163dd..63b7460 100644 --- a/src/presets.py +++ b/src/presets.py @@ -1,9 +1,13 @@ from machine import Pin from neopixel import NeoPixel from preset import Preset -from patterns import Blink, Rainbow, Pulse, Transition, Chase, Circle from utils import convert_and_reorder_colors import json +import sys +try: + import uos as os +except ImportError: + import os class Presets: @@ -18,17 +22,53 @@ class Presets: self.presets = {} self.selected = None - # Register all pattern methods + self.reload_patterns() + + def reload_patterns(self): + # Register built-in methods first, then discovered pattern classes self.patterns = { "off": self.off, "on": self.on, - "blink": Blink(self).run, - "rainbow": Rainbow(self).run, - "pulse": Pulse(self).run, - "transition": Transition(self).run, - "chase": Chase(self).run, - "circle": Circle(self).run, } + self.patterns.update(self._load_dynamic_patterns()) + + def _load_dynamic_patterns(self): + loaded = {} + try: + files = os.listdir("patterns") + except OSError: + return loaded + + for filename in files: + if not filename.endswith(".py") or filename == "__init__.py": + continue + module_basename = filename[:-3] + module_name = "patterns." + module_basename + try: + if module_name in sys.modules: + del sys.modules[module_name] + module = __import__(module_name, None, None, ["*"]) + except Exception as e: + print("Pattern import failed:", module_name, e) + continue + + pattern_class = None + for attr_name in dir(module): + attr = getattr(module, attr_name) + # Pick the first class in the module that exposes run() + if isinstance(attr, type) and hasattr(attr, "run"): + pattern_class = attr + break + + if pattern_class is None: + continue + + try: + loaded[module_basename] = pattern_class(self).run + except Exception as e: + print("Pattern init failed:", module_name, e) + + return loaded def save(self): """Save the presets to a file.""" diff --git a/src/settings.py b/src/settings.py index f28d6bc..b2f15ea 100644 --- a/src/settings.py +++ b/src/settings.py @@ -21,9 +21,10 @@ class Settings(dict): self["debug"] = False self["default"] = "on" self["brightness"] = 32 - self["transport_type"] = "espnow" + self["transport_type"] = "wifi" self["wifi_channel"] = 1 - # Wi-Fi + TCP to Pi: leave ssid empty for ESP-NOW-only. + # Wi-Fi + TCP to controller: set ssid and password. Use transport_type "espnow" + # for ESP-NOW (requires espnow firmware). self["ssid"] = "" self["password"] = ""