159 lines
5.7 KiB
Python
159 lines
5.7 KiB
Python
import asyncio
|
|
import json
|
|
import argparse
|
|
import signal
|
|
|
|
try:
|
|
import websockets # type: ignore
|
|
except Exception as e:
|
|
print("Please install websockets: pip install websockets")
|
|
raise
|
|
|
|
WS_URI = "ws://192.168.4.1/ws"
|
|
|
|
# Default pattern suite aligned with current firmware patterns
|
|
PATTERN_SUITE = [
|
|
{"pattern": "flicker", "delay": 80, "iterations": 30, "repeat_delay": 80, "colors": ["#ffaa00"]},
|
|
{"pattern": "fill_range", "n1": 10, "n2": 20, "delay": 400, "iterations": 1, "repeat_delay": 500, "colors": ["#888888"]},
|
|
{"pattern": "n_chase", "n1": 5, "n2": 5, "delay": 250, "iterations": 40, "repeat_delay": 120, "colors": ["#00ff88"]},
|
|
{"pattern": "alternating", "n1": 6, "n2": 6, "delay": 300, "iterations": 20, "repeat_delay": 300, "colors": ["#ff8800"]},
|
|
{"pattern": "pulse", "delay": 200, "iterations": 6, "repeat_delay": 300, "colors": ["#ffffff"]},
|
|
]
|
|
|
|
|
|
def build_message(
|
|
pattern: str,
|
|
n: int | None = None,
|
|
delay: int | None = None,
|
|
colors: list[str] | None = None,
|
|
brightness: int | None = None,
|
|
num_leds: int | None = None,
|
|
n1: int | None = None,
|
|
n2: int | None = None,
|
|
name: str = "0",
|
|
pattern_step: int | None = None,
|
|
):
|
|
settings: dict[str, object] = {
|
|
"pattern": pattern,
|
|
}
|
|
if n is not None:
|
|
settings["n"] = n
|
|
if delay is not None:
|
|
settings["delay"] = delay
|
|
if colors is not None:
|
|
settings["colors"] = colors
|
|
if brightness is not None:
|
|
settings["brightness"] = brightness
|
|
if num_leds is not None:
|
|
settings["num_leds"] = num_leds
|
|
if n1 is not None:
|
|
settings["n1"] = n1
|
|
if n2 is not None:
|
|
settings["n2"] = n2
|
|
if pattern_step is not None:
|
|
settings["pattern_step"] = pattern_step
|
|
# ESP-NOW-style nested payload keyed by name (e.g., "0")
|
|
return {name: settings}
|
|
|
|
|
|
async def send_once(uri: str, payload: dict, hold_ms: int | None = None):
|
|
async with websockets.connect(uri) as ws:
|
|
await ws.send(json.dumps(payload))
|
|
if hold_ms and hold_ms > 0:
|
|
await asyncio.sleep(hold_ms / 1000)
|
|
|
|
|
|
async def run_suite(uri: str):
|
|
async with websockets.connect(uri) as ws:
|
|
for cfg in PATTERN_SUITE:
|
|
iterations = int(cfg.get("iterations", 10))
|
|
interval_ms = int(cfg.get("interval_ms", cfg.get("delay", 100) or 100))
|
|
repeat_ms = int(cfg.get("repeat_delay", interval_ms))
|
|
for i in range(iterations):
|
|
msg = build_message(
|
|
cfg.get("pattern", "off"),
|
|
i,
|
|
delay=cfg.get("delay"),
|
|
colors=cfg.get("colors"),
|
|
brightness=cfg.get("brightness", 127),
|
|
num_leds=cfg.get("num_leds"),
|
|
n1=cfg.get("n1"),
|
|
n2=cfg.get("n2"),
|
|
name=cfg.get("name", "0"),
|
|
pattern_step=cfg.get("pattern_step"),
|
|
)
|
|
print(msg)
|
|
await ws.send(json.dumps(msg))
|
|
await asyncio.sleep(repeat_ms / 1000)
|
|
|
|
|
|
def _parse_args():
|
|
p = argparse.ArgumentParser(description="WebSocket LED pattern tester")
|
|
p.add_argument("--uri", default=WS_URI, help="WebSocket URI, default ws://192.168.4.1/ws")
|
|
p.add_argument("--pattern", help="Single pattern to send (overrides suite)")
|
|
p.add_argument("--delay", type=int, help="Delay ms")
|
|
p.add_argument("--brightness", type=int, help="Brightness 0-255")
|
|
p.add_argument("--num-leds", type=int, help="Number of LEDs")
|
|
p.add_argument("--colors", nargs="*", help="Hex colors like #ff0000 #00ff00")
|
|
p.add_argument("--on-width", type=int)
|
|
p.add_argument("--off-width", type=int)
|
|
p.add_argument("--n1", type=int)
|
|
p.add_argument("--n2", type=int)
|
|
p.add_argument("--name", default="0", help="Target name key for nested payload (default: 0)")
|
|
p.add_argument("--iterations", type=int, help="How many cycles/messages to send")
|
|
p.add_argument("--interval", type=int, help="Interval between messages in ms (default: delay or 100)")
|
|
p.add_argument("--repeat-delay", dest="repeat_delay", type=int, help="Delay between repeats in ms (overrides --interval if set)")
|
|
p.add_argument("--hold", type=int, default=1500, help="Hold ms for single send")
|
|
return p.parse_args()
|
|
|
|
def _setup_sigint(loop: asyncio.AbstractEventLoop):
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, loop.stop)
|
|
except NotImplementedError:
|
|
pass
|
|
|
|
|
|
async def main_async():
|
|
args = _parse_args()
|
|
if args.pattern:
|
|
iterations = int(args.iterations or 1)
|
|
interval_ms = int(args.interval or (args.delay if args.delay is not None else 100))
|
|
repeat_ms = int(args.repeat_delay or interval_ms)
|
|
async with websockets.connect(args.uri) as ws:
|
|
for i in range(iterations):
|
|
msg = build_message(
|
|
pattern=args.pattern,
|
|
n=i,
|
|
delay=args.delay,
|
|
colors=args.colors,
|
|
brightness=args.brightness,
|
|
num_leds=args.num_leds,
|
|
n1=args.n1,
|
|
n2=args.n2,
|
|
name=args.name,
|
|
)
|
|
print(msg)
|
|
await ws.send(json.dumps(msg))
|
|
await asyncio.sleep(repeat_ms / 1000)
|
|
else:
|
|
await run_suite(args.uri)
|
|
|
|
|
|
def main():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
_setup_sigint(loop)
|
|
try:
|
|
loop.run_until_complete(main_async())
|
|
finally:
|
|
try:
|
|
loop.run_until_complete(asyncio.sleep(0))
|
|
except Exception:
|
|
pass
|
|
loop.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|