patterns: alternating uses n1 (on) and n2 (off); ensure visible ON color; return delay; phase via self.step

test: WS client sends nested {name:{...}}; add iterations and repeat-delay; include n per message; use n1/n2 for alternating
This commit is contained in:
2025-09-16 21:22:47 +12:00
parent 93560a253e
commit d599af271b
6 changed files with 640 additions and 474 deletions

158
test/main.py Normal file
View File

@@ -0,0 +1,158 @@
import asyncio
import json
import argparse
import signal
try:
import websockets # type: ignore
except Exception as e:
print("Please install websockets: pip install websockets")
raise
WS_URI = "ws://192.168.4.1/ws"
# Default pattern suite aligned with current firmware patterns
PATTERN_SUITE = [
{"pattern": "flicker", "delay": 80, "iterations": 30, "repeat_delay": 80, "colors": ["#ffaa00"]},
{"pattern": "fill_range", "n1": 10, "n2": 20, "delay": 400, "iterations": 1, "repeat_delay": 500, "colors": ["#888888"]},
{"pattern": "n_chase", "n1": 5, "n2": 5, "delay": 250, "iterations": 40, "repeat_delay": 120, "colors": ["#00ff88"]},
{"pattern": "alternating", "n1": 6, "n2": 6, "delay": 300, "iterations": 20, "repeat_delay": 300, "colors": ["#ff8800"]},
{"pattern": "pulse", "delay": 200, "iterations": 6, "repeat_delay": 300, "colors": ["#ffffff"]},
]
def build_message(
pattern: str,
n: int | None = None,
delay: int | None = None,
colors: list[str] | None = None,
brightness: int | None = None,
num_leds: int | None = None,
n1: int | None = None,
n2: int | None = None,
name: str = "0",
pattern_step: int | None = None,
):
settings: dict[str, object] = {
"pattern": pattern,
}
if n is not None:
settings["n"] = n
if delay is not None:
settings["delay"] = delay
if colors is not None:
settings["colors"] = colors
if brightness is not None:
settings["brightness"] = brightness
if num_leds is not None:
settings["num_leds"] = num_leds
if n1 is not None:
settings["n1"] = n1
if n2 is not None:
settings["n2"] = n2
if pattern_step is not None:
settings["pattern_step"] = pattern_step
# ESP-NOW-style nested payload keyed by name (e.g., "0")
return {name: settings}
async def send_once(uri: str, payload: dict, hold_ms: int | None = None):
async with websockets.connect(uri) as ws:
await ws.send(json.dumps(payload))
if hold_ms and hold_ms > 0:
await asyncio.sleep(hold_ms / 1000)
async def run_suite(uri: str):
async with websockets.connect(uri) as ws:
for cfg in PATTERN_SUITE:
iterations = int(cfg.get("iterations", 10))
interval_ms = int(cfg.get("interval_ms", cfg.get("delay", 100) or 100))
repeat_ms = int(cfg.get("repeat_delay", interval_ms))
for i in range(iterations):
msg = build_message(
cfg.get("pattern", "off"),
i,
delay=cfg.get("delay"),
colors=cfg.get("colors"),
brightness=cfg.get("brightness", 127),
num_leds=cfg.get("num_leds"),
n1=cfg.get("n1"),
n2=cfg.get("n2"),
name=cfg.get("name", "0"),
pattern_step=cfg.get("pattern_step"),
)
print(msg)
await ws.send(json.dumps(msg))
await asyncio.sleep(repeat_ms / 1000)
def _parse_args():
p = argparse.ArgumentParser(description="WebSocket LED pattern tester")
p.add_argument("--uri", default=WS_URI, help="WebSocket URI, default ws://192.168.4.1/ws")
p.add_argument("--pattern", help="Single pattern to send (overrides suite)")
p.add_argument("--delay", type=int, help="Delay ms")
p.add_argument("--brightness", type=int, help="Brightness 0-255")
p.add_argument("--num-leds", type=int, help="Number of LEDs")
p.add_argument("--colors", nargs="*", help="Hex colors like #ff0000 #00ff00")
p.add_argument("--on-width", type=int)
p.add_argument("--off-width", type=int)
p.add_argument("--n1", type=int)
p.add_argument("--n2", type=int)
p.add_argument("--name", default="0", help="Target name key for nested payload (default: 0)")
p.add_argument("--iterations", type=int, help="How many cycles/messages to send")
p.add_argument("--interval", type=int, help="Interval between messages in ms (default: delay or 100)")
p.add_argument("--repeat-delay", dest="repeat_delay", type=int, help="Delay between repeats in ms (overrides --interval if set)")
p.add_argument("--hold", type=int, default=1500, help="Hold ms for single send")
return p.parse_args()
def _setup_sigint(loop: asyncio.AbstractEventLoop):
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, loop.stop)
except NotImplementedError:
pass
async def main_async():
args = _parse_args()
if args.pattern:
iterations = int(args.iterations or 1)
interval_ms = int(args.interval or (args.delay if args.delay is not None else 100))
repeat_ms = int(args.repeat_delay or interval_ms)
async with websockets.connect(args.uri) as ws:
for i in range(iterations):
msg = build_message(
pattern=args.pattern,
n=i,
delay=args.delay,
colors=args.colors,
brightness=args.brightness,
num_leds=args.num_leds,
n1=args.n1,
n2=args.n2,
name=args.name,
)
print(msg)
await ws.send(json.dumps(msg))
await asyncio.sleep(repeat_ms / 1000)
else:
await run_suite(args.uri)
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_setup_sigint(loop)
try:
loop.run_until_complete(main_async())
finally:
try:
loop.run_until_complete(asyncio.sleep(0))
except Exception:
pass
loop.close()
if __name__ == "__main__":
main()