Add complete REST API for lighting control

- Migrated from websockets to aiohttp for unified HTTP/WebSocket server
- Added REST endpoints: /api/pattern, /api/parameters, /api/state, /api/tempo/reset
- Implemented color palette API with 8-slot system and selected colors
- First selected color (index 0) is used as primary RGB for patterns
- All operations now available via simple HTTP requests (no WebSocket needed)
- Added comprehensive documentation: FRONTEND_API.md, COLOR_PALETTE_API.md
- Added test scripts: test_rest_api.sh, test_color_patterns.py
- Updated test/test_control_server.py for new /ws WebSocket path
- Configuration persistence via lighting_config.json
- Pattern parameters (n1-n4, brightness, delay) controllable via API
- WebSocket still available at /ws for legacy support
This commit is contained in:
Pi User
2025-10-03 23:38:54 +13:00
parent aa9f892454
commit 6f9133b43e
19 changed files with 3512 additions and 44 deletions

View File

@@ -6,7 +6,6 @@ Receives commands from UI client via WebSocket.
"""
import asyncio
import websockets
import json
import logging
import socket
@@ -14,6 +13,7 @@ import threading
import time
import argparse
import os
from aiohttp import web
from dotenv import load_dotenv
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
from color_utils import adjust_brightness
@@ -27,11 +27,17 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
# Configuration
CONTROL_SERVER_PORT = int(os.getenv("CONTROL_SERVER_PORT", "8765"))
HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "8766"))
SOUND_CONTROL_HOST = os.getenv("SOUND_CONTROL_HOST", "127.0.0.1")
SOUND_CONTROL_PORT = int(os.getenv("SOUND_CONTROL_PORT", "65433"))
CONFIG_FILE = "lighting_config.json"
# Pattern name mapping for shorter JSON payloads
# Frontend sends shortnames, backend can use either long or short names
# These map to the shortnames defined in led-bar/src/patterns.py
PATTERN_NAMES = {
# Long names to short names (for backend use)
"off": "o",
"flicker": "f",
"fill_range": "fr",
"n_chase": "nc",
@@ -40,9 +46,21 @@ PATTERN_NAMES = {
"rainbow": "r",
"specto": "s",
"radiate": "rd",
"segmented_movement": "sm",
# Short names pass through (for frontend use)
"o": "o",
"f": "f",
"fr": "fr",
"nc": "nc",
"a": "a",
"p": "p",
"r": "r",
"s": "s",
"rd": "rd",
"sm": "sm",
# Backend-specific patterns
"sequential_pulse": "sp",
"alternating_phase": "ap",
"segmented_movement": "sm",
}
@@ -118,17 +136,100 @@ class LightingController:
self.beat_index = 0
self.beat_sending_enabled = True
# Color palette (8 slots, 2 selected)
self.color_palette = [
{"r": 255, "g": 0, "b": 0}, # Red
{"r": 0, "g": 255, "b": 0}, # Green
{"r": 0, "g": 0, "b": 255}, # Blue
{"r": 255, "g": 255, "b": 0}, # Yellow
{"r": 255, "g": 0, "b": 255}, # Magenta
{"r": 0, "g": 255, "b": 255}, # Cyan
{"r": 255, "g": 128, "b": 0}, # Orange
{"r": 255, "g": 255, "b": 255}, # White
]
self.selected_color_indices = [0, 1] # Default: Red and Green
# Load config
self._load_config()
# Rate limiting
self.last_param_update = 0.0
self.param_update_interval = 0.1
self.pending_param_update = False
def _current_color_rgb(self):
"""Get current RGB color tuple."""
"""Get current RGB color tuple from selected palette color (index 0)."""
# Use the first selected color from the palette
if self.selected_color_indices and len(self.selected_color_indices) > 0:
color_index = self.selected_color_indices[0]
if 0 <= color_index < len(self.color_palette):
color = self.color_palette[color_index]
return (color['r'], color['g'], color['b'])
# Fallback to legacy color sliders if palette not set
r = max(0, min(255, int(self.color_r)))
g = max(0, min(255, int(self.color_g)))
b = max(0, min(255, int(self.color_b)))
return (r, g, b)
def _load_config(self):
"""Load configuration from file."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
# Load color palette
if "color_palette" in config:
self.color_palette = config["color_palette"]
# Load selected color indices
if "selected_color_indices" in config:
self.selected_color_indices = config["selected_color_indices"]
logging.info(f"Loaded config from {CONFIG_FILE}")
except Exception as e:
logging.error(f"Error loading config: {e}")
def _save_config(self):
"""Save configuration to file."""
try:
config = {
"color_palette": self.color_palette,
"selected_color_indices": self.selected_color_indices,
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
logging.info(f"Saved config to {CONFIG_FILE}")
except Exception as e:
logging.error(f"Error saving config: {e}")
def get_color_palette_config(self):
"""Get current color palette configuration."""
return {
"palette": self.color_palette,
"selected_indices": self.selected_color_indices
}
def set_color_palette(self, palette_data):
"""Set color palette configuration."""
if "palette" in palette_data:
# Validate palette has 8 colors
if len(palette_data["palette"]) == 8:
self.color_palette = palette_data["palette"]
else:
logging.warning(f"Invalid palette size: {len(palette_data['palette'])}, expected 8")
if "selected_indices" in palette_data:
# Validate indices
indices = palette_data["selected_indices"]
if len(indices) == 2 and all(0 <= i < 8 for i in indices):
self.selected_color_indices = indices
else:
logging.warning(f"Invalid selected indices: {indices}")
self._save_config()
logging.info(f"Color palette updated: selected indices {self.selected_color_indices}")
async def _send_full_parameters(self):
"""Send all parameters to LED bars."""
@@ -237,10 +338,6 @@ class LightingController:
self.beat_index = (self.beat_index + 1) % 1000000
# Send periodic parameter updates every 8 beats
if self.beat_index % 8 == 0:
await self._send_full_parameters()
# Check for pending parameter updates
if self.pending_param_update:
current_time = time.time()
@@ -295,6 +392,15 @@ class LightingController:
elif message_type == "reset_tempo":
await self.sound_controller.send_reset_tempo()
elif message_type == "get_color_palette":
# Return color palette configuration
return self.get_color_palette_config()
elif message_type == "set_color_palette":
# Set color palette configuration
self.set_color_palette(data)
return {"status": "ok", "palette": self.get_color_palette_config()}
class ControlServer:
@@ -304,34 +410,51 @@ class ControlServer:
self.lighting_controller = LightingController(transport=transport, **transport_kwargs)
self.clients = set()
self.tcp_server = None
self.http_app = None
self.http_runner = None
self.enable_heartbeat = enable_heartbeat
async def handle_ui_client(self, websocket):
"""Handle UI client WebSocket connection."""
self.clients.add(websocket)
client_addr = websocket.remote_address
async def handle_websocket(self, request):
"""Handle WebSocket connection for UI client."""
ws = web.WebSocketResponse()
await ws.prepare(request)
self.clients.add(ws)
client_addr = request.remote
logging.info(f"UI client connected: {client_addr}")
try:
async for message in websocket:
try:
data = json.loads(message)
message_type = data.get("type")
message_data = data.get("data", {})
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
message_type = data.get("type")
message_data = data.get("data", {})
response = await self.lighting_controller.handle_ui_command(message_type, message_data)
# Send response if command returned data
if response is not None:
await ws.send_json({
"type": f"{message_type}_response",
"data": response
})
except json.JSONDecodeError:
logging.error(f"Invalid JSON from client {client_addr}: {msg.data}")
except Exception as e:
logging.error(f"Error handling message from client {client_addr}: {e}")
elif msg.type == web.WSMsgType.ERROR:
logging.error(f"WebSocket error from {client_addr}: {ws.exception()}")
await self.lighting_controller.handle_ui_command(message_type, message_data)
except json.JSONDecodeError:
logging.error(f"Invalid JSON from client {client_addr}: {message}")
except Exception as e:
logging.error(f"Error handling message from client {client_addr}: {e}")
except websockets.exceptions.ConnectionClosed:
logging.info(f"UI client disconnected: {client_addr}")
except Exception as e:
logging.error(f"Error in UI client handler: {e}")
logging.error(f"Error in WebSocket handler: {e}")
finally:
self.clients.discard(websocket)
self.clients.discard(ws)
logging.info(f"UI client disconnected: {client_addr}")
return ws
async def handle_tcp_client(self, reader, writer):
"""Handle TCP client (sound detector) connection."""
@@ -374,13 +497,191 @@ class ControlServer:
addrs = ', '.join(str(sock.getsockname()) for sock in self.tcp_server.sockets)
logging.info(f"TCP server listening on {addrs}")
async def start_websocket_server(self):
"""Start WebSocket server for UI clients."""
# HTTP API Handlers
# Color Palette API
async def http_get_color_palette(self, request):
"""HTTP GET /api/color-palette"""
palette_config = self.lighting_controller.get_color_palette_config()
return web.json_response(palette_config)
async def http_set_color_palette(self, request):
"""HTTP POST/PUT /api/color-palette"""
try:
data = await request.json()
self.lighting_controller.set_color_palette(data)
palette_config = self.lighting_controller.get_color_palette_config()
return web.json_response({
"status": "ok",
"palette": palette_config
})
except json.JSONDecodeError:
return web.json_response(
{"status": "error", "message": "Invalid JSON"},
status=400
)
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500
)
# Pattern API
async def http_set_pattern(self, request):
"""HTTP POST /api/pattern"""
try:
data = await request.json()
pattern = data.get("pattern")
if not pattern:
return web.json_response(
{"status": "error", "message": "Pattern name required"},
status=400
)
self.lighting_controller.current_pattern = pattern
await self.lighting_controller._send_full_parameters()
logging.info(f"Pattern changed to: {pattern}")
return web.json_response({
"status": "ok",
"pattern": self.lighting_controller.current_pattern
})
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500
)
async def http_get_pattern(self, request):
"""HTTP GET /api/pattern"""
return web.json_response({
"pattern": self.lighting_controller.current_pattern
})
# Parameters API
async def http_set_parameters(self, request):
"""HTTP POST /api/parameters"""
try:
data = await request.json()
# Update any provided parameters
if "brightness" in data:
self.lighting_controller.brightness = int(data["brightness"])
if "delay" in data:
self.lighting_controller.delay = int(data["delay"])
if "n1" in data:
self.lighting_controller.n1 = int(data["n1"])
if "n2" in data:
self.lighting_controller.n2 = int(data["n2"])
if "n3" in data:
self.lighting_controller.n3 = int(data["n3"])
if "n4" in data:
self.lighting_controller.n4 = int(data["n4"])
# Send updated parameters to LED bars
await self.lighting_controller._send_full_parameters()
return web.json_response({
"status": "ok",
"parameters": {
"brightness": self.lighting_controller.brightness,
"delay": self.lighting_controller.delay,
"n1": self.lighting_controller.n1,
"n2": self.lighting_controller.n2,
"n3": self.lighting_controller.n3,
"n4": self.lighting_controller.n4
}
})
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500
)
async def http_get_parameters(self, request):
"""HTTP GET /api/parameters"""
return web.json_response({
"brightness": self.lighting_controller.brightness,
"delay": self.lighting_controller.delay,
"n1": self.lighting_controller.n1,
"n2": self.lighting_controller.n2,
"n3": self.lighting_controller.n3,
"n4": self.lighting_controller.n4
})
# State API
async def http_get_state(self, request):
"""HTTP GET /api/state - Get complete system state"""
palette_config = self.lighting_controller.get_color_palette_config()
return web.json_response({
"pattern": self.lighting_controller.current_pattern,
"parameters": {
"brightness": self.lighting_controller.brightness,
"delay": self.lighting_controller.delay,
"n1": self.lighting_controller.n1,
"n2": self.lighting_controller.n2,
"n3": self.lighting_controller.n3,
"n4": self.lighting_controller.n4
},
"color_palette": palette_config,
"beat_index": self.lighting_controller.beat_index
})
# Tempo API
async def http_reset_tempo(self, request):
"""HTTP POST /api/tempo/reset"""
try:
await self.lighting_controller.sound_controller.send_reset_tempo()
return web.json_response({"status": "ok", "message": "Tempo reset sent"})
except Exception as e:
return web.json_response(
{"status": "error", "message": str(e)},
status=500
)
async def start_http_server(self):
"""Start combined HTTP and WebSocket server."""
self.http_app = web.Application()
# WebSocket endpoint (legacy support)
self.http_app.router.add_get('/ws', self.handle_websocket)
# REST API endpoints
# Color Palette
self.http_app.router.add_get('/api/color-palette', self.http_get_color_palette)
self.http_app.router.add_post('/api/color-palette', self.http_set_color_palette)
self.http_app.router.add_put('/api/color-palette', self.http_set_color_palette)
# Pattern
self.http_app.router.add_get('/api/pattern', self.http_get_pattern)
self.http_app.router.add_post('/api/pattern', self.http_set_pattern)
# Parameters
self.http_app.router.add_get('/api/parameters', self.http_get_parameters)
self.http_app.router.add_post('/api/parameters', self.http_set_parameters)
# State (complete system state)
self.http_app.router.add_get('/api/state', self.http_get_state)
# Tempo
self.http_app.router.add_post('/api/tempo/reset', self.http_reset_tempo)
self.http_runner = web.AppRunner(self.http_app)
await self.http_runner.setup()
host = os.getenv("CONTROL_SERVER_HOST", "0.0.0.0")
server = await websockets.serve(
self.handle_ui_client, host, CONTROL_SERVER_PORT
)
logging.info(f"WebSocket server listening on {host}:{CONTROL_SERVER_PORT}")
# Start WebSocket server on CONTROL_SERVER_PORT
ws_site = web.TCPSite(self.http_runner, host, CONTROL_SERVER_PORT)
await ws_site.start()
logging.info(f"WebSocket server listening on {host}:{CONTROL_SERVER_PORT}/ws")
logging.info(f"HTTP API server listening on {host}:{CONTROL_SERVER_PORT}")
# Also start on HTTP_API_PORT for backward compatibility
if HTTP_API_PORT != CONTROL_SERVER_PORT:
api_site = web.TCPSite(self.http_runner, host, HTTP_API_PORT)
await api_site.start()
logging.info(f"HTTP API also available on {host}:{HTTP_API_PORT}")
async def run(self):
"""Run the control server."""
@@ -388,23 +689,16 @@ class ControlServer:
await self.lighting_controller.led_controller.connect()
# Start servers (optionally include heartbeat)
websocket_task = asyncio.create_task(self._websocket_server_task())
tcp_task = asyncio.create_task(self._tcp_server_task())
http_task = asyncio.create_task(self._http_server_task()) # Handles both WebSocket and HTTP
tasks = [websocket_task, tcp_task]
tasks = [tcp_task, http_task]
if self.enable_heartbeat:
heartbeat_task = asyncio.create_task(self._heartbeat_loop())
tasks.append(heartbeat_task)
await asyncio.gather(*tasks)
async def _websocket_server_task(self):
"""Keep WebSocket server running."""
await self.start_websocket_server()
# Keep the server running indefinitely
while True:
await asyncio.sleep(1)
async def _tcp_server_task(self):
"""Keep TCP server running."""
await self.start_tcp_server()
@@ -412,6 +706,13 @@ class ControlServer:
while True:
await asyncio.sleep(1)
async def _http_server_task(self):
"""Keep HTTP and WebSocket server running."""
await self.start_http_server()
# Keep the server running indefinitely
while True:
await asyncio.sleep(1)
async def _heartbeat_loop(self):
"""Send periodic heartbeats to keep LED connection alive."""
try: