Compare commits
13 Commits
55a97ac51c
...
p2p
| Author | SHA1 | Date | |
|---|---|---|---|
| 3286c4002d | |||
| 68eb547ec4 | |||
| 8403df531d | |||
| 088fe161a8 | |||
| c9895df512 | |||
| 39a84696c3 | |||
| c7560b2e87 | |||
| ea21563900 | |||
| a97f6c7c2c | |||
| 1fdb2c9441 | |||
| 3e718f7432 | |||
| 85490a3bd0 | |||
| 94266d5a7c |
8
bulk.sh
Executable file
8
bulk.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
PORT="${1:-/dev/ttyACM0}"
|
||||
|
||||
while true; do
|
||||
ls "$PORT" && led-cli -p "$PORT" --erase --src --patterns && led-cli -p "$PORT" --reset -f
|
||||
sleep 0.5
|
||||
done
|
||||
@@ -1,11 +1,7 @@
|
||||
import asyncio
|
||||
import utime
|
||||
|
||||
from hello import broadcast_hello_udp
|
||||
from mem_stats import print_mem
|
||||
from wifi_sta import try_reconnect
|
||||
|
||||
_UDP_HELLO_ATTEMPT = 0
|
||||
|
||||
|
||||
async def presets_loop(presets, wdt):
|
||||
@@ -18,41 +14,4 @@ async def presets_loop(presets, wdt):
|
||||
if utime.ticks_diff(now, last_mem_log) >= 5000:
|
||||
print_mem("runtime")
|
||||
last_mem_log = now
|
||||
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
|
||||
await asyncio.sleep(0)
|
||||
|
||||
|
||||
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
|
||||
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
|
||||
global _UDP_HELLO_ATTEMPT
|
||||
await asyncio.sleep(1)
|
||||
started_ms = utime.ticks_ms()
|
||||
while True:
|
||||
try:
|
||||
wifi_ok = sta_if.isconnected()
|
||||
except Exception:
|
||||
wifi_ok = False
|
||||
if not wifi_ok:
|
||||
ssid = settings.get("ssid") or ""
|
||||
if ssid:
|
||||
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
|
||||
try:
|
||||
wifi_ok = sta_if.isconnected()
|
||||
except Exception:
|
||||
wifi_ok = False
|
||||
if wifi_ok and runtime_state.hello:
|
||||
_UDP_HELLO_ATTEMPT += 1
|
||||
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
|
||||
try:
|
||||
broadcast_hello_udp(
|
||||
sta_if,
|
||||
settings.get("name", ""),
|
||||
wait_reply=False,
|
||||
wdt=wdt,
|
||||
dual_destinations=True,
|
||||
)
|
||||
except Exception as ex:
|
||||
print("UDP hello broadcast failed:", ex)
|
||||
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
|
||||
interval_s = 10 if elapsed_ms < 120000 else 30
|
||||
await asyncio.sleep(interval_s)
|
||||
|
||||
@@ -2,7 +2,11 @@
|
||||
|
||||
import json
|
||||
import socket
|
||||
import network
|
||||
import ubinascii
|
||||
|
||||
import device_groups as dg
|
||||
from v1_wire import expand_v1
|
||||
from binary_envelope import parse_binary_envelope
|
||||
from utils import convert_and_reorder_colors
|
||||
|
||||
@@ -40,8 +44,8 @@ def _log_rx(payload) -> None:
|
||||
print("rx (logging failed)")
|
||||
|
||||
|
||||
def process_data(payload, settings, presets, controller_ip=None):
|
||||
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields."""
|
||||
def process_data(payload, settings, presets, controller_ip=None, save=False):
|
||||
"""Read one controller message; binary v2 envelope or JSON v1, then apply fields."""
|
||||
_log_rx(payload)
|
||||
data = None
|
||||
if isinstance(payload, (bytes, bytearray)):
|
||||
@@ -58,6 +62,18 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
return
|
||||
if data.get("v", "") != "1":
|
||||
return
|
||||
data = expand_v1(data)
|
||||
if save:
|
||||
data["save"] = True
|
||||
set_groups = bool(data.get("set_groups"))
|
||||
groups = data.get("groups")
|
||||
if set_groups and isinstance(groups, list):
|
||||
dg.groups_replace(groups, settings)
|
||||
print("groups set", dg.list_groups())
|
||||
elif isinstance(groups, list) and groups:
|
||||
if not any(dg.in_group(str(g)) for g in groups):
|
||||
print("ignored: not in groups", groups)
|
||||
return
|
||||
if "device_config" in data:
|
||||
apply_device_config(data, settings, presets)
|
||||
if "b" in data:
|
||||
@@ -66,7 +82,7 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
apply_presets(data, settings, presets)
|
||||
if "clear_presets" in data:
|
||||
apply_clear_presets(data, presets)
|
||||
if "select" in data:
|
||||
if ("select" in data or "s" in data) and "presets" not in data:
|
||||
apply_select(data, settings, presets)
|
||||
if "default" in data:
|
||||
apply_default(data, settings, presets)
|
||||
@@ -80,6 +96,7 @@ def process_data(payload, settings, presets, controller_ip=None):
|
||||
settings.save()
|
||||
if "save" in data and "device_config" in data:
|
||||
settings.save()
|
||||
_flush_pending_select(settings, presets)
|
||||
|
||||
|
||||
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
|
||||
@@ -170,7 +187,30 @@ def apply_brightness(data, settings, presets):
|
||||
pass
|
||||
|
||||
|
||||
_pending_select = None
|
||||
|
||||
|
||||
def _run_select(presets, settings, preset_name, step=None):
|
||||
if presets.select(preset_name, step=step):
|
||||
record_last_preset(settings, preset_name)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _flush_pending_select(settings, presets):
|
||||
global _pending_select
|
||||
if _pending_select is None:
|
||||
return
|
||||
preset_name, step = _pending_select
|
||||
if preset_name not in presets.presets and preset_name not in ("on", "off"):
|
||||
return
|
||||
_pending_select = None
|
||||
if not _run_select(presets, settings, preset_name, step):
|
||||
print("select failed (pending):", preset_name)
|
||||
|
||||
|
||||
def apply_presets(data, settings, presets):
|
||||
global _pending_select
|
||||
presets_map = data["presets"]
|
||||
for id, preset_data in presets_map.items():
|
||||
if not preset_data:
|
||||
@@ -181,8 +221,8 @@ def apply_presets(data, settings, presets):
|
||||
preset_data[color_key] = convert_and_reorder_colors(
|
||||
preset_data[color_key], settings
|
||||
)
|
||||
except (TypeError, ValueError, KeyError):
|
||||
continue
|
||||
except (TypeError, ValueError, KeyError) as err:
|
||||
print("preset color convert failed:", id, err)
|
||||
if "bg" in preset_data:
|
||||
try:
|
||||
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
|
||||
@@ -191,18 +231,74 @@ def apply_presets(data, settings, presets):
|
||||
except (TypeError, ValueError, KeyError):
|
||||
pass
|
||||
presets.edit(id, preset_data)
|
||||
# Same message often carries select; apply now while presets are loaded.
|
||||
if "select" in data or "s" in data:
|
||||
apply_select(data, settings, presets)
|
||||
else:
|
||||
_flush_pending_select(settings, presets)
|
||||
|
||||
|
||||
def _select_list_for_this_device(select_val, settings):
|
||||
"""Resolve select to ``[preset_id, step?]`` (wire list or legacy name map)."""
|
||||
if isinstance(select_val, list) and select_val:
|
||||
return select_val
|
||||
if isinstance(select_val, str) and str(select_val).strip():
|
||||
return [str(select_val).strip()]
|
||||
if not isinstance(select_val, dict) or not select_val:
|
||||
return None
|
||||
if "preset" in select_val:
|
||||
preset_name = select_val.get("preset")
|
||||
if preset_name is None:
|
||||
return None
|
||||
out = [str(preset_name)]
|
||||
if "step" in select_val:
|
||||
out.append(select_val["step"])
|
||||
return out
|
||||
device_name = str(settings.get("name") or "").strip()
|
||||
select_list = select_val.get(device_name)
|
||||
if select_list:
|
||||
return select_list
|
||||
try:
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
mac_hex = ubinascii.hexlify(sta.config("mac")).decode().lower()
|
||||
except Exception:
|
||||
mac_hex = ""
|
||||
if mac_hex:
|
||||
for key in select_val:
|
||||
k = str(key).lower().replace(":", "").replace("-", "")
|
||||
if mac_hex in k:
|
||||
return select_val[key]
|
||||
if len(select_val) == 1:
|
||||
return next(iter(select_val.values()))
|
||||
return None
|
||||
|
||||
|
||||
def apply_select(data, settings, presets):
|
||||
select_map = data["select"]
|
||||
device_name = settings["name"]
|
||||
select_list = select_map.get(device_name, [])
|
||||
global _pending_select
|
||||
select_val = data.get("select")
|
||||
if select_val is None:
|
||||
select_val = data.get("s")
|
||||
select_list = _select_list_for_this_device(select_val, settings)
|
||||
if not select_list:
|
||||
print("select ignored:", repr(select_val))
|
||||
return
|
||||
preset_name = str(select_list[0]).strip()
|
||||
if not preset_name:
|
||||
return
|
||||
preset_name = select_list[0]
|
||||
step = select_list[1] if len(select_list) > 1 else None
|
||||
if presets.select(preset_name, step=step):
|
||||
record_last_preset(settings, preset_name)
|
||||
if preset_name not in presets.presets and preset_name not in ("on", "off"):
|
||||
try:
|
||||
presets.load(settings)
|
||||
except Exception:
|
||||
pass
|
||||
if preset_name not in presets.presets and preset_name not in ("on", "off"):
|
||||
_pending_select = (preset_name, step)
|
||||
print("select deferred (preset not loaded yet):", preset_name)
|
||||
return
|
||||
if _run_select(presets, settings, preset_name, step):
|
||||
_pending_select = None
|
||||
else:
|
||||
print("select failed:", preset_name)
|
||||
|
||||
|
||||
def apply_clear_presets(data, presets):
|
||||
|
||||
28
src/device_groups.py
Normal file
28
src/device_groups.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""Group membership for GROUP_CMD filtering; persisted in settings.json."""
|
||||
|
||||
_groups = []
|
||||
|
||||
|
||||
def load_from_settings(settings):
|
||||
global _groups
|
||||
g = settings.get("groups") if settings is not None else None
|
||||
if isinstance(g, list):
|
||||
_groups = [str(x) for x in g if str(x).strip()]
|
||||
else:
|
||||
_groups = []
|
||||
|
||||
|
||||
def groups_replace(group_ids, settings=None, *, persist=True):
|
||||
global _groups
|
||||
_groups = [str(g) for g in group_ids]
|
||||
if persist and settings is not None:
|
||||
settings["groups"] = list(_groups)
|
||||
settings.save()
|
||||
|
||||
|
||||
def in_group(group_id):
|
||||
return str(group_id) in _groups
|
||||
|
||||
|
||||
def list_groups():
|
||||
return list(_groups)
|
||||
187
src/espnow_transport.py
Normal file
187
src/espnow_transport.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""ESP-NOW receive loop and boot announce."""
|
||||
|
||||
import asyncio
|
||||
import urandom
|
||||
import ubinascii
|
||||
|
||||
import espnow
|
||||
import network
|
||||
|
||||
import device_groups as dg
|
||||
from espnow_wire import (
|
||||
BROADCAST_MAC,
|
||||
MSG_ANNOUNCE,
|
||||
MSG_CMD,
|
||||
MSG_GROUP_CMD,
|
||||
MSG_GROUPS,
|
||||
MSG_PING_REQ,
|
||||
cmd_envelope,
|
||||
pack_announce,
|
||||
pack_ping_rsp,
|
||||
parse_group_cmd,
|
||||
parse_groups,
|
||||
parse_ping_req,
|
||||
wire_msg_type,
|
||||
)
|
||||
|
||||
from controller_messages import process_data
|
||||
from settings import WIFI_CHANNEL_DEFAULT
|
||||
|
||||
_PING_DELAY_MS_MIN = 50
|
||||
_PING_DELAY_MS_MAX = 500
|
||||
|
||||
_esp = None
|
||||
_groups_received = False
|
||||
_debug = False
|
||||
|
||||
|
||||
def _dlog(*parts):
|
||||
if _debug:
|
||||
print(*parts)
|
||||
|
||||
|
||||
def init_espnow(settings):
|
||||
global _esp, _debug
|
||||
_debug = bool(settings.get("debug", False))
|
||||
try:
|
||||
ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT))
|
||||
except (TypeError, ValueError):
|
||||
ch = WIFI_CHANNEL_DEFAULT
|
||||
ch = max(1, min(11, ch))
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
try:
|
||||
sta.config(channel=ch)
|
||||
except Exception as e:
|
||||
print("espnow sta channel set failed:", e)
|
||||
_esp = espnow.ESPNow()
|
||||
_esp.active(True)
|
||||
try:
|
||||
_esp.add_peer(BROADCAST_MAC)
|
||||
_dlog("espnow add bcast ok")
|
||||
except Exception as e:
|
||||
print("espnow add bcast failed:", e)
|
||||
try:
|
||||
actual_ch = sta.config("channel")
|
||||
except Exception:
|
||||
actual_ch = "?"
|
||||
print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug)
|
||||
return _esp
|
||||
|
||||
|
||||
def _send_ping_rsp(host, settings, ping_id, delay_ms):
|
||||
import utime
|
||||
|
||||
utime.sleep_ms(delay_ms)
|
||||
if _esp is None or not host or len(host) != 6:
|
||||
return
|
||||
pkt = pack_ping_rsp(ping_id, settings.get("name", "led"))
|
||||
try:
|
||||
try:
|
||||
_esp.add_peer(host)
|
||||
_dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode())
|
||||
except Exception as e:
|
||||
_dlog("espnow ping add_peer skip", repr(e))
|
||||
_esp.send(host, pkt)
|
||||
print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode())
|
||||
except Exception as e:
|
||||
print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt))
|
||||
|
||||
|
||||
async def _send_ping_rsp_delayed(host, settings, ping_id):
|
||||
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
|
||||
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
|
||||
await asyncio.sleep(delay_ms / 1000)
|
||||
_send_ping_rsp(host, settings, ping_id, delay_ms)
|
||||
|
||||
|
||||
def _schedule_ping_rsp(host, settings, ping_id):
|
||||
span = _PING_DELAY_MS_MAX - _PING_DELAY_MS_MIN
|
||||
delay_ms = _PING_DELAY_MS_MIN + (urandom.getrandbits(10) % (span + 1))
|
||||
try:
|
||||
import _thread
|
||||
|
||||
_thread.start_new_thread(_send_ping_rsp, (host, settings, ping_id, delay_ms))
|
||||
except ImportError:
|
||||
asyncio.create_task(_send_ping_rsp_delayed(host, settings, ping_id))
|
||||
|
||||
|
||||
def send_boot_announce(settings):
|
||||
if _esp is None:
|
||||
return
|
||||
pkt = pack_announce(
|
||||
settings.get("name", "led"),
|
||||
settings.get("num_leds", 1),
|
||||
color_order=settings.get("color_order", "rgb"),
|
||||
startup_mode=settings.get("startup_mode", "default"),
|
||||
brightness=settings.get("brightness", 32),
|
||||
)
|
||||
try:
|
||||
_esp.send(BROADCAST_MAC, pkt)
|
||||
print("espnow announce", len(pkt), "B")
|
||||
except Exception as e:
|
||||
print("espnow announce failed:", e)
|
||||
|
||||
|
||||
def _handle_packet(host, pkt, settings, presets):
|
||||
global _groups_received
|
||||
mt = wire_msg_type(pkt)
|
||||
if mt == MSG_GROUPS:
|
||||
ids = parse_groups(pkt)
|
||||
if ids is not None:
|
||||
dg.groups_replace(ids, settings)
|
||||
_groups_received = True
|
||||
print("groups", ids)
|
||||
return
|
||||
if mt == MSG_GROUP_CMD:
|
||||
parsed = parse_group_cmd(pkt)
|
||||
if parsed is None:
|
||||
return
|
||||
gid, env = parsed
|
||||
if not dg.in_group(gid):
|
||||
return
|
||||
from espnow_wire import _envelope_size
|
||||
|
||||
need = _envelope_size(env)
|
||||
save = len(env) > need and env[need] == 1
|
||||
body = env[:need] if save else env
|
||||
if body:
|
||||
process_data(body, settings, presets, save=save)
|
||||
return
|
||||
if mt == MSG_CMD:
|
||||
env, save = cmd_envelope(pkt)
|
||||
if env:
|
||||
process_data(env, settings, presets, save=save)
|
||||
return
|
||||
if mt == MSG_PING_REQ:
|
||||
ping_id = parse_ping_req(pkt)
|
||||
if ping_id is not None and host and len(host) == 6:
|
||||
_schedule_ping_rsp(host, settings, ping_id)
|
||||
return
|
||||
if mt == MSG_ANNOUNCE:
|
||||
return
|
||||
|
||||
|
||||
async def espnow_receive_loop(settings, presets, wdt=None):
|
||||
global _groups_received
|
||||
while True:
|
||||
if _esp is None:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
host, msg = _esp.recv(0)
|
||||
if not host:
|
||||
if not _groups_received:
|
||||
await asyncio.sleep(5)
|
||||
send_boot_announce(settings)
|
||||
else:
|
||||
await asyncio.sleep(0.02)
|
||||
if wdt:
|
||||
wdt.feed()
|
||||
continue
|
||||
try:
|
||||
_handle_packet(host, msg, settings, presets)
|
||||
except Exception as e:
|
||||
print("espnow rx error:", e)
|
||||
if wdt:
|
||||
wdt.feed()
|
||||
135
src/espnow_wire.py
Normal file
135
src/espnow_wire.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
|
||||
|
||||
import struct
|
||||
|
||||
WIRE_MAGIC = 0x4C
|
||||
MAX_ESPNOW_PAYLOAD = 250
|
||||
|
||||
MSG_ANNOUNCE = 0x01
|
||||
MSG_GROUPS = 0x02
|
||||
MSG_CMD = 0x03
|
||||
MSG_GROUP_CMD = 0x04
|
||||
MSG_PING_REQ = 0x05
|
||||
MSG_PING_RSP = 0x06
|
||||
|
||||
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
|
||||
|
||||
COLOR_ORDER_TO_ENUM = {
|
||||
"rgb": 0,
|
||||
"rbg": 1,
|
||||
"grb": 2,
|
||||
"gbr": 3,
|
||||
"brg": 4,
|
||||
"bgr": 5,
|
||||
}
|
||||
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
|
||||
|
||||
|
||||
def _pack_header(msg_type, body):
|
||||
pkt = bytes([WIRE_MAGIC, msg_type]) + body
|
||||
if len(pkt) > MAX_ESPNOW_PAYLOAD:
|
||||
raise ValueError("packet too large")
|
||||
return pkt
|
||||
|
||||
|
||||
def pack_announce(
|
||||
name,
|
||||
num_leds,
|
||||
color_order="rgb",
|
||||
startup_mode="default",
|
||||
brightness=32,
|
||||
device_type=0,
|
||||
):
|
||||
name_b = name.encode("utf-8")
|
||||
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
|
||||
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
|
||||
body = (
|
||||
bytes([len(name_b)])
|
||||
+ name_b
|
||||
+ struct.pack("<H", int(num_leds))
|
||||
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
|
||||
)
|
||||
return _pack_header(MSG_ANNOUNCE, body)
|
||||
|
||||
|
||||
def parse_groups(payload):
|
||||
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
|
||||
if payload[1] != MSG_GROUPS:
|
||||
return None
|
||||
body = payload[2:]
|
||||
else:
|
||||
body = payload
|
||||
if not body:
|
||||
return []
|
||||
off = 0
|
||||
count = body[off]
|
||||
off += 1
|
||||
out = []
|
||||
for _ in range(count):
|
||||
gl = body[off]
|
||||
off += 1
|
||||
out.append(body[off : off + gl].decode("utf-8"))
|
||||
off += gl
|
||||
return out
|
||||
|
||||
|
||||
def parse_group_cmd(payload):
|
||||
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
|
||||
return None
|
||||
body = payload[2:]
|
||||
gl = body[0]
|
||||
gid = body[1 : 1 + gl].decode("utf-8")
|
||||
env = body[1 + gl :]
|
||||
return gid, env
|
||||
|
||||
|
||||
HEADER_LEN = 5
|
||||
|
||||
|
||||
def _envelope_size(env):
|
||||
if len(env) < HEADER_LEN:
|
||||
return len(env)
|
||||
lp, ls, ld = env[2], env[3], env[4]
|
||||
return HEADER_LEN + lp + ls + ld
|
||||
|
||||
|
||||
def cmd_envelope(payload):
|
||||
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
|
||||
return None, False
|
||||
env = payload[2:]
|
||||
if not env:
|
||||
return None, False
|
||||
need = _envelope_size(env)
|
||||
if need > len(env):
|
||||
return None, False
|
||||
save = len(env) > need and env[need] == 1
|
||||
return env[:need], save
|
||||
|
||||
|
||||
def pack_ping_req(ping_id):
|
||||
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF)
|
||||
return _pack_header(MSG_PING_REQ, body)
|
||||
|
||||
|
||||
def parse_ping_req(payload):
|
||||
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
|
||||
if payload[1] != MSG_PING_REQ:
|
||||
return None
|
||||
body = payload[2:]
|
||||
else:
|
||||
body = payload
|
||||
if len(body) < 4:
|
||||
return None
|
||||
return struct.unpack("<I", body[:4])[0]
|
||||
|
||||
|
||||
def pack_ping_rsp(ping_id, name):
|
||||
name_b = name.encode("utf-8")
|
||||
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF) + bytes([len(name_b)]) + name_b
|
||||
return _pack_header(MSG_PING_RSP, body)
|
||||
|
||||
|
||||
def wire_msg_type(payload):
|
||||
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
|
||||
return payload[1]
|
||||
return None
|
||||
101
src/file_hashes.py
Normal file
101
src/file_hashes.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""
|
||||
Deploy hash manifest at flash root (file_hashes.json).
|
||||
|
||||
Updated by led-cli after directory uploads; used to skip unchanged files on
|
||||
the next deploy. Format: {"version": 1, "algorithm": "sha256", "files": {...}}
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
MANIFEST_VERSION = 1
|
||||
MANIFEST_FILENAME = "file_hashes.json"
|
||||
HASH_ALGO = "sha256"
|
||||
|
||||
_SKIP_NAMES = frozenset({MANIFEST_FILENAME, "__pycache__"})
|
||||
_SKIP_SUFFIXES = (".pyc", ".pyo")
|
||||
|
||||
|
||||
def _normalize_path(path):
|
||||
return path.replace("\\", "/").lstrip("/")
|
||||
|
||||
|
||||
def load():
|
||||
"""Return path -> sha256 hex map, or {} if missing or invalid."""
|
||||
try:
|
||||
with open(MANIFEST_FILENAME, "r") as f:
|
||||
doc = json.load(f)
|
||||
except OSError:
|
||||
return {}
|
||||
if not isinstance(doc, dict):
|
||||
return {}
|
||||
files = doc.get("files")
|
||||
return files if isinstance(files, dict) else {}
|
||||
|
||||
|
||||
def save(files):
|
||||
"""Write manifest (path keys use forward slashes, no leading slash)."""
|
||||
if not isinstance(files, dict):
|
||||
files = {}
|
||||
doc = {
|
||||
"version": MANIFEST_VERSION,
|
||||
"algorithm": HASH_ALGO,
|
||||
"files": files,
|
||||
}
|
||||
with open(MANIFEST_FILENAME, "w") as f:
|
||||
json.dump(doc, f)
|
||||
|
||||
|
||||
def _hash_file(path):
|
||||
import hashlib
|
||||
|
||||
h = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
while True:
|
||||
chunk = f.read(256)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _walk_dir(base, prefix, out):
|
||||
try:
|
||||
names = os.listdir(base)
|
||||
except OSError:
|
||||
return
|
||||
for name in names:
|
||||
if name in _SKIP_NAMES or name.endswith(_SKIP_SUFFIXES):
|
||||
continue
|
||||
full = base + "/" + name if base else name
|
||||
key = _normalize_path((prefix + "/" + name) if prefix else name)
|
||||
try:
|
||||
mode = os.stat(full)[0]
|
||||
except OSError:
|
||||
continue
|
||||
if mode & 0x4000:
|
||||
_walk_dir(full, key, out)
|
||||
else:
|
||||
out[key] = _hash_file(full)
|
||||
|
||||
|
||||
def rebuild():
|
||||
"""Rebuild manifest from root .py files plus patterns/ and lib/ trees."""
|
||||
files = {}
|
||||
try:
|
||||
for name in os.listdir("."):
|
||||
if name in _SKIP_NAMES or name.endswith(_SKIP_SUFFIXES):
|
||||
continue
|
||||
try:
|
||||
mode = os.stat(name)[0]
|
||||
except OSError:
|
||||
continue
|
||||
if mode & 0x4000:
|
||||
if name in ("patterns", "lib"):
|
||||
_walk_dir(name, name, files)
|
||||
else:
|
||||
files[_normalize_path(name)] = _hash_file(name)
|
||||
except OSError:
|
||||
pass
|
||||
save(files)
|
||||
return files
|
||||
178
src/main.py
178
src/main.py
@@ -1,33 +1,26 @@
|
||||
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms]
|
||||
import print_timestamp # noqa: F401
|
||||
from settings import Settings
|
||||
import machine
|
||||
import utime
|
||||
import asyncio
|
||||
import json
|
||||
import gc
|
||||
from microdot import Microdot
|
||||
from microdot.websocket import WebSocketError, with_websocket
|
||||
import json
|
||||
import network
|
||||
import espnow
|
||||
import device_groups as dg
|
||||
from presets import Presets
|
||||
from controller_messages import apply_startup_pattern, process_data
|
||||
from runtime_state import RuntimeState
|
||||
from background_tasks import presets_loop, udp_hello_loop_after_http_ready
|
||||
from mem_stats import print_mem
|
||||
from wifi_sta import boot_sta
|
||||
try:
|
||||
import uos as os
|
||||
except ImportError:
|
||||
import os
|
||||
from espnow_transport import _handle_packet, init_espnow
|
||||
from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
|
||||
|
||||
wdt = machine.WDT(timeout=10000)
|
||||
wdt.feed()
|
||||
|
||||
machine.freq(160000000)
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
dg.load_from_settings(settings)
|
||||
print(settings)
|
||||
gc.collect()
|
||||
sta_if = boot_sta(settings, wdt)
|
||||
|
||||
presets = Presets(settings["led_pin"], settings["num_leds"])
|
||||
presets.load(settings)
|
||||
@@ -37,130 +30,41 @@ gc.collect()
|
||||
|
||||
apply_startup_pattern(settings, presets)
|
||||
|
||||
esp = init_espnow(settings)
|
||||
print(network.WLAN(network.STA_IF).config("channel"))
|
||||
|
||||
def _print_network_ips(controller_ip=None):
|
||||
"""Always log STA address and led-controller (WS client) address when known."""
|
||||
try:
|
||||
led_ip = sta_if.ifconfig()[0]
|
||||
except Exception:
|
||||
led_ip = "?"
|
||||
ctrl = controller_ip if controller_ip else "(not connected)"
|
||||
print("led-driver IP:", led_ip, " led-controller IP:", ctrl)
|
||||
hello = json.dumps({
|
||||
"v": "1",
|
||||
"name": settings.get("name", "led"),
|
||||
"type": "led",
|
||||
})
|
||||
print(hello)
|
||||
|
||||
esp.send(BROADCAST_MAC, hello)
|
||||
print("espnow hello", len(hello), "B")
|
||||
|
||||
|
||||
_print_network_ips()
|
||||
print_mem("startup")
|
||||
|
||||
runtime_state = RuntimeState()
|
||||
|
||||
app = Microdot()
|
||||
def _on_espnow_message(host, msg):
|
||||
if not msg:
|
||||
return
|
||||
if msg[0] == WIRE_MAGIC:
|
||||
_handle_packet(host, msg, settings, presets)
|
||||
return
|
||||
if msg[0:1] == b"{":
|
||||
process_data(msg, settings, presets)
|
||||
|
||||
|
||||
def _safe_pattern_filename(name):
|
||||
if not isinstance(name, str):
|
||||
return False
|
||||
if not name.endswith(".py"):
|
||||
return False
|
||||
if "/" in name or "\\" in name or ".." in name:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@app.route("/ws")
|
||||
@with_websocket
|
||||
async def ws_handler(request, ws):
|
||||
runtime_state.ws_connected()
|
||||
controller_ip = None
|
||||
try:
|
||||
client_addr = getattr(request, "client_addr", None)
|
||||
if isinstance(client_addr, (tuple, list)) and client_addr:
|
||||
controller_ip = client_addr[0]
|
||||
elif isinstance(client_addr, str):
|
||||
controller_ip = client_addr
|
||||
except Exception:
|
||||
controller_ip = None
|
||||
_print_network_ips(controller_ip)
|
||||
print_mem("ws connect")
|
||||
try:
|
||||
while True:
|
||||
data = await ws.receive()
|
||||
if not data:
|
||||
break
|
||||
process_data(data, settings, presets, controller_ip=controller_ip)
|
||||
except WebSocketError as e:
|
||||
print("WS client disconnected:", e)
|
||||
except OSError as e:
|
||||
print("WS client dropped (OSError):", e)
|
||||
finally:
|
||||
runtime_state.ws_disconnected()
|
||||
|
||||
|
||||
@app.post("/patterns/upload")
|
||||
async def upload_pattern(request):
|
||||
"""Receive one pattern file body from led-controller and reload patterns."""
|
||||
raw_name = request.args.get("name")
|
||||
reload_raw = request.args.get("reload", "1")
|
||||
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
|
||||
|
||||
if not isinstance(raw_name, str) or not raw_name.strip():
|
||||
return json.dumps({"error": "name is required"}), 400, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
body = request.body
|
||||
if not isinstance(body, (bytes, bytearray)) or not body:
|
||||
return json.dumps({"error": "code is required"}), 400, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
try:
|
||||
code = body.decode("utf-8")
|
||||
except UnicodeError:
|
||||
return json.dumps({"error": "body must be utf-8 text"}), 400, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
if not code.strip():
|
||||
return json.dumps({"error": "code is required"}), 400, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
name = raw_name.strip()
|
||||
if not name.endswith(".py"):
|
||||
name += ".py"
|
||||
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
|
||||
return json.dumps({"error": "invalid pattern filename"}), 400, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
os.mkdir("patterns")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
path = "patterns/" + name
|
||||
try:
|
||||
with open(path, "w") as f:
|
||||
f.write(code)
|
||||
if reload_patterns:
|
||||
presets.reload_patterns()
|
||||
except OSError as e:
|
||||
print("patterns/upload failed:", e)
|
||||
return json.dumps({"error": str(e)}), 500, {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
return json.dumps({
|
||||
"message": "pattern uploaded",
|
||||
"name": name,
|
||||
"reloaded": reload_patterns,
|
||||
}), 201, {"Content-Type": "application/json"}
|
||||
|
||||
|
||||
async def main(port=80):
|
||||
asyncio.create_task(presets_loop(presets, wdt))
|
||||
asyncio.create_task(
|
||||
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
|
||||
)
|
||||
await app.start_server(host="0.0.0.0", port=port)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(port=80))
|
||||
while True:
|
||||
wdt.feed()
|
||||
while esp.any():
|
||||
host, msg = esp.recv(0)
|
||||
if not host or not msg:
|
||||
continue
|
||||
print(host, len(msg), "B")
|
||||
try:
|
||||
_on_espnow_message(host, msg)
|
||||
print(msg)
|
||||
except Exception as e:
|
||||
print("espnow rx error:", e)
|
||||
presets.tick()
|
||||
|
||||
@@ -26,13 +26,13 @@ class Aurora:
|
||||
c = self.driver.apply_brightness(colors[idx], preset.b)
|
||||
w = 255 - abs(128 - ((i * 8 + phase) & 255)) * 2
|
||||
w = max(0, min(255, w + shimmer))
|
||||
self.driver.n[i] = (
|
||||
self.driver.n[self.driver.led_i(preset, i)] = (
|
||||
(c[0] * w) // 255,
|
||||
(c[1] * w) // 255,
|
||||
(c[2] * w) // 255,
|
||||
)
|
||||
self.driver.n.write()
|
||||
phase = (phase + 1) & 255
|
||||
phase = (phase + self.driver.signed(preset, 1)) & 255
|
||||
self.driver.step = phase
|
||||
last = utime.ticks_add(last, d)
|
||||
if not preset.a:
|
||||
@@ -76,9 +76,9 @@ class Aurora:
|
||||
peak = lerp3(colors[fi], colors[fi + 1], frac)
|
||||
peak = self.driver.apply_brightness(peak, preset.b)
|
||||
mixf = min(255, int(w * contrast * 2) >> 1)
|
||||
self.driver.n[i] = lerp3(bg, peak, mixf)
|
||||
self.driver.n[self.driver.led_i(preset, i)] = lerp3(bg, peak, mixf)
|
||||
self.driver.n.write()
|
||||
phase = (phase + drift) % 256
|
||||
phase = (phase + self.driver.signed(preset, drift)) % 256
|
||||
last = utime.ticks_add(last, d)
|
||||
if not preset.a:
|
||||
yield
|
||||
|
||||
@@ -46,12 +46,14 @@ class Blizzard:
|
||||
for pos, ci, wj in flakes:
|
||||
p = pos
|
||||
lateral = wind + (wj if wj else 0)
|
||||
p -= speed
|
||||
p += lateral
|
||||
p -= self.driver.signed(preset, speed)
|
||||
p += self.driver.signed(preset, lateral)
|
||||
if p < -2 or p >= nled + 2:
|
||||
continue
|
||||
pi = max(0, min(nled - 1, int(p)))
|
||||
self.driver.n[pi] = self.driver.apply_brightness(colors[ci], preset.b)
|
||||
self.driver.n[self.driver.led_i(preset, pi)] = self.driver.apply_brightness(
|
||||
colors[ci], preset.b
|
||||
)
|
||||
nf.append([p, ci, wj])
|
||||
flakes = nf
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ class Chase:
|
||||
def _run_marquee(self, preset, colors):
|
||||
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
|
||||
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
|
||||
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
|
||||
step = max(1, abs(self.driver.signed(preset, int(preset.n3) if int(preset.n3) > 0 else 1)))
|
||||
phase = self.driver.step % (on_len + off_len)
|
||||
last = utime.ticks_ms()
|
||||
while True:
|
||||
@@ -23,9 +23,9 @@ class Chase:
|
||||
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
|
||||
for i in range(self.driver.num_leds):
|
||||
m = (i + phase) % (on_len + off_len)
|
||||
self.driver.n[i] = c if m < on_len else bg_color
|
||||
self.driver.n[self.driver.led_i(preset, i)] = c if m < on_len else bg_color
|
||||
self.driver.n.write()
|
||||
phase = (phase + step) % (on_len + off_len)
|
||||
phase = (phase + self.driver.signed(preset, step)) % (on_len + off_len)
|
||||
self.driver.step = phase
|
||||
last = utime.ticks_add(last, d)
|
||||
if not preset.a:
|
||||
@@ -66,8 +66,8 @@ class Chase:
|
||||
|
||||
n1 = max(1, int(preset.n1)) # LEDs of color 0
|
||||
n2 = max(1, int(preset.n2)) # LEDs of color 1
|
||||
n3 = int(preset.n3) # Step movement on even steps (can be negative)
|
||||
n4 = int(preset.n4) # Step movement on odd steps (can be negative)
|
||||
n3 = self.driver.signed(preset, int(preset.n3)) # Step movement on even steps
|
||||
n4 = self.driver.signed(preset, int(preset.n4)) # Step movement on odd steps
|
||||
|
||||
segment_length = n1 + n2
|
||||
|
||||
@@ -101,9 +101,9 @@ class Chase:
|
||||
|
||||
# Determine which color based on position in segment
|
||||
if relative_pos < n1:
|
||||
self.driver.n[i] = color0
|
||||
self.driver.n[self.driver.led_i(preset, i)] = color0
|
||||
else:
|
||||
self.driver.n[i] = color1
|
||||
self.driver.n[self.driver.led_i(preset, i)] = color1
|
||||
|
||||
self.driver.n.write()
|
||||
print("[chase] step", step_count)
|
||||
@@ -147,9 +147,9 @@ class Chase:
|
||||
|
||||
# Determine which color based on position in segment
|
||||
if relative_pos < n1:
|
||||
self.driver.n[i] = color0
|
||||
self.driver.n[self.driver.led_i(preset, i)] = color0
|
||||
else:
|
||||
self.driver.n[i] = color1
|
||||
self.driver.n[self.driver.led_i(preset, i)] = color1
|
||||
|
||||
self.driver.n.write()
|
||||
print("[chase] step", step_count)
|
||||
|
||||
@@ -18,7 +18,7 @@ class ColourCycle:
|
||||
pos -= 170
|
||||
return (0, pos * 3, 255 - pos * 3)
|
||||
|
||||
def _render_gradient(self, colors, phase, brightness):
|
||||
def _render_gradient(self, preset, colors, phase, brightness):
|
||||
num_leds = self.driver.num_leds
|
||||
color_count = len(colors)
|
||||
if num_leds <= 0 or color_count <= 0:
|
||||
@@ -40,14 +40,16 @@ class ColourCycle:
|
||||
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
|
||||
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
|
||||
)
|
||||
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
|
||||
self.driver.n[self.driver.led_i(preset, i)] = self.driver.apply_brightness(
|
||||
blended, brightness
|
||||
)
|
||||
self.driver.n.write()
|
||||
|
||||
def _render_rainbow(self, phase, brightness):
|
||||
def _render_rainbow(self, preset, phase, brightness):
|
||||
num_leds = self.driver.num_leds
|
||||
for i in range(num_leds):
|
||||
rc_index = (i * 256 // max(1, num_leds)) + phase
|
||||
self.driver.n[i] = self.driver.apply_brightness(
|
||||
self.driver.n[self.driver.led_i(preset, i)] = self.driver.apply_brightness(
|
||||
self._wheel(rc_index & 255), brightness
|
||||
)
|
||||
self.driver.n.write()
|
||||
@@ -64,8 +66,8 @@ class ColourCycle:
|
||||
|
||||
if mode == 1:
|
||||
if not preset.a:
|
||||
self._render_rainbow(phase, preset.b)
|
||||
self.driver.step = (phase + step_amount) % 256
|
||||
self._render_rainbow(preset, phase, preset.b)
|
||||
self.driver.step = (phase + self.driver.signed(preset, step_amount)) % 256
|
||||
yield
|
||||
return
|
||||
last_update = utime.ticks_ms()
|
||||
@@ -73,16 +75,16 @@ class ColourCycle:
|
||||
delay_ms = max(1, int(preset.d))
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last_update) >= delay_ms:
|
||||
self._render_rainbow(phase, preset.b)
|
||||
phase = (phase + step_amount) % 256
|
||||
self._render_rainbow(preset, phase, preset.b)
|
||||
phase = (phase + self.driver.signed(preset, step_amount)) % 256
|
||||
self.driver.step = phase
|
||||
last_update = utime.ticks_add(last_update, delay_ms)
|
||||
yield
|
||||
|
||||
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
|
||||
if not preset.a:
|
||||
self._render_gradient(colors, phase, preset.b)
|
||||
self.driver.step = (phase + step_amount) % 256
|
||||
self._render_gradient(preset, colors, phase, preset.b)
|
||||
self.driver.step = (phase + self.driver.signed(preset, step_amount)) % 256
|
||||
yield
|
||||
return
|
||||
|
||||
@@ -91,8 +93,8 @@ class ColourCycle:
|
||||
delay_ms = max(1, int(preset.d))
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last_update) >= delay_ms:
|
||||
self._render_gradient(colors, phase, preset.b)
|
||||
phase = (phase + step_amount) % 256
|
||||
self._render_gradient(preset, colors, phase, preset.b)
|
||||
phase = (phase + self.driver.signed(preset, step_amount)) % 256
|
||||
self.driver.step = phase
|
||||
last_update = utime.ticks_add(last_update, delay_ms)
|
||||
yield
|
||||
|
||||
@@ -44,7 +44,7 @@ class Icicles:
|
||||
if idx >= nled:
|
||||
break
|
||||
br = ((k + 1) * 255) // max(1, ic_len)
|
||||
self.driver.n[idx] = (
|
||||
self.driver.n[self.driver.led_i(preset, idx)] = (
|
||||
(tip[0] * br + bg[0] * (255 - br)) // 255,
|
||||
(tip[1] * br + bg[1] * (255 - br)) // 255,
|
||||
(tip[2] * br + bg[2] * (255 - br)) // 255,
|
||||
@@ -52,7 +52,7 @@ class Icicles:
|
||||
aidx += 1
|
||||
|
||||
self.driver.n.write()
|
||||
phase = (phase + phase_step) % span
|
||||
phase = (phase + self.driver.signed(preset, phase_step)) % span
|
||||
last = utime.ticks_add(last, d_ms)
|
||||
|
||||
if not preset.a:
|
||||
|
||||
@@ -30,9 +30,9 @@ class Meteor:
|
||||
base = colors[color_index % len(colors)]
|
||||
lit = self.driver.apply_brightness(base, preset.b)
|
||||
if 0 <= head < self.driver.num_leds:
|
||||
self.driver.n[head] = lit
|
||||
self.driver.n[self.driver.led_i(preset, head)] = lit
|
||||
self.driver.n.write()
|
||||
head += direction * speed
|
||||
head += self.driver.signed(preset, direction * speed)
|
||||
if head >= self.driver.num_leds + tail_len:
|
||||
head = self.driver.num_leds - 1
|
||||
direction = -1
|
||||
@@ -62,14 +62,22 @@ class Meteor:
|
||||
i1 = p1 - t
|
||||
if 0 <= i1 < self.driver.num_leds:
|
||||
s = (255 * (tail - t)) // max(1, tail)
|
||||
self.driver.n[i1] = ((c1[0] * s) // 255, (c1[1] * s) // 255, (c1[2] * s) // 255)
|
||||
self.driver.n[self.driver.led_i(preset, i1)] = (
|
||||
(c1[0] * s) // 255,
|
||||
(c1[1] * s) // 255,
|
||||
(c1[2] * s) // 255,
|
||||
)
|
||||
i2 = p2 + t
|
||||
if 0 <= i2 < self.driver.num_leds:
|
||||
s = (255 * (tail - t)) // max(1, tail)
|
||||
self.driver.n[i2] = ((c2[0] * s) // 255, (c2[1] * s) // 255, (c2[2] * s) // 255)
|
||||
self.driver.n[self.driver.led_i(preset, i2)] = (
|
||||
(c2[0] * s) // 255,
|
||||
(c2[1] * s) // 255,
|
||||
(c2[2] * s) // 255,
|
||||
)
|
||||
self.driver.n.write()
|
||||
p1 += speed
|
||||
p2 -= speed
|
||||
p1 += self.driver.signed(preset, speed)
|
||||
p2 -= self.driver.signed(preset, speed)
|
||||
if p1 - tail > self.driver.num_leds and p2 + tail < 0:
|
||||
p1 = 0
|
||||
p2 = self.driver.num_leds - 1 - gap
|
||||
@@ -89,10 +97,10 @@ class Meteor:
|
||||
if dist < 0:
|
||||
dist = -dist
|
||||
if dist > width:
|
||||
self.driver.n[i] = bg_color
|
||||
self.driver.n[self.driver.led_i(preset, i)] = bg_color
|
||||
else:
|
||||
scale = ((width - dist) * 255) // max(1, width)
|
||||
self.driver.n[i] = (
|
||||
self.driver.n[self.driver.led_i(preset, i)] = (
|
||||
(base[0] * scale) // 255,
|
||||
(base[1] * scale) // 255,
|
||||
(base[2] * scale) // 255,
|
||||
@@ -101,7 +109,7 @@ class Meteor:
|
||||
if pause_frames > 0:
|
||||
pause_frames -= 1
|
||||
else:
|
||||
center += direction
|
||||
center += self.driver.signed(preset, direction)
|
||||
if center >= self.driver.num_leds - 1:
|
||||
center = self.driver.num_leds - 1
|
||||
direction = -1
|
||||
@@ -121,7 +129,11 @@ class Meteor:
|
||||
|
||||
if mode == 1:
|
||||
gap = max(0, int(preset.n3))
|
||||
p1, p2 = 0, self.driver.num_leds - 1 - gap
|
||||
nled = self.driver.num_leds
|
||||
if self.driver.is_reversed(preset):
|
||||
p1, p2 = nled - 1, gap
|
||||
else:
|
||||
p1, p2 = 0, nled - 1 - gap
|
||||
last = utime.ticks_ms()
|
||||
while True:
|
||||
p1, p2, last, stepped = self._run_comet_dual(preset, colors, p1, p2, last)
|
||||
@@ -131,7 +143,11 @@ class Meteor:
|
||||
yield
|
||||
|
||||
if mode == 2:
|
||||
color_index, center, direction, pause_frames = 0, 0, 1, 0
|
||||
nled = self.driver.num_leds
|
||||
if self.driver.is_reversed(preset):
|
||||
color_index, center, direction, pause_frames = 0, max(0, nled - 1), -1, 0
|
||||
else:
|
||||
color_index, center, direction, pause_frames = 0, 0, 1, 0
|
||||
last_update = utime.ticks_ms()
|
||||
while True:
|
||||
color_index, center, direction, pause_frames, last_update, stepped = (
|
||||
@@ -144,7 +160,11 @@ class Meteor:
|
||||
return
|
||||
yield
|
||||
|
||||
color_index, head, direction = 0, 0, 1
|
||||
nled = self.driver.num_leds
|
||||
if self.driver.is_reversed(preset):
|
||||
color_index, head, direction = 0, max(0, nled - 1), -1
|
||||
else:
|
||||
color_index, head, direction = 0, 0, 1
|
||||
last_update = utime.ticks_ms()
|
||||
while True:
|
||||
color_index, head, direction, last_update, stepped = self._run_meteor(
|
||||
|
||||
@@ -12,7 +12,7 @@ class Particles:
|
||||
|
||||
def _run_snowfall(self, preset, colors, flakes, last):
|
||||
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
|
||||
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
|
||||
speed = max(1, abs(self.driver.signed(preset, int(preset.n2) if int(preset.n2) > 0 else 1)))
|
||||
d = max(1, int(preset.d))
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last) < d:
|
||||
@@ -25,8 +25,10 @@ class Particles:
|
||||
nf = []
|
||||
for pos, ci in flakes:
|
||||
if 0 <= pos < self.driver.num_leds:
|
||||
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b)
|
||||
pos -= speed
|
||||
self.driver.n[self.driver.led_i(preset, pos)] = self.driver.apply_brightness(
|
||||
colors[ci], preset.b
|
||||
)
|
||||
pos -= self.driver.signed(preset, speed)
|
||||
if pos >= -1:
|
||||
nf.append([pos, ci])
|
||||
self.driver.n.write()
|
||||
@@ -34,7 +36,7 @@ class Particles:
|
||||
|
||||
def _run_starfall(self, preset, colors, stars, last):
|
||||
rate = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 14))
|
||||
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
|
||||
speed = max(1, abs(self.driver.signed(preset, int(preset.n2) if int(preset.n2) > 0 else 2)))
|
||||
tail = max(2, int(preset.n3) if int(preset.n3) > 0 else 10)
|
||||
max_stars = 4
|
||||
d = max(1, int(preset.d))
|
||||
@@ -65,13 +67,14 @@ class Particles:
|
||||
(base[2] * fade) // 255,
|
||||
)
|
||||
lit = self.driver.apply_brightness(lit, preset.b)
|
||||
o = self.driver.n[idx]
|
||||
self.driver.n[idx] = (
|
||||
pix = self.driver.led_i(preset, idx)
|
||||
o = self.driver.n[pix]
|
||||
self.driver.n[pix] = (
|
||||
max(o[0], lit[0]),
|
||||
max(o[1], lit[1]),
|
||||
max(o[2], lit[2]),
|
||||
)
|
||||
h -= speed
|
||||
h -= self.driver.signed(preset, speed)
|
||||
if h >= -tail:
|
||||
s["h"] = h
|
||||
ns.append(s)
|
||||
|
||||
19
src/patterns/pattern_direction.py
Normal file
19
src/patterns/pattern_direction.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Strip install direction: n5 bit 0 reverses along-strip motion (upside-down wiring)."""
|
||||
|
||||
|
||||
def is_reversed(preset):
|
||||
return bool(int(getattr(preset, "n5", 0) or 0) & 1)
|
||||
|
||||
|
||||
def led_i(driver, preset, logical_index):
|
||||
"""Map a logical strip index (0 = pattern start) to a physical pixel index."""
|
||||
n = int(driver.num_leds)
|
||||
i = int(logical_index)
|
||||
if 0 <= i < n and is_reversed(preset):
|
||||
return n - 1 - i
|
||||
return i
|
||||
|
||||
|
||||
def signed(preset, value):
|
||||
v = int(value)
|
||||
return -v if is_reversed(preset) else v
|
||||
@@ -13,13 +13,8 @@ class Pulse:
|
||||
bg_base = preset.background_or(colors)
|
||||
self.driver.fill(self.driver.apply_brightness(bg_base, preset.b))
|
||||
|
||||
manual = not preset.a
|
||||
color_index = self.driver.step % max(1, len(colors))
|
||||
if not preset.a:
|
||||
# Manual / beat trigger: each select restarts this generator and resets
|
||||
# cycle_start below. Advancing step here makes each beat the next colour
|
||||
# without requiring a full wall-clock cycle between beats.
|
||||
nclr = max(1, len(colors))
|
||||
self.driver.step = (color_index + 1) % nclr
|
||||
cycle_start = utime.ticks_ms()
|
||||
|
||||
# State machine based pulse using a single generator loop
|
||||
@@ -29,7 +24,7 @@ class Pulse:
|
||||
attack_ms = max(0, int(preset.n1)) # Attack time in ms
|
||||
hold_ms = max(0, int(preset.n2)) # Hold time in ms
|
||||
decay_ms = max(0, int(preset.n3)) # Decay time in ms
|
||||
delay_ms = max(0, int(preset.d))
|
||||
delay_ms = 0 if manual else max(0, int(preset.d))
|
||||
|
||||
total_ms = attack_ms + hold_ms + decay_ms + delay_ms
|
||||
if total_ms <= 0:
|
||||
@@ -58,12 +53,13 @@ class Pulse:
|
||||
# Delay phase: LEDs off between pulses
|
||||
self.driver.fill(bg_color)
|
||||
else:
|
||||
# End of cycle: auto advances colour and loops; manual already
|
||||
# advanced step at run start for the next beat.
|
||||
if not preset.a:
|
||||
break
|
||||
color_index = (color_index + 1) % max(1, len(colors))
|
||||
# End of cycle: advance colour for the next run, then loop or stop.
|
||||
nclr = max(1, len(colors))
|
||||
color_index = (color_index + 1) % nclr
|
||||
self.driver.step = color_index
|
||||
if manual:
|
||||
self.driver.fill(bg_color)
|
||||
break
|
||||
cycle_start = now
|
||||
yield
|
||||
continue
|
||||
|
||||
@@ -32,6 +32,18 @@ class Preset:
|
||||
int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"}
|
||||
allowed_fields = {"p", "c", "d", "b", "a", "bg", "n1", "n2", "n3", "n4", "n5", "n6"}
|
||||
for key, value in data.items():
|
||||
if key == "reverse":
|
||||
try:
|
||||
if isinstance(value, bool):
|
||||
self.n5 = 1 if value else 0
|
||||
elif isinstance(value, (int, float)):
|
||||
self.n5 = 1 if int(value) else 0
|
||||
elif isinstance(value, str):
|
||||
lowered = value.lower()
|
||||
self.n5 = 1 if lowered in ("true", "1", "yes", "on") else 0
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
continue
|
||||
key = aliases.get(key, key)
|
||||
if key not in allowed_fields:
|
||||
continue
|
||||
|
||||
@@ -4,6 +4,7 @@ from preset import Preset
|
||||
from utils import convert_and_reorder_colors
|
||||
import json
|
||||
import sys
|
||||
import utime
|
||||
try:
|
||||
import uos as os
|
||||
except ImportError:
|
||||
@@ -31,6 +32,7 @@ class Presets:
|
||||
self.patterns = {
|
||||
"off": self.off,
|
||||
"on": self.on,
|
||||
"blink": self.blink,
|
||||
}
|
||||
self.patterns.update(self._load_dynamic_patterns())
|
||||
|
||||
@@ -193,20 +195,23 @@ class Presets:
|
||||
if preset_name in self.presets:
|
||||
preset = self.presets[preset_name]
|
||||
if preset.p in self.patterns:
|
||||
# Manual single-shot patterns: if this select arrives before the main loop has
|
||||
# tick()'d the previous frame, completing it first keeps step in sync with beats.
|
||||
if preset.p == "off":
|
||||
self.generator = None
|
||||
self.step = 0
|
||||
self.fill((0, 0, 0))
|
||||
self.selected = preset_name
|
||||
return True
|
||||
# If re-selecting the same preset before the main loop has tick()'d the
|
||||
# previous frame, run one pending tick so step stays in sync.
|
||||
if (
|
||||
preset_name == self.selected
|
||||
and not preset.a
|
||||
and preset.p == "chase"
|
||||
and self.generator is not None
|
||||
):
|
||||
while self.generator is not None:
|
||||
self.tick()
|
||||
self.tick()
|
||||
# Set step value if explicitly provided
|
||||
if step is not None:
|
||||
self.step = step
|
||||
elif preset.p == "off" or self.selected != preset_name:
|
||||
elif self.selected != preset_name:
|
||||
self.step = 0
|
||||
self.generator = self.patterns[preset.p](preset)
|
||||
self.selected = preset_name # Store the preset name, not the object
|
||||
@@ -222,6 +227,21 @@ class Presets:
|
||||
self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
|
||||
self.num_leds = num_leds
|
||||
|
||||
def is_reversed(self, preset):
|
||||
from patterns.pattern_direction import is_reversed as _is_reversed
|
||||
|
||||
return _is_reversed(preset)
|
||||
|
||||
def led_i(self, preset, logical_index):
|
||||
from patterns.pattern_direction import led_i as _led_i
|
||||
|
||||
return _led_i(self, preset, logical_index)
|
||||
|
||||
def signed(self, preset, value):
|
||||
from patterns.pattern_direction import signed as _signed
|
||||
|
||||
return _signed(preset, value)
|
||||
|
||||
def apply_brightness(self, color, brightness_override=None):
|
||||
# Combine per-preset brightness (override) with global brightness self.b
|
||||
local = brightness_override if brightness_override is not None else 255
|
||||
@@ -241,4 +261,44 @@ class Presets:
|
||||
def on(self, preset):
|
||||
colors = preset.c
|
||||
color = colors[0] if colors else (255, 255, 255)
|
||||
self.fill(self.apply_brightness(color, preset.b))
|
||||
lit = self.apply_brightness(color, preset.b)
|
||||
while True:
|
||||
self.fill(lit)
|
||||
yield
|
||||
|
||||
def blink(self, preset):
|
||||
"""Built-in blink (used by controller identify); no patterns/ deploy required."""
|
||||
colors = preset.c if preset.c else [(255, 255, 255)]
|
||||
bg_color = self.apply_brightness(preset.background_or(colors), preset.b)
|
||||
color_index = 0
|
||||
delay_ms = max(1, int(preset.d))
|
||||
period = delay_ms * 2
|
||||
now = utime.ticks_ms()
|
||||
# Phase-lock to wall time so group identify (broadcast select) stays in sync even
|
||||
# when devices process the packet on different main-loop iterations.
|
||||
phase = now % period if period else 0
|
||||
state = phase < delay_ms
|
||||
last_update = utime.ticks_add(now, -phase)
|
||||
if state:
|
||||
base = colors[color_index % len(colors)]
|
||||
self.fill(self.apply_brightness(base, preset.b))
|
||||
color_index += 1
|
||||
else:
|
||||
self.fill(bg_color)
|
||||
while True:
|
||||
now = utime.ticks_ms()
|
||||
if utime.ticks_diff(now, last_update) >= delay_ms:
|
||||
if state:
|
||||
base = colors[color_index % len(colors)]
|
||||
self.fill(self.apply_brightness(base, preset.b))
|
||||
color_index += 1
|
||||
else:
|
||||
self.fill(bg_color)
|
||||
state = not state
|
||||
last_update = utime.ticks_add(last_update, delay_ms)
|
||||
yield
|
||||
|
||||
|
||||
def run_tick(presets):
|
||||
"""Advance one animation frame (standalone tests / mpremote demos)."""
|
||||
presets.tick()
|
||||
|
||||
@@ -3,6 +3,8 @@ import ubinascii
|
||||
import machine
|
||||
import network
|
||||
|
||||
WIFI_CHANNEL_DEFAULT = 5
|
||||
|
||||
class Settings(dict):
|
||||
SETTINGS_FILE = "/settings.json"
|
||||
|
||||
@@ -14,9 +16,9 @@ class Settings(dict):
|
||||
def set_defaults(self):
|
||||
|
||||
self["led_pin"] = 10
|
||||
self["num_leds"] = 119
|
||||
self["num_leds"] = 200
|
||||
|
||||
self["color_order"] = "rgb"
|
||||
self["color_order"] = "grb"
|
||||
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
@@ -31,11 +33,10 @@ class Settings(dict):
|
||||
# Power-on: "default" | "last" | "off"
|
||||
self["startup_mode"] = "default"
|
||||
self["brightness"] = 32
|
||||
self["transport_type"] = "espnow"
|
||||
self["wifi_channel"] = 1
|
||||
# ESP-NOW transport (requires espnow firmware; uses wifi_channel).
|
||||
self["ssid"] = ""
|
||||
self["password"] = ""
|
||||
|
||||
self["wifi_channel"] = WIFI_CHANNEL_DEFAULT
|
||||
self["groups"] = []
|
||||
|
||||
|
||||
def save(self):
|
||||
try:
|
||||
|
||||
36
src/v1_wire.py
Normal file
36
src/v1_wire.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Expand short v1 wire keys to long names (MicroPython)."""
|
||||
|
||||
|
||||
K_PRESETS = "p"
|
||||
K_SELECT = "s"
|
||||
K_GROUPS = "g"
|
||||
K_SET_GROUPS = "sg"
|
||||
K_SAVE = "sv"
|
||||
K_DEFAULT = "df"
|
||||
K_DEVICE_CONFIG = "dc"
|
||||
K_CLEAR_PRESETS = "cp"
|
||||
K_MANIFEST = "mf"
|
||||
|
||||
_SHORT_TO_LONG = {
|
||||
K_PRESETS: "presets",
|
||||
K_SELECT: "select",
|
||||
K_GROUPS: "groups",
|
||||
K_SET_GROUPS: "set_groups",
|
||||
K_SAVE: "save",
|
||||
K_DEFAULT: "default",
|
||||
K_DEVICE_CONFIG: "device_config",
|
||||
K_CLEAR_PRESETS: "clear_presets",
|
||||
K_MANIFEST: "manifest",
|
||||
}
|
||||
|
||||
|
||||
def expand_v1(data):
|
||||
if not isinstance(data, dict):
|
||||
return data
|
||||
out = dict(data)
|
||||
for short_key, long_key in _SHORT_TO_LONG.items():
|
||||
if short_key in data and long_key not in out:
|
||||
out[long_key] = data[short_key]
|
||||
if short_key in out:
|
||||
del out[short_key]
|
||||
return out
|
||||
169
tests/bridge_ws_blink.py
Normal file
169
tests/bridge_ws_blink.py
Normal file
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Send blink preset + select to a driver via the ESP-NOW bridge WebSocket.
|
||||
|
||||
Pairs with the on-device demo ``tests/patterns/blink.py``: same preset slot,
|
||||
pattern, and colours; this script reaches the driver over ESP-NOW through
|
||||
``espnow-sender`` (devices envelope, not legacy broadcast JSON).
|
||||
|
||||
Run from the **led-controller** repo (needs ``websockets`` in Pipenv)::
|
||||
|
||||
pipenv run python led-driver/tests/bridge_ws_blink.py
|
||||
|
||||
pipenv run python led-driver/tests/bridge_ws_blink.py \\
|
||||
--url ws://192.168.4.1/ws --mac 18:8b:0e:15:60:a8
|
||||
|
||||
From **led-driver** (if Pipenv/env is the parent project)::
|
||||
|
||||
pipenv run python tests/bridge_ws_blink.py --dry-run
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
# led-driver/tests -> led-driver -> led-controller
|
||||
LED_DRIVER_ROOT = Path(__file__).resolve().parents[1]
|
||||
PROJECT_ROOT = LED_DRIVER_ROOT.parent
|
||||
|
||||
|
||||
def _load_bridge_url(explicit: Optional[str]) -> str:
|
||||
if explicit and explicit.strip():
|
||||
return explicit.strip()
|
||||
for path in (PROJECT_ROOT / "settings.json", LED_DRIVER_ROOT / "settings.json"):
|
||||
if not path.is_file():
|
||||
continue
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
url = str(data.get("bridge_ws_url") or "").strip()
|
||||
if url:
|
||||
return url
|
||||
except (OSError, json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
return "ws://192.168.4.1/ws"
|
||||
|
||||
|
||||
def _format_mac(mac: str) -> str:
|
||||
s = re.sub(r"[^0-9a-fA-F]", "", str(mac or "").strip().lower())
|
||||
if len(s) != 12 or not re.fullmatch(r"[0-9a-f]{12}", s):
|
||||
raise ValueError("MAC must be 12 hex digits (e.g. 188b0e1560a8)")
|
||||
return ":".join(s[i : i + 2] for i in range(0, 12, 2))
|
||||
|
||||
|
||||
def build_blink_envelope(
|
||||
mac: str,
|
||||
*,
|
||||
preset_id: str = "2",
|
||||
delay_ms: int = 200,
|
||||
brightness: int = 64,
|
||||
) -> Dict[str, Any]:
|
||||
"""v1 devices envelope: preset body + list select (same shape as the Pi)."""
|
||||
body = {
|
||||
"p": {
|
||||
preset_id: {
|
||||
"p": "blink",
|
||||
"b": max(0, min(255, int(brightness))),
|
||||
"d": max(1, int(delay_ms)),
|
||||
"c": ["#FF0000", "#0000FF"],
|
||||
"a": True,
|
||||
}
|
||||
},
|
||||
"s": [str(preset_id)],
|
||||
}
|
||||
return {"v": "1", "dv": {_format_mac(mac): body}}
|
||||
|
||||
|
||||
async def _send(url: str, envelope: Dict[str, Any], hold_s: float) -> None:
|
||||
import websockets
|
||||
|
||||
packet = json.dumps(envelope, separators=(",", ":")).encode("utf-8")
|
||||
print(f"connecting to {url}")
|
||||
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
|
||||
print(f"connected, sending {len(packet)} B")
|
||||
print(packet.decode("utf-8"))
|
||||
await ws.send(packet)
|
||||
if hold_s > 0:
|
||||
print(f"holding connection {hold_s}s …")
|
||||
await asyncio.sleep(hold_s)
|
||||
print("done")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Send blink preset+select to one driver via bridge WebSocket.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--url",
|
||||
default=None,
|
||||
help="Bridge WebSocket URL (default: settings.json bridge_ws_url or ws://192.168.4.1/ws)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mac",
|
||||
default="188b0e1560a8",
|
||||
help="Driver MAC (12 hex, colons optional). Default: registry example id.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--preset-id",
|
||||
default="2",
|
||||
help="Wire preset slot id (default: 2, matches zone push)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--delay-ms",
|
||||
type=int,
|
||||
default=200,
|
||||
help="Blink delay in ms (default: 200)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--brightness",
|
||||
type=int,
|
||||
default=64,
|
||||
help="Preset brightness 0–255 (default: 64)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--hold",
|
||||
type=float,
|
||||
default=2.0,
|
||||
help="Seconds to keep WebSocket open after send (default: 2)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print envelope only; do not connect",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
url = _load_bridge_url(args.url)
|
||||
try:
|
||||
envelope = build_blink_envelope(
|
||||
args.mac,
|
||||
preset_id=args.preset_id,
|
||||
delay_ms=args.delay_ms,
|
||||
brightness=args.brightness,
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"error: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
print(f"url={url!r} mac={_format_mac(args.mac)!r}")
|
||||
if args.dry_run:
|
||||
print(json.dumps(envelope, indent=2))
|
||||
return 0
|
||||
|
||||
try:
|
||||
asyncio.run(_send(url, envelope, args.hold))
|
||||
except KeyboardInterrupt:
|
||||
print("interrupted")
|
||||
return 130
|
||||
except Exception as e:
|
||||
print(f"failed: {e!r}", file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
116
tests/device_test_radio_diag.py
Normal file
116
tests/device_test_radio_diag.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Device-side radio diagnostic test (MicroPython).
|
||||
|
||||
Checks:
|
||||
1) STA/AP bring-up on channel 5
|
||||
2) ESP-NOW init and broadcast peer add
|
||||
3) Broadcast TX test packet send
|
||||
4) RX wait window to see any incoming ESP-NOW frames
|
||||
"""
|
||||
|
||||
import espnow
|
||||
import machine
|
||||
import network
|
||||
import time
|
||||
import ubinascii
|
||||
|
||||
|
||||
CHANNEL = 5
|
||||
RX_WINDOW_MS = 3000
|
||||
WDT_TIMEOUT_MS = 10000
|
||||
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
|
||||
TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00"
|
||||
|
||||
|
||||
def _mac_hex(mac_bytes):
|
||||
try:
|
||||
return ubinascii.hexlify(mac_bytes).decode()
|
||||
except Exception:
|
||||
return "?"
|
||||
|
||||
|
||||
def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS):
|
||||
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
|
||||
wdt.feed()
|
||||
|
||||
print("diag start")
|
||||
print("cpu freq", machine.freq())
|
||||
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
|
||||
# Clean start
|
||||
try:
|
||||
sta.active(False)
|
||||
ap.active(False)
|
||||
time.sleep_ms(100)
|
||||
except Exception as e:
|
||||
print("wifi reset failed", repr(e))
|
||||
|
||||
# STA setup
|
||||
try:
|
||||
sta.active(True)
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
sta.config(channel=channel)
|
||||
print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac")))
|
||||
except Exception as e:
|
||||
print("sta setup failed", repr(e))
|
||||
|
||||
# AP setup
|
||||
try:
|
||||
ap.active(True)
|
||||
try:
|
||||
ap.config(essid="diag-ap", channel=channel, hidden=True)
|
||||
except TypeError:
|
||||
ap.config(essid="diag-ap", channel=channel)
|
||||
print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac")))
|
||||
except Exception as e:
|
||||
print("ap setup failed", repr(e))
|
||||
|
||||
wdt.feed()
|
||||
|
||||
# ESP-NOW setup
|
||||
try:
|
||||
e = espnow.ESPNow()
|
||||
e.active(True)
|
||||
print("espnow active ok")
|
||||
except Exception as e_err:
|
||||
print("espnow init failed", repr(e_err))
|
||||
return
|
||||
|
||||
# Add broadcast peer
|
||||
try:
|
||||
e.add_peer(BROADCAST_MAC, channel=channel)
|
||||
print("add bcast peer ok")
|
||||
except TypeError:
|
||||
try:
|
||||
e.add_peer(BROADCAST_MAC)
|
||||
print("add bcast peer ok (no channel arg)")
|
||||
except Exception as e_err:
|
||||
print("add bcast peer failed", repr(e_err))
|
||||
except Exception as e_err:
|
||||
print("add bcast peer failed", repr(e_err))
|
||||
|
||||
# TX test
|
||||
try:
|
||||
ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True)
|
||||
print("tx bcast", ok, "len", len(TEST_PAYLOAD))
|
||||
except Exception as e_err:
|
||||
print("tx bcast failed", repr(e_err))
|
||||
|
||||
# RX window
|
||||
print("rx window ms", rx_window_ms)
|
||||
t_end = time.ticks_add(time.ticks_ms(), rx_window_ms)
|
||||
rx_count = 0
|
||||
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
|
||||
wdt.feed()
|
||||
host, msg = e.recv(100)
|
||||
if host:
|
||||
rx_count += 1
|
||||
print("rx", rx_count, _mac_hex(host), "len", len(msg))
|
||||
time.sleep_ms(5)
|
||||
|
||||
print("diag done rx_count", rx_count)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_diag()
|
||||
40
tests/device_test_recv_ch5.py
Normal file
40
tests/device_test_recv_ch5.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Device test: receive ESP-NOW packets on channel 5 (MicroPython)."""
|
||||
|
||||
import espnow
|
||||
import machine
|
||||
import network
|
||||
import ubinascii
|
||||
import time
|
||||
|
||||
|
||||
CHANNEL = 5
|
||||
TIMEOUT_MS = 1000
|
||||
WDT_TIMEOUT_MS = 10000
|
||||
|
||||
|
||||
def _set_channel(channel):
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
sta.config(channel=channel)
|
||||
|
||||
|
||||
def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS):
|
||||
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
|
||||
_set_channel(channel)
|
||||
e = espnow.ESPNow()
|
||||
e.active(True)
|
||||
print("recv ready ch", channel)
|
||||
while True:
|
||||
wdt.feed()
|
||||
host, msg = e.recv(timeout_ms)
|
||||
if host:
|
||||
mac_hex = ubinascii.hexlify(host).decode()
|
||||
print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode())
|
||||
else:
|
||||
print("rx timeout")
|
||||
time.sleep_ms(10)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
recv_loop()
|
||||
47
tests/device_test_send_ch5.py
Normal file
47
tests/device_test_send_ch5.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Device test: send one ESP-NOW packet on channel 5 (MicroPython)."""
|
||||
|
||||
import espnow
|
||||
import machine
|
||||
import network
|
||||
import ubinascii
|
||||
|
||||
|
||||
CHANNEL = 5
|
||||
DEST_HEX = "ffffffffffff"
|
||||
PAYLOAD_HEX = "4c0501000000"
|
||||
WDT_TIMEOUT_MS = 10000
|
||||
|
||||
|
||||
def _set_channel(channel):
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
sta.active(True)
|
||||
sta.config(pm=network.WLAN.PM_NONE)
|
||||
sta.config(channel=channel)
|
||||
|
||||
|
||||
def _add_peer(esp, dest, channel):
|
||||
try:
|
||||
esp.add_peer(dest, channel=channel)
|
||||
except TypeError:
|
||||
esp.add_peer(dest)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL):
|
||||
wdt = machine.WDT(timeout=WDT_TIMEOUT_MS)
|
||||
wdt.feed()
|
||||
dest = ubinascii.unhexlify(dest_hex)
|
||||
pkt = ubinascii.unhexlify(payload_hex)
|
||||
_set_channel(channel)
|
||||
e = espnow.ESPNow()
|
||||
e.active(True)
|
||||
_add_peer(e, dest, channel)
|
||||
wdt.feed()
|
||||
ok = e.send(dest, pkt, True)
|
||||
print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt))
|
||||
return ok
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
send_once()
|
||||
@@ -1,35 +1,74 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Standalone blink pattern demo (WDT-fed tick loop).
|
||||
|
||||
Run on device::
|
||||
|
||||
mpremote connect <port> run tests/patterns/blink.py
|
||||
"""
|
||||
import sys
|
||||
import utime
|
||||
from machine import WDT
|
||||
from settings import Settings
|
||||
from presets import Presets, run_tick
|
||||
|
||||
|
||||
def _bootstrap_import_path():
|
||||
"""Find ``settings`` / ``presets`` on device or when run via mpremote."""
|
||||
try:
|
||||
import uos as os
|
||||
except ImportError:
|
||||
import os
|
||||
|
||||
candidates = []
|
||||
try:
|
||||
here = __file__.rsplit("/", 1)[0]
|
||||
if here:
|
||||
candidates.append(here)
|
||||
tests = here.rsplit("/", 1)[0]
|
||||
if tests:
|
||||
candidates.append(tests)
|
||||
root = tests.rsplit("/", 1)[0]
|
||||
if root:
|
||||
candidates.append(root)
|
||||
candidates.append(root + "/src")
|
||||
except NameError:
|
||||
pass
|
||||
for p in (".", "..", "/", "src", "/src"):
|
||||
candidates.append(p)
|
||||
for p in candidates:
|
||||
if p and p not in sys.path:
|
||||
sys.path.insert(0, p)
|
||||
|
||||
|
||||
_bootstrap_import_path()
|
||||
|
||||
from machine import WDT # noqa: E402
|
||||
from settings import Settings # noqa: E402
|
||||
from presets import Presets # noqa: E402
|
||||
|
||||
|
||||
def _run_ms(presets, wdt, duration_ms, sleep_ms=10):
|
||||
start = utime.ticks_ms()
|
||||
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
|
||||
wdt.feed()
|
||||
presets.tick()
|
||||
utime.sleep_ms(sleep_ms)
|
||||
|
||||
|
||||
def main():
|
||||
s = Settings()
|
||||
pin = s.get("led_pin", 10)
|
||||
num = s.get("num_leds", 30)
|
||||
|
||||
p = Presets(pin=pin, num_leds=num)
|
||||
settings = Settings()
|
||||
presets = Presets(settings.get("led_pin", 10), settings.get("num_leds", 30))
|
||||
wdt = WDT(timeout=10000)
|
||||
|
||||
# Create blink preset (use short-key fields: p=pattern, b=brightness, d=delay, c=colors)
|
||||
p.edit("test_blink", {
|
||||
"p": "blink",
|
||||
"b": 64,
|
||||
"d": 200,
|
||||
"c": [(255, 0, 0), (0, 0, 255)],
|
||||
})
|
||||
p.select("test_blink")
|
||||
|
||||
start = utime.ticks_ms()
|
||||
while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
|
||||
wdt.feed()
|
||||
run_tick(p)
|
||||
utime.sleep_ms(10)
|
||||
presets.edit(
|
||||
"test_blink",
|
||||
{
|
||||
"p": "blink",
|
||||
"b": 64,
|
||||
"d": 200,
|
||||
"c": [(255, 0, 0), (0, 0, 255)],
|
||||
},
|
||||
)
|
||||
presets.select("test_blink")
|
||||
_run_ms(presets, wdt, 1500)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user