Files
led-controller/tests/tcp_test_server.py
2026-04-05 16:41:23 +12:00

214 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
Simple TCP test server for led-controller.
Listens on the same TCP port used by led-driver WiFi transport and
every 5 seconds sends a newline-delimited JSON message with v="1".
"""
import asyncio
import json
import os
import sys
from typing import Dict, Set
CLIENTS: Set[asyncio.StreamWriter] = set()
# Map each client writer to the device_name it reported.
CLIENT_DEVICE: Dict[asyncio.StreamWriter, str] = {}
async def _send_off_to_all():
"""Best-effort send an 'off' message to all connected devices."""
if not CLIENTS:
return
print("[TCP TEST] Sending 'off' to all clients before shutdown")
dead = []
for w in CLIENTS:
device_name = CLIENT_DEVICE.get(w)
if not device_name:
continue
payload = {
"v": "1",
"select": {device_name: ["off"]},
}
line = json.dumps(payload) + "\n"
data = line.encode("utf-8")
try:
w.write(data)
await w.drain()
except Exception as e:
peer = w.get_extra_info("peername")
print(f"[TCP TEST] Error sending 'off' to {peer}: {e}")
dead.append(w)
for w in dead:
CLIENTS.discard(w)
CLIENT_DEVICE.pop(w, None)
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
peer = writer.get_extra_info("peername")
print(f"[TCP TEST] Client connected: {peer}")
CLIENTS.add(writer)
buf = b""
try:
# Wait for client to send its device_name JSON, then send presets once.
sent_presets = False
while True:
data = await reader.read(100)
if not data:
break
buf += data
print(f"[TCP TEST] From client {peer}: {data!r}")
# Handle newline-delimited JSON from client.
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if not line:
continue
try:
msg = json.loads(line.decode("utf-8"))
except Exception:
continue
if isinstance(msg, dict) and "device_name" in msg:
device_name = str(msg.get("device_name") or "")
CLIENT_DEVICE[writer] = device_name
print(f"[TCP TEST] Registered device_name {device_name!r} for {peer}")
if not sent_presets and device_name:
hello_payload = {
"v": "1",
"presets": {
"solid_red": {
"p": "on",
"c": ["#ff0000"],
"d": 100,
},
"solid_blue": {
"p": "on",
"c": ["#0000ff"],
"d": 100,
},
},
"select": {
device_name: ["solid_red"],
},
"b": 32,
}
try:
writer.write((json.dumps(hello_payload) + "\n").encode("utf-8"))
await writer.drain()
sent_presets = True
print(
f"[TCP TEST] Sent initial presets/select for device "
f"{device_name!r} to {peer}"
)
except Exception as e:
print(f"[TCP TEST] Failed to send initial presets/select to {peer}: {e}")
except Exception as e:
print(f"[TCP TEST] Client error: {peer} {e}")
finally:
print(f"[TCP TEST] Client disconnected: {peer}")
CLIENTS.discard(writer)
CLIENT_DEVICE.pop(writer, None)
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
async def broadcaster(port: int):
"""Broadcast preset selection / brightness changes every 5 seconds."""
counter = 0
while True:
await asyncio.sleep(5)
counter += 1
# Toggle between two presets and brightness levels.
if CLIENTS:
print(f"[TCP TEST] Broadcasting to {len(CLIENTS)} client(s)")
dead = []
for w in CLIENTS:
device_name = CLIENT_DEVICE.get(w)
if not device_name:
continue
if counter % 2 == 0:
preset_name = "solid_red"
payload = {
"v": "1",
"select": {device_name: [preset_name]},
}
else:
preset_name = "solid_blue"
payload = {
"v": "1",
"select": {device_name: [preset_name]},
}
line = json.dumps(payload) + "\n"
data = line.encode("utf-8")
try:
w.write(data)
await w.drain()
peer = w.get_extra_info("peername")
print(
f"[TCP TEST] Sent preset {preset_name!r} to device {device_name!r} "
f"for client {peer}"
)
except Exception as e:
peer = w.get_extra_info("peername")
print(f"[TCP TEST] Error writing to {peer}: {e}")
dead.append(w)
for w in dead:
CLIENTS.discard(w)
CLIENT_DEVICE.pop(w, None)
async def main():
port = int(os.environ.get("PORT", os.environ.get("TCP_PORT", "8765")))
host = "0.0.0.0"
print(f"[TCP TEST] Starting TCP test server on {host}:{port}")
try:
server = await asyncio.start_server(handle_client, host=host, port=port)
except OSError as e:
if e.errno == 98: # EADDRINUSE
print(
f"[TCP TEST] Port {port} is already in use.\n"
f" If led-controller.service is enabled, it binds this port for ESP TCP "
f"transport after boot. Stop it for a standalone mock:\n"
f" sudo systemctl stop led-controller\n"
f" Or keep the main app and use another port for this mock:\n"
f" TCP_PORT=8766 pipenv run tcp-test\n"
f" (point test clients at that port). See also: sudo ss -tlnp | grep {port}",
file=sys.stderr,
)
raise
async with server:
broadcaster_task = asyncio.create_task(broadcaster(port))
try:
await server.serve_forever()
finally:
# On shutdown, try to turn all connected devices off.
await _send_off_to_all()
broadcaster_task.cancel()
with contextlib.suppress(Exception):
await broadcaster_task
if __name__ == "__main__":
import contextlib
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[TCP TEST] Shutting down.")