diff --git a/src/util/binary_envelope.py b/src/util/binary_envelope.py new file mode 100644 index 0000000..9e2cc7d --- /dev/null +++ b/src/util/binary_envelope.py @@ -0,0 +1,508 @@ +""" +Compact binary controller → led-driver messages (ESP-NOW friendly). + +Header (5 bytes), same for v1 (legacy) and v2 (native binary): + + 0: version — 1 = legacy (JSON text blobs); 2 = native binary blobs + 1: brightness — 0–127 scales to device 0–255; 128–255 = leave unchanged + 2: byte length of presets section (0–255) + 3: byte length of select section + 4: byte length of default section + +v2 presets blob (no JSON): + u8 preset_count + each preset: + u8 name_len; name utf-8 + u8 pattern_len; pattern utf-8 (``p``) + u8 color_count; color_count × (u8 r, u8 g, u8 b) + u16 delay_le (``d``) + u8 preset_brightness (``b``) + u8 auto (0/1) (``a``) + i16 n1..n6 little-endian (``n1``–``n6``) + +v2 select blob: + u8 entry_count + each: + u8 device_len; device utf-8 + u8 preset_name_len; preset name utf-8 + u8 has_step (0/1); optional u16 step_le + +v2 default blob: + u8 default_name_len; name utf-8 + u8 target_count + each: u8 len; target name utf-8 + +Legacy v1: sections are UTF-8 JSON text (see ``parse_binary_envelope_v1``). + +Keep ``5 + lp + ls + ld`` ≤ 245 for a single ESP-NOW frame body. +""" + +from __future__ import annotations + +import json +import struct +from typing import Any, Dict, List, Optional, Tuple + +BINARY_ENVELOPE_VERSION_1 = 1 +BINARY_ENVELOPE_VERSION_2 = 2 +HEADER_LEN = 5 + + +def brightness_wire_from_0_255(value: int) -> int: + """Map device brightness 0–255 to wire 0–127.""" + v = max(0, min(255, int(value))) + return (v * 127 + 127) // 255 + + +def brightness_0_255_from_wire(wire: int) -> int: + """Map wire 0–127 to device brightness 0–255.""" + w = max(0, min(127, int(wire))) + return min(255, (w * 255) // 127) + + +def _clamp_i16(x: int) -> int: + x = int(x) + return max(-32768, min(32767, x)) + + +def _colors_to_rgb_list(colors: Any) -> List[Tuple[int, int, int]]: + out: List[Tuple[int, int, int]] = [] + if not colors: + return out + for c in colors: + if isinstance(c, str): + h = c.strip().lstrip("#") + if len(h) >= 6: + out.append( + (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)) + ) + elif isinstance(c, (list, tuple)) and len(c) >= 3: + out.append((int(c[0]), int(c[1]), int(c[2]))) + return out + + +def _pack_preset_dict(name: str, preset: Dict[str, Any]) -> bytes: + pname = name.encode("utf-8") + if len(pname) > 250: + raise ValueError("preset name too long") + pattern = str(preset.get("p") or preset.get("pattern", "off")).encode("utf-8") + if len(pattern) > 250: + raise ValueError("pattern string too long") + rgbs = _colors_to_rgb_list(preset.get("c") or preset.get("colors") or []) + if len(rgbs) > 255: + raise ValueError("too many colours") + delay = max(0, min(65535, int(preset.get("d") or preset.get("delay", 100)))) + br = max(0, min(255, int(preset.get("b") or preset.get("brightness", 127)))) + auto = 1 if preset.get("a", preset.get("auto", True)) else 0 + parts = [ + bytes([len(pname)]), + pname, + bytes([len(pattern)]), + pattern, + bytes([len(rgbs)]), + ] + for r, g, b in rgbs: + parts.append(bytes([r & 255, g & 255, b & 255])) + n1 = _clamp_i16(preset.get("n1", 0)) + n2 = _clamp_i16(preset.get("n2", 0)) + n3 = _clamp_i16(preset.get("n3", 0)) + n4 = _clamp_i16(preset.get("n4", 0)) + n5 = _clamp_i16(preset.get("n5", 0)) + n6 = _clamp_i16(preset.get("n6", 0)) + parts.append( + struct.pack( + " bytes: + items = [(k, v) for k, v in presets.items() if isinstance(v, dict)] + out = [bytes([len(items)])] + for name, pdata in items: + out.append(_pack_preset_dict(str(name), pdata)) + return b"".join(out) + + +def _pack_select_blob(select: Dict[str, Any]) -> bytes: + out = [bytes([len(select)])] + for device, sel in select.items(): + dev_b = str(device).encode("utf-8") + if len(dev_b) > 250: + raise ValueError("device name too long") + if isinstance(sel, (list, tuple)) and sel: + pn = str(sel[0]).encode("utf-8") + step = sel[1] if len(sel) > 1 else None + else: + pn = str(sel).encode("utf-8") + step = None + if len(pn) > 250: + raise ValueError("preset name too long") + if step is None: + out.append( + bytes([len(dev_b)]) + + dev_b + + bytes([len(pn)]) + + pn + + bytes([0]) + ) + else: + s = int(step) + if s < 0 or s > 65535: + raise ValueError("step out of range") + out.append( + bytes([len(dev_b)]) + + dev_b + + bytes([len(pn)]) + + pn + + bytes([1]) + + struct.pack(" bytes: + name_b = str(default).encode("utf-8") + if len(name_b) > 250: + raise ValueError("default name too long") + tlist = list(targets) if targets else [] + if len(tlist) > 255: + raise ValueError("too many targets") + out = [bytes([len(name_b)]), name_b, bytes([len(tlist)])] + for t in tlist: + tb = str(t).encode("utf-8") + if len(tb) > 250: + raise ValueError("target name too long") + out.append(bytes([len(tb)])) + out.append(tb) + return b"".join(out) + + +def pack_binary_envelope_v2( + *, + presets: Optional[Dict[str, Any]] = None, + select: Optional[Dict[str, Any]] = None, + default: Optional[str] = None, + default_targets: Optional[list] = None, + brightness_0_255: Optional[int] = None, +) -> bytes: + """Build a v2 envelope (native binary sections, no JSON).""" + presets_bytes = ( + _pack_presets_blob(presets) if presets is not None and presets else b"" + ) + select_bytes = ( + _pack_select_blob(select) if select is not None and select else b"" + ) + default_bytes = ( + _pack_default_blob(default, default_targets) + if default is not None + else b"" + ) + + lp = len(presets_bytes) + ls = len(select_bytes) + ld = len(default_bytes) + if lp > 255 or ls > 255 or ld > 255: + raise ValueError("binary envelope section exceeds 255 bytes") + + br_wire = ( + 255 + if brightness_0_255 is None + else brightness_wire_from_0_255(brightness_0_255) + ) + header = bytes([BINARY_ENVELOPE_VERSION_2, br_wire, lp, ls, ld]) + return header + presets_bytes + select_bytes + default_bytes + + +def pack_binary_envelope_v1( + *, + presets: Optional[Dict[str, Any]] = None, + select: Optional[Dict[str, Any]] = None, + default: Optional[str] = None, + default_targets: Optional[list] = None, + brightness_0_255: Optional[int] = None, +) -> bytes: + """Legacy: JSON UTF-8 fragments (version byte 1). Prefer ``pack_binary_envelope_v2``.""" + if presets is None: + presets_bytes = b"" + else: + presets_bytes = json.dumps(presets, separators=(",", ":")).encode("utf-8") + + if select is None: + select_bytes = b"" + else: + select_bytes = json.dumps(select, separators=(",", ":")).encode("utf-8") + + default_obj: Optional[Dict[str, Any]] = None + if default is not None: + default_obj = { + "default": default, + "targets": list(default_targets) if default_targets else [], + } + default_bytes = ( + json.dumps(default_obj, separators=(",", ":")).encode("utf-8") + if default_obj is not None + else b"" + ) + + lp = len(presets_bytes) + ls = len(select_bytes) + ld = len(default_bytes) + if lp > 255 or ls > 255 or ld > 255: + raise ValueError("binary envelope fragment exceeds 255 bytes") + + br_wire = ( + 255 + if brightness_0_255 is None + else brightness_wire_from_0_255(brightness_0_255) + ) + header = bytes([BINARY_ENVELOPE_VERSION_1, br_wire, lp, ls, ld]) + return header + presets_bytes + select_bytes + default_bytes + + +def _decode_preset_record( + buf: bytes, off: int +) -> Tuple[str, Dict[str, Any], int]: + if off + 1 > len(buf): + raise ValueError("truncated") + nl = buf[off] + off += 1 + if off + nl > len(buf): + raise ValueError("truncated") + name = buf[off : off + nl].decode("utf-8") + off += nl + if off + 1 > len(buf): + raise ValueError("truncated") + pl = buf[off] + off += 1 + if off + pl > len(buf): + raise ValueError("truncated") + pattern = buf[off : off + pl].decode("utf-8") + off += pl + if off + 1 > len(buf): + raise ValueError("truncated") + nc = buf[off] + off += 1 + if off + nc * 3 > len(buf): + raise ValueError("truncated") + colors: List[str] = [] + for _ in range(nc): + r, g, b = buf[off], buf[off + 1], buf[off + 2] + off += 3 + colors.append(f"#{r:02x}{g:02x}{b:02x}") + if off + 16 > len(buf): + raise ValueError("truncated") + delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from( + " Dict[str, Any]: + if not chunk: + return {} + off = 0 + if off + 1 > len(chunk): + raise ValueError("truncated") + count = chunk[off] + off += 1 + out: Dict[str, Any] = {} + for _ in range(count): + name, preset, off = _decode_preset_record(chunk, off) + out[name] = preset + if off != len(chunk): + raise ValueError("presets blob length mismatch") + return out + + +def _decode_select_blob(chunk: bytes) -> Dict[str, Any]: + if not chunk: + return {} + off = 0 + if off + 1 > len(chunk): + raise ValueError("truncated") + count = chunk[off] + off += 1 + out: Dict[str, Any] = {} + for _ in range(count): + if off + 1 > len(chunk): + raise ValueError("truncated") + dl = chunk[off] + off += 1 + if off + dl > len(chunk): + raise ValueError("truncated") + device = chunk[off : off + dl].decode("utf-8") + off += dl + if off + 1 > len(chunk): + raise ValueError("truncated") + pl = chunk[off] + off += 1 + if off + pl > len(chunk): + raise ValueError("truncated") + pname = chunk[off : off + pl].decode("utf-8") + off += pl + if off + 1 > len(chunk): + raise ValueError("truncated") + has_step = chunk[off] + off += 1 + if has_step: + if off + 2 > len(chunk): + raise ValueError("truncated") + step = struct.unpack_from(" Tuple[Optional[str], list]: + if not chunk: + return None, [] + off = 0 + if off + 1 > len(chunk): + raise ValueError("truncated") + nl = chunk[off] + off += 1 + if off + nl > len(chunk): + raise ValueError("truncated") + default_name = chunk[off : off + nl].decode("utf-8") if nl else "" + off += nl + if off + 1 > len(chunk): + raise ValueError("truncated") + nt = chunk[off] + off += 1 + targets: List[str] = [] + for _ in range(nt): + if off + 1 > len(chunk): + raise ValueError("truncated") + tl = chunk[off] + off += 1 + if off + tl > len(chunk): + raise ValueError("truncated") + targets.append(chunk[off : off + tl].decode("utf-8")) + off += tl + if off != len(chunk): + raise ValueError("default blob length mismatch") + return default_name, targets + + +def parse_binary_envelope_v2(buf: bytes) -> Optional[Dict[str, Any]]: + """Decode native-binary v2 envelope into the v1 API dict shape.""" + if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN: + return None + if buf[0] != BINARY_ENVELOPE_VERSION_2: + return None + lp, ls, ld = buf[2], buf[3], buf[4] + need = HEADER_LEN + lp + ls + ld + if len(buf) != need: + return None + + off = HEADER_LEN + presets_chunk = buf[off : off + lp] + off += lp + select_chunk = buf[off : off + ls] + off += ls + default_chunk = buf[off : off + ld] + + data: Dict[str, Any] = {"v": "1"} + br = buf[1] + if br < 128: + data["b"] = brightness_0_255_from_wire(br) + + try: + if lp: + data["presets"] = _decode_presets_blob(bytes(presets_chunk)) + if ls: + data["select"] = _decode_select_blob(bytes(select_chunk)) + if ld: + dname, targets = _decode_default_blob(bytes(default_chunk)) + data["default"] = dname + data["targets"] = targets + except (ValueError, UnicodeError): + return None + + return data + + +def parse_binary_envelope_v1(buf: bytes) -> Optional[Dict[str, Any]]: + """ + Decode legacy v1 bytes (JSON text blobs) into a v1 API dict. + Returns None if ``buf`` is not a valid v1 envelope. + """ + if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN: + return None + if buf[0] != BINARY_ENVELOPE_VERSION_1: + return None + lp, ls, ld = buf[2], buf[3], buf[4] + need = HEADER_LEN + lp + ls + ld + if len(buf) != need: + return None + + off = HEADER_LEN + presets_chunk = buf[off : off + lp] + off += lp + select_chunk = buf[off : off + ls] + off += ls + default_chunk = buf[off : off + ld] + + data: Dict[str, Any] = {"v": "1"} + + br = buf[1] + if br < 128: + data["b"] = brightness_0_255_from_wire(br) + + if lp: + try: + data["presets"] = json.loads(presets_chunk.decode("utf-8")) + except (ValueError, UnicodeError): + return None + if ls: + try: + data["select"] = json.loads(select_chunk.decode("utf-8")) + except (ValueError, UnicodeError): + return None + if ld: + try: + extra = json.loads(default_chunk.decode("utf-8")) + except (ValueError, UnicodeError): + return None + if isinstance(extra, dict): + for k, v in extra.items(): + data[k] = v + + return data + + +def parse_binary_envelope(buf: bytes) -> Optional[Dict[str, Any]]: + """Try v2 (native binary), then v1 (JSON fragments).""" + d = parse_binary_envelope_v2(buf) + if d is not None: + return d + return parse_binary_envelope_v1(buf) \ No newline at end of file diff --git a/src/util/message.py b/src/util/message.py new file mode 100644 index 0000000..7df8c38 --- /dev/null +++ b/src/util/message.py @@ -0,0 +1,25 @@ +"""JSON wire representation for controller messages (binary packing can replace later).""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Union + + +class Message: + """Round-trip API dicts as compact UTF-8 JSON.""" + + def encode(self, data: Dict[str, Any]) -> bytes: + """Encode a JSON-serialisable mapping (typically a v1 API dict) to bytes.""" + return json.dumps(data, separators=(",", ":")).encode("utf-8") + + def decode(self, payload: Union[str, bytes, bytearray]) -> Dict[str, Any]: + """Decode UTF-8 JSON bytes or string into a dict.""" + if isinstance(payload, (bytes, bytearray)): + text = payload.decode("utf-8") + else: + text = payload + obj = json.loads(text) + if not isinstance(obj, dict): + raise TypeError("JSON root must be an object") + return obj diff --git a/tests/test_binary_envelope.py b/tests/test_binary_envelope.py new file mode 100644 index 0000000..aeaf7d3 --- /dev/null +++ b/tests/test_binary_envelope.py @@ -0,0 +1,93 @@ +"""Tests for compact binary controller envelopes (host util).""" + +import sys +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(PROJECT_ROOT / "src")) + +from util.binary_envelope import ( # noqa: E402 + BINARY_ENVELOPE_VERSION_2, + brightness_wire_from_0_255, + brightness_0_255_from_wire, + pack_binary_envelope_v2, + parse_binary_envelope, + parse_binary_envelope_v2, + parse_binary_envelope_v1, +) + + +def test_brightness_round_trip_extremes(): + assert brightness_0_255_from_wire(brightness_wire_from_0_255(0)) == 0 + assert brightness_0_255_from_wire(brightness_wire_from_0_255(255)) == 255 + + +def test_pack_parse_v2_brightness_only(): + raw = pack_binary_envelope_v2(brightness_0_255=128) + assert raw[0] == BINARY_ENVELOPE_VERSION_2 + data = parse_binary_envelope_v2(raw) + assert data == {"v": "1", "b": 128} + + +def test_pack_parse_v2_full(): + raw = pack_binary_envelope_v2( + presets={ + "a": { + "p": "on", + "c": ["#ffffff"], + "d": 10, + "b": 255, + "a": True, + "n1": 1, + "n2": -2, + "n3": 3, + "n4": 4, + "n5": 5, + "n6": 6, + } + }, + select={"dev": ["a"]}, + default="a", + default_targets=["dev"], + brightness_0_255=64, + ) + assert len(raw) <= 250 + data = parse_binary_envelope_v2(raw) + assert data["v"] == "1" + assert data["b"] == 64 + assert data["presets"]["a"]["p"] == "on" + assert data["presets"]["a"]["n2"] == -2 + assert data["select"]["dev"] == ["a"] + assert data["default"] == "a" + assert data["targets"] == ["dev"] + + merged = parse_binary_envelope(raw) + assert merged == data + + +def test_v2_wire_not_utf8_json(): + raw = pack_binary_envelope_v2( + presets={"x": {"p": "blink", "c": ["#112233"]}}, + brightness_0_255=None, + ) + assert raw[0] == BINARY_ENVELOPE_VERSION_2 + assert parse_binary_envelope_v1(raw) is None + + +def test_dont_change_brightness_v2(): + raw = pack_binary_envelope_v2(brightness_0_255=None) + data = parse_binary_envelope_v2(raw) + assert "b" not in data + + +def test_json_wire_not_v2(): + assert parse_binary_envelope_v2(b'{"v":"1"}') is None + + +def test_legacy_v1_parse_via_dispatcher(): + import json + + inner = json.dumps({"x": {"p": "on"}}, separators=(",", ":")).encode() + raw = bytes([1, 255, len(inner), 0, 0]) + inner + d = parse_binary_envelope(raw) + assert d["presets"]["x"]["p"] == "on"