Includes tests for v1/v2 envelope round-trips. Co-authored-by: Cursor <cursoragent@cursor.com>
26 lines
881 B
Python
26 lines
881 B
Python
"""JSON wire representation for controller messages (binary packing can replace later)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, Dict, Union
|
|
|
|
|
|
class Message:
|
|
"""Round-trip API dicts as compact UTF-8 JSON."""
|
|
|
|
def encode(self, data: Dict[str, Any]) -> bytes:
|
|
"""Encode a JSON-serialisable mapping (typically a v1 API dict) to bytes."""
|
|
return json.dumps(data, separators=(",", ":")).encode("utf-8")
|
|
|
|
def decode(self, payload: Union[str, bytes, bytearray]) -> Dict[str, Any]:
|
|
"""Decode UTF-8 JSON bytes or string into a dict."""
|
|
if isinstance(payload, (bytes, bytearray)):
|
|
text = payload.decode("utf-8")
|
|
else:
|
|
text = payload
|
|
obj = json.loads(text)
|
|
if not isinstance(obj, dict):
|
|
raise TypeError("JSON root must be an object")
|
|
return obj
|