feat(espnow): add wire transport and simplify broadcast main
Binary espnow_wire/espnow_transport modules plus a minimal main that broadcasts a JSON hello and polls ESP-NOW while running presets. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
110
src/espnow_wire.py
Normal file
110
src/espnow_wire.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""ESP-NOW wire format (MicroPython). See docs/espnow-binary-protocol.md in led-controller."""
|
||||
|
||||
import struct
|
||||
|
||||
WIRE_MAGIC = 0x4C
|
||||
MAX_ESPNOW_PAYLOAD = 250
|
||||
|
||||
MSG_ANNOUNCE = 0x01
|
||||
MSG_GROUPS = 0x02
|
||||
MSG_CMD = 0x03
|
||||
MSG_GROUP_CMD = 0x04
|
||||
|
||||
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
|
||||
|
||||
COLOR_ORDER_TO_ENUM = {
|
||||
"rgb": 0,
|
||||
"rbg": 1,
|
||||
"grb": 2,
|
||||
"gbr": 3,
|
||||
"brg": 4,
|
||||
"bgr": 5,
|
||||
}
|
||||
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
|
||||
|
||||
|
||||
def _pack_header(msg_type, body):
|
||||
pkt = bytes([WIRE_MAGIC, msg_type]) + body
|
||||
if len(pkt) > MAX_ESPNOW_PAYLOAD:
|
||||
raise ValueError("packet too large")
|
||||
return pkt
|
||||
|
||||
|
||||
def pack_announce(
|
||||
name,
|
||||
num_leds,
|
||||
color_order="rgb",
|
||||
startup_mode="default",
|
||||
brightness=32,
|
||||
device_type=0,
|
||||
):
|
||||
name_b = name.encode("utf-8")
|
||||
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
|
||||
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
|
||||
body = (
|
||||
bytes([len(name_b)])
|
||||
+ name_b
|
||||
+ struct.pack("<H", int(num_leds))
|
||||
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
|
||||
)
|
||||
return _pack_header(MSG_ANNOUNCE, body)
|
||||
|
||||
|
||||
def parse_groups(payload):
|
||||
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
|
||||
if payload[1] != MSG_GROUPS:
|
||||
return None
|
||||
body = payload[2:]
|
||||
else:
|
||||
body = payload
|
||||
if not body:
|
||||
return []
|
||||
off = 0
|
||||
count = body[off]
|
||||
off += 1
|
||||
out = []
|
||||
for _ in range(count):
|
||||
gl = body[off]
|
||||
off += 1
|
||||
out.append(body[off : off + gl].decode("utf-8"))
|
||||
off += gl
|
||||
return out
|
||||
|
||||
|
||||
def parse_group_cmd(payload):
|
||||
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
|
||||
return None
|
||||
body = payload[2:]
|
||||
gl = body[0]
|
||||
gid = body[1 : 1 + gl].decode("utf-8")
|
||||
env = body[1 + gl :]
|
||||
return gid, env
|
||||
|
||||
|
||||
HEADER_LEN = 5
|
||||
|
||||
|
||||
def _envelope_size(env):
|
||||
if len(env) < HEADER_LEN:
|
||||
return len(env)
|
||||
lp, ls, ld = env[2], env[3], env[4]
|
||||
return HEADER_LEN + lp + ls + ld
|
||||
|
||||
|
||||
def cmd_envelope(payload):
|
||||
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
|
||||
return None, False
|
||||
env = payload[2:]
|
||||
if not env:
|
||||
return None, False
|
||||
need = _envelope_size(env)
|
||||
if need > len(env):
|
||||
return None, False
|
||||
save = len(env) > need and env[need] == 1
|
||||
return env[:need], save
|
||||
|
||||
|
||||
def wire_msg_type(payload):
|
||||
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
|
||||
return payload[1]
|
||||
return None
|
||||
Reference in New Issue
Block a user