2 Commits

Author SHA1 Message Date
1fdb2c9441 fix(espnow): handle binary and JSON RX in simplified main
Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON
v1 payloads to process_data from the poll loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:45:13 +12:00
3e718f7432 feat(espnow): add wire transport and simplify broadcast main
Binary espnow_wire/espnow_transport modules plus a minimal main that
broadcasts a JSON hello and polls ESP-NOW while running presets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 22:44:39 +12:00
7 changed files with 291 additions and 182 deletions

View File

@@ -1,11 +1,7 @@
import asyncio import asyncio
import utime import utime
from hello import broadcast_hello_udp
from mem_stats import print_mem from mem_stats import print_mem
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt): async def presets_loop(presets, wdt):
@@ -18,41 +14,4 @@ async def presets_loop(presets, wdt):
if utime.ticks_diff(now, last_mem_log) >= 5000: if utime.ticks_diff(now, last_mem_log) >= 5000:
print_mem("runtime") print_mem("runtime")
last_mem_log = now last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1)
started_ms = utime.ticks_ms()
while True:
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s)

View File

@@ -40,8 +40,8 @@ def _log_rx(payload) -> None:
print("rx (logging failed)") print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None, save=False):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" """Read one controller message; binary v2 envelope or JSON v1, then apply fields."""
_log_rx(payload) _log_rx(payload)
data = None data = None
if isinstance(payload, (bytes, bytearray)): if isinstance(payload, (bytes, bytearray)):
@@ -58,6 +58,8 @@ def process_data(payload, settings, presets, controller_ip=None):
return return
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
if save:
data["save"] = True
if "device_config" in data: if "device_config" in data:
apply_device_config(data, settings, presets) apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:

16
src/device_groups.py Normal file
View File

@@ -0,0 +1,16 @@
"""In-memory group membership for GROUP_CMD filtering."""
_groups = []
def groups_replace(group_ids):
global _groups
_groups = [str(g) for g in group_ids]
def in_group(group_id):
return str(group_id) in _groups
def list_groups():
return list(_groups)

119
src/espnow_transport.py Normal file
View File

@@ -0,0 +1,119 @@
"""ESP-NOW receive loop and boot announce."""
import asyncio
import espnow
import network
import device_groups as dg
from espnow_wire import (
BROADCAST_MAC,
MSG_ANNOUNCE,
MSG_CMD,
MSG_GROUP_CMD,
MSG_GROUPS,
cmd_envelope,
pack_announce,
parse_group_cmd,
parse_groups,
wire_msg_type,
)
from controller_messages import process_data
_esp = None
_groups_received = False
def init_espnow(settings):
global _esp
ch = 6
try:
ch = int(settings.get("wifi_channel", 6))
except (TypeError, ValueError):
pass
ch = max(1, min(11, ch))
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=ch)
_esp = espnow.ESPNow()
_esp.active(True)
try:
_esp.add_peer(BROADCAST_MAC)
except Exception:
pass
return _esp
def send_boot_announce(settings):
if _esp is None:
return
pkt = pack_announce(
settings.get("name", "led"),
settings.get("num_leds", 1),
color_order=settings.get("color_order", "rgb"),
startup_mode=settings.get("startup_mode", "default"),
brightness=settings.get("brightness", 32),
)
try:
_esp.send(BROADCAST_MAC, pkt)
print("espnow announce", len(pkt), "B")
except Exception as e:
print("espnow announce failed:", e)
def _handle_packet(pkt, settings, presets):
global _groups_received
mt = wire_msg_type(pkt)
if mt == MSG_GROUPS:
ids = parse_groups(pkt)
if ids is not None:
dg.groups_replace(ids)
_groups_received = True
print("groups", ids)
return
if mt == MSG_GROUP_CMD:
parsed = parse_group_cmd(pkt)
if parsed is None:
return
gid, env = parsed
if not dg.in_group(gid):
return
from espnow_wire import _envelope_size
need = _envelope_size(env)
save = len(env) > need and env[need] == 1
body = env[:need] if save else env
if body:
process_data(body, settings, presets, save=save)
return
if mt == MSG_CMD:
env, save = cmd_envelope(pkt)
if env:
process_data(env, settings, presets, save=save)
return
if mt == MSG_ANNOUNCE:
return
async def espnow_receive_loop(settings, presets, wdt=None):
global _groups_received
while True:
if _esp is None:
await asyncio.sleep(0.1)
continue
host, msg = _esp.recv(0)
if not host:
if not _groups_received:
await asyncio.sleep(5)
send_boot_announce(settings)
else:
await asyncio.sleep(0.02)
if wdt:
wdt.feed()
continue
try:
_handle_packet(msg, settings, presets)
except Exception as e:
print("espnow rx error:", e)
if wdt:
wdt.feed()

110
src/espnow_wire.py Normal file
View File

@@ -0,0 +1,110 @@
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
import struct
WIRE_MAGIC = 0x4C
MAX_ESPNOW_PAYLOAD = 250
MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02
MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
COLOR_ORDER_TO_ENUM = {
"rgb": 0,
"rbg": 1,
"grb": 2,
"gbr": 3,
"brg": 4,
"bgr": 5,
}
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
def _pack_header(msg_type, body):
pkt = bytes([WIRE_MAGIC, msg_type]) + body
if len(pkt) > MAX_ESPNOW_PAYLOAD:
raise ValueError("packet too large")
return pkt
def pack_announce(
name,
num_leds,
color_order="rgb",
startup_mode="default",
brightness=32,
device_type=0,
):
name_b = name.encode("utf-8")
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
body = (
bytes([len(name_b)])
+ name_b
+ struct.pack("<H", int(num_leds))
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
)
return _pack_header(MSG_ANNOUNCE, body)
def parse_groups(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_GROUPS:
return None
body = payload[2:]
else:
body = payload
if not body:
return []
off = 0
count = body[off]
off += 1
out = []
for _ in range(count):
gl = body[off]
off += 1
out.append(body[off : off + gl].decode("utf-8"))
off += gl
return out
def parse_group_cmd(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
return None
body = payload[2:]
gl = body[0]
gid = body[1 : 1 + gl].decode("utf-8")
env = body[1 + gl :]
return gid, env
HEADER_LEN = 5
def _envelope_size(env):
if len(env) < HEADER_LEN:
return len(env)
lp, ls, ld = env[2], env[3], env[4]
return HEADER_LEN + lp + ls + ld
def cmd_envelope(payload):
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
return None, False
env = payload[2:]
if not env:
return None, False
need = _envelope_size(env)
if need > len(env):
return None, False
save = len(env) > need and env[need] == 1
return env[:need], save
def wire_msg_type(payload):
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return payload[1]
return None

View File

@@ -1,33 +1,23 @@
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms] import print_timestamp # noqa: F401
from settings import Settings from settings import Settings
import machine import machine
import utime
import asyncio import asyncio
import json
import gc import gc
from microdot import Microdot import json
from microdot.websocket import WebSocketError, with_websocket import network
import espnow
from presets import Presets from presets import Presets
from controller_messages import apply_startup_pattern, process_data from controller_messages import apply_startup_pattern, process_data
from runtime_state import RuntimeState from espnow_transport import _handle_packet, init_espnow
from background_tasks import presets_loop, udp_hello_loop_after_http_ready from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
from mem_stats import print_mem
from wifi_sta import boot_sta
try:
import uos as os
except ImportError:
import os
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
gc.collect() gc.collect()
sta_if = boot_sta(settings, wdt)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
@@ -37,130 +27,45 @@ gc.collect()
apply_startup_pattern(settings, presets) apply_startup_pattern(settings, presets)
esp = init_espnow(settings)
print(network.WLAN(network.STA_IF).config("channel"))
def _print_network_ips(controller_ip=None): hello = json.dumps({
"""Always log STA address and led-controller (WS client) address when known.""" "v": "1",
try: "name": settings.get("name", "led"),
led_ip = sta_if.ifconfig()[0] "type": "led",
except Exception: })
led_ip = "?" try:
ctrl = controller_ip if controller_ip else "(not connected)" esp.send(BROADCAST_MAC, hello)
print("led-driver IP:", led_ip, " led-controller IP:", ctrl) print("espnow hello", len(hello), "B")
except Exception as e:
print("espnow hello failed:", e)
_print_network_ips() def _on_espnow_message(msg):
print_mem("startup") if not msg:
return
runtime_state = RuntimeState() if msg[0] == WIRE_MAGIC:
_handle_packet(msg, settings, presets)
app = Microdot() return
if msg[0:1] == b"{":
process_data(msg, settings, presets)
def _safe_pattern_filename(name): async def main():
if not isinstance(name, str): while True:
return False presets.tick()
if not name.endswith(".py"): wdt.feed()
return False if esp.any():
if "/" in name or "\\" in name or ".." in name: host, msg = esp.recv(0)
return False if host and msg:
return True print(host, len(msg), "B")
try:
_on_espnow_message(msg)
@app.route("/ws") except Exception as e:
@with_websocket print("espnow rx error:", e)
async def ws_handler(request, ws): await asyncio.sleep(0)
runtime_state.ws_connected()
controller_ip = None
try:
client_addr = getattr(request, "client_addr", None)
if isinstance(client_addr, (tuple, list)) and client_addr:
controller_ip = client_addr[0]
elif isinstance(client_addr, str):
controller_ip = client_addr
except Exception:
controller_ip = None
_print_network_ips(controller_ip)
print_mem("ws connect")
try:
while True:
data = await ws.receive()
if not data:
break
process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload")
async def upload_pattern(request):
"""Receive one pattern file body from led-controller and reload patterns."""
raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {
"Content-Type": "application/json"
}
body = request.body
if not isinstance(body, (bytes, bytearray)) or not body:
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
try:
code = body.decode("utf-8")
except UnicodeError:
return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json"
}
if not code.strip():
return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json"
}
name = raw_name.strip()
if not name.endswith(".py"):
name += ".py"
if not _safe_pattern_filename(name) or name in ("__init__.py", "main.py"):
return json.dumps({"error": "invalid pattern filename"}), 400, {
"Content-Type": "application/json"
}
try:
os.mkdir("patterns")
except OSError:
pass
path = "patterns/" + name
try:
with open(path, "w") as f:
f.write(code)
if reload_patterns:
presets.reload_patterns()
except OSError as e:
print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json"
}
return json.dumps({
"message": "pattern uploaded",
"name": name,
"reloaded": reload_patterns,
}), 201, {"Content-Type": "application/json"}
async def main(port=80):
asyncio.create_task(presets_loop(presets, wdt))
asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main(port=80)) asyncio.run(main())

View File

@@ -31,11 +31,9 @@ class Settings(dict):
# Power-on: "default" | "last" | "off" # Power-on: "default" | "last" | "off"
self["startup_mode"] = "default" self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
# ESP-NOW transport (requires espnow firmware; uses wifi_channel).
self["ssid"] = ""
self["password"] = ""
def save(self): def save(self):
try: try: