feat(espnow): Pi bridge client, binary wire, and espnow-sender firmware
Replace serial/Wi-Fi driver transport paths with WebSocket bridge client, binary espnow_wire delivery, device announce registry, and restructured espnow-sender (AP + broadcast passthrough). Includes docs and tests. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
110
tests/test_espnow_wire.py
Normal file
110
tests/test_espnow_wire.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Tests for ESP-NOW binary wire format."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(PROJECT_ROOT / "src"))
|
||||
|
||||
from util.binary_envelope import pack_binary_envelope_v2 # noqa: E402
|
||||
from util.espnow_wire import ( # noqa: E402
|
||||
BROADCAST_MAC,
|
||||
MAX_ESPNOW_PAYLOAD,
|
||||
MSG_ANNOUNCE,
|
||||
MSG_CMD,
|
||||
MSG_GROUPS,
|
||||
WIRE_MAGIC,
|
||||
pack_announce,
|
||||
pack_bridge_channel,
|
||||
pack_cmd,
|
||||
pack_cmd_from_kwargs,
|
||||
pack_group_cmd_from_kwargs,
|
||||
pack_groups,
|
||||
pack_ws_downlink,
|
||||
pack_ws_uplink,
|
||||
parse_announce,
|
||||
parse_cmd_as_v1_dict,
|
||||
parse_group_cmd,
|
||||
parse_groups,
|
||||
parse_ws_frame,
|
||||
wire_msg_type,
|
||||
)
|
||||
|
||||
|
||||
def test_announce_round_trip():
|
||||
raw = pack_announce(
|
||||
name="led-abc123",
|
||||
num_leds=119,
|
||||
color_order="grb",
|
||||
startup_mode="last",
|
||||
brightness=70,
|
||||
)
|
||||
assert len(raw) <= MAX_ESPNOW_PAYLOAD
|
||||
assert raw[0] == WIRE_MAGIC
|
||||
assert raw[1] == MSG_ANNOUNCE
|
||||
d = parse_announce(raw)
|
||||
assert d["name"] == "led-abc123"
|
||||
assert d["num_leds"] == 119
|
||||
assert d["color_order"] == "grb"
|
||||
assert d["startup_mode"] == "last"
|
||||
assert d["brightness"] == 70
|
||||
|
||||
|
||||
def test_groups_round_trip():
|
||||
raw = pack_groups(["5", "18", "test"])
|
||||
assert wire_msg_type(raw) == MSG_GROUPS
|
||||
assert parse_groups(raw) == ["5", "18", "test"]
|
||||
|
||||
|
||||
def test_cmd_envelope_round_trip():
|
||||
env = pack_binary_envelope_v2(brightness_0_255=128)
|
||||
raw = pack_cmd(env, save=True)
|
||||
assert wire_msg_type(raw) == MSG_CMD
|
||||
assert len(raw) <= MAX_ESPNOW_PAYLOAD
|
||||
d = parse_cmd_as_v1_dict(raw)
|
||||
assert d == {"v": "1", "b": 128, "save": True}
|
||||
|
||||
|
||||
def test_cmd_from_kwargs():
|
||||
raw = pack_cmd_from_kwargs(
|
||||
select={"dev1": ["on"]},
|
||||
brightness_0_255=64,
|
||||
)
|
||||
d = parse_cmd_as_v1_dict(raw)
|
||||
assert d["select"]["dev1"] == ["on"]
|
||||
assert d["b"] == 64
|
||||
|
||||
|
||||
def test_group_cmd_round_trip():
|
||||
raw = pack_group_cmd_from_kwargs("18", brightness_0_255=32)
|
||||
gid, env = parse_group_cmd(raw)
|
||||
assert gid == "18"
|
||||
d = parse_cmd_as_v1_dict(bytes([WIRE_MAGIC, MSG_CMD]) + env)
|
||||
assert d["b"] == 32
|
||||
|
||||
|
||||
def test_ws_frame_round_trip():
|
||||
pkt = pack_announce(name="led-x", num_leds=10)
|
||||
peer = bytes.fromhex("e8f60a16dad0")
|
||||
up = pack_ws_uplink(peer, pkt)
|
||||
p2, pkt2, bcast = parse_ws_frame(up)
|
||||
assert p2 == peer
|
||||
assert pkt2 == pkt
|
||||
assert not bcast
|
||||
|
||||
down = pack_ws_downlink(pkt, peer_mac=peer)
|
||||
p3, pkt3, bcast3 = parse_ws_frame(down)
|
||||
assert p3 == peer
|
||||
assert pkt3 == pkt
|
||||
assert not bcast3
|
||||
|
||||
bdown = pack_ws_downlink(pkt, broadcast=True)
|
||||
_, pkt4, bcast4 = parse_ws_frame(bdown)
|
||||
assert pkt4 == pkt
|
||||
assert bcast4
|
||||
|
||||
|
||||
def test_bridge_channel():
|
||||
raw = pack_bridge_channel(6)
|
||||
assert len(raw) == 3
|
||||
assert raw[1] == 0x10
|
||||
Reference in New Issue
Block a user