#!/usr/bin/env python3 """ Control Server for Lighting Controller Handles lighting control logic and communicates with LED bars via SPI or WebSocket. Receives commands from UI client via WebSocket. """ import asyncio import json import logging import socket import threading import time import argparse import os from aiohttp import web from dotenv import load_dotenv from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS from color_utils import adjust_brightness from networking import SPIClient, WebSocketClient # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Configuration CONTROL_SERVER_PORT = int(os.getenv("CONTROL_SERVER_PORT", "8765")) HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "8766")) SOUND_CONTROL_HOST = os.getenv("SOUND_CONTROL_HOST", "127.0.0.1") SOUND_CONTROL_PORT = int(os.getenv("SOUND_CONTROL_PORT", "65433")) CONFIG_FILE = "lighting_config.json" # Pattern name mapping for shorter JSON payloads # Frontend sends shortnames, backend can use either long or short names # These map to the shortnames defined in led-bar/src/patterns.py PATTERN_NAMES = { # Long names to short names (for backend use) "off": "o", "flicker": "f", "fill_range": "fr", "n_chase": "nc", "alternating": "a", "pulse": "p", "rainbow": "r", "specto": "s", "radiate": "rd", "segmented_movement": "sm", # New: alternate two palette colors by pulsing per beat (backend-only logical name) "alternating_pulse": "apu", # Short names pass through (for frontend use) "o": "o", "f": "f", "fr": "fr", "nc": "nc", "a": "a", "p": "p", "r": "r", "s": "s", "rd": "rd", "sm": "sm", # Backend-specific patterns "sequential_pulse": "sp", "alternating_phase": "ap", } class LEDController: """Handles communication with LED bars via SPI or WebSocket.""" def __init__(self, transport="spi", **kwargs): if transport == "spi": self.client = SPIClient( bus=kwargs.get('spi_bus', 0), device=kwargs.get('spi_device', 0), speed_hz=kwargs.get('spi_speed_hz', 1_000_000) ) elif transport == "websocket": self.client = WebSocketClient(uri=kwargs.get('uri', 'ws://192.168.4.1/ws')) else: raise ValueError(f"Invalid transport: {transport}. Must be 'spi' or 'websocket'") @property def is_connected(self) -> bool: return getattr(self.client, "is_connected", False) async def connect(self): await self.client.connect() async def send_data(self, data): await self.client.send_data(data) async def close(self): await self.client.close() class SoundController: """Handles communication with sound beat detector.""" def __init__(self, sound_host, sound_port): self.sound_host = sound_host self.sound_port = sound_port async def send_reset_tempo(self): """Send reset tempo command to sound controller.""" try: reader, writer = await asyncio.open_connection(self.sound_host, self.sound_port) cmd = "RESET_TEMPO\n".encode('utf-8') writer.write(cmd) await writer.drain() resp = await reader.read(100) logging.info(f"Sent RESET_TEMPO, response: {resp.decode().strip()}") writer.close() await writer.wait_closed() except Exception as e: logging.error(f"Failed to send RESET_TEMPO: {e}") class LightingController: """Main lighting control logic.""" def __init__(self, transport="spi", **transport_kwargs): self.led_controller = LEDController(transport=transport, **transport_kwargs) self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT) # Lighting state self.current_pattern = "" self.brightness = 100 self.color_r = 0 self.color_g = 255 self.color_b = 0 self.beat_index = 0 self.beat_sending_enabled = True # Per-pattern parameters (pattern_name -> {delay, n1, n2, n3, n4}) self.pattern_parameters = {} # Default parameters for new patterns self.default_params = { "delay": 100, "n1": 10, "n2": 10, "n3": 1, "n4": 1 } # Current active parameters (loaded from current pattern) self.delay = self.default_params["delay"] self.n1 = self.default_params["n1"] self.n2 = self.default_params["n2"] self.n3 = self.default_params["n3"] self.n4 = self.default_params["n4"] # Color palette (8 slots, 2 selected) self.color_palette = [ {"r": 255, "g": 0, "b": 0}, # Red {"r": 0, "g": 255, "b": 0}, # Green {"r": 0, "g": 0, "b": 255}, # Blue {"r": 255, "g": 255, "b": 0}, # Yellow {"r": 255, "g": 0, "b": 255}, # Magenta {"r": 0, "g": 255, "b": 255}, # Cyan {"r": 255, "g": 128, "b": 0}, # Orange {"r": 255, "g": 255, "b": 255}, # White ] self.selected_color_indices = [0, 1] # Default: Red and Green # Load config self._load_config() # Rate limiting self.last_param_update = 0.0 self.param_update_interval = 0.1 self.pending_param_update = False def _load_pattern_parameters(self, pattern_name): """Load parameters for a specific pattern.""" if pattern_name in self.pattern_parameters: params = self.pattern_parameters[pattern_name] self.delay = params.get("delay", self.default_params["delay"]) self.n1 = params.get("n1", self.default_params["n1"]) self.n2 = params.get("n2", self.default_params["n2"]) self.n3 = params.get("n3", self.default_params["n3"]) self.n4 = params.get("n4", self.default_params["n4"]) else: # Use defaults for new pattern self.delay = self.default_params["delay"] self.n1 = self.default_params["n1"] self.n2 = self.default_params["n2"] self.n3 = self.default_params["n3"] self.n4 = self.default_params["n4"] def _save_pattern_parameters(self, pattern_name): """Save current parameters for the active pattern.""" if pattern_name: self.pattern_parameters[pattern_name] = { "delay": self.delay, "n1": self.n1, "n2": self.n2, "n3": self.n3, "n4": self.n4 } self._save_config() def _current_color_rgb(self): """Get current RGB color tuple from selected palette color (index 0).""" # Use the first selected color from the palette if self.selected_color_indices and len(self.selected_color_indices) > 0: color_index = self.selected_color_indices[0] if 0 <= color_index < len(self.color_palette): color = self.color_palette[color_index] return (color['r'], color['g'], color['b']) # Fallback to legacy color sliders if palette not set r = max(0, min(255, int(self.color_r))) g = max(0, min(255, int(self.color_g))) b = max(0, min(255, int(self.color_b))) return (r, g, b) def _palette_color(self, selected_index_position: int): """Return RGB tuple for the selected palette color at given position (0 or 1).""" if not self.selected_color_indices or selected_index_position >= len(self.selected_color_indices): return self._current_color_rgb() color_index = self.selected_color_indices[selected_index_position] if 0 <= color_index < len(self.color_palette): color = self.color_palette[color_index] return (color['r'], color['g'], color['b']) return self._current_color_rgb() def _load_config(self): """Load configuration from file.""" try: if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: config = json.load(f) # Load color palette if "color_palette" in config: self.color_palette = config["color_palette"] # Load selected color indices if "selected_color_indices" in config: self.selected_color_indices = config["selected_color_indices"] # Load per-pattern parameters if "pattern_parameters" in config: self.pattern_parameters = config["pattern_parameters"] logging.info(f"Loaded config from {CONFIG_FILE}") except Exception as e: logging.error(f"Error loading config: {e}") def _save_config(self): """Save configuration to file.""" try: config = { "color_palette": self.color_palette, "selected_color_indices": self.selected_color_indices, "pattern_parameters": self.pattern_parameters, } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) logging.info(f"Saved config to {CONFIG_FILE}") except Exception as e: logging.error(f"Error saving config: {e}") def get_color_palette_config(self): """Get current color palette configuration.""" return { "palette": self.color_palette, "selected_indices": self.selected_color_indices } def set_color_palette(self, palette_data): """Set color palette configuration.""" if "palette" in palette_data: # Validate palette has 8 colors if len(palette_data["palette"]) == 8: self.color_palette = palette_data["palette"] else: logging.warning(f"Invalid palette size: {len(palette_data['palette'])}, expected 8") if "selected_indices" in palette_data: # Validate indices indices = palette_data["selected_indices"] if len(indices) == 2 and all(0 <= i < 8 for i in indices): self.selected_color_indices = indices else: logging.warning(f"Invalid selected indices: {indices}") self._save_config() logging.info(f"Color palette updated: selected indices {self.selected_color_indices}") async def _send_full_parameters(self): """Send all parameters to LED bars.""" full_payload = { "d": { "t": "u", # Message type: update "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "dl": self.delay, "cl": [self._current_color_rgb()], "br": self.brightness, "n1": self.n1, "n2": self.n2, "n3": self.n3, "n4": self.n4, "s": self.beat_index % 256, } } # Add empty entries for each bar for bar_name in LED_BAR_NAMES: full_payload[bar_name] = {} await self.led_controller.send_data(full_payload) async def _request_param_update(self): """Request parameter update with rate limiting.""" current_time = time.time() if current_time - self.last_param_update >= self.param_update_interval: self.last_param_update = current_time await self._send_full_parameters() else: self.pending_param_update = True async def _send_normal_pattern(self): """Send normal pattern to all bars.""" # Patterns that need parameters (both long and short names) patterns_needing_params = [ "alternating", "a", "flicker", "f", "n_chase", "nc", "rainbow", "r", "radiate", "rd", "segmented_movement", "sm" ] payload = { "d": { "t": "b", # Message type: beat "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "cl": [self._current_color_rgb()], # Always send color } } if self.current_pattern in patterns_needing_params: payload["d"].update({ "n1": self.n1, "n2": self.n2, "n3": self.n3, "dl": self.delay, "s": self.beat_index % 256, }) for bar_name in LED_BAR_NAMES: payload[bar_name] = {} await self.led_controller.send_data(payload) async def _handle_sequential_pulse(self): """Handle sequential pulse pattern.""" from bar_config import LEFT_BARS, RIGHT_BARS bar_index = self.beat_index % 4 payload = { "d": { "t": "b", "pt": "o", # off } } left_bar = LEFT_BARS[bar_index] right_bar = RIGHT_BARS[bar_index] payload[left_bar] = {"pt": "p"} # pulse payload[right_bar] = {"pt": "p"} # pulse await self.led_controller.send_data(payload) async def _handle_alternating_phase(self): """Handle alternating pattern with phase offset.""" # Determine two colors from selected palette (fallback to current color if not set) color_a = self._palette_color(0) color_b = self._palette_color(1) phase = self.beat_index % 2 # Set the default color based on phase so both bar groups swap each beat default_color = color_a if phase == 0 else color_b alt_color_for_swap = color_b if phase == 0 else color_a # Avoid pure white edge-case on some bars by slightly reducing to 254 if default_color == (255, 255, 255): default_color = (254, 254, 254) if alt_color_for_swap == (255, 255, 255): alt_color_for_swap = (254, 254, 254) payload = { "d": { "t": "b", "pt": "a", # alternating "n1": self.n1, "n2": self.n2, # Default color for non-swapped bars changes with phase "cl": [default_color], "s": phase, } } # Bars in this list will have inverted phase and explicit color override # Flip grouping so first four bars (100-103) use default color (color_a on even beats) swap_bars = ["104", "105", "106", "107"] # Only include overrides for swapped bars to minimize payload size for bar_name in swap_bars: inv_phase = (phase + 1) % 2 payload[bar_name] = {"s": inv_phase, "cl": [alt_color_for_swap]} await self.led_controller.send_data(payload) async def _handle_alternating_pulse(self): """Handle APU: color1 on odd beat for odd bars, color2 on even beat for even bars.""" phase = self.beat_index % 2 color1 = self._palette_color(0) color2 = self._palette_color(1) if color1 == (255, 255, 255): color1 = (254, 254, 254) if color2 == (255, 255, 255): color2 = (254, 254, 254) # Define bar groups by numeric parity even_bars = ["100", "102", "104", "106"] odd_bars = ["101", "103", "105", "107"] # Default: turn bars off this beat payload = { "d": { "t": "b", "pt": "o", # off by default # Provide pulse envelope params in defaults for bars we enable "n1": self.n1, "n2": self.n2, "dl": self.delay, } } # Activate the correct half with the correct color if phase == 0: # Even beat -> even bars use color2 active_bars = even_bars active_color = color2 else: # Odd beat -> odd bars use color1 active_bars = odd_bars active_color = color1 for bar_name in active_bars: payload[bar_name] = {"pt": "p", "cl": [active_color]} await self.led_controller.send_data(payload) async def handle_beat(self, bpm_value): """Handle beat from sound detector.""" if not self.beat_sending_enabled or not self.current_pattern: return self.beat_index = (self.beat_index + 1) % 1000000 # Check for pending parameter updates if self.pending_param_update: current_time = time.time() if current_time - self.last_param_update >= self.param_update_interval: self.last_param_update = current_time self.pending_param_update = False await self._send_full_parameters() # Handle pattern-specific beat logic if self.current_pattern == "sequential_pulse": await self._handle_sequential_pulse() elif self.current_pattern == "alternating_phase": await self._handle_alternating_phase() elif self.current_pattern == "alternating_pulse": await self._handle_alternating_pulse() elif self.current_pattern: await self._send_normal_pattern() async def handle_ui_command(self, message_type, data): """Handle command from UI client.""" if message_type == "pattern_change": # Save current pattern's parameters before switching if self.current_pattern: self._save_pattern_parameters(self.current_pattern) # Switch to new pattern and load its parameters self.current_pattern = data.get("pattern", "") self._load_pattern_parameters(self.current_pattern) await self._send_full_parameters() logging.info(f"Pattern changed to: {self.current_pattern}") elif message_type == "color_change": self.color_r = data.get("r", self.color_r) self.color_g = data.get("g", self.color_g) self.color_b = data.get("b", self.color_b) await self._request_param_update() elif message_type == "brightness_change": self.brightness = data.get("brightness", self.brightness) await self._request_param_update() elif message_type == "parameter_change": if "n1" in data: self.n1 = data["n1"] if "n2" in data: self.n2 = data["n2"] if "n3" in data: self.n3 = data["n3"] if "n4" in data: self.n4 = data["n4"] # Save parameters for current pattern if self.current_pattern: self._save_pattern_parameters(self.current_pattern) await self._request_param_update() elif message_type == "delay_change": self.delay = data.get("delay", self.delay) # Save parameters for current pattern if self.current_pattern: self._save_pattern_parameters(self.current_pattern) await self._request_param_update() elif message_type == "beat_toggle": self.beat_sending_enabled = data.get("enabled", True) logging.info(f"Beat sending {'enabled' if self.beat_sending_enabled else 'disabled'}") elif message_type == "reset_tempo": await self.sound_controller.send_reset_tempo() elif message_type == "get_color_palette": # Return color palette configuration return self.get_color_palette_config() elif message_type == "set_color_palette": # Set color palette configuration self.set_color_palette(data) return {"status": "ok", "palette": self.get_color_palette_config()} class ControlServer: """WebSocket server for UI client communication and TCP server for sound.""" def __init__(self, transport="spi", enable_heartbeat=False, **transport_kwargs): self.lighting_controller = LightingController(transport=transport, **transport_kwargs) self.clients = set() self.tcp_server = None self.http_app = None self.http_runner = None self.enable_heartbeat = enable_heartbeat async def handle_websocket(self, request): """Handle WebSocket connection for UI client.""" ws = web.WebSocketResponse() await ws.prepare(request) self.clients.add(ws) client_addr = request.remote logging.info(f"UI client connected: {client_addr}") try: async for msg in ws: if msg.type == web.WSMsgType.TEXT: try: data = json.loads(msg.data) message_type = data.get("type") message_data = data.get("data", {}) response = await self.lighting_controller.handle_ui_command(message_type, message_data) # Send response if command returned data if response is not None: await ws.send_json({ "type": f"{message_type}_response", "data": response }) except json.JSONDecodeError: logging.error(f"Invalid JSON from client {client_addr}: {msg.data}") except Exception as e: logging.error(f"Error handling message from client {client_addr}: {e}") elif msg.type == web.WSMsgType.ERROR: logging.error(f"WebSocket error from {client_addr}: {ws.exception()}") except Exception as e: logging.error(f"Error in WebSocket handler: {e}") finally: self.clients.discard(ws) logging.info(f"UI client disconnected: {client_addr}") return ws async def handle_tcp_client(self, reader, writer): """Handle TCP client (sound detector) connection.""" addr = writer.get_extra_info('peername') logging.info(f"Sound client connected: {addr}") try: while True: data = await reader.read(4096) if not data: logging.info(f"Sound client disconnected: {addr}") break message = data.decode().strip() if self.lighting_controller.beat_sending_enabled: try: bpm_value = float(message) await self.lighting_controller.handle_beat(bpm_value) except ValueError: logging.warning(f"Non-BPM message from {addr}: {message}") except Exception as e: logging.error(f"Error processing beat from {addr}: {e}") except asyncio.CancelledError: logging.info(f"Sound client handler cancelled: {addr}") except Exception as e: logging.error(f"Error handling sound client {addr}: {e}") finally: logging.info(f"Closing connection for sound client: {addr}") writer.close() await writer.wait_closed() async def start_tcp_server(self): """Start TCP server for sound detector.""" self.tcp_server = await asyncio.start_server( self.handle_tcp_client, "127.0.0.1", 65432 ) addrs = ', '.join(str(sock.getsockname()) for sock in self.tcp_server.sockets) logging.info(f"TCP server listening on {addrs}") # HTTP API Handlers # Color Palette API async def http_get_color_palette(self, request): """HTTP GET /api/color-palette""" palette_config = self.lighting_controller.get_color_palette_config() return web.json_response(palette_config) async def http_set_color_palette(self, request): """HTTP POST/PUT /api/color-palette""" try: data = await request.json() self.lighting_controller.set_color_palette(data) palette_config = self.lighting_controller.get_color_palette_config() return web.json_response({ "status": "ok", "palette": palette_config }) except json.JSONDecodeError: return web.json_response( {"status": "error", "message": "Invalid JSON"}, status=400 ) except Exception as e: return web.json_response( {"status": "error", "message": str(e)}, status=500 ) # Pattern API async def http_set_pattern(self, request): """HTTP POST /api/pattern""" try: data = await request.json() logging.info(f"API received pattern change: {data}") pattern = data.get("pattern") if not pattern: return web.json_response( {"status": "error", "message": "Pattern name required"}, status=400 ) # Save current pattern's parameters before switching if self.lighting_controller.current_pattern: self.lighting_controller._save_pattern_parameters(self.lighting_controller.current_pattern) # Switch to new pattern and load its parameters # Normalize shortnames for backend-only patterns if pattern == "ap": pattern = "alternating_pulse" self.lighting_controller.current_pattern = pattern self.lighting_controller._load_pattern_parameters(pattern) await self.lighting_controller._send_full_parameters() logging.info(f"Pattern changed to: {pattern} with params: delay={self.lighting_controller.delay}, n1={self.lighting_controller.n1}, n2={self.lighting_controller.n2}, n3={self.lighting_controller.n3}, n4={self.lighting_controller.n4}") return web.json_response({ "status": "ok", "pattern": self.lighting_controller.current_pattern, "parameters": { "delay": self.lighting_controller.delay, "n1": self.lighting_controller.n1, "n2": self.lighting_controller.n2, "n3": self.lighting_controller.n3, "n4": self.lighting_controller.n4 } }) except Exception as e: return web.json_response( {"status": "error", "message": str(e)}, status=500 ) async def http_get_pattern(self, request): """HTTP GET /api/pattern""" return web.json_response({ "pattern": self.lighting_controller.current_pattern }) # Parameters API async def http_set_parameters(self, request): """HTTP POST /api/parameters""" try: data = await request.json() logging.info(f"API received parameter update: {data}") # Update any provided parameters if "brightness" in data: self.lighting_controller.brightness = int(data["brightness"]) if "delay" in data: self.lighting_controller.delay = int(data["delay"]) if "n1" in data: self.lighting_controller.n1 = int(data["n1"]) if "n2" in data: self.lighting_controller.n2 = int(data["n2"]) if "n3" in data: self.lighting_controller.n3 = int(data["n3"]) if "n4" in data: self.lighting_controller.n4 = int(data["n4"]) logging.info(f"Updated parameters for pattern '{self.lighting_controller.current_pattern}': brightness={self.lighting_controller.brightness}, delay={self.lighting_controller.delay}, n1={self.lighting_controller.n1}, n2={self.lighting_controller.n2}, n3={self.lighting_controller.n3}, n4={self.lighting_controller.n4}") # Save parameters for current pattern if self.lighting_controller.current_pattern: self.lighting_controller._save_pattern_parameters(self.lighting_controller.current_pattern) # Send updated parameters to LED bars await self.lighting_controller._send_full_parameters() return web.json_response({ "status": "ok", "parameters": { "brightness": self.lighting_controller.brightness, "delay": self.lighting_controller.delay, "n1": self.lighting_controller.n1, "n2": self.lighting_controller.n2, "n3": self.lighting_controller.n3, "n4": self.lighting_controller.n4 } }) except Exception as e: return web.json_response( {"status": "error", "message": str(e)}, status=500 ) async def http_get_parameters(self, request): """HTTP GET /api/parameters""" return web.json_response({ "brightness": self.lighting_controller.brightness, "delay": self.lighting_controller.delay, "n1": self.lighting_controller.n1, "n2": self.lighting_controller.n2, "n3": self.lighting_controller.n3, "n4": self.lighting_controller.n4 }) # State API async def http_get_state(self, request): """HTTP GET /api/state - Get complete system state""" palette_config = self.lighting_controller.get_color_palette_config() return web.json_response({ "pattern": self.lighting_controller.current_pattern, "parameters": { "brightness": self.lighting_controller.brightness, "delay": self.lighting_controller.delay, "n1": self.lighting_controller.n1, "n2": self.lighting_controller.n2, "n3": self.lighting_controller.n3, "n4": self.lighting_controller.n4 }, "color_palette": palette_config, "beat_index": self.lighting_controller.beat_index }) # Tempo API async def http_reset_tempo(self, request): """HTTP POST /api/tempo/reset""" try: await self.lighting_controller.sound_controller.send_reset_tempo() return web.json_response({"status": "ok", "message": "Tempo reset sent"}) except Exception as e: return web.json_response( {"status": "error", "message": str(e)}, status=500 ) async def start_http_server(self): """Start combined HTTP and WebSocket server.""" self.http_app = web.Application() # WebSocket endpoint (legacy support) self.http_app.router.add_get('/ws', self.handle_websocket) # REST API endpoints # Color Palette self.http_app.router.add_get('/api/color-palette', self.http_get_color_palette) self.http_app.router.add_post('/api/color-palette', self.http_set_color_palette) self.http_app.router.add_put('/api/color-palette', self.http_set_color_palette) # Pattern self.http_app.router.add_get('/api/pattern', self.http_get_pattern) self.http_app.router.add_post('/api/pattern', self.http_set_pattern) # Parameters self.http_app.router.add_get('/api/parameters', self.http_get_parameters) self.http_app.router.add_post('/api/parameters', self.http_set_parameters) # State (complete system state) self.http_app.router.add_get('/api/state', self.http_get_state) # Tempo self.http_app.router.add_post('/api/tempo/reset', self.http_reset_tempo) self.http_runner = web.AppRunner(self.http_app) await self.http_runner.setup() host = os.getenv("CONTROL_SERVER_HOST", "0.0.0.0") # Start WebSocket server on CONTROL_SERVER_PORT ws_site = web.TCPSite(self.http_runner, host, CONTROL_SERVER_PORT) await ws_site.start() logging.info(f"WebSocket server listening on {host}:{CONTROL_SERVER_PORT}/ws") logging.info(f"HTTP API server listening on {host}:{CONTROL_SERVER_PORT}") # Also start on HTTP_API_PORT for backward compatibility if HTTP_API_PORT != CONTROL_SERVER_PORT: api_site = web.TCPSite(self.http_runner, host, HTTP_API_PORT) await api_site.start() logging.info(f"HTTP API also available on {host}:{HTTP_API_PORT}") async def run(self): """Run the control server.""" # Connect to LED server await self.lighting_controller.led_controller.connect() # Start servers (optionally include heartbeat) tcp_task = asyncio.create_task(self._tcp_server_task()) http_task = asyncio.create_task(self._http_server_task()) # Handles both WebSocket and HTTP tasks = [tcp_task, http_task] if self.enable_heartbeat: heartbeat_task = asyncio.create_task(self._heartbeat_loop()) tasks.append(heartbeat_task) await asyncio.gather(*tasks) async def _tcp_server_task(self): """Keep TCP server running.""" await self.start_tcp_server() # Keep the server running indefinitely while True: await asyncio.sleep(1) async def _http_server_task(self): """Keep HTTP and WebSocket server running.""" await self.start_http_server() # Keep the server running indefinitely while True: await asyncio.sleep(1) async def _heartbeat_loop(self): """Send periodic heartbeats to keep LED connection alive.""" try: while True: await asyncio.sleep(5) # Send heartbeat every 5 seconds if self.lighting_controller.led_controller.is_connected: # Send a simple heartbeat to keep connection alive heartbeat_data = { "d": { "t": "h", # heartbeat type } } await self.lighting_controller.led_controller.send_data(heartbeat_data) except asyncio.CancelledError: logging.info("Heartbeat loop cancelled") except Exception as e: logging.error(f"Heartbeat loop error: {e}") async def main(): """Main entry point.""" args = parse_arguments() transport_kwargs = {} if args.transport == "spi": transport_kwargs = { 'spi_bus': args.spi_bus, 'spi_device': args.spi_device, 'spi_speed_hz': args.spi_speed } elif args.transport == "websocket": transport_kwargs = { 'uri': args.uri } server = ControlServer( transport=args.transport, enable_heartbeat=args.enable_heartbeat, **transport_kwargs ) try: await server.run() except KeyboardInterrupt: logging.info("Server interrupted by user") except Exception as e: logging.error(f"Server error: {e}") finally: await server.lighting_controller.led_controller.close() def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Control Server for Lighting Controller") # Transport selection transport_group = parser.add_argument_group("Transport Options") default_transport = os.getenv("TRANSPORT", "spi") transport_group.add_argument( "--transport", choices=["spi", "websocket"], default=default_transport, help=f"Transport method for LED communication (default from .env or {default_transport})" ) # Control options control_group = parser.add_argument_group("Control Options") control_group.add_argument( "--enable-heartbeat", action="store_true", help="Enable heartbeat system (may cause pattern interruptions)" ) # SPI options spi_group = parser.add_argument_group("SPI Options") spi_group.add_argument( "--spi-bus", type=int, default=0, help="SPI bus number (default: 0)" ) spi_group.add_argument( "--spi-device", type=int, default=0, help="SPI device number (default: 0)" ) spi_group.add_argument( "--spi-speed", type=int, default=1_000_000, help="SPI speed in Hz (default: 1000000)" ) # WebSocket options ws_group = parser.add_argument_group("WebSocket Options") ws_group.add_argument( "--uri", type=str, default="ws://192.168.4.1/ws", help="WebSocket URI for LED communication (default: ws://192.168.4.1/ws)" ) return parser.parse_args() if __name__ == "__main__": asyncio.run(main()) #