#!/usr/bin/env python3 """ Networking SPI test: builds a legacy led-bar payload and sends it via src/networking.py SPI client. Usage examples: python test/test_networking.py --type b --pattern on --colors ff0000,00ff00,0000ff python test/test_networking.py --type u --brightness 128 --delay 50 python test/test_networking.py --data '{"d":{"t":"b","pt":"off"}}' """ import argparse import asyncio import json import os import re import sys # Import SPI networking client SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(SCRIPT_DIR) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from src.networking import WebSocketClient # SPI client with same API HEX6_RE = re.compile(r"^[0-9a-fA-F]{6}$") def parse_hex6_to_rgb(value: str): v = value.strip() if v.startswith("0x") or v.startswith("0X"): v = v[2:] if v.startswith("#"): v = v[1:] if not HEX6_RE.match(v): raise ValueError(f"Invalid hex color: {value}") return [int(v[0:2], 16), int(v[2:4], 16), int(v[4:6], 16)] def build_payload(args: argparse.Namespace) -> dict: if args.data: return json.loads(args.data) if args.file: with open(args.file, "r", encoding="utf-8") as f: return json.load(f) d = {"t": args.type} if args.pattern: d["pt"] = args.pattern if args.brightness is not None: d["br"] = int(args.brightness) if args.delay is not None: d["dl"] = int(args.delay) if args.n1 is not None: d["n1"] = int(args.n1) if args.n2 is not None: d["n2"] = int(args.n2) if args.n3 is not None: d["n3"] = int(args.n3) if args.step is not None: d["s"] = int(args.step) if args.colors: items = [c.strip() for c in args.colors.split(',') if c.strip()] d["cl"] = [parse_hex6_to_rgb(c) for c in items] payload = {"d": d} if args.name: # For convenience, mirror defaults as per-device override payload[args.name] = {k: v for k, v in d.items() if k != "t"} return payload def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Send SPI networking test payload (legacy led-bar format)") src = p.add_mutually_exclusive_group() src.add_argument("--data", help="Raw JSON payload to send") src.add_argument("--file", help="Path to JSON file to send") p.add_argument("--type", choices=["b", "u"], default="b", help="Message type (beat/update)") p.add_argument("--pattern", help="Pattern name (pt)") p.add_argument("--brightness", type=int, help="Brightness (br)") p.add_argument("--delay", type=int, help="Delay (dl)") p.add_argument("--n1", type=int, help="n1") p.add_argument("--n2", type=int, help="n2") p.add_argument("--n3", type=int, help="n3") p.add_argument("--step", type=int, help="step (s)") p.add_argument("--colors", help="Comma-separated hex colors for cl (e.g. ff0000,00ff00,0000ff)") p.add_argument("--name", help="Per-device override key (device name)") # SPI config overrides p.add_argument("--bus", type=int, default=0, help="SPI bus (default 0)") p.add_argument("--device", type=int, default=0, help="SPI device/CE (default 0)") p.add_argument("--speed", type=int, default=1_000_000, help="SPI speed Hz (default 1MHz)") return p.parse_args() async def main_async() -> int: args = parse_args() payload = build_payload(args) client = WebSocketClient(uri=None, bus=args.bus, device=args.device, speed_hz=args.speed) await client.connect() await client.send_data(payload) await client.close() return 0 def main() -> int: return asyncio.run(main_async()) if __name__ == "__main__": raise SystemExit(main())