Files
led-controller/src/util/espnow_wire.py
2026-05-28 00:38:21 +12:00

337 lines
9.5 KiB
Python

"""
ESP-NOW wire format: magic header + message types, Pi↔bridge WebSocket framing.
See docs/espnow-binary-protocol.md.
"""
from __future__ import annotations
import struct
from typing import Any, Dict, List, Optional, Tuple
from util.binary_envelope import (
BINARY_ENVELOPE_VERSION_2,
pack_binary_envelope_v2,
parse_binary_envelope_v2,
)
WIRE_MAGIC = 0x4C
MAX_ESPNOW_PAYLOAD = 250
MSG_ANNOUNCE = 0x01
MSG_GROUPS = 0x02
MSG_CMD = 0x03
MSG_GROUP_CMD = 0x04
MSG_PING_REQ = 0x05
MSG_PING_RSP = 0x06
MSG_BRIDGE_CH = 0x10
BROADCAST_MAC = bytes.fromhex("ffffffffffff")
WS_FLAG_BROADCAST = 0x01
COLOR_ORDER_TO_ENUM = {
"rgb": 0,
"rbg": 1,
"grb": 2,
"gbr": 3,
"brg": 4,
"bgr": 5,
}
ENUM_TO_COLOR_ORDER = {v: k for k, v in COLOR_ORDER_TO_ENUM.items()}
STARTUP_MODE_TO_ENUM = {"default": 0, "last": 1, "off": 2}
ENUM_TO_STARTUP_MODE = {v: k for k, v in STARTUP_MODE_TO_ENUM.items()}
def normalize_mac_bytes(mac: Any) -> Optional[bytes]:
if mac is None:
return None
if isinstance(mac, (bytes, bytearray)) and len(mac) == 6:
return bytes(mac)
s = str(mac).strip().lower().replace(":", "").replace("-", "")
if len(s) == 12 and all(c in "0123456789abcdef" for c in s):
return bytes.fromhex(s)
return None
def mac_bytes_to_hex(mac: bytes) -> str:
return mac.hex()
def _pack_header(msg_type: int, body: bytes) -> bytes:
pkt = bytes([WIRE_MAGIC, msg_type]) + body
if len(pkt) > MAX_ESPNOW_PAYLOAD:
raise ValueError(f"ESP-NOW packet {len(pkt)} exceeds {MAX_ESPNOW_PAYLOAD}")
return pkt
def pack_announce(
*,
name: str,
num_leds: int,
color_order: str = "rgb",
startup_mode: str = "default",
brightness: int = 32,
device_type: int = 0,
) -> bytes:
name_b = name.encode("utf-8")
if len(name_b) > 250:
raise ValueError("name too long")
co = COLOR_ORDER_TO_ENUM.get(str(color_order).lower(), 0)
sm = STARTUP_MODE_TO_ENUM.get(str(startup_mode).lower(), 0)
body = (
bytes([len(name_b)])
+ name_b
+ struct.pack("<H", max(0, min(65535, int(num_leds))))
+ bytes([co & 7, sm & 3, max(0, min(255, int(brightness))), device_type & 255])
)
return _pack_header(MSG_ANNOUNCE, body)
def parse_announce(payload: bytes) -> Optional[Dict[str, Any]]:
"""Parse full ESP-NOW packet or body-only after type byte."""
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_ANNOUNCE:
return None
body = payload[2:]
else:
body = payload
off = 0
if off + 1 > len(body):
return None
nl = body[off]
off += 1
if off + nl + 5 > len(body):
return None
name = body[off : off + nl].decode("utf-8")
off += nl
num_leds = struct.unpack_from("<H", body, off)[0]
off += 2
co, sm, br, dt = body[off], body[off + 1], body[off + 2], body[off + 3]
return {
"name": name,
"num_leds": num_leds,
"color_order": ENUM_TO_COLOR_ORDER.get(co, "rgb"),
"startup_mode": ENUM_TO_STARTUP_MODE.get(sm, "default"),
"brightness": br,
"device_type": "led" if dt == 0 else str(dt),
}
def pack_groups(group_ids: List[str]) -> bytes:
parts = [bytes([min(255, len(group_ids))])]
for gid in group_ids[:255]:
gb = str(gid).encode("utf-8")
if len(gb) > 250:
raise ValueError("group id too long")
parts.append(bytes([len(gb)]))
parts.append(gb)
return _pack_header(MSG_GROUPS, b"".join(parts))
def parse_groups(payload: bytes) -> Optional[List[str]]:
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_GROUPS:
return None
body = payload[2:]
else:
body = payload
if not body:
return []
off = 0
if off + 1 > len(body):
return None
count = body[off]
off += 1
out: List[str] = []
for _ in range(count):
if off + 1 > len(body):
return None
gl = body[off]
off += 1
if off + gl > len(body):
return None
out.append(body[off : off + gl].decode("utf-8"))
off += gl
return out
def cmd_envelope_size(envelope: bytes) -> int:
from util.binary_envelope import HEADER_LEN
if len(envelope) < HEADER_LEN:
return len(envelope)
lp, ls, ld = envelope[2], envelope[3], envelope[4]
return HEADER_LEN + lp + ls + ld
def pack_cmd(envelope: bytes, *, save: bool = False) -> bytes:
if envelope and envelope[0] != BINARY_ENVELOPE_VERSION_2:
raise ValueError("CMD envelope must be v2 binary")
need = cmd_envelope_size(envelope)
body = envelope[:need]
if save:
body = body + bytes([1])
if len(body) + 2 > MAX_ESPNOW_PAYLOAD:
raise ValueError("CMD envelope too large for ESP-NOW")
return _pack_header(MSG_CMD, body)
def pack_cmd_from_kwargs(*, save: bool = False, **kwargs: Any) -> bytes:
return pack_cmd(pack_binary_envelope_v2(**kwargs), save=save)
def parse_cmd(payload: bytes) -> Tuple[Optional[bytes], bool]:
"""Return (v2 envelope bytes, save_flag) inside CMD packet."""
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_CMD:
return None, False
env = payload[2:]
if not env:
return None, False
need = cmd_envelope_size(env)
if need > len(env):
return None, False
save = len(env) > need and env[need] == 1
return bytes(env[:need]), save
def parse_cmd_as_v1_dict(payload: bytes) -> Optional[Dict[str, Any]]:
env, save = parse_cmd(payload)
if env is None:
return None
data = parse_binary_envelope_v2(env)
if data is None:
return None
if save:
data["save"] = True
return data
def pack_group_cmd(group_id: str, envelope: bytes, *, save: bool = False) -> bytes:
if envelope and envelope[0] != BINARY_ENVELOPE_VERSION_2:
raise ValueError("GROUP_CMD envelope must be v2 binary")
gid_b = str(group_id).encode("utf-8")
if len(gid_b) > 250:
raise ValueError("group id too long")
need = cmd_envelope_size(envelope)
env = envelope[:need]
if save:
env = env + bytes([1])
body = bytes([len(gid_b)]) + gid_b + env
return _pack_header(MSG_GROUP_CMD, body)
def pack_group_cmd_from_kwargs(group_id: str, **kwargs: Any) -> bytes:
return pack_group_cmd(group_id, pack_binary_envelope_v2(**kwargs))
def parse_group_cmd(payload: bytes) -> Optional[Tuple[str, bytes]]:
if len(payload) < 2 or payload[0] != WIRE_MAGIC or payload[1] != MSG_GROUP_CMD:
return None
body = payload[2:]
if not body:
return None
gl = body[0]
if 1 + gl > len(body):
return None
gid = body[1 : 1 + gl].decode("utf-8")
env = body[1 + gl :]
return gid, bytes(env)
def pack_ping_req(ping_id: int) -> bytes:
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF)
return _pack_header(MSG_PING_REQ, body)
def parse_ping_req(payload: bytes) -> Optional[int]:
"""Return ping_id from a PING_REQ packet or body."""
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_PING_REQ:
return None
body = payload[2:]
else:
body = payload
if len(body) < 4:
return None
return int(struct.unpack_from("<I", body, 0)[0])
def pack_ping_rsp(ping_id: int, name: str) -> bytes:
name_b = name.encode("utf-8")
if len(name_b) > 250:
raise ValueError("name too long")
body = struct.pack("<I", int(ping_id) & 0xFFFFFFFF) + bytes([len(name_b)]) + name_b
return _pack_header(MSG_PING_RSP, body)
def parse_ping_rsp(payload: bytes) -> Optional[Dict[str, Any]]:
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
if payload[1] != MSG_PING_RSP:
return None
body = payload[2:]
else:
body = payload
if len(body) < 5:
return None
ping_id = int(struct.unpack_from("<I", body, 0)[0])
nl = body[4]
if len(body) < 5 + nl:
return None
name = body[5 : 5 + nl].decode("utf-8")
return {"ping_id": ping_id, "name": name}
def pack_bridge_channel(channel: int) -> bytes:
ch = max(1, min(11, int(channel)))
return _pack_header(MSG_BRIDGE_CH, bytes([ch]))
def parse_bridge_channel(payload: bytes) -> Optional[int]:
if len(payload) < 3 or payload[0] != WIRE_MAGIC or payload[1] != MSG_BRIDGE_CH:
return None
return int(payload[2])
def wire_msg_type(payload: bytes) -> Optional[int]:
if len(payload) >= 2 and payload[0] == WIRE_MAGIC:
return int(payload[1])
return None
def pack_ws_downlink(
espnow_packet: bytes,
*,
peer_mac: Optional[Any] = None,
broadcast: bool = False,
) -> bytes:
flags = WS_FLAG_BROADCAST if broadcast else 0
if broadcast:
peer = BROADCAST_MAC
else:
peer = normalize_mac_bytes(peer_mac)
if peer is None:
raise ValueError("peer MAC required for unicast downlink")
return bytes([flags]) + peer + espnow_packet
def pack_ws_uplink(peer_mac: bytes, espnow_packet: bytes) -> bytes:
peer = normalize_mac_bytes(peer_mac)
if peer is None:
raise ValueError("invalid peer MAC")
return bytes([0]) + peer + espnow_packet
def parse_ws_frame(frame: bytes) -> Tuple[bytes, bytes, bool]:
"""
Returns (peer_mac_6bytes, espnow_packet, is_broadcast_dest).
"""
if len(frame) < 8:
raise ValueError("WS frame too short")
flags = frame[0]
peer = frame[1:7]
pkt = frame[7:]
broadcast = bool(flags & WS_FLAG_BROADCAST) or peer == BROADCAST_MAC
return peer, pkt, broadcast