pipenv: add send-net; networking: use SPI; add networking test
This commit is contained in:
1
Pipfile
1
Pipfile
@@ -33,3 +33,4 @@ build-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py
|
||||
flash-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p $ESPPORT -b ${ESPSPEED:-460800} flash'"
|
||||
watch-esp32 = "watchfiles 'bash -c \"source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p ${ESPPORT:-/dev/ttyACM0} -b ${ESPSPEED:-460800} flash monitor\"' esp32/main"
|
||||
send-json = "python test/send_json.py"
|
||||
send-net = "python test/test_networking.py"
|
||||
|
@@ -1,53 +1,79 @@
|
||||
import asyncio
|
||||
import websockets
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
try:
|
||||
import spidev
|
||||
except Exception as e:
|
||||
spidev = None
|
||||
|
||||
|
||||
class WebSocketClient:
|
||||
def __init__(self, uri):
|
||||
self.uri = uri
|
||||
self.websocket = None
|
||||
def __init__(self, uri=None, *, bus=None, device=None, speed_hz=None):
|
||||
# SPI configuration (defaults can be overridden by args or env)
|
||||
self.bus = 0 if bus is None else int(bus)
|
||||
self.device = 0 if device is None else int(device)
|
||||
self.speed_hz = (
|
||||
int(os.getenv("SPI_SPEED_HZ", "1000000")) if speed_hz is None else int(speed_hz)
|
||||
)
|
||||
|
||||
self.spi = None
|
||||
self.is_connected = False
|
||||
|
||||
async def connect(self):
|
||||
"""Establishes the WebSocket connection."""
|
||||
if self.is_connected and self.websocket:
|
||||
print("Already connected.")
|
||||
"""Initializes the SPI connection."""
|
||||
if self.is_connected and self.spi:
|
||||
return
|
||||
|
||||
if spidev is None:
|
||||
print("spidev not available; cannot open SPI")
|
||||
self.is_connected = False
|
||||
self.spi = None
|
||||
return
|
||||
|
||||
try:
|
||||
print(f"Connecting to {self.uri}...")
|
||||
self.websocket = await websockets.connect(self.uri)
|
||||
spi = spidev.SpiDev()
|
||||
spi.open(self.bus, self.device)
|
||||
spi.max_speed_hz = self.speed_hz
|
||||
spi.mode = 0
|
||||
spi.bits_per_word = 8
|
||||
self.spi = spi
|
||||
self.is_connected = True
|
||||
print("WebSocket connected.")
|
||||
except (ConnectionError, websockets.exceptions.ConnectionClosedOK) as e:
|
||||
print(f"Error connecting: {e}")
|
||||
print(f"SPI connected: bus={self.bus} device={self.device} speed={self.speed_hz}Hz mode=0")
|
||||
except Exception as e:
|
||||
print(f"Error opening SPI: {e}")
|
||||
self.is_connected = False
|
||||
self.websocket = None
|
||||
self.spi = None
|
||||
|
||||
async def send_data(self, data):
|
||||
print(data)
|
||||
"""Sends data over the open WebSocket connection."""
|
||||
if not self.is_connected or not self.websocket:
|
||||
print("WebSocket not connected. Attempting to reconnect...")
|
||||
"""Sends a JSON object over SPI as UTF-8 bytes."""
|
||||
if not self.is_connected or not self.spi:
|
||||
await self.connect()
|
||||
if not self.is_connected:
|
||||
print("Failed to reconnect. Cannot send data.")
|
||||
print("SPI not connected; cannot send")
|
||||
return
|
||||
|
||||
try:
|
||||
await self.websocket.send(json.dumps(data))
|
||||
print(f"Sent: {data}")
|
||||
except (ConnectionError, websockets.exceptions.ConnectionClosed) as e:
|
||||
print(f"Error sending data: {e}")
|
||||
json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
|
||||
payload = list(json_str.encode("utf-8"))
|
||||
if not payload:
|
||||
return
|
||||
# Keep payload comfortably below ESP-NOW max; trim if necessary
|
||||
if len(payload) > 240:
|
||||
payload = payload[:240]
|
||||
self.spi.xfer2(payload)
|
||||
except Exception as e:
|
||||
print(f"SPI send failed: {e}")
|
||||
# Attempt simple reopen on next call
|
||||
self.is_connected = False
|
||||
self.websocket = None # Reset connection on error
|
||||
await self.connect() # Attempt to reconnect
|
||||
self.spi = None
|
||||
|
||||
async def close(self):
|
||||
"""Closes the WebSocket connection."""
|
||||
if self.websocket and self.is_connected:
|
||||
await self.websocket.close()
|
||||
self.is_connected = False
|
||||
self.websocket = None
|
||||
print("WebSocket closed.")
|
||||
"""Closes the SPI connection."""
|
||||
try:
|
||||
if self.spi:
|
||||
self.spi.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.is_connected = False
|
||||
self.spi = None
|
||||
|
118
test/test_networking.py
Normal file
118
test/test_networking.py
Normal file
@@ -0,0 +1,118 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Networking SPI test: builds a legacy led-bar payload and sends it via src/networking.py SPI client.
|
||||
|
||||
Usage examples:
|
||||
python test/test_networking.py --type b --pattern on --colors ff0000,00ff00,0000ff
|
||||
python test/test_networking.py --type u --brightness 128 --delay 50
|
||||
python test/test_networking.py --data '{"d":{"t":"b","pt":"off"}}'
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
# Import SPI networking client
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
ROOT_DIR = os.path.dirname(SCRIPT_DIR)
|
||||
if ROOT_DIR not in sys.path:
|
||||
sys.path.insert(0, ROOT_DIR)
|
||||
from src.networking import WebSocketClient # SPI client with same API
|
||||
|
||||
|
||||
HEX6_RE = re.compile(r"^[0-9a-fA-F]{6}$")
|
||||
|
||||
|
||||
def parse_hex6_to_rgb(value: str):
|
||||
v = value.strip()
|
||||
if v.startswith("0x") or v.startswith("0X"):
|
||||
v = v[2:]
|
||||
if v.startswith("#"):
|
||||
v = v[1:]
|
||||
if not HEX6_RE.match(v):
|
||||
raise ValueError(f"Invalid hex color: {value}")
|
||||
return [int(v[0:2], 16), int(v[2:4], 16), int(v[4:6], 16)]
|
||||
|
||||
|
||||
def build_payload(args: argparse.Namespace) -> dict:
|
||||
if args.data:
|
||||
return json.loads(args.data)
|
||||
if args.file:
|
||||
with open(args.file, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
d = {"t": args.type}
|
||||
if args.pattern:
|
||||
d["pt"] = args.pattern
|
||||
if args.brightness is not None:
|
||||
d["br"] = int(args.brightness)
|
||||
if args.delay is not None:
|
||||
d["dl"] = int(args.delay)
|
||||
if args.n1 is not None:
|
||||
d["n1"] = int(args.n1)
|
||||
if args.n2 is not None:
|
||||
d["n2"] = int(args.n2)
|
||||
if args.n3 is not None:
|
||||
d["n3"] = int(args.n3)
|
||||
if args.step is not None:
|
||||
d["s"] = int(args.step)
|
||||
|
||||
if args.colors:
|
||||
items = [c.strip() for c in args.colors.split(',') if c.strip()]
|
||||
d["cl"] = [parse_hex6_to_rgb(c) for c in items]
|
||||
|
||||
payload = {"d": d}
|
||||
|
||||
if args.name:
|
||||
# For convenience, mirror defaults as per-device override
|
||||
payload[args.name] = {k: v for k, v in d.items() if k != "t"}
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Send SPI networking test payload (legacy led-bar format)")
|
||||
src = p.add_mutually_exclusive_group()
|
||||
src.add_argument("--data", help="Raw JSON payload to send")
|
||||
src.add_argument("--file", help="Path to JSON file to send")
|
||||
|
||||
p.add_argument("--type", choices=["b", "u"], default="b", help="Message type (beat/update)")
|
||||
p.add_argument("--pattern", help="Pattern name (pt)")
|
||||
p.add_argument("--brightness", type=int, help="Brightness (br)")
|
||||
p.add_argument("--delay", type=int, help="Delay (dl)")
|
||||
p.add_argument("--n1", type=int, help="n1")
|
||||
p.add_argument("--n2", type=int, help="n2")
|
||||
p.add_argument("--n3", type=int, help="n3")
|
||||
p.add_argument("--step", type=int, help="step (s)")
|
||||
p.add_argument("--colors", help="Comma-separated hex colors for cl (e.g. ff0000,00ff00,0000ff)")
|
||||
p.add_argument("--name", help="Per-device override key (device name)")
|
||||
|
||||
# SPI config overrides
|
||||
p.add_argument("--bus", type=int, default=0, help="SPI bus (default 0)")
|
||||
p.add_argument("--device", type=int, default=0, help="SPI device/CE (default 0)")
|
||||
p.add_argument("--speed", type=int, default=1_000_000, help="SPI speed Hz (default 1MHz)")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
async def main_async() -> int:
|
||||
args = parse_args()
|
||||
payload = build_payload(args)
|
||||
|
||||
client = WebSocketClient(uri=None, bus=args.bus, device=args.device, speed_hz=args.speed)
|
||||
await client.connect()
|
||||
await client.send_data(payload)
|
||||
await client.close()
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
return asyncio.run(main_async())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
|
Reference in New Issue
Block a user