"""Short v1 field names for ESP-NOW JSON (≤250 B). Long names still accepted on receive.""" from __future__ import annotations from typing import Any, Dict, List, Optional, Union # Envelope: devices map ENV_DEVICES = "dv" # Device body K_PRESETS = "p" K_SELECT = "s" K_GROUPS = "g" K_SET_GROUPS = "sg" K_SAVE = "sv" K_DEFAULT = "df" K_DEVICE_CONFIG = "dc" K_CLEAR_PRESETS = "cp" K_MANIFEST = "mf" _BODY_LONG_TO_SHORT = { "presets": K_PRESETS, "select": K_SELECT, "groups": K_GROUPS, "set_groups": K_SET_GROUPS, "save": K_SAVE, "default": K_DEFAULT, "device_config": K_DEVICE_CONFIG, "clear_presets": K_CLEAR_PRESETS, "manifest": K_MANIFEST, } _BODY_SHORT_TO_LONG = {v: k for k, v in _BODY_LONG_TO_SHORT.items()} def wire_select_list(preset_id: Union[str, int], step: Optional[Union[int, str]] = None) -> List[Any]: """Preset id (+ optional step) for ``select`` on unicast/broadcast to one driver.""" out: List[Any] = [str(preset_id)] if step is not None: out.append(step) return out def normalize_select_for_wire(select: Any) -> Any: """Long or legacy shapes → wire list ``[preset_id, step?]``.""" if isinstance(select, list): return select if isinstance(select, str) and select.strip(): return [select.strip()] if not isinstance(select, dict): return select if "preset" in select: out: List[Any] = [str(select["preset"])] if "step" in select: out.append(select["step"]) return out # Legacy {device_name: [preset, step?]} — unicast only; keep dict for expand on driver if len(select) == 1: val = next(iter(select.values())) if isinstance(val, list) and val: return list(val) return select def compact_body(body: Dict[str, Any]) -> Dict[str, Any]: """Long-key device body → short keys for the wire.""" out: Dict[str, Any] = {} for long_key, short_key in _BODY_LONG_TO_SHORT.items(): if long_key in body: val = body[long_key] if long_key == "select": val = normalize_select_for_wire(val) out[short_key] = val for short_key in _BODY_SHORT_TO_LONG: if short_key in body and short_key not in out: val = body[short_key] if short_key == K_SELECT: val = normalize_select_for_wire(val) out[short_key] = val if "b" in body: out["b"] = body["b"] return out def expand_body(body: Dict[str, Any]) -> Dict[str, Any]: """Short or long device body → long keys for driver logic.""" out: Dict[str, Any] = dict(body) for short_key, long_key in _BODY_SHORT_TO_LONG.items(): if short_key in body and long_key not in out: out[long_key] = body[short_key] if short_key in out: del out[short_key] return out def compact_envelope(envelope: Dict[str, Any]) -> Dict[str, Any]: if envelope.get("v") != "1": return envelope devices = envelope.get("devices") if devices is None: devices = envelope.get(ENV_DEVICES) if not isinstance(devices, dict): return envelope compact_devices = {mac: compact_body(body) for mac, body in devices.items() if isinstance(body, dict)} return {"v": "1", ENV_DEVICES: compact_devices} def expand_envelope(envelope: Dict[str, Any]) -> Dict[str, Any]: if envelope.get("v") != "1": return envelope devices = envelope.get("devices") if devices is None: devices = envelope.get(ENV_DEVICES) if not isinstance(devices, dict): return envelope expanded = {mac: expand_body(body) for mac, body in devices.items() if isinstance(body, dict)} return {"v": "1", "devices": expanded} def wire_json_size(obj: Dict[str, Any]) -> int: import json return len(json.dumps(obj, separators=(",", ":")).encode("utf-8"))