Files
led-controller/src/util/binary_envelope.py
Jimmy 3bb75d49de feat(util): add binary envelope packing and message helpers
Includes tests for v1/v2 envelope round-trips.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:56:37 +12:00

508 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Compact binary controller → led-driver messages (ESP-NOW friendly).
Header (5 bytes), same for v1 (legacy) and v2 (native binary):
0: version — 1 = legacy (JSON text blobs); 2 = native binary blobs
1: brightness — 0127 scales to device 0255; 128255 = leave unchanged
2: byte length of presets section (0255)
3: byte length of select section
4: byte length of default section
v2 presets blob (no JSON):
u8 preset_count
each preset:
u8 name_len; name utf-8
u8 pattern_len; pattern utf-8 (``p``)
u8 color_count; color_count × (u8 r, u8 g, u8 b)
u16 delay_le (``d``)
u8 preset_brightness (``b``)
u8 auto (0/1) (``a``)
i16 n1..n6 little-endian (``n1````n6``)
v2 select blob:
u8 entry_count
each:
u8 device_len; device utf-8
u8 preset_name_len; preset name utf-8
u8 has_step (0/1); optional u16 step_le
v2 default blob:
u8 default_name_len; name utf-8
u8 target_count
each: u8 len; target name utf-8
Legacy v1: sections are UTF-8 JSON text (see ``parse_binary_envelope_v1``).
Keep ``5 + lp + ls + ld`` ≤ 245 for a single ESP-NOW frame body.
"""
from __future__ import annotations
import json
import struct
from typing import Any, Dict, List, Optional, Tuple
BINARY_ENVELOPE_VERSION_1 = 1
BINARY_ENVELOPE_VERSION_2 = 2
HEADER_LEN = 5
def brightness_wire_from_0_255(value: int) -> int:
"""Map device brightness 0255 to wire 0127."""
v = max(0, min(255, int(value)))
return (v * 127 + 127) // 255
def brightness_0_255_from_wire(wire: int) -> int:
"""Map wire 0127 to device brightness 0255."""
w = max(0, min(127, int(wire)))
return min(255, (w * 255) // 127)
def _clamp_i16(x: int) -> int:
x = int(x)
return max(-32768, min(32767, x))
def _colors_to_rgb_list(colors: Any) -> List[Tuple[int, int, int]]:
out: List[Tuple[int, int, int]] = []
if not colors:
return out
for c in colors:
if isinstance(c, str):
h = c.strip().lstrip("#")
if len(h) >= 6:
out.append(
(int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16))
)
elif isinstance(c, (list, tuple)) and len(c) >= 3:
out.append((int(c[0]), int(c[1]), int(c[2])))
return out
def _pack_preset_dict(name: str, preset: Dict[str, Any]) -> bytes:
pname = name.encode("utf-8")
if len(pname) > 250:
raise ValueError("preset name too long")
pattern = str(preset.get("p") or preset.get("pattern", "off")).encode("utf-8")
if len(pattern) > 250:
raise ValueError("pattern string too long")
rgbs = _colors_to_rgb_list(preset.get("c") or preset.get("colors") or [])
if len(rgbs) > 255:
raise ValueError("too many colours")
delay = max(0, min(65535, int(preset.get("d") or preset.get("delay", 100))))
br = max(0, min(255, int(preset.get("b") or preset.get("brightness", 127))))
auto = 1 if preset.get("a", preset.get("auto", True)) else 0
parts = [
bytes([len(pname)]),
pname,
bytes([len(pattern)]),
pattern,
bytes([len(rgbs)]),
]
for r, g, b in rgbs:
parts.append(bytes([r & 255, g & 255, b & 255]))
n1 = _clamp_i16(preset.get("n1", 0))
n2 = _clamp_i16(preset.get("n2", 0))
n3 = _clamp_i16(preset.get("n3", 0))
n4 = _clamp_i16(preset.get("n4", 0))
n5 = _clamp_i16(preset.get("n5", 0))
n6 = _clamp_i16(preset.get("n6", 0))
parts.append(
struct.pack(
"<HBBhhhhhh",
delay,
br,
auto,
n1,
n2,
n3,
n4,
n5,
n6,
)
)
return b"".join(parts)
def _pack_presets_blob(presets: Dict[str, Any]) -> bytes:
items = [(k, v) for k, v in presets.items() if isinstance(v, dict)]
out = [bytes([len(items)])]
for name, pdata in items:
out.append(_pack_preset_dict(str(name), pdata))
return b"".join(out)
def _pack_select_blob(select: Dict[str, Any]) -> bytes:
out = [bytes([len(select)])]
for device, sel in select.items():
dev_b = str(device).encode("utf-8")
if len(dev_b) > 250:
raise ValueError("device name too long")
if isinstance(sel, (list, tuple)) and sel:
pn = str(sel[0]).encode("utf-8")
step = sel[1] if len(sel) > 1 else None
else:
pn = str(sel).encode("utf-8")
step = None
if len(pn) > 250:
raise ValueError("preset name too long")
if step is None:
out.append(
bytes([len(dev_b)])
+ dev_b
+ bytes([len(pn)])
+ pn
+ bytes([0])
)
else:
s = int(step)
if s < 0 or s > 65535:
raise ValueError("step out of range")
out.append(
bytes([len(dev_b)])
+ dev_b
+ bytes([len(pn)])
+ pn
+ bytes([1])
+ struct.pack("<H", s)
)
return b"".join(out)
def _pack_default_blob(default: str, targets: Optional[list]) -> bytes:
name_b = str(default).encode("utf-8")
if len(name_b) > 250:
raise ValueError("default name too long")
tlist = list(targets) if targets else []
if len(tlist) > 255:
raise ValueError("too many targets")
out = [bytes([len(name_b)]), name_b, bytes([len(tlist)])]
for t in tlist:
tb = str(t).encode("utf-8")
if len(tb) > 250:
raise ValueError("target name too long")
out.append(bytes([len(tb)]))
out.append(tb)
return b"".join(out)
def pack_binary_envelope_v2(
*,
presets: Optional[Dict[str, Any]] = None,
select: Optional[Dict[str, Any]] = None,
default: Optional[str] = None,
default_targets: Optional[list] = None,
brightness_0_255: Optional[int] = None,
) -> bytes:
"""Build a v2 envelope (native binary sections, no JSON)."""
presets_bytes = (
_pack_presets_blob(presets) if presets is not None and presets else b""
)
select_bytes = (
_pack_select_blob(select) if select is not None and select else b""
)
default_bytes = (
_pack_default_blob(default, default_targets)
if default is not None
else b""
)
lp = len(presets_bytes)
ls = len(select_bytes)
ld = len(default_bytes)
if lp > 255 or ls > 255 or ld > 255:
raise ValueError("binary envelope section exceeds 255 bytes")
br_wire = (
255
if brightness_0_255 is None
else brightness_wire_from_0_255(brightness_0_255)
)
header = bytes([BINARY_ENVELOPE_VERSION_2, br_wire, lp, ls, ld])
return header + presets_bytes + select_bytes + default_bytes
def pack_binary_envelope_v1(
*,
presets: Optional[Dict[str, Any]] = None,
select: Optional[Dict[str, Any]] = None,
default: Optional[str] = None,
default_targets: Optional[list] = None,
brightness_0_255: Optional[int] = None,
) -> bytes:
"""Legacy: JSON UTF-8 fragments (version byte 1). Prefer ``pack_binary_envelope_v2``."""
if presets is None:
presets_bytes = b""
else:
presets_bytes = json.dumps(presets, separators=(",", ":")).encode("utf-8")
if select is None:
select_bytes = b""
else:
select_bytes = json.dumps(select, separators=(",", ":")).encode("utf-8")
default_obj: Optional[Dict[str, Any]] = None
if default is not None:
default_obj = {
"default": default,
"targets": list(default_targets) if default_targets else [],
}
default_bytes = (
json.dumps(default_obj, separators=(",", ":")).encode("utf-8")
if default_obj is not None
else b""
)
lp = len(presets_bytes)
ls = len(select_bytes)
ld = len(default_bytes)
if lp > 255 or ls > 255 or ld > 255:
raise ValueError("binary envelope fragment exceeds 255 bytes")
br_wire = (
255
if brightness_0_255 is None
else brightness_wire_from_0_255(brightness_0_255)
)
header = bytes([BINARY_ENVELOPE_VERSION_1, br_wire, lp, ls, ld])
return header + presets_bytes + select_bytes + default_bytes
def _decode_preset_record(
buf: bytes, off: int
) -> Tuple[str, Dict[str, Any], int]:
if off + 1 > len(buf):
raise ValueError("truncated")
nl = buf[off]
off += 1
if off + nl > len(buf):
raise ValueError("truncated")
name = buf[off : off + nl].decode("utf-8")
off += nl
if off + 1 > len(buf):
raise ValueError("truncated")
pl = buf[off]
off += 1
if off + pl > len(buf):
raise ValueError("truncated")
pattern = buf[off : off + pl].decode("utf-8")
off += pl
if off + 1 > len(buf):
raise ValueError("truncated")
nc = buf[off]
off += 1
if off + nc * 3 > len(buf):
raise ValueError("truncated")
colors: List[str] = []
for _ in range(nc):
r, g, b = buf[off], buf[off + 1], buf[off + 2]
off += 3
colors.append(f"#{r:02x}{g:02x}{b:02x}")
if off + 16 > len(buf):
raise ValueError("truncated")
delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from(
"<HBBhhhhhh", buf, off
)
off += 16
preset = {
"p": pattern,
"c": colors,
"d": delay,
"b": br,
"a": bool(auto),
"n1": n1,
"n2": n2,
"n3": n3,
"n4": n4,
"n5": n5,
"n6": n6,
}
return name, preset, off
def _decode_presets_blob(chunk: bytes) -> Dict[str, Any]:
if not chunk:
return {}
off = 0
if off + 1 > len(chunk):
raise ValueError("truncated")
count = chunk[off]
off += 1
out: Dict[str, Any] = {}
for _ in range(count):
name, preset, off = _decode_preset_record(chunk, off)
out[name] = preset
if off != len(chunk):
raise ValueError("presets blob length mismatch")
return out
def _decode_select_blob(chunk: bytes) -> Dict[str, Any]:
if not chunk:
return {}
off = 0
if off + 1 > len(chunk):
raise ValueError("truncated")
count = chunk[off]
off += 1
out: Dict[str, Any] = {}
for _ in range(count):
if off + 1 > len(chunk):
raise ValueError("truncated")
dl = chunk[off]
off += 1
if off + dl > len(chunk):
raise ValueError("truncated")
device = chunk[off : off + dl].decode("utf-8")
off += dl
if off + 1 > len(chunk):
raise ValueError("truncated")
pl = chunk[off]
off += 1
if off + pl > len(chunk):
raise ValueError("truncated")
pname = chunk[off : off + pl].decode("utf-8")
off += pl
if off + 1 > len(chunk):
raise ValueError("truncated")
has_step = chunk[off]
off += 1
if has_step:
if off + 2 > len(chunk):
raise ValueError("truncated")
step = struct.unpack_from("<H", chunk, off)[0]
off += 2
out[device] = [pname, step]
else:
out[device] = [pname]
if off != len(chunk):
raise ValueError("select blob length mismatch")
return out
def _decode_default_blob(chunk: bytes) -> Tuple[Optional[str], list]:
if not chunk:
return None, []
off = 0
if off + 1 > len(chunk):
raise ValueError("truncated")
nl = chunk[off]
off += 1
if off + nl > len(chunk):
raise ValueError("truncated")
default_name = chunk[off : off + nl].decode("utf-8") if nl else ""
off += nl
if off + 1 > len(chunk):
raise ValueError("truncated")
nt = chunk[off]
off += 1
targets: List[str] = []
for _ in range(nt):
if off + 1 > len(chunk):
raise ValueError("truncated")
tl = chunk[off]
off += 1
if off + tl > len(chunk):
raise ValueError("truncated")
targets.append(chunk[off : off + tl].decode("utf-8"))
off += tl
if off != len(chunk):
raise ValueError("default blob length mismatch")
return default_name, targets
def parse_binary_envelope_v2(buf: bytes) -> Optional[Dict[str, Any]]:
"""Decode native-binary v2 envelope into the v1 API dict shape."""
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_2:
return None
lp, ls, ld = buf[2], buf[3], buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data: Dict[str, Any] = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = brightness_0_255_from_wire(br)
try:
if lp:
data["presets"] = _decode_presets_blob(bytes(presets_chunk))
if ls:
data["select"] = _decode_select_blob(bytes(select_chunk))
if ld:
dname, targets = _decode_default_blob(bytes(default_chunk))
data["default"] = dname
data["targets"] = targets
except (ValueError, UnicodeError):
return None
return data
def parse_binary_envelope_v1(buf: bytes) -> Optional[Dict[str, Any]]:
"""
Decode legacy v1 bytes (JSON text blobs) into a v1 API dict.
Returns None if ``buf`` is not a valid v1 envelope.
"""
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
return None
if buf[0] != BINARY_ENVELOPE_VERSION_1:
return None
lp, ls, ld = buf[2], buf[3], buf[4]
need = HEADER_LEN + lp + ls + ld
if len(buf) != need:
return None
off = HEADER_LEN
presets_chunk = buf[off : off + lp]
off += lp
select_chunk = buf[off : off + ls]
off += ls
default_chunk = buf[off : off + ld]
data: Dict[str, Any] = {"v": "1"}
br = buf[1]
if br < 128:
data["b"] = brightness_0_255_from_wire(br)
if lp:
try:
data["presets"] = json.loads(presets_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ls:
try:
data["select"] = json.loads(select_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if ld:
try:
extra = json.loads(default_chunk.decode("utf-8"))
except (ValueError, UnicodeError):
return None
if isinstance(extra, dict):
for k, v in extra.items():
data[k] = v
return data
def parse_binary_envelope(buf: bytes) -> Optional[Dict[str, Any]]:
"""Try v2 (native binary), then v1 (JSON fragments)."""
d = parse_binary_envelope_v2(buf)
if d is not None:
return d
return parse_binary_envelope_v1(buf)