Compare commits
1 Commits
1fdb2c9441
...
a97f6c7c2c
| Author | SHA1 | Date | |
|---|---|---|---|
| a97f6c7c2c |
@@ -2,7 +2,11 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import socket
|
import socket
|
||||||
|
import network
|
||||||
|
import ubinascii
|
||||||
|
|
||||||
|
import device_groups as dg
|
||||||
|
from v1_wire import expand_v1
|
||||||
from binary_envelope import parse_binary_envelope
|
from binary_envelope import parse_binary_envelope
|
||||||
from utils import convert_and_reorder_colors
|
from utils import convert_and_reorder_colors
|
||||||
|
|
||||||
@@ -58,8 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
|
|||||||
return
|
return
|
||||||
if data.get("v", "") != "1":
|
if data.get("v", "") != "1":
|
||||||
return
|
return
|
||||||
|
data = expand_v1(data)
|
||||||
if save:
|
if save:
|
||||||
data["save"] = True
|
data["save"] = True
|
||||||
|
set_groups = bool(data.get("set_groups"))
|
||||||
|
groups = data.get("groups")
|
||||||
|
if set_groups and isinstance(groups, list):
|
||||||
|
dg.groups_replace(groups)
|
||||||
|
print("groups set", dg.list_groups())
|
||||||
|
elif isinstance(groups, list) and groups:
|
||||||
|
if not any(dg.in_group(str(g)) for g in groups):
|
||||||
|
print("ignored: not in groups", groups)
|
||||||
|
return
|
||||||
if "device_config" in data:
|
if "device_config" in data:
|
||||||
apply_device_config(data, settings, presets)
|
apply_device_config(data, settings, presets)
|
||||||
if "b" in data:
|
if "b" in data:
|
||||||
@@ -68,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None, save=False):
|
|||||||
apply_presets(data, settings, presets)
|
apply_presets(data, settings, presets)
|
||||||
if "clear_presets" in data:
|
if "clear_presets" in data:
|
||||||
apply_clear_presets(data, presets)
|
apply_clear_presets(data, presets)
|
||||||
if "select" in data:
|
elif "select" in data or "s" in data:
|
||||||
apply_select(data, settings, presets)
|
apply_select(data, settings, presets)
|
||||||
if "default" in data:
|
if "default" in data:
|
||||||
apply_default(data, settings, presets)
|
apply_default(data, settings, presets)
|
||||||
@@ -172,7 +186,18 @@ def apply_brightness(data, settings, presets):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
_pending_select = None
|
||||||
|
|
||||||
|
|
||||||
|
def _run_select(presets, settings, preset_name, step=None):
|
||||||
|
if presets.select(preset_name, step=step):
|
||||||
|
record_last_preset(settings, preset_name)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def apply_presets(data, settings, presets):
|
def apply_presets(data, settings, presets):
|
||||||
|
global _pending_select
|
||||||
presets_map = data["presets"]
|
presets_map = data["presets"]
|
||||||
for id, preset_data in presets_map.items():
|
for id, preset_data in presets_map.items():
|
||||||
if not preset_data:
|
if not preset_data:
|
||||||
@@ -183,8 +208,8 @@ def apply_presets(data, settings, presets):
|
|||||||
preset_data[color_key] = convert_and_reorder_colors(
|
preset_data[color_key] = convert_and_reorder_colors(
|
||||||
preset_data[color_key], settings
|
preset_data[color_key], settings
|
||||||
)
|
)
|
||||||
except (TypeError, ValueError, KeyError):
|
except (TypeError, ValueError, KeyError) as err:
|
||||||
continue
|
print("preset color convert failed:", id, err)
|
||||||
if "bg" in preset_data:
|
if "bg" in preset_data:
|
||||||
try:
|
try:
|
||||||
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
|
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
|
||||||
@@ -193,18 +218,72 @@ def apply_presets(data, settings, presets):
|
|||||||
except (TypeError, ValueError, KeyError):
|
except (TypeError, ValueError, KeyError):
|
||||||
pass
|
pass
|
||||||
presets.edit(id, preset_data)
|
presets.edit(id, preset_data)
|
||||||
|
# Same message often carries select; apply now while presets are loaded.
|
||||||
|
if "select" in data:
|
||||||
|
apply_select(data, settings, presets)
|
||||||
|
elif _pending_select is not None:
|
||||||
|
preset_name, step = _pending_select
|
||||||
|
_pending_select = None
|
||||||
|
if preset_name in presets.presets or preset_name in ("on", "off"):
|
||||||
|
_run_select(presets, settings, preset_name, step)
|
||||||
|
|
||||||
|
|
||||||
|
def _select_list_for_this_device(select_val, settings):
|
||||||
|
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
|
||||||
|
if isinstance(select_val, list) and select_val:
|
||||||
|
return select_val
|
||||||
|
if isinstance(select_val, str) and str(select_val).strip():
|
||||||
|
return [str(select_val).strip()]
|
||||||
|
if not isinstance(select_val, dict) or not select_val:
|
||||||
|
return None
|
||||||
|
if "preset" in select_val:
|
||||||
|
preset_name = select_val.get("preset")
|
||||||
|
if preset_name is None:
|
||||||
|
return None
|
||||||
|
out = [str(preset_name)]
|
||||||
|
if "step" in select_val:
|
||||||
|
out.append(select_val["step"])
|
||||||
|
return out
|
||||||
|
device_name = str(settings.get("name") or "").strip()
|
||||||
|
select_list = select_val.get(device_name)
|
||||||
|
if select_list:
|
||||||
|
return select_list
|
||||||
|
try:
|
||||||
|
sta = network.WLAN(network.STA_IF)
|
||||||
|
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
|
||||||
|
except Exception:
|
||||||
|
mac_hex = ""
|
||||||
|
if mac_hex:
|
||||||
|
for key in select_val:
|
||||||
|
k = str(key).lower().replace(":", "").replace("-", "")
|
||||||
|
if mac_hex in k:
|
||||||
|
return select_val[key]
|
||||||
|
if len(select_val) == 1:
|
||||||
|
return next(iter(select_val.values()))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def apply_select(data, settings, presets):
|
def apply_select(data, settings, presets):
|
||||||
select_map = data["select"]
|
global _pending_select
|
||||||
device_name = settings["name"]
|
select_val = data.get("select")
|
||||||
select_list = select_map.get(device_name, [])
|
if select_val is None:
|
||||||
|
select_val = data.get("s")
|
||||||
|
select_list = _select_list_for_this_device(select_val, settings)
|
||||||
if not select_list:
|
if not select_list:
|
||||||
|
print("select ignored:", repr(select_val))
|
||||||
|
return
|
||||||
|
preset_name = str(select_list[0]).strip()
|
||||||
|
if not preset_name:
|
||||||
return
|
return
|
||||||
preset_name = select_list[0]
|
|
||||||
step = select_list[1] if len(select_list) > 1 else None
|
step = select_list[1] if len(select_list) > 1 else None
|
||||||
if presets.select(preset_name, step=step):
|
if preset_name not in presets.presets and preset_name not in ("on", "off"):
|
||||||
record_last_preset(settings, preset_name)
|
_pending_select = (preset_name, step)
|
||||||
|
print("select deferred (preset not loaded yet):", preset_name)
|
||||||
|
return
|
||||||
|
if _run_select(presets, settings, preset_name, step):
|
||||||
|
_pending_select = None
|
||||||
|
else:
|
||||||
|
print("select failed:", preset_name)
|
||||||
|
|
||||||
|
|
||||||
def apply_clear_presets(data, presets):
|
def apply_clear_presets(data, presets):
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ def init_espnow(settings):
|
|||||||
global _esp
|
global _esp
|
||||||
ch = 6
|
ch = 6
|
||||||
try:
|
try:
|
||||||
ch = int(settings.get("wifi_channel", 6))
|
ch = int(settings.get("wifi_channel", 1))
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
pass
|
pass
|
||||||
ch = max(1, min(11, ch))
|
ch = max(1, min(11, ch))
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ wdt.feed()
|
|||||||
machine.freq(160000000)
|
machine.freq(160000000)
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
print(settings)
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
presets = Presets(settings["led_pin"], settings["num_leds"])
|
presets = Presets(settings["led_pin"], settings["num_leds"])
|
||||||
@@ -35,11 +36,10 @@ hello = json.dumps({
|
|||||||
"name": settings.get("name", "led"),
|
"name": settings.get("name", "led"),
|
||||||
"type": "led",
|
"type": "led",
|
||||||
})
|
})
|
||||||
try:
|
print(hello)
|
||||||
|
|
||||||
esp.send(BROADCAST_MAC, hello)
|
esp.send(BROADCAST_MAC, hello)
|
||||||
print("espnow hello", len(hello), "B")
|
print("espnow hello", len(hello), "B")
|
||||||
except Exception as e:
|
|
||||||
print("espnow hello failed:", e)
|
|
||||||
|
|
||||||
|
|
||||||
def _on_espnow_message(msg):
|
def _on_espnow_message(msg):
|
||||||
@@ -62,6 +62,7 @@ async def main():
|
|||||||
print(host, len(msg), "B")
|
print(host, len(msg), "B")
|
||||||
try:
|
try:
|
||||||
_on_espnow_message(msg)
|
_on_espnow_message(msg)
|
||||||
|
print(msg)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("espnow rx error:", e)
|
print("espnow rx error:", e)
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from preset import Preset
|
|||||||
from utils import convert_and_reorder_colors
|
from utils import convert_and_reorder_colors
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
|
import utime
|
||||||
try:
|
try:
|
||||||
import uos as os
|
import uos as os
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -31,6 +32,7 @@ class Presets:
|
|||||||
self.patterns = {
|
self.patterns = {
|
||||||
"off": self.off,
|
"off": self.off,
|
||||||
"on": self.on,
|
"on": self.on,
|
||||||
|
"blink": self.blink,
|
||||||
}
|
}
|
||||||
self.patterns.update(self._load_dynamic_patterns())
|
self.patterns.update(self._load_dynamic_patterns())
|
||||||
|
|
||||||
@@ -193,6 +195,12 @@ class Presets:
|
|||||||
if preset_name in self.presets:
|
if preset_name in self.presets:
|
||||||
preset = self.presets[preset_name]
|
preset = self.presets[preset_name]
|
||||||
if preset.p in self.patterns:
|
if preset.p in self.patterns:
|
||||||
|
if preset.p == "off":
|
||||||
|
self.generator = None
|
||||||
|
self.step = 0
|
||||||
|
self.fill((0, 0, 0))
|
||||||
|
self.selected = preset_name
|
||||||
|
return True
|
||||||
# Manual single-shot patterns: if this select arrives before the main loop has
|
# Manual single-shot patterns: if this select arrives before the main loop has
|
||||||
# tick()'d the previous frame, completing it first keeps step in sync with beats.
|
# tick()'d the previous frame, completing it first keeps step in sync with beats.
|
||||||
if (
|
if (
|
||||||
@@ -206,7 +214,7 @@ class Presets:
|
|||||||
# Set step value if explicitly provided
|
# Set step value if explicitly provided
|
||||||
if step is not None:
|
if step is not None:
|
||||||
self.step = step
|
self.step = step
|
||||||
elif preset.p == "off" or self.selected != preset_name:
|
elif self.selected != preset_name:
|
||||||
self.step = 0
|
self.step = 0
|
||||||
self.generator = self.patterns[preset.p](preset)
|
self.generator = self.patterns[preset.p](preset)
|
||||||
self.selected = preset_name # Store the preset name, not the object
|
self.selected = preset_name # Store the preset name, not the object
|
||||||
@@ -256,4 +264,33 @@ class Presets:
|
|||||||
def on(self, preset):
|
def on(self, preset):
|
||||||
colors = preset.c
|
colors = preset.c
|
||||||
color = colors[0] if colors else (255, 255, 255)
|
color = colors[0] if colors else (255, 255, 255)
|
||||||
self.fill(self.apply_brightness(color, preset.b))
|
lit = self.apply_brightness(color, preset.b)
|
||||||
|
while True:
|
||||||
|
self.fill(lit)
|
||||||
|
yield
|
||||||
|
|
||||||
|
def blink(self, preset):
|
||||||
|
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
|
||||||
|
colors = preset.c if preset.c else [(255, 255, 255)]
|
||||||
|
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
|
||||||
|
color_index = 0
|
||||||
|
state = True
|
||||||
|
last_update = utime.ticks_ms()
|
||||||
|
while True:
|
||||||
|
now = utime.ticks_ms()
|
||||||
|
delay_ms = max(1, int(preset.d))
|
||||||
|
if utime.ticks_diff(now, last_update) >= delay_ms:
|
||||||
|
if state:
|
||||||
|
base = colors[color_index % len(colors)]
|
||||||
|
self.fill(self.apply_brightness(base, preset.b))
|
||||||
|
color_index += 1
|
||||||
|
else:
|
||||||
|
self.fill(bg_color)
|
||||||
|
state = not state
|
||||||
|
last_update = utime.ticks_add(last_update, delay_ms)
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
def run_tick(presets):
|
||||||
|
"""Advance one animation frame (standalone tests / mpremote demos)."""
|
||||||
|
presets.tick()
|
||||||
|
|||||||
36
src/v1_wire.py
Normal file
36
src/v1_wire.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""Expand short v1 wire keys to long names (MicroPython)."""
|
||||||
|
|
||||||
|
|
||||||
|
K_PRESETS = "p"
|
||||||
|
K_SELECT = "s"
|
||||||
|
K_GROUPS = "g"
|
||||||
|
K_SET_GROUPS = "sg"
|
||||||
|
K_SAVE = "sv"
|
||||||
|
K_DEFAULT = "df"
|
||||||
|
K_DEVICE_CONFIG = "dc"
|
||||||
|
K_CLEAR_PRESETS = "cp"
|
||||||
|
K_MANIFEST = "mf"
|
||||||
|
|
||||||
|
_SHORT_TO_LONG = {
|
||||||
|
K_PRESETS: "presets",
|
||||||
|
K_SELECT: "select",
|
||||||
|
K_GROUPS: "groups",
|
||||||
|
K_SET_GROUPS: "set_groups",
|
||||||
|
K_SAVE: "save",
|
||||||
|
K_DEFAULT: "default",
|
||||||
|
K_DEVICE_CONFIG: "device_config",
|
||||||
|
K_CLEAR_PRESETS: "clear_presets",
|
||||||
|
K_MANIFEST: "manifest",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def expand_v1(data):
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return data
|
||||||
|
out = dict(data)
|
||||||
|
for short_key, long_key in _SHORT_TO_LONG.items():
|
||||||
|
if short_key in data and long_key not in out:
|
||||||
|
out[long_key] = data[short_key]
|
||||||
|
if short_key in out:
|
||||||
|
del out[short_key]
|
||||||
|
return out
|
||||||
169
tests/bridge_ws_blink.py
Normal file
169
tests/bridge_ws_blink.py
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
|
||||||
|
|
||||||
|
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
|
||||||
|
pattern, and colours; this script reaches the driver over ESP-NOW through
|
||||||
|
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
|
||||||
|
|
||||||
|
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
|
||||||
|
|
||||||
|
pipenv run python led-driver/tests/bridge_ws_blink.py
|
||||||
|
|
||||||
|
pipenv run python led-driver/tests/bridge_ws_blink.py \\
|
||||||
|
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
|
||||||
|
|
||||||
|
From **led-driver** (if Pipenv/env is the parent project)::
|
||||||
|
|
||||||
|
pipenv run python tests/bridge_ws_blink.py --dry-run
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
# led-driver/tests -> led-driver -> led-controller
|
||||||
|
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
PROJECT_ROOT = LED_DRIVER_ROOT.parent
|
||||||
|
|
||||||
|
|
||||||
|
def _load_bridge_url(explicit: Optional[str]) -> str:
|
||||||
|
if explicit and explicit.strip():
|
||||||
|
return explicit.strip()
|
||||||
|
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
|
||||||
|
if not path.is_file():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
data = json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
url = str(data.get("bridge_ws_url") or "").strip()
|
||||||
|
if url:
|
||||||
|
return url
|
||||||
|
except (OSError, json.JSONDecodeError, TypeError):
|
||||||
|
pass
|
||||||
|
return "ws://192.168.4.1/ws"
|
||||||
|
|
||||||
|
|
||||||
|
def _format_mac(mac: str) -> str:
|
||||||
|
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
|
||||||
|
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
|
||||||
|
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
|
||||||
|
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
|
||||||
|
|
||||||
|
|
||||||
|
def build_blink_envelope(
|
||||||
|
mac: str,
|
||||||
|
*,
|
||||||
|
preset_id: str = "2",
|
||||||
|
delay_ms: int = 200,
|
||||||
|
brightness: int = 64,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
|
||||||
|
body = {
|
||||||
|
"p": {
|
||||||
|
preset_id: {
|
||||||
|
"p": "blink",
|
||||||
|
"b": max(0, min(255, int(brightness))),
|
||||||
|
"d": max(1, int(delay_ms)),
|
||||||
|
"c": ["#FF0000", "#0000FF"],
|
||||||
|
"a": True,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"s": [str(preset_id)],
|
||||||
|
}
|
||||||
|
return {"v": "1", "dv": {_format_mac(mac): body}}
|
||||||
|
|
||||||
|
|
||||||
|
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
|
||||||
|
import websockets
|
||||||
|
|
||||||
|
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
|
||||||
|
print(f"connecting to {url}")
|
||||||
|
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
|
||||||
|
print(f"connected, sending {len(packet)} B")
|
||||||
|
print(packet.decode("utf-8"))
|
||||||
|
await ws.send(packet)
|
||||||
|
if hold_s > 0:
|
||||||
|
print(f"holding connection {hold_s}s …")
|
||||||
|
await asyncio.sleep(hold_s)
|
||||||
|
print("done")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Send blink preset+select to one driver via bridge WebSocket.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--url",
|
||||||
|
default=None,
|
||||||
|
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--mac",
|
||||||
|
default="188b0e1560a8",
|
||||||
|
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--preset-id",
|
||||||
|
default="2",
|
||||||
|
help="Wire preset slot id (default: 2, matches zone push)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--delay-ms",
|
||||||
|
type=int,
|
||||||
|
default=200,
|
||||||
|
help="Blink delay in ms (default: 200)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--brightness",
|
||||||
|
type=int,
|
||||||
|
default=64,
|
||||||
|
help="Preset brightness 0–255 (default: 64)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--hold",
|
||||||
|
type=float,
|
||||||
|
default=2.0,
|
||||||
|
help="Seconds to keep WebSocket open after send (default: 2)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help="Print envelope only; do not connect",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
url = _load_bridge_url(args.url)
|
||||||
|
try:
|
||||||
|
envelope = build_blink_envelope(
|
||||||
|
args.mac,
|
||||||
|
preset_id=args.preset_id,
|
||||||
|
delay_ms=args.delay_ms,
|
||||||
|
brightness=args.brightness,
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
print(f"error: {e}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
|
||||||
|
if args.dry_run:
|
||||||
|
print(json.dumps(envelope, indent=2))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(_send(url, envelope, args.hold))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("interrupted")
|
||||||
|
return 130
|
||||||
|
except Exception as e:
|
||||||
|
print(f"failed: {e!r}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
@@ -1,35 +1,74 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""Standalone blink pattern demo (WDT-fed tick loop).
|
||||||
|
|
||||||
|
Run on device::
|
||||||
|
|
||||||
|
mpremote connect <port> run tests/patterns/blink.py
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
import utime
|
import utime
|
||||||
from machine import WDT
|
|
||||||
from settings import Settings
|
|
||||||
from presets import Presets, run_tick
|
def _bootstrap_import_path():
|
||||||
|
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
|
||||||
|
try:
|
||||||
|
import uos as os
|
||||||
|
except ImportError:
|
||||||
|
import os
|
||||||
|
|
||||||
|
candidates = []
|
||||||
|
try:
|
||||||
|
here = __file__.rsplit("/", 1)[0]
|
||||||
|
if here:
|
||||||
|
candidates.append(here)
|
||||||
|
tests = here.rsplit("/", 1)[0]
|
||||||
|
if tests:
|
||||||
|
candidates.append(tests)
|
||||||
|
root = tests.rsplit("/", 1)[0]
|
||||||
|
if root:
|
||||||
|
candidates.append(root)
|
||||||
|
candidates.append(root + "/src")
|
||||||
|
except NameError:
|
||||||
|
pass
|
||||||
|
for p in (".", "..", "/", "src", "/src"):
|
||||||
|
candidates.append(p)
|
||||||
|
for p in candidates:
|
||||||
|
if p and p not in sys.path:
|
||||||
|
sys.path.insert(0, p)
|
||||||
|
|
||||||
|
|
||||||
|
_bootstrap_import_path()
|
||||||
|
|
||||||
|
from machine import WDT # noqa: E402
|
||||||
|
from settings import Settings # noqa: E402
|
||||||
|
from presets import Presets # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
|
||||||
|
start = utime.ticks_ms()
|
||||||
|
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
|
||||||
|
wdt.feed()
|
||||||
|
presets.tick()
|
||||||
|
utime.sleep_ms(sleep_ms)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
s = Settings()
|
settings = Settings()
|
||||||
pin = s.get("led_pin", 10)
|
presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
|
||||||
num = s.get("num_leds", 30)
|
|
||||||
|
|
||||||
p = Presets(pin=pin, num_leds=num)
|
|
||||||
wdt = WDT(timeout=10000)
|
wdt = WDT(timeout=10000)
|
||||||
|
|
||||||
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors)
|
presets.edit(
|
||||||
p.edit("test_blink", {
|
"test_blink",
|
||||||
|
{
|
||||||
"p": "blink",
|
"p": "blink",
|
||||||
"b": 64,
|
"b": 64,
|
||||||
"d": 200,
|
"d": 200,
|
||||||
"c": [(255, 0, 0), (0, 0, 255)],
|
"c": [(255, 0, 0), (0, 0, 255)],
|
||||||
})
|
},
|
||||||
p.select("test_blink")
|
)
|
||||||
|
presets.select("test_blink")
|
||||||
start = utime.ticks_ms()
|
_run_ms(presets, wdt, 1500)
|
||||||
while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
|
|
||||||
wdt.feed()
|
|
||||||
run_tick(p)
|
|
||||||
utime.sleep_ms(10)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user