"""Parse controller JSON (v1) and apply brightness, presets, OTA patterns, etc.""" import json import socket from binary_envelope import parse_binary_envelope from utils import convert_and_reorder_colors try: import uos as os except ImportError: import os def _log_rx(payload) -> None: """Serial log when led-controller sends a message into ``process_data``.""" try: if isinstance(payload, (bytes, bytearray)): n = len(payload) if n == 0: print("rx 0 B") return cap = 160 chunk = payload if n <= cap else payload[:cap] try: txt = bytes(chunk).decode("utf-8") except Exception: txt = str(chunk) if n > cap: txt = txt + "..." print("rx", n, "B", txt) else: s = str(payload) cap = 200 if len(s) <= cap: print("rx", len(s), "C", s) else: print("rx", len(s), "C", s[:cap] + "...") except Exception: print("rx (logging failed)") def process_data(payload, settings, presets, controller_ip=None): """Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" _log_rx(payload) data = None if isinstance(payload, (bytes, bytearray)): data = parse_binary_envelope(payload) if data is None: try: data = json.loads(payload) except (ValueError, TypeError): return else: try: data = json.loads(payload) except (ValueError, TypeError): return if data.get("v", "") != "1": return if "device_config" in data: apply_device_config(data, settings, presets) if "b" in data: apply_brightness(data, settings, presets) if "presets" in data: apply_presets(data, settings, presets) if "clear_presets" in data: apply_clear_presets(data, presets) if "select" in data: apply_select(data, settings, presets) if "default" in data: apply_default(data, settings, presets) if "manifest" in data: apply_patterns_ota(data, presets, controller_ip=controller_ip) if "save" in data and ("presets" in data or "default" in data): presets.save() if "save" in data and "clear_presets" in data: presets.save() if "save" in data and "b" in data: settings.save() if "save" in data and "device_config" in data: settings.save() _VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"}) _STARTUP_MODES = frozenset({"default", "last", "off"}) _MAX_DEVICE_LEDS = 2048 def apply_startup_pattern(settings, presets): """Apply power-on behaviour from ``startup_mode`` (default / last / off).""" mode = str(settings.get("startup_mode", "default")).lower().strip() if mode not in _STARTUP_MODES: mode = "default" if mode == "off": if presets.select("off"): return presets.fill((0, 0, 0)) return if mode == "last": lp = settings.get("last_preset") or "" if isinstance(lp, str) and lp.strip() and lp.strip() in presets.presets: if presets.select(lp.strip()): return dp = settings.get("default", "") if dp and dp in presets.presets: if not presets.select(dp): print("Startup preset failed (invalid pattern?):", dp) def apply_device_config(data, settings, presets): """Apply fields from v1 ``device_config``; reload presets when strip length or colour order changes.""" dc = data.get("device_config") if not isinstance(dc, dict): return strip_changed = False meta_changed = False if "name" in dc: n = dc["name"] if isinstance(n, str) and n.strip(): settings["name"] = n.strip() meta_changed = True if "num_leds" in dc: try: n = int(dc["num_leds"]) if 1 <= n <= _MAX_DEVICE_LEDS: settings["num_leds"] = n presets.update_num_leds(settings["led_pin"], n) strip_changed = True except (TypeError, ValueError): pass if "color_order" in dc: co = str(dc["color_order"]).lower().strip() if co in _VALID_DEVICE_COLOR_ORDERS: settings["color_order"] = co settings.color_order = settings.get_color_order(co) strip_changed = True if "startup_mode" in dc: sm = str(dc["startup_mode"]).lower().strip() if sm in _STARTUP_MODES: settings["startup_mode"] = sm meta_changed = True if not strip_changed and not meta_changed: return if strip_changed: prev = presets.selected try: presets.load(settings) except Exception as e: print("device_config: presets.load failed:", e) if prev and prev in presets.presets: presets.select(prev) elif settings.get("default") and settings["default"] in presets.presets: presets.select(settings["default"]) def record_last_preset(settings, preset_name): """Persist the last selected preset id (single entry in flash).""" if not isinstance(preset_name, str) or not preset_name: return settings["last_preset"] = preset_name.strip() settings.save() def apply_brightness(data, settings, presets): try: presets.b = max(0, min(255, int(data["b"]))) settings["brightness"] = presets.b except (TypeError, ValueError): pass def apply_presets(data, settings, presets): presets_map = data["presets"] for id, preset_data in presets_map.items(): if not preset_data: continue color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) if color_key is not None: try: preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key], settings ) except (TypeError, ValueError, KeyError): continue if "bg" in preset_data: try: bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) if bg_color: preset_data["bg"] = bg_color[0] except (TypeError, ValueError, KeyError): pass presets.edit(id, preset_data) def apply_select(data, settings, presets): select_map = data["select"] device_name = settings["name"] select_list = select_map.get(device_name, []) if not select_list: return preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None if presets.select(preset_name, step=step): record_last_preset(settings, preset_name) def apply_clear_presets(data, presets): clear_value = data.get("clear_presets") if isinstance(clear_value, bool): should_clear = clear_value elif isinstance(clear_value, int): should_clear = bool(clear_value) elif isinstance(clear_value, str): should_clear = clear_value.lower() in ("true", "1", "yes", "on") else: should_clear = False if not should_clear: return presets.delete_all() def apply_default(data, settings, presets): targets = data.get("targets") or [] default_name = data["default"] if ( settings["name"] in targets and isinstance(default_name, str) and default_name in presets.presets ): settings["default"] = default_name settings.save() def _parse_http_url(url): """Parse http://host[:port]/path into (host, port, path).""" if not isinstance(url, str): raise ValueError("url must be a string") if not url.startswith("http://"): raise ValueError("only http:// URLs are supported") remainder = url[7:] slash_idx = remainder.find("/") if slash_idx == -1: host_port = remainder path = "/" else: host_port = remainder[:slash_idx] path = remainder[slash_idx:] if ":" in host_port: host, port_s = host_port.rsplit(":", 1) port = int(port_s) else: host = host_port port = 80 if not host: raise ValueError("missing host") return host, port, path def _http_get_raw(url, timeout_s=10.0): host, port, path = _parse_http_url(url) req = ( "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host) ).encode("utf-8") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.settimeout(timeout_s) sock.connect((host, int(port))) sock.send(req) data = b"" while True: chunk = sock.recv(1024) if not chunk: break data += chunk finally: try: sock.close() except Exception: pass sep = b"\r\n\r\n" if sep not in data: raise OSError("invalid HTTP response") head, body = data.split(sep, 1) status_line = head.split(b"\r\n", 1)[0] if b" 200 " not in status_line: raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8")) return body def _http_get_json(url, timeout_s=10.0): body = _http_get_raw(url, timeout_s=timeout_s) return json.loads(body.decode("utf-8")) def _http_get_text(url, timeout_s=10.0, controller_ip=None): # Support relative URLs from controller messages. if isinstance(url, str) and url.startswith("/"): if not controller_ip: raise OSError("controller IP unavailable for relative URL") url = "http://%s%s" % (controller_ip, url) try: body = _http_get_raw(url, timeout_s=timeout_s) return body.decode("utf-8") except Exception: # Fallback for mDNS/unresolvable host: retry against current controller IP. if not controller_ip or not isinstance(url, str) or not url.startswith("http://"): raise _host, _port, path = _parse_http_url(url) fallback = "http://%s:%d%s" % (controller_ip, _port, path) body = _http_get_raw(fallback, timeout_s=timeout_s) return body.decode("utf-8") def _safe_pattern_filename(name): if not isinstance(name, str): return False if not name.endswith(".py"): return False if "/" in name or "\\" in name or ".." in name: return False return True def apply_patterns_ota(data, presets, controller_ip=None): manifest_payload = data.get("manifest") if not manifest_payload: return try: if isinstance(manifest_payload, dict): manifest = manifest_payload elif isinstance(manifest_payload, str): manifest = _http_get_json(manifest_payload, timeout_s=20.0) else: print("patterns_ota: invalid manifest payload type") return files = manifest.get("files", []) if not isinstance(files, list) or not files: print("patterns_ota: no files in manifest") return try: os.mkdir("patterns") except OSError: pass updated = 0 for item in files: if not isinstance(item, dict): continue name = item.get("name") url = item.get("url") inline_code = item.get("code") if not _safe_pattern_filename(name): continue if isinstance(inline_code, str): code = inline_code elif isinstance(url, str): code = _http_get_text(url, timeout_s=20.0, controller_ip=controller_ip) else: continue with open("patterns/" + name, "w") as f: f.write(code) updated += 1 if updated > 0: presets.reload_patterns() except Exception as e: print("patterns_ota failed:", e)