feat(driver): add HTTP routes, startup split, and binary envelope support
Wire controller messages through new modules (background tasks, runtime state, startup) and add binary envelope handling. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
209
src/binary_envelope.py
Normal file
209
src/binary_envelope.py
Normal file
@@ -0,0 +1,209 @@
|
||||
"""Decode compact binary controller envelopes — v2 native binary, v1 legacy JSON blobs."""
|
||||
|
||||
import json
|
||||
import struct
|
||||
|
||||
BINARY_ENVELOPE_VERSION_1 = 1
|
||||
BINARY_ENVELOPE_VERSION_2 = 2
|
||||
HEADER_LEN = 5
|
||||
|
||||
|
||||
def _brightness_0_255_from_wire(wire):
|
||||
w = max(0, min(127, int(wire)))
|
||||
return min(255, (w * 255) // 127)
|
||||
|
||||
|
||||
def _decode_preset_record(buf, off):
|
||||
nl = buf[off]
|
||||
off += 1
|
||||
name = buf[off : off + nl].decode("utf-8")
|
||||
off += nl
|
||||
pl = buf[off]
|
||||
off += 1
|
||||
pattern = buf[off : off + pl].decode("utf-8")
|
||||
off += pl
|
||||
nc = buf[off]
|
||||
off += 1
|
||||
colors = []
|
||||
for _ in range(nc):
|
||||
r, g, b = buf[off], buf[off + 1], buf[off + 2]
|
||||
off += 3
|
||||
colors.append("#%02x%02x%02x" % (r, g, b))
|
||||
if off + 16 > len(buf):
|
||||
raise ValueError("truncated")
|
||||
delay, br, auto, n1, n2, n3, n4, n5, n6 = struct.unpack_from(
|
||||
"<HBBhhhhhh", buf, off
|
||||
)
|
||||
off += 16
|
||||
preset = {
|
||||
"p": pattern,
|
||||
"c": colors,
|
||||
"d": delay,
|
||||
"b": br,
|
||||
"a": bool(auto),
|
||||
"n1": n1,
|
||||
"n2": n2,
|
||||
"n3": n3,
|
||||
"n4": n4,
|
||||
"n5": n5,
|
||||
"n6": n6,
|
||||
}
|
||||
return name, preset, off
|
||||
|
||||
|
||||
def _decode_presets_blob(chunk):
|
||||
if not chunk:
|
||||
return {}
|
||||
off = 0
|
||||
count = chunk[off]
|
||||
off += 1
|
||||
out = {}
|
||||
for _ in range(count):
|
||||
name, preset, off = _decode_preset_record(chunk, off)
|
||||
out[name] = preset
|
||||
if off != len(chunk):
|
||||
raise ValueError("presets blob mismatch")
|
||||
return out
|
||||
|
||||
|
||||
def _decode_select_blob(chunk):
|
||||
if not chunk:
|
||||
return {}
|
||||
off = 0
|
||||
count = chunk[off]
|
||||
off += 1
|
||||
out = {}
|
||||
for _ in range(count):
|
||||
dl = chunk[off]
|
||||
off += 1
|
||||
device = chunk[off : off + dl].decode("utf-8")
|
||||
off += dl
|
||||
pl = chunk[off]
|
||||
off += 1
|
||||
pname = chunk[off : off + pl].decode("utf-8")
|
||||
off += pl
|
||||
has_step = chunk[off]
|
||||
off += 1
|
||||
if has_step:
|
||||
step = struct.unpack_from("<H", chunk, off)[0]
|
||||
off += 2
|
||||
out[device] = [pname, step]
|
||||
else:
|
||||
out[device] = [pname]
|
||||
if off != len(chunk):
|
||||
raise ValueError("select blob mismatch")
|
||||
return out
|
||||
|
||||
|
||||
def _decode_default_blob(chunk):
|
||||
if not chunk:
|
||||
return "", []
|
||||
off = 0
|
||||
nl = chunk[off]
|
||||
off += 1
|
||||
default_name = chunk[off : off + nl].decode("utf-8") if nl else ""
|
||||
off += nl
|
||||
nt = chunk[off]
|
||||
off += 1
|
||||
targets = []
|
||||
for _ in range(nt):
|
||||
tl = chunk[off]
|
||||
off += 1
|
||||
targets.append(chunk[off : off + tl].decode("utf-8"))
|
||||
off += tl
|
||||
if off != len(chunk):
|
||||
raise ValueError("default blob mismatch")
|
||||
return default_name, targets
|
||||
|
||||
|
||||
def parse_binary_envelope_v2(buf):
|
||||
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
|
||||
return None
|
||||
if buf[0] != BINARY_ENVELOPE_VERSION_2:
|
||||
return None
|
||||
lp = buf[2]
|
||||
ls = buf[3]
|
||||
ld = buf[4]
|
||||
need = HEADER_LEN + lp + ls + ld
|
||||
if len(buf) != need:
|
||||
return None
|
||||
|
||||
off = HEADER_LEN
|
||||
presets_chunk = buf[off : off + lp]
|
||||
off += lp
|
||||
select_chunk = buf[off : off + ls]
|
||||
off += ls
|
||||
default_chunk = buf[off : off + ld]
|
||||
|
||||
data = {"v": "1"}
|
||||
br = buf[1]
|
||||
if br < 128:
|
||||
data["b"] = _brightness_0_255_from_wire(br)
|
||||
|
||||
try:
|
||||
if lp:
|
||||
data["presets"] = _decode_presets_blob(presets_chunk)
|
||||
if ls:
|
||||
data["select"] = _decode_select_blob(select_chunk)
|
||||
if ld:
|
||||
dname, targets = _decode_default_blob(default_chunk)
|
||||
data["default"] = dname
|
||||
data["targets"] = targets
|
||||
except (ValueError, UnicodeError, TypeError, struct.error):
|
||||
return None
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def parse_binary_envelope_v1(buf):
|
||||
if not isinstance(buf, (bytes, bytearray)) or len(buf) < HEADER_LEN:
|
||||
return None
|
||||
if buf[0] != BINARY_ENVELOPE_VERSION_1:
|
||||
return None
|
||||
lp = buf[2]
|
||||
ls = buf[3]
|
||||
ld = buf[4]
|
||||
need = HEADER_LEN + lp + ls + ld
|
||||
if len(buf) != need:
|
||||
return None
|
||||
|
||||
off = HEADER_LEN
|
||||
presets_chunk = buf[off : off + lp]
|
||||
off += lp
|
||||
select_chunk = buf[off : off + ls]
|
||||
off += ls
|
||||
default_chunk = buf[off : off + ld]
|
||||
|
||||
data = {"v": "1"}
|
||||
|
||||
br = buf[1]
|
||||
if br < 128:
|
||||
data["b"] = _brightness_0_255_from_wire(br)
|
||||
|
||||
if lp:
|
||||
try:
|
||||
data["presets"] = json.loads(presets_chunk.decode("utf-8"))
|
||||
except (ValueError, UnicodeError):
|
||||
return None
|
||||
if ls:
|
||||
try:
|
||||
data["select"] = json.loads(select_chunk.decode("utf-8"))
|
||||
except (ValueError, UnicodeError):
|
||||
return None
|
||||
if ld:
|
||||
try:
|
||||
extra = json.loads(default_chunk.decode("utf-8"))
|
||||
except (ValueError, UnicodeError):
|
||||
return None
|
||||
if isinstance(extra, dict):
|
||||
for k, v in extra.items():
|
||||
data[k] = v
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def parse_binary_envelope(buf):
|
||||
d = parse_binary_envelope_v2(buf)
|
||||
if d is not None:
|
||||
return d
|
||||
return parse_binary_envelope_v1(buf)
|
||||
Reference in New Issue
Block a user