- Add n4 parameter to control server, LED bar receiver, and test script - Create segmented_movement pattern with alternating forward/backward movement - Pattern supports n1 (segment length), n2 (spacing), n3 (forward speed), n4 (backward speed) - Fix test script to send all messages instead of just the first one - Add segmented_movement to patterns_needing_params for proper parameter transmission - Pattern intelligently handles all cases: alternating, forward-only, backward-only, or static - Implements repeating segments with configurable spacing across LED strip
109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for control_server.py UI WebSocket API.
|
|
|
|
Starts a client to localhost:8765 and sends a small sequence of UI commands:
|
|
- pattern_change
|
|
- color_change
|
|
- brightness_change
|
|
- parameter_change (n1/n2/n3/n4)
|
|
|
|
Usage examples:
|
|
python test/test_control_server.py --pattern on --r 255 --g 0 --b 0 --brightness 150 --n1 5 --n2 5 --n3 1 --n4 2
|
|
python test/test_control_server.py --pattern rainbow
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import websockets
|
|
import re
|
|
|
|
|
|
def build_messages(args):
|
|
msgs = []
|
|
# Prioritize delay_change so a single send applies delay when both are provided
|
|
if args.delay is not None:
|
|
msgs.append({"type": "delay_change", "data": {"delay": args.delay}})
|
|
if args.pattern is not None:
|
|
msgs.append({"type": "pattern_change", "data": {"pattern": args.pattern}})
|
|
|
|
# Optional colors flag: parse first color and map to r,g,b if explicit r/g/b not given
|
|
if args.colors and (args.r is None and args.g is None and args.b is None):
|
|
hex_re = re.compile(r"^#?[0-9a-fA-F]{6}$")
|
|
first = args.colors.split(',')[0].strip()
|
|
if hex_re.match(first):
|
|
v = first[1:] if first.startswith('#') else first
|
|
args.r = int(v[0:2], 16)
|
|
args.g = int(v[2:4], 16)
|
|
args.b = int(v[4:6], 16)
|
|
|
|
if args.r is not None or args.g is not None or args.b is not None:
|
|
payload = {}
|
|
if args.r is not None:
|
|
payload["r"] = args.r
|
|
if args.g is not None:
|
|
payload["g"] = args.g
|
|
if args.b is not None:
|
|
payload["b"] = args.b
|
|
msgs.append({"type": "color_change", "data": payload})
|
|
|
|
if args.brightness is not None:
|
|
msgs.append({"type": "brightness_change", "data": {"brightness": args.brightness}})
|
|
|
|
if any(v is not None for v in (args.n1, args.n2, args.n3, args.n4)):
|
|
payload = {}
|
|
if args.n1 is not None:
|
|
payload["n1"] = args.n1
|
|
if args.n2 is not None:
|
|
payload["n2"] = args.n2
|
|
if args.n3 is not None:
|
|
payload["n3"] = args.n3
|
|
if args.n4 is not None:
|
|
payload["n4"] = args.n4
|
|
msgs.append({"type": "parameter_change", "data": payload})
|
|
|
|
return msgs
|
|
|
|
|
|
async def run_test(uri: str, messages: list[dict], sleep_s: float):
|
|
async with websockets.connect(uri) as ws:
|
|
# Send all messages with a delay between them
|
|
for m in messages:
|
|
await ws.send(json.dumps(m))
|
|
if len(messages) > 1:
|
|
await asyncio.sleep(sleep_s)
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description="Send UI commands to control_server WebSocket")
|
|
p.add_argument("--uri", default="ws://localhost:8765", help="WebSocket URI (default ws://localhost:8765)")
|
|
p.add_argument("--pattern", help="Pattern name for pattern_change")
|
|
p.add_argument("--r", type=int, help="Red 0-255 for color_change")
|
|
p.add_argument("--g", type=int, help="Green 0-255 for color_change")
|
|
p.add_argument("--b", type=int, help="Blue 0-255 for color_change")
|
|
p.add_argument("--brightness", type=int, help="Brightness value for brightness_change")
|
|
p.add_argument("--delay", type=int, help="Pattern delay (ms) via delay_change")
|
|
p.add_argument("--n1", type=int, help="n1 for parameter_change")
|
|
p.add_argument("--n2", type=int, help="n2 for parameter_change")
|
|
p.add_argument("--n3", type=int, help="n3 for parameter_change")
|
|
p.add_argument("--n4", type=int, help="n4 for parameter_change")
|
|
p.add_argument("--sleep", type=float, default=0.2, help="Seconds to wait between messages (default 0.2)")
|
|
p.add_argument("--colors", help="Comma-separated hex colors (uses first as r,g,b)")
|
|
return p.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
messages = build_messages(args)
|
|
if not messages:
|
|
# Default minimal test: just pattern_change to 'on'
|
|
messages = [{"type": "pattern_change", "data": {"pattern": "on"}}]
|
|
asyncio.run(run_test(args.uri, messages, args.sleep))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|