49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for Pi↔bridge USB serial framing."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(ROOT / "src"))
|
|
|
|
from util.bridge_serial_frame import feed_serial_buffer, pack_serial_frame # noqa: E402
|
|
|
|
|
|
def test_pack_and_parse_single_frame():
|
|
payload = b'{"v":"1","select":["1"]}'
|
|
frame = pack_serial_frame(payload)
|
|
buf = bytearray()
|
|
out = feed_serial_buffer(buf, frame)
|
|
assert out == [payload]
|
|
assert len(buf) == 0
|
|
|
|
|
|
def test_rejects_oversized_length_and_caps_buffer():
|
|
buf = bytearray(b"\x31\x23") # length 12579 — invalid on bridge
|
|
buf.extend(b"x" * 100)
|
|
assert feed_serial_buffer(buf, b"") == []
|
|
assert len(buf) < 100 # resync shifted past bad header
|
|
|
|
|
|
def test_serial_frame_carries_ws_uplink():
|
|
from util.espnow_wire import pack_ws_uplink, parse_ws_frame
|
|
|
|
peer = bytes.fromhex("e8f60a16ea10")
|
|
pkt = b'{"v":"1","name":"test"}'
|
|
inner = pack_ws_uplink(peer, pkt)
|
|
framed = pack_serial_frame(inner)
|
|
out = feed_serial_buffer(bytearray(), framed)
|
|
assert len(out) == 1
|
|
p2, pkt2, _ = parse_ws_frame(out[0])
|
|
assert p2 == peer
|
|
assert pkt2 == pkt
|
|
payload = b"\x4c\x03abc"
|
|
frame = pack_serial_frame(payload)
|
|
buf = bytearray()
|
|
assert feed_serial_buffer(buf, frame[:1]) == []
|
|
assert feed_serial_buffer(buf, frame[1:3]) == []
|
|
assert feed_serial_buffer(buf, frame[3:]) == [payload]
|