From 93476655fcd0a04bf4e88d12875797014f7c4775 Mon Sep 17 00:00:00 2001 From: pi Date: Sun, 5 Apr 2026 16:41:23 +1200 Subject: [PATCH] test: add tcp mock server with bind conflict hints Made-with: Cursor --- led-driver | 2 +- tests/tcp_test_server.py | 213 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 tests/tcp_test_server.py diff --git a/led-driver b/led-driver index c42dff8..dc19877 160000 --- a/led-driver +++ b/led-driver @@ -1 +1 @@ -Subproject commit c42dff8975bc4c16d0a07ef5116dd21e4826c00f +Subproject commit dc19877132e563bafd77e2b5383cf96937e73c61 diff --git a/tests/tcp_test_server.py b/tests/tcp_test_server.py new file mode 100644 index 0000000..a38828c --- /dev/null +++ b/tests/tcp_test_server.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Simple TCP test server for led-controller. + +Listens on the same TCP port used by led-driver WiFi transport and +every 5 seconds sends a newline-delimited JSON message with v="1". +""" + +import asyncio +import json +import os +import sys +from typing import Dict, Set + + +CLIENTS: Set[asyncio.StreamWriter] = set() +# Map each client writer to the device_name it reported. +CLIENT_DEVICE: Dict[asyncio.StreamWriter, str] = {} + + +async def _send_off_to_all(): + """Best-effort send an 'off' message to all connected devices.""" + if not CLIENTS: + return + print("[TCP TEST] Sending 'off' to all clients before shutdown") + dead = [] + for w in CLIENTS: + device_name = CLIENT_DEVICE.get(w) + if not device_name: + continue + payload = { + "v": "1", + "select": {device_name: ["off"]}, + } + line = json.dumps(payload) + "\n" + data = line.encode("utf-8") + try: + w.write(data) + await w.drain() + except Exception as e: + peer = w.get_extra_info("peername") + print(f"[TCP TEST] Error sending 'off' to {peer}: {e}") + dead.append(w) + for w in dead: + CLIENTS.discard(w) + CLIENT_DEVICE.pop(w, None) + + +async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + peer = writer.get_extra_info("peername") + print(f"[TCP TEST] Client connected: {peer}") + CLIENTS.add(writer) + buf = b"" + try: + # Wait for client to send its device_name JSON, then send presets once. + sent_presets = False + while True: + data = await reader.read(100) + if not data: + break + buf += data + print(f"[TCP TEST] From client {peer}: {data!r}") + + # Handle newline-delimited JSON from client. + while b"\n" in buf: + line, buf = buf.split(b"\n", 1) + line = line.strip() + if not line: + continue + try: + msg = json.loads(line.decode("utf-8")) + except Exception: + continue + + if isinstance(msg, dict) and "device_name" in msg: + device_name = str(msg.get("device_name") or "") + CLIENT_DEVICE[writer] = device_name + print(f"[TCP TEST] Registered device_name {device_name!r} for {peer}") + + if not sent_presets and device_name: + hello_payload = { + "v": "1", + "presets": { + "solid_red": { + "p": "on", + "c": ["#ff0000"], + "d": 100, + }, + "solid_blue": { + "p": "on", + "c": ["#0000ff"], + "d": 100, + }, + }, + "select": { + device_name: ["solid_red"], + }, + "b": 32, + } + try: + writer.write((json.dumps(hello_payload) + "\n").encode("utf-8")) + await writer.drain() + sent_presets = True + print( + f"[TCP TEST] Sent initial presets/select for device " + f"{device_name!r} to {peer}" + ) + except Exception as e: + print(f"[TCP TEST] Failed to send initial presets/select to {peer}: {e}") + except Exception as e: + print(f"[TCP TEST] Client error: {peer} {e}") + finally: + print(f"[TCP TEST] Client disconnected: {peer}") + CLIENTS.discard(writer) + CLIENT_DEVICE.pop(writer, None) + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + +async def broadcaster(port: int): + """Broadcast preset selection / brightness changes every 5 seconds.""" + counter = 0 + while True: + await asyncio.sleep(5) + counter += 1 + + # Toggle between two presets and brightness levels. + if CLIENTS: + print(f"[TCP TEST] Broadcasting to {len(CLIENTS)} client(s)") + + dead = [] + for w in CLIENTS: + device_name = CLIENT_DEVICE.get(w) + if not device_name: + continue + + if counter % 2 == 0: + preset_name = "solid_red" + payload = { + "v": "1", + "select": {device_name: [preset_name]}, + } + else: + preset_name = "solid_blue" + payload = { + "v": "1", + "select": {device_name: [preset_name]}, + } + + line = json.dumps(payload) + "\n" + data = line.encode("utf-8") + + try: + w.write(data) + await w.drain() + peer = w.get_extra_info("peername") + print( + f"[TCP TEST] Sent preset {preset_name!r} to device {device_name!r} " + f"for client {peer}" + ) + except Exception as e: + peer = w.get_extra_info("peername") + print(f"[TCP TEST] Error writing to {peer}: {e}") + dead.append(w) + + for w in dead: + CLIENTS.discard(w) + CLIENT_DEVICE.pop(w, None) + + +async def main(): + port = int(os.environ.get("PORT", os.environ.get("TCP_PORT", "8765"))) + host = "0.0.0.0" + print(f"[TCP TEST] Starting TCP test server on {host}:{port}") + + try: + server = await asyncio.start_server(handle_client, host=host, port=port) + except OSError as e: + if e.errno == 98: # EADDRINUSE + print( + f"[TCP TEST] Port {port} is already in use.\n" + f" If led-controller.service is enabled, it binds this port for ESP TCP " + f"transport after boot. Stop it for a standalone mock:\n" + f" sudo systemctl stop led-controller\n" + f" Or keep the main app and use another port for this mock:\n" + f" TCP_PORT=8766 pipenv run tcp-test\n" + f" (point test clients at that port). See also: sudo ss -tlnp | grep {port}", + file=sys.stderr, + ) + raise + async with server: + broadcaster_task = asyncio.create_task(broadcaster(port)) + try: + await server.serve_forever() + finally: + # On shutdown, try to turn all connected devices off. + await _send_off_to_all() + broadcaster_task.cancel() + with contextlib.suppress(Exception): + await broadcaster_task + + +if __name__ == "__main__": + import contextlib + + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n[TCP TEST] Shutting down.") +