feat(util): add binary envelope packing and message helpers
Includes tests for v1/v2 envelope round-trips. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
508
src/util/binary_envelope.py
Normal file
508
src/util/binary_envelope.py
Normal file
@@ -0,0 +1,508 @@
|
|||||||
|
"""
|
||||||
|
Compact binary controller → led-driver messages (ESP-NOW friendly).
|
||||||
|
|
||||||
|
Header (5 bytes), same for v1 (legacy) and v2 (native binary):
|
||||||
|
|
||||||
|
0: version — 1 = legacy (JSON text blobs); 2 = native binary blobs
|
||||||
|
1: brightness — 0–127 scales to device 0–255; 128–255 = leave unchanged
|
||||||
|
2: byte length of presets section (0–255)
|
||||||
|
3: byte length of select section
|
||||||
|
4: byte length of default section
|
||||||
|
|
||||||
|
v2 presets blob (no JSON):
|
||||||
|
u8 preset_count
|
||||||
|
each preset:
|
||||||
|
u8 name_len; name utf-8
|
||||||
|
u8 pattern_len; pattern utf-8 (``p``)
|
||||||
|
u8 color_count; color_count × (u8 r, u8 g, u8 b)
|
||||||
|
u16 delay_le (``d``)
|
||||||
|
u8 preset_brightness (``b``)
|
||||||
|
u8 auto (0/1) (``a``)
|
||||||
|
i16 n1..n6 little-endian (``n1``–``n6``)
|
||||||
|
|
||||||
|
v2 select blob:
|
||||||
|
u8 entry_count
|
||||||
|
each:
|
||||||
|
u8 device_len; device utf-8
|
||||||
|
u8 preset_name_len; preset name utf-8
|
||||||
|
u8 has_step (0/1); optional u16 step_le
|
||||||
|
|
||||||
|
v2 default blob:
|
||||||
|
u8 default_name_len; name utf-8
|
||||||
|
u8 target_count
|
||||||
|
each: u8 len; target name utf-8
|
||||||
|
|
||||||
|
Legacy v1: sections are UTF-8 JSON text (see ``parse_binary_envelope_v1``).
|
||||||
|
|
||||||
|
Keep ``5 + lp + ls + ld`` ≤ 245 for a single ESP-NOW frame body.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import struct
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
BINARY_ENVELOPE_VERSION_1 = 1
|
||||||
|
BINARY_ENVELOPE_VERSION_2 = 2
|
||||||
|
HEADER_LEN = 5
|
||||||
|
|
||||||
|
|
||||||
|
def brightness_wire_from_0_255(value: int) -> int:
|
||||||
|
"""Map device brightness 0–255 to wire 0–127."""
|
||||||
|
v = max(0, min(255, int(value)))
|
||||||
|
return (v * 127 + 127) // 255
|
||||||
|
|
||||||
|
|
||||||
|
def brightness_0_255_from_wire(wire: int) -> int:
|
||||||
|
"""Map wire 0–127 to device brightness 0–255."""
|
||||||
|
w = max(0, min(127, int(wire)))
|
||||||
|
return min(255, (w * 255) // 127)
|
||||||
|
|
||||||
|
|
||||||
|
def _clamp_i16(x: int) -> int:
|
||||||
|
x = int(x)
|
||||||
|
return max(-32768, min(32767, x))
|
||||||
|
|
||||||
|
|
||||||
|
def _colors_to_rgb_list(colors: Any) -> List[Tuple[int, int, int]]:
|
||||||
|
out: List[Tuple[int, int, int]] = []
|
||||||
|
if not colors:
|
||||||
|
return out
|
||||||
|
for c in colors:
|
||||||
|
if isinstance(c, str):
|
||||||
|
h = c.strip().lstrip("#")
|
||||||
|
if len(h) >= 6:
|
||||||
|
out.append(
|
||||||
|
(int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16))
|
||||||
|
)
|
||||||
|
elif isinstance(c, (list, tuple)) and len(c) >= 3:
|
||||||
|
out.append((int(c[0]), int(c[1]), int(c[2])))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_preset_dict(name: str, preset: Dict[str, Any]) -> bytes:
|
||||||
|
pname = name.encode("utf-8")
|
||||||
|
if len(pname) > 250:
|
||||||
|
raise ValueError("preset name too long")
|
||||||
|
pattern = str(preset.get("p") or preset.get("pattern", "off")).encode("utf-8")
|
||||||
|
if len(pattern) > 250:
|
||||||
|
raise ValueError("pattern string too long")
|
||||||
|
rgbs = _colors_to_rgb_list(preset.get("c") or preset.get("colors") or [])
|
||||||
|
if len(rgbs) > 255:
|
||||||
|
raise ValueError("too many colours")
|
||||||
|
delay = max(0, min(65535, int(preset.get("d") or preset.get("delay", 100))))
|
||||||
|
br = max(0, min(255, int(preset.get("b") or preset.get("brightness", 127))))
|
||||||
|
auto = 1 if preset.get("a", preset.get("auto", True)) else 0
|
||||||
|
parts = [
|
||||||
|
bytes([len(pname)]),
|
||||||
|
pname,
|
||||||
|
bytes([len(pattern)]),
|
||||||
|
pattern,
|
||||||
|
bytes([len(rgbs)]),
|
||||||
|
]
|
||||||
|
for r, g, b in rgbs:
|
||||||
|
parts.append(bytes([r & 255, g & 255, b & 255]))
|
||||||
|
n1 = _clamp_i16(preset.get("n1", 0))
|
||||||
|
n2 = _clamp_i16(preset.get("n2", 0))
|
||||||
|
n3 = _clamp_i16(preset.get("n3", 0))
|
||||||
|
n4 = _clamp_i16(preset.get("n4", 0))
|
||||||
|
n5 = _clamp_i16(preset.get("n5", 0))
|
||||||
|
n6 = _clamp_i16(preset.get("n6", 0))
|
||||||
|
parts.append(
|
||||||
|
struct.pack(
|
||||||
|
"<HBBhhhhhh",
|
||||||
|
delay,
|
||||||
|
br,
|
||||||
|
auto,
|
||||||
|
n1,
|
||||||
|
n2,
|
||||||
|
n3,
|
||||||
|
n4,
|
||||||
|
n5,
|
||||||
|
n6,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return b"".join(parts)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_presets_blob(presets: Dict[str, Any]) -> bytes:
|
||||||
|
items = [(k, v) for k, v in presets.items() if isinstance(v, dict)]
|
||||||
|
out = [bytes([len(items)])]
|
||||||
|
for name, pdata in items:
|
||||||
|
out.append(_pack_preset_dict(str(name), pdata))
|
||||||
|
return b"".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_select_blob(select: Dict[str, Any]) -> bytes:
|
||||||
|
out = [bytes([len(select)])]
|
||||||
|
for device, sel in select.items():
|
||||||
|
dev_b = str(device).encode("utf-8")
|
||||||
|
if len(dev_b) > 250:
|
||||||
|
raise ValueError("device name too long")
|
||||||
|
if isinstance(sel, (list, tuple)) and sel:
|
||||||
|
pn = str(sel[0]).encode("utf-8")
|
||||||
|
step = sel[1] if len(sel) > 1 else None
|
||||||
|
else:
|
||||||
|
pn = str(sel).encode("utf-8")
|
||||||
|
step = None
|
||||||
|
if len(pn) > 250:
|
||||||
|
raise ValueError("preset name too long")
|
||||||
|
if step is None:
|
||||||
|
out.append(
|
||||||
|
bytes([len(dev_b)])
|
||||||
|
+ dev_b
|
||||||
|
+ bytes([len(pn)])
|
||||||
|
+ pn
|
||||||
|
+ bytes([0])
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
s = int(step)
|
||||||
|
if s < 0 or s > 65535:
|
||||||
|
raise ValueError("step out of range")
|
||||||
|
out.append(
|
||||||
|
bytes([len(dev_b)])
|
||||||
|
+ dev_b
|
||||||
|
+ bytes([len(pn)])
|
||||||
|
+ pn
|
||||||
|
+ bytes([1])
|
||||||
|
+ struct.pack("<H", s)
|
||||||
|
)
|
||||||
|
return b"".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_default_blob(default: str, targets: Optional[list]) -> bytes:
|
||||||
|
name_b = str(default).encode("utf-8")
|
||||||
|
if len(name_b) > 250:
|
||||||
|
raise ValueError("default name too long")
|
||||||
|
tlist = list(targets) if targets else []
|
||||||
|
if len(tlist) > 255:
|
||||||
|
raise ValueError("too many targets")
|
||||||
|
out = [bytes([len(name_b)]), name_b, bytes([len(tlist)])]
|
||||||
|
for t in tlist:
|
||||||
|
tb = str(t).encode("utf-8")
|
||||||
|
if len(tb) > 250:
|
||||||
|
raise ValueError("target name too long")
|
||||||
|
out.append(bytes([len(tb)]))
|
||||||
|
out.append(tb)
|
||||||
|
return b"".join(out)
|
||||||
|
|
||||||
|
|
||||||
|
def pack_binary_envelope_v2(
|
||||||
|
*,
|
||||||
|
presets: Optional[Dict[str, Any]] = None,
|
||||||
|
select: Optional[Dict[str, Any]] = None,
|
||||||
|
default: Optional[str] = None,
|
||||||
|
default_targets: Optional[list] = None,
|
||||||
|
brightness_0_255: Optional[int] = None,
|
||||||
|
) -> bytes:
|
||||||
|
"""Build a v2 envelope (native binary sections, no JSON)."""
|
||||||
|
presets_bytes = (
|
||||||
|
_pack_presets_blob(presets) if presets is not None and presets else b""
|
||||||
|
)
|
||||||
|
select_bytes = (
|
||||||
|
_pack_select_blob(select) if select is not None and select else b""
|
||||||
|
)
|
||||||
|
default_bytes = (
|
||||||
|
_pack_default_blob(default, default_targets)
|
||||||
|
if default is not None
|
||||||
|
else b""
|
||||||
|
)
|
||||||
|
|
||||||
|
lp = len(presets_bytes)
|
||||||
|
ls = len(select_bytes)
|
||||||
|
ld = len(default_bytes)
|
||||||
|
if lp > 255 or ls > 255 or ld > 255:
|
||||||
|
raise ValueError("binary envelope section exceeds 255 bytes")
|
||||||
|
|
||||||
|
br_wire = (
|
||||||
|
255
|
||||||
|
if brightness_0_255 is None
|
||||||
|
else brightness_wire_from_0_255(brightness_0_255)
|
||||||
|
)
|
||||||
|
header = bytes([BINARY_ENVELOPE_VERSION_2, br_wire, lp, ls, ld])
|
||||||
|
return header + presets_bytes + select_bytes + default_bytes
|
||||||
|
|
||||||
|
|
||||||
|
def pack_binary_envelope_v1(
|
||||||
|
*,
|
||||||
|
presets: Optional[Dict[str, Any]] = None,
|
||||||
|
select: Optional[Dict[str, Any]] = None,
|
||||||
|
default: Optional[str] = None,
|
||||||
|
default_targets: Optional[list] = None,
|
||||||
|
brightness_0_255: Optional[int] = None,
|
||||||
|
) -> bytes:
|
||||||
|
"""Legacy: JSON UTF-8 fragments (version byte 1). Prefer ``pack_binary_envelope_v2``."""
|
||||||
|
if presets is None:
|
||||||
|
presets_bytes = b""
|
||||||
|
else:
|
||||||
|
presets_bytes = json.dumps(presets, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
|
if select is None:
|
||||||
|
select_bytes = b""
|
||||||
|
else:
|
||||||
|
select_bytes = json.dumps(select, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
|
default_obj: Optional[Dict[str, Any]] = None
|
||||||
|
if default is not None:
|
||||||
|
default_obj = {
|
||||||
|
"default": default,
|
||||||
|
"targets": list(default_targets) if default_targets else [],
|
||||||
|
}
|
||||||
|
default_bytes = (
|
||||||
|
json.dumps(default_obj, separators=(",", ":")).encode("utf-8")
|
||||||
|
if default_obj is not None
|
||||||
|
else b""
|
||||||
|
)
|
||||||
|
|
||||||
|
lp = len(presets_bytes)
|
||||||
|
ls = len(select_bytes)
|
||||||
|
ld = len(default_bytes)
|
||||||
|
if lp > 255 or ls > 255 or ld > 255:
|
||||||
|
raise ValueError("binary envelope fragment exceeds 255 bytes")
|
||||||
|
|
||||||
|
br_wire = (
|
||||||
|
255
|
||||||
|
if brightness_0_255 is None
|
||||||
|
else brightness_wire_from_0_255(brightness_0_255)
|
||||||
|
)
|
||||||
|
header = bytes([BINARY_ENVELOPE_VERSION_1, br_wire, lp, ls, ld])
|
||||||
|
return header + presets_bytes + select_bytes + default_bytes
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_preset_record(
|
||||||
|
buf: bytes, off: int
|
||||||
|
) -> Tuple[str, Dict[str, Any], int]:
|
||||||
|
if off + 1 > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
nl = buf[off]
|
||||||
|
off += 1
|
||||||
|
if off + nl > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
name = buf[off : off + nl].decode("utf-8")
|
||||||
|
off += nl
|
||||||
|
if off + 1 > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
pl = buf[off]
|
||||||
|
off += 1
|
||||||
|
if off + pl > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
pattern = buf[off : off + pl].decode("utf-8")
|
||||||
|
off += pl
|
||||||
|
if off + 1 > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
nc = buf[off]
|
||||||
|
off += 1
|
||||||
|
if off + nc * 3 > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
colors: List[str] = []
|
||||||
|
for _ in range(nc):
|
||||||
|
r, g, b = buf[off], buf[off + 1], buf[off + 2]
|
||||||
|
off += 3
|
||||||
|
colors.append(f"#{r:02x}{g:02x}{b:02x}")
|
||||||
|
if off + 16 > len(buf):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from(
|
||||||
|
"<HBBhhhhhh", buf, off
|
||||||
|
)
|
||||||
|
off += 16
|
||||||
|
preset = {
|
||||||
|
"p": pattern,
|
||||||
|
"c": colors,
|
||||||
|
"d": delay,
|
||||||
|
"b": br,
|
||||||
|
"a": bool(auto),
|
||||||
|
"n1": n1,
|
||||||
|
"n2": n2,
|
||||||
|
"n3": n3,
|
||||||
|
"n4": n4,
|
||||||
|
"n5": n5,
|
||||||
|
"n6": n6,
|
||||||
|
}
|
||||||
|
return name, preset, off
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_presets_blob(chunk: bytes) -> Dict[str, Any]:
|
||||||
|
if not chunk:
|
||||||
|
return {}
|
||||||
|
off = 0
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
count = chunk[off]
|
||||||
|
off += 1
|
||||||
|
out: Dict[str, Any] = {}
|
||||||
|
for _ in range(count):
|
||||||
|
name, preset, off = _decode_preset_record(chunk, off)
|
||||||
|
out[name] = preset
|
||||||
|
if off != len(chunk):
|
||||||
|
raise ValueError("presets blob length mismatch")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_select_blob(chunk: bytes) -> Dict[str, Any]:
|
||||||
|
if not chunk:
|
||||||
|
return {}
|
||||||
|
off = 0
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
count = chunk[off]
|
||||||
|
off += 1
|
||||||
|
out: Dict[str, Any] = {}
|
||||||
|
for _ in range(count):
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
dl = chunk[off]
|
||||||
|
off += 1
|
||||||
|
if off + dl > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
device = chunk[off : off + dl].decode("utf-8")
|
||||||
|
off += dl
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
pl = chunk[off]
|
||||||
|
off += 1
|
||||||
|
if off + pl > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
pname = chunk[off : off + pl].decode("utf-8")
|
||||||
|
off += pl
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
has_step = chunk[off]
|
||||||
|
off += 1
|
||||||
|
if has_step:
|
||||||
|
if off + 2 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
step = struct.unpack_from("<H", chunk, off)[0]
|
||||||
|
off += 2
|
||||||
|
out[device] = [pname, step]
|
||||||
|
else:
|
||||||
|
out[device] = [pname]
|
||||||
|
if off != len(chunk):
|
||||||
|
raise ValueError("select blob length mismatch")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_default_blob(chunk: bytes) -> Tuple[Optional[str], list]:
|
||||||
|
if not chunk:
|
||||||
|
return None, []
|
||||||
|
off = 0
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
nl = chunk[off]
|
||||||
|
off += 1
|
||||||
|
if off + nl > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
default_name = chunk[off : off + nl].decode("utf-8") if nl else ""
|
||||||
|
off += nl
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
nt = chunk[off]
|
||||||
|
off += 1
|
||||||
|
targets: List[str] = []
|
||||||
|
for _ in range(nt):
|
||||||
|
if off + 1 > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
tl = chunk[off]
|
||||||
|
off += 1
|
||||||
|
if off + tl > len(chunk):
|
||||||
|
raise ValueError("truncated")
|
||||||
|
targets.append(chunk[off : off + tl].decode("utf-8"))
|
||||||
|
off += tl
|
||||||
|
if off != len(chunk):
|
||||||
|
raise ValueError("default blob length mismatch")
|
||||||
|
return default_name, targets
|
||||||
|
|
||||||
|
|
||||||
|
def parse_binary_envelope_v2(buf: bytes) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Decode native-binary v2 envelope into the v1 API dict shape."""
|
||||||
|
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
|
||||||
|
return None
|
||||||
|
if buf[0] != BINARY_ENVELOPE_VERSION_2:
|
||||||
|
return None
|
||||||
|
lp, ls, ld = buf[2], buf[3], buf[4]
|
||||||
|
need = HEADER_LEN + lp + ls + ld
|
||||||
|
if len(buf) != need:
|
||||||
|
return None
|
||||||
|
|
||||||
|
off = HEADER_LEN
|
||||||
|
presets_chunk = buf[off : off + lp]
|
||||||
|
off += lp
|
||||||
|
select_chunk = buf[off : off + ls]
|
||||||
|
off += ls
|
||||||
|
default_chunk = buf[off : off + ld]
|
||||||
|
|
||||||
|
data: Dict[str, Any] = {"v": "1"}
|
||||||
|
br = buf[1]
|
||||||
|
if br < 128:
|
||||||
|
data["b"] = brightness_0_255_from_wire(br)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if lp:
|
||||||
|
data["presets"] = _decode_presets_blob(bytes(presets_chunk))
|
||||||
|
if ls:
|
||||||
|
data["select"] = _decode_select_blob(bytes(select_chunk))
|
||||||
|
if ld:
|
||||||
|
dname, targets = _decode_default_blob(bytes(default_chunk))
|
||||||
|
data["default"] = dname
|
||||||
|
data["targets"] = targets
|
||||||
|
except (ValueError, UnicodeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def parse_binary_envelope_v1(buf: bytes) -> Optional[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Decode legacy v1 bytes (JSON text blobs) into a v1 API dict.
|
||||||
|
Returns None if ``buf`` is not a valid v1 envelope.
|
||||||
|
"""
|
||||||
|
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
|
||||||
|
return None
|
||||||
|
if buf[0] != BINARY_ENVELOPE_VERSION_1:
|
||||||
|
return None
|
||||||
|
lp, ls, ld = buf[2], buf[3], buf[4]
|
||||||
|
need = HEADER_LEN + lp + ls + ld
|
||||||
|
if len(buf) != need:
|
||||||
|
return None
|
||||||
|
|
||||||
|
off = HEADER_LEN
|
||||||
|
presets_chunk = buf[off : off + lp]
|
||||||
|
off += lp
|
||||||
|
select_chunk = buf[off : off + ls]
|
||||||
|
off += ls
|
||||||
|
default_chunk = buf[off : off + ld]
|
||||||
|
|
||||||
|
data: Dict[str, Any] = {"v": "1"}
|
||||||
|
|
||||||
|
br = buf[1]
|
||||||
|
if br < 128:
|
||||||
|
data["b"] = brightness_0_255_from_wire(br)
|
||||||
|
|
||||||
|
if lp:
|
||||||
|
try:
|
||||||
|
data["presets"] = json.loads(presets_chunk.decode("utf-8"))
|
||||||
|
except (ValueError, UnicodeError):
|
||||||
|
return None
|
||||||
|
if ls:
|
||||||
|
try:
|
||||||
|
data["select"] = json.loads(select_chunk.decode("utf-8"))
|
||||||
|
except (ValueError, UnicodeError):
|
||||||
|
return None
|
||||||
|
if ld:
|
||||||
|
try:
|
||||||
|
extra = json.loads(default_chunk.decode("utf-8"))
|
||||||
|
except (ValueError, UnicodeError):
|
||||||
|
return None
|
||||||
|
if isinstance(extra, dict):
|
||||||
|
for k, v in extra.items():
|
||||||
|
data[k] = v
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def parse_binary_envelope(buf: bytes) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Try v2 (native binary), then v1 (JSON fragments)."""
|
||||||
|
d = parse_binary_envelope_v2(buf)
|
||||||
|
if d is not None:
|
||||||
|
return d
|
||||||
|
return parse_binary_envelope_v1(buf)
|
||||||
25
src/util/message.py
Normal file
25
src/util/message.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
"""JSON wire representation for controller messages (binary packing can replace later)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import Any, Dict, Union
|
||||||
|
|
||||||
|
|
||||||
|
class Message:
|
||||||
|
"""Round-trip API dicts as compact UTF-8 JSON."""
|
||||||
|
|
||||||
|
def encode(self, data: Dict[str, Any]) -> bytes:
|
||||||
|
"""Encode a JSON-serialisable mapping (typically a v1 API dict) to bytes."""
|
||||||
|
return json.dumps(data, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
|
def decode(self, payload: Union[str, bytes, bytearray]) -> Dict[str, Any]:
|
||||||
|
"""Decode UTF-8 JSON bytes or string into a dict."""
|
||||||
|
if isinstance(payload, (bytes, bytearray)):
|
||||||
|
text = payload.decode("utf-8")
|
||||||
|
else:
|
||||||
|
text = payload
|
||||||
|
obj = json.loads(text)
|
||||||
|
if not isinstance(obj, dict):
|
||||||
|
raise TypeError("JSON root must be an object")
|
||||||
|
return obj
|
||||||
93
tests/test_binary_envelope.py
Normal file
93
tests/test_binary_envelope.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
"""Tests for compact binary controller envelopes (host util)."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
sys.path.insert(0, str(PROJECT_ROOT / "src"))
|
||||||
|
|
||||||
|
from util.binary_envelope import ( # noqa: E402
|
||||||
|
BINARY_ENVELOPE_VERSION_2,
|
||||||
|
brightness_wire_from_0_255,
|
||||||
|
brightness_0_255_from_wire,
|
||||||
|
pack_binary_envelope_v2,
|
||||||
|
parse_binary_envelope,
|
||||||
|
parse_binary_envelope_v2,
|
||||||
|
parse_binary_envelope_v1,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_brightness_round_trip_extremes():
|
||||||
|
assert brightness_0_255_from_wire(brightness_wire_from_0_255(0)) == 0
|
||||||
|
assert brightness_0_255_from_wire(brightness_wire_from_0_255(255)) == 255
|
||||||
|
|
||||||
|
|
||||||
|
def test_pack_parse_v2_brightness_only():
|
||||||
|
raw = pack_binary_envelope_v2(brightness_0_255=128)
|
||||||
|
assert raw[0] == BINARY_ENVELOPE_VERSION_2
|
||||||
|
data = parse_binary_envelope_v2(raw)
|
||||||
|
assert data == {"v": "1", "b": 128}
|
||||||
|
|
||||||
|
|
||||||
|
def test_pack_parse_v2_full():
|
||||||
|
raw = pack_binary_envelope_v2(
|
||||||
|
presets={
|
||||||
|
"a": {
|
||||||
|
"p": "on",
|
||||||
|
"c": ["#ffffff"],
|
||||||
|
"d": 10,
|
||||||
|
"b": 255,
|
||||||
|
"a": True,
|
||||||
|
"n1": 1,
|
||||||
|
"n2": -2,
|
||||||
|
"n3": 3,
|
||||||
|
"n4": 4,
|
||||||
|
"n5": 5,
|
||||||
|
"n6": 6,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
select={"dev": ["a"]},
|
||||||
|
default="a",
|
||||||
|
default_targets=["dev"],
|
||||||
|
brightness_0_255=64,
|
||||||
|
)
|
||||||
|
assert len(raw) <= 250
|
||||||
|
data = parse_binary_envelope_v2(raw)
|
||||||
|
assert data["v"] == "1"
|
||||||
|
assert data["b"] == 64
|
||||||
|
assert data["presets"]["a"]["p"] == "on"
|
||||||
|
assert data["presets"]["a"]["n2"] == -2
|
||||||
|
assert data["select"]["dev"] == ["a"]
|
||||||
|
assert data["default"] == "a"
|
||||||
|
assert data["targets"] == ["dev"]
|
||||||
|
|
||||||
|
merged = parse_binary_envelope(raw)
|
||||||
|
assert merged == data
|
||||||
|
|
||||||
|
|
||||||
|
def test_v2_wire_not_utf8_json():
|
||||||
|
raw = pack_binary_envelope_v2(
|
||||||
|
presets={"x": {"p": "blink", "c": ["#112233"]}},
|
||||||
|
brightness_0_255=None,
|
||||||
|
)
|
||||||
|
assert raw[0] == BINARY_ENVELOPE_VERSION_2
|
||||||
|
assert parse_binary_envelope_v1(raw) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_dont_change_brightness_v2():
|
||||||
|
raw = pack_binary_envelope_v2(brightness_0_255=None)
|
||||||
|
data = parse_binary_envelope_v2(raw)
|
||||||
|
assert "b" not in data
|
||||||
|
|
||||||
|
|
||||||
|
def test_json_wire_not_v2():
|
||||||
|
assert parse_binary_envelope_v2(b'{"v":"1"}') is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_legacy_v1_parse_via_dispatcher():
|
||||||
|
import json
|
||||||
|
|
||||||
|
inner = json.dumps({"x": {"p": "on"}}, separators=(",", ":")).encode()
|
||||||
|
raw = bytes([1, 255, len(inner), 0, 0]) + inner
|
||||||
|
d = parse_binary_envelope(raw)
|
||||||
|
assert d["presets"]["x"]["p"] == "on"
|
||||||
Reference in New Issue
Block a user