diff --git a/src/controller_messages.py b/src/controller_messages.py index 0e19993..d52dc4e 100644 --- a/src/controller_messages.py +++ b/src/controller_messages.py @@ -2,7 +2,11 @@ import json import socket +import network +import ubinascii +import device_groups as dg +from v1_wire import expand_v1 from binary_envelope import parse_binary_envelope from utils import convert_and_reorder_colors @@ -58,8 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None, save=False): return if data.get("v", "") != "1": return + data = expand_v1(data) if save: data["save"] = True + set_groups = bool(data.get("set_groups")) + groups = data.get("groups") + if set_groups and isinstance(groups, list): + dg.groups_replace(groups) + print("groups set", dg.list_groups()) + elif isinstance(groups, list) and groups: + if not any(dg.in_group(str(g)) for g in groups): + print("ignored: not in groups", groups) + return if "device_config" in data: apply_device_config(data, settings, presets) if "b" in data: @@ -68,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False): apply_presets(data, settings, presets) if "clear_presets" in data: apply_clear_presets(data, presets) - if "select" in data: + elif "select" in data or "s" in data: apply_select(data, settings, presets) if "default" in data: apply_default(data, settings, presets) @@ -172,7 +186,18 @@ def apply_brightness(data, settings, presets): pass +_pending_select = None + + +def _run_select(presets, settings, preset_name, step=None): + if presets.select(preset_name, step=step): + record_last_preset(settings, preset_name) + return True + return False + + def apply_presets(data, settings, presets): + global _pending_select presets_map = data["presets"] for id, preset_data in presets_map.items(): if not preset_data: @@ -183,8 +208,8 @@ def apply_presets(data, settings, presets): preset_data[color_key] = convert_and_reorder_colors( preset_data[color_key], settings ) - except (TypeError, ValueError, KeyError): - continue + except (TypeError, ValueError, KeyError) as err: + print("preset color convert failed:", id, err) if "bg" in preset_data: try: bg_color = convert_and_reorder_colors([preset_data["bg"]], settings) @@ -193,18 +218,72 @@ def apply_presets(data, settings, presets): except (TypeError, ValueError, KeyError): pass presets.edit(id, preset_data) + # Same message often carries select; apply now while presets are loaded. + if "select" in data: + apply_select(data, settings, presets) + elif _pending_select is not None: + preset_name, step = _pending_select + _pending_select = None + if preset_name in presets.presets or preset_name in ("on", "off"): + _run_select(presets, settings, preset_name, step) + + +def _select_list_for_this_device(select_val, settings): + """Resolve select to ``[preset_id, step?]`` (wire list or legacy name map).""" + if isinstance(select_val, list) and select_val: + return select_val + if isinstance(select_val, str) and str(select_val).strip(): + return [str(select_val).strip()] + if not isinstance(select_val, dict) or not select_val: + return None + if "preset" in select_val: + preset_name = select_val.get("preset") + if preset_name is None: + return None + out = [str(preset_name)] + if "step" in select_val: + out.append(select_val["step"]) + return out + device_name = str(settings.get("name") or "").strip() + select_list = select_val.get(device_name) + if select_list: + return select_list + try: + sta = network.WLAN(network.STA_IF) + mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower() + except Exception: + mac_hex = "" + if mac_hex: + for key in select_val: + k = str(key).lower().replace(":", "").replace("-", "") + if mac_hex in k: + return select_val[key] + if len(select_val) == 1: + return next(iter(select_val.values())) + return None def apply_select(data, settings, presets): - select_map = data["select"] - device_name = settings["name"] - select_list = select_map.get(device_name, []) + global _pending_select + select_val = data.get("select") + if select_val is None: + select_val = data.get("s") + select_list = _select_list_for_this_device(select_val, settings) if not select_list: + print("select ignored:", repr(select_val)) + return + preset_name = str(select_list[0]).strip() + if not preset_name: return - preset_name = select_list[0] step = select_list[1] if len(select_list) > 1 else None - if presets.select(preset_name, step=step): - record_last_preset(settings, preset_name) + if preset_name not in presets.presets and preset_name not in ("on", "off"): + _pending_select = (preset_name, step) + print("select deferred (preset not loaded yet):", preset_name) + return + if _run_select(presets, settings, preset_name, step): + _pending_select = None + else: + print("select failed:", preset_name) def apply_clear_presets(data, presets): diff --git a/src/espnow_transport.py b/src/espnow_transport.py index 297cbf8..8142785 100644 --- a/src/espnow_transport.py +++ b/src/espnow_transport.py @@ -28,7 +28,7 @@ def init_espnow(settings): global _esp ch = 6 try: - ch = int(settings.get("wifi_channel", 6)) + ch = int(settings.get("wifi_channel", 1)) except (TypeError, ValueError): pass ch = max(1, min(11, ch)) diff --git a/src/main.py b/src/main.py index c6a53af..b9c7f9b 100644 --- a/src/main.py +++ b/src/main.py @@ -17,6 +17,7 @@ wdt.feed() machine.freq(160000000) settings = Settings() +print(settings) gc.collect() presets = Presets(settings["led_pin"], settings["num_leds"]) @@ -35,11 +36,10 @@ hello = json.dumps({ "name": settings.get("name", "led"), "type": "led", }) -try: - esp.send(BROADCAST_MAC, hello) - print("espnow hello", len(hello), "B") -except Exception as e: - print("espnow hello failed:", e) +print(hello) + +esp.send(BROADCAST_MAC, hello) +print("espnow hello", len(hello), "B") def _on_espnow_message(msg): @@ -62,6 +62,7 @@ async def main(): print(host, len(msg), "B") try: _on_espnow_message(msg) + print(msg) except Exception as e: print("espnow rx error:", e) await asyncio.sleep(0) diff --git a/src/presets.py b/src/presets.py index fe709d9..839f3ec 100644 --- a/src/presets.py +++ b/src/presets.py @@ -4,6 +4,7 @@ from preset import Preset from utils import convert_and_reorder_colors import json import sys +import utime try: import uos as os except ImportError: @@ -31,6 +32,7 @@ class Presets: self.patterns = { "off": self.off, "on": self.on, + "blink": self.blink, } self.patterns.update(self._load_dynamic_patterns()) @@ -193,6 +195,12 @@ class Presets: if preset_name in self.presets: preset = self.presets[preset_name] if preset.p in self.patterns: + if preset.p == "off": + self.generator = None + self.step = 0 + self.fill((0, 0, 0)) + self.selected = preset_name + return True # Manual single-shot patterns: if this select arrives before the main loop has # tick()'d the previous frame, completing it first keeps step in sync with beats. if ( @@ -206,7 +214,7 @@ class Presets: # Set step value if explicitly provided if step is not None: self.step = step - elif preset.p == "off" or self.selected != preset_name: + elif self.selected != preset_name: self.step = 0 self.generator = self.patterns[preset.p](preset) self.selected = preset_name # Store the preset name, not the object @@ -256,4 +264,33 @@ class Presets: def on(self, preset): colors = preset.c color = colors[0] if colors else (255, 255, 255) - self.fill(self.apply_brightness(color, preset.b)) + lit = self.apply_brightness(color, preset.b) + while True: + self.fill(lit) + yield + + def blink(self, preset): + """Built-in blink (used by controller identify); no patterns/ deploy required.""" + colors = preset.c if preset.c else [(255, 255, 255)] + bg_color = self.apply_brightness(preset.background_or(colors), preset.b) + color_index = 0 + state = True + last_update = utime.ticks_ms() + while True: + now = utime.ticks_ms() + delay_ms = max(1, int(preset.d)) + if utime.ticks_diff(now, last_update) >= delay_ms: + if state: + base = colors[color_index % len(colors)] + self.fill(self.apply_brightness(base, preset.b)) + color_index += 1 + else: + self.fill(bg_color) + state = not state + last_update = utime.ticks_add(last_update, delay_ms) + yield + + +def run_tick(presets): + """Advance one animation frame (standalone tests / mpremote demos).""" + presets.tick() diff --git a/src/v1_wire.py b/src/v1_wire.py new file mode 100644 index 0000000..2a2aaf2 --- /dev/null +++ b/src/v1_wire.py @@ -0,0 +1,36 @@ +"""Expand short v1 wire keys to long names (MicroPython).""" + + +K_PRESETS = "p" +K_SELECT = "s" +K_GROUPS = "g" +K_SET_GROUPS = "sg" +K_SAVE = "sv" +K_DEFAULT = "df" +K_DEVICE_CONFIG = "dc" +K_CLEAR_PRESETS = "cp" +K_MANIFEST = "mf" + +_SHORT_TO_LONG = { + K_PRESETS: "presets", + K_SELECT: "select", + K_GROUPS: "groups", + K_SET_GROUPS: "set_groups", + K_SAVE: "save", + K_DEFAULT: "default", + K_DEVICE_CONFIG: "device_config", + K_CLEAR_PRESETS: "clear_presets", + K_MANIFEST: "manifest", +} + + +def expand_v1(data): + if not isinstance(data, dict): + return data + out = dict(data) + for short_key, long_key in _SHORT_TO_LONG.items(): + if short_key in data and long_key not in out: + out[long_key] = data[short_key] + if short_key in out: + del out[short_key] + return out diff --git a/tests/bridge_ws_blink.py b/tests/bridge_ws_blink.py new file mode 100644 index 0000000..9fe16ad --- /dev/null +++ b/tests/bridge_ws_blink.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket. + +Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot, +pattern, and colours; this script reaches the driver over ESP-NOW through +``espnow-sender`` (devices envelope, not legacy broadcast JSON). + +Run from the **led-controller** repo (needs ``websockets`` in Pipenv):: + + pipenv run python led-driver/tests/bridge_ws_blink.py + + pipenv run python led-driver/tests/bridge_ws_blink.py \\ + --url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8 + +From **led-driver** (if Pipenv/env is the parent project):: + + pipenv run python tests/bridge_ws_blink.py --dry-run +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import re +import sys +from pathlib import Path +from typing import Any, Dict, Optional + +# led-driver/tests -> led-driver -> led-controller +LED_DRIVER_ROOT = Path(__file__).resolve().parents[1] +PROJECT_ROOT = LED_DRIVER_ROOT.parent + + +def _load_bridge_url(explicit: Optional[str]) -> str: + if explicit and explicit.strip(): + return explicit.strip() + for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"): + if not path.is_file(): + continue + try: + data = json.loads(path.read_text(encoding="utf-8")) + url = str(data.get("bridge_ws_url") or "").strip() + if url: + return url + except (OSError, json.JSONDecodeError, TypeError): + pass + return "ws://192.168.4.1/ws" + + +def _format_mac(mac: str) -> str: + s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower()) + if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s): + raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)") + return ":".join(s[i : i + 2] for i in range(0, 12, 2)) + + +def build_blink_envelope( + mac: str, + *, + preset_id: str = "2", + delay_ms: int = 200, + brightness: int = 64, +) -> Dict[str, Any]: + """v1 devices envelope: preset body + list select (same shape as the Pi).""" + body = { + "p": { + preset_id: { + "p": "blink", + "b": max(0, min(255, int(brightness))), + "d": max(1, int(delay_ms)), + "c": ["#FF0000", "#0000FF"], + "a": True, + } + }, + "s": [str(preset_id)], + } + return {"v": "1", "dv": {_format_mac(mac): body}} + + +async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None: + import websockets + + packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8") + print(f"connecting to {url}") + async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: + print(f"connected, sending {len(packet)} B") + print(packet.decode("utf-8")) + await ws.send(packet) + if hold_s > 0: + print(f"holding connection {hold_s}s …") + await asyncio.sleep(hold_s) + print("done") + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Send blink preset+select to one driver via bridge WebSocket.", + ) + parser.add_argument( + "--url", + default=None, + help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)", + ) + parser.add_argument( + "--mac", + default="188b0e1560a8", + help="Driver MAC (12 hex, colons optional). Default: registry example id.", + ) + parser.add_argument( + "--preset-id", + default="2", + help="Wire preset slot id (default: 2, matches zone push)", + ) + parser.add_argument( + "--delay-ms", + type=int, + default=200, + help="Blink delay in ms (default: 200)", + ) + parser.add_argument( + "--brightness", + type=int, + default=64, + help="Preset brightness 0–255 (default: 64)", + ) + parser.add_argument( + "--hold", + type=float, + default=2.0, + help="Seconds to keep WebSocket open after send (default: 2)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print envelope only; do not connect", + ) + args = parser.parse_args() + + url = _load_bridge_url(args.url) + try: + envelope = build_blink_envelope( + args.mac, + preset_id=args.preset_id, + delay_ms=args.delay_ms, + brightness=args.brightness, + ) + except ValueError as e: + print(f"error: {e}", file=sys.stderr) + return 1 + + print(f"url={url!r} mac={_format_mac(args.mac)!r}") + if args.dry_run: + print(json.dumps(envelope, indent=2)) + return 0 + + try: + asyncio.run(_send(url, envelope, args.hold)) + except KeyboardInterrupt: + print("interrupted") + return 130 + except Exception as e: + print(f"failed: {e!r}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/patterns/blink.py b/tests/patterns/blink.py index 124de8f..edfeba2 100644 --- a/tests/patterns/blink.py +++ b/tests/patterns/blink.py @@ -1,35 +1,74 @@ #!/usr/bin/env python3 +"""Standalone blink pattern demo (WDT-fed tick loop). + +Run on device:: + + mpremote connect run tests/patterns/blink.py +""" +import sys import utime -from machine import WDT -from settings import Settings -from presets import Presets, run_tick + + +def _bootstrap_import_path(): + """Find ``settings`` / ``presets`` on device or when run via mpremote.""" + try: + import uos as os + except ImportError: + import os + + candidates = [] + try: + here = __file__.rsplit("/", 1)[0] + if here: + candidates.append(here) + tests = here.rsplit("/", 1)[0] + if tests: + candidates.append(tests) + root = tests.rsplit("/", 1)[0] + if root: + candidates.append(root) + candidates.append(root + "/src") + except NameError: + pass + for p in (".", "..", "/", "src", "/src"): + candidates.append(p) + for p in candidates: + if p and p not in sys.path: + sys.path.insert(0, p) + + +_bootstrap_import_path() + +from machine import WDT # noqa: E402 +from settings import Settings # noqa: E402 +from presets import Presets # noqa: E402 + + +def _run_ms(presets, wdt, duration_ms, sleep_ms=10): + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: + wdt.feed() + presets.tick() + utime.sleep_ms(sleep_ms) def main(): - s = Settings() - pin = s.get("led_pin", 10) - num = s.get("num_leds", 30) - - p = Presets(pin=pin, num_leds=num) + settings = Settings() + presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30)) wdt = WDT(timeout=10000) - - # Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors) - p.edit("test_blink", { - "p": "blink", - "b": 64, - "d": 200, - "c": [(255, 0, 0), (0, 0, 255)], - }) - p.select("test_blink") - start = utime.ticks_ms() - while utime.ticks_diff(utime.ticks_ms(), start) < 1500: - wdt.feed() - run_tick(p) - utime.sleep_ms(10) + presets.edit( + "test_blink", + { + "p": "blink", + "b": 64, + "d": 200, + "c": [(255, 0, 0), (0, 0, 255)], + }, + ) + presets.select("test_blink") + _run_ms(presets, wdt, 1500) if __name__ == "__main__": main() - -