Separate UI and control logic with WebSocket communication

- Create UI client (src/ui_client.py) with MIDI controller integration
- Create control server (src/control_server.py) with lighting logic
- Implement WebSocket protocol between UI and control server
- Add startup script (start_lighting_controller.py) for all components
- Update Pipfile with new scripts for separated architecture
- Add comprehensive documentation (README_SEPARATED.md)
- Fix LED connection stability with heartbeat mechanism
- Fix UI knob display and button highlighting
- Maintain backward compatibility with existing MIDI mappings
This commit is contained in:
2025-09-28 12:36:25 +13:00
parent ed5bbb8c18
commit 937fb1f2f9
5 changed files with 1462 additions and 2 deletions

View File

@@ -19,5 +19,8 @@ websocket-client = "*"
python_version = "3.12"
[scripts]
main = "python main.py"
dev = 'watchfiles "python src/main.py" src'
ui = "python src/ui_client.py"
control = "python src/control_server.py"
sound = "python src/sound.py"
dev-ui = 'watchfiles "python src/ui_client.py" src'
dev-control = 'watchfiles "python src/control_server.py" src'

296
README_SEPARATED.md Normal file
View File

@@ -0,0 +1,296 @@
# Lighting Controller - Separated Architecture
This version of the lighting controller separates the UI and control logic, communicating via WebSocket. The MIDI controller is now integrated with the UI client.
## Architecture Overview
```
┌─────────────────┐ WebSocket ┌─────────────────┐ WebSocket ┌─────────────────┐
│ UI Client │◄─────────────────►│ Control Server │◄─────────────────►│ LED Server │
│ │ │ │ │ │
│ - MIDI Input │ │ - Lighting Logic│ │ - LED Bars │
│ - User Interface│ │ - Pattern Logic │ │ - ESP-NOW │
│ - Status Display│ │ - Beat Handling │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
│ │ TCP
│ ▼
│ ┌─────────────────┐
│ │ Sound Detector │
│ │ │
│ │ - Audio Input │
│ │ - Beat Detection│
│ │ - BPM Analysis │
│ └─────────────────┘
│ MIDI
┌─────────────────┐
│ MIDI Controller │
│ │
│ - Knobs/Dials │
│ - Buttons │
│ - Pattern Select│
└─────────────────┘
```
## Components
### 1. UI Client (`src/ui_client.py`)
- **Purpose**: User interface and MIDI controller integration
- **Features**:
- MIDI controller input handling
- Real-time status display
- Pattern selection visualization
- Connection status monitoring
- **Communication**: WebSocket client to control server
### 2. Control Server (`src/control_server.py`)
- **Purpose**: Core lighting control logic
- **Features**:
- Pattern execution
- Beat synchronization
- Parameter management
- LED bar communication
- **Communication**:
- WebSocket server for UI clients
- TCP server for sound detector
- WebSocket client to LED server
### 3. Sound Detector (`src/sound.py`)
- **Purpose**: Audio beat detection and BPM analysis
- **Features**:
- Real-time audio processing
- Beat detection
- BPM calculation
- Tempo reset functionality
- **Communication**: TCP client to control server
## WebSocket Protocol
### UI Client → Control Server Messages
```json
{
"type": "pattern_change",
"data": {
"pattern": "pulse"
}
}
```
```json
{
"type": "color_change",
"data": {
"r": 255,
"g": 0,
"b": 0
}
}
```
```json
{
"type": "brightness_change",
"data": {
"brightness": 80
}
}
```
```json
{
"type": "parameter_change",
"data": {
"n1": 15,
"n2": 20
}
}
```
```json
{
"type": "delay_change",
"data": {
"delay": 150
}
}
```
```json
{
"type": "beat_toggle",
"data": {
"enabled": true
}
}
```
```json
{
"type": "reset_tempo",
"data": {}
}
```
## Running the System
### Option 1: Use the startup script (Recommended)
```bash
python start_lighting_controller.py
```
### Option 2: Start components individually
1. **Start Control Server**:
```bash
pipenv run control
# or
python src/control_server.py
```
2. **Start Sound Detector** (in another terminal):
```bash
pipenv run sound
# or
python src/sound.py
```
3. **Start UI Client** (in another terminal):
```bash
pipenv run ui
# or
python src/ui_client.py
```
### Option 3: Development mode with auto-reload
```bash
# Terminal 1 - Control Server
pipenv run dev-control
# Terminal 2 - Sound Detector
pipenv run sound
# Terminal 3 - UI Client
pipenv run dev-ui
```
## Configuration
### MIDI Controller
- MIDI device preferences are saved in `config.json`
- The UI client automatically detects and connects to MIDI devices
- Use the dropdown to select different MIDI ports
### Network Settings
- **Control Server**: `localhost:8765` (WebSocket)
- **Sound Detector**: `127.0.0.1:65432` (TCP)
- **LED Server**: `192.168.4.1:80/ws` (WebSocket)
### Audio Settings
- Audio input device index: 7 (modify in `src/sound.py`)
- Buffer size: 512 samples
- Sample rate: Auto-detected from device
## MIDI Controller Mapping
### Buttons (Notes 36-51)
- **Row 1**: Pulse, Sequential Pulse
- **Row 2**: Alternating, Alternating Phase
- **Row 3**: N Chase, Rainbow
- **Row 4**: Flicker, Radiate
### Dials (CC30-37)
- **CC30**: Red (0-255)
- **CC31**: Green (0-255)
- **CC32**: Blue (0-255)
- **CC33**: Brightness (0-100)
- **CC34**: N1 parameter
- **CC35**: N2 parameter
- **CC36**: N3 parameter
- **CC37**: Delay (0-508ms)
### Additional Knobs (CC38-45)
- **CC38**: Pulse N1
- **CC39**: Pulse N2
- **CC40**: Alternating N1
- **CC41**: Alternating N2
- **CC42**: Radiate N1
- **CC43**: Radiate Delay
- **CC44**: Knob 7
- **CC45**: Knob 8
### Control Buttons
- **CC27**: Beat sending toggle (127=on, 0=off)
- **CC29**: Reset tempo detection
## Troubleshooting
### Connection Issues
1. **UI Client can't connect to Control Server**:
- Ensure control server is running first
- Check firewall settings
- Verify port 8765 is available
2. **Control Server can't connect to LED Server**:
- Check LED server IP address (192.168.4.1)
- Verify LED server is running
- Check network connectivity
3. **Sound Detector can't connect to Control Server**:
- Ensure control server is running
- Check TCP port 65432 is available
### MIDI Issues
1. **No MIDI devices detected**:
- Check MIDI controller connection
- Install MIDI drivers if needed
- Use "Refresh MIDI Ports" button
2. **MIDI input not working**:
- Verify correct MIDI port is selected
- Check MIDI controller is sending data
- Look for error messages in console
### Performance Issues
1. **High CPU usage**:
- Reduce audio buffer size in sound.py
- Increase parameter update interval
- Check for network latency
2. **Audio dropouts**:
- Increase audio buffer size
- Check audio device settings
- Reduce system load
## Development
### Adding New Patterns
1. Add pattern name to `PATTERN_NAMES` in `control_server.py`
2. Implement pattern logic in `LightingController` class
3. Add pattern to MIDI button mapping in `ui_client.py`
### Adding New MIDI Controls
1. Add control change handler in `MidiController.handle_midi_message()`
2. Add corresponding WebSocket message type
3. Implement handler in `LightingController.handle_ui_command()`
### Modifying UI
- Edit `src/ui_client.py` for UI changes
- Use `pipenv run dev-ui` for auto-reload during development
- UI uses tkinter with dark theme
## Migration from Monolithic Version
The separated architecture maintains compatibility with:
- Existing MIDI controller mappings
- LED bar communication protocol
- Sound detection functionality
- Configuration files
Key differences:
- MIDI controller is now part of UI client
- Control logic is isolated in control server
- Communication via WebSocket instead of direct function calls
- Better separation of concerns and modularity

443
src/control_server.py Normal file
View File

@@ -0,0 +1,443 @@
#!/usr/bin/env python3
"""
Control Server for Lighting Controller
Handles lighting control logic and communicates with LED bars via WebSocket.
Receives commands from UI client via WebSocket.
"""
import asyncio
import websockets
import json
import logging
import socket
import threading
import time
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
from color_utils import adjust_brightness
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration
LED_SERVER_URI = "ws://192.168.4.1:80/ws"
CONTROL_SERVER_PORT = 8765
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
# Pattern name mapping for shorter JSON payloads
PATTERN_NAMES = {
"flicker": "f",
"fill_range": "fr",
"n_chase": "nc",
"alternating": "a",
"pulse": "p",
"rainbow": "r",
"specto": "s",
"radiate": "rd",
"sequential_pulse": "sp",
"alternating_phase": "ap",
}
class LEDController:
"""Handles communication with LED bars via WebSocket."""
def __init__(self, led_server_uri):
self.led_server_uri = led_server_uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
async def connect(self):
"""Connect to LED server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to LED server at {self.led_server_uri}...")
self.websocket = await websockets.connect(self.led_server_uri)
self.is_connected = True
logging.info("Connected to LED server")
except Exception as e:
logging.error(f"Failed to connect to LED server: {e}")
self.is_connected = False
self.websocket = None
async def send_data(self, data):
"""Send data to LED server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to LED server. Attempting to reconnect...")
await self.connect()
if not self.is_connected:
logging.error("Failed to reconnect to LED server. Cannot send data.")
return
try:
await self.websocket.send(json.dumps(data))
logging.debug(f"Sent to LED server: {data}")
except Exception as e:
logging.error(f"Failed to send data to LED server: {e}")
self.is_connected = False
self.websocket = None
# Attempt to reconnect
await self.connect()
async def close(self):
"""Close LED server connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from LED server")
class SoundController:
"""Handles communication with sound beat detector."""
def __init__(self, sound_host, sound_port):
self.sound_host = sound_host
self.sound_port = sound_port
async def send_reset_tempo(self):
"""Send reset tempo command to sound controller."""
try:
reader, writer = await asyncio.open_connection(self.sound_host, self.sound_port)
cmd = "RESET_TEMPO\n".encode('utf-8')
writer.write(cmd)
await writer.drain()
resp = await reader.read(100)
logging.info(f"Sent RESET_TEMPO, response: {resp.decode().strip()}")
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"Failed to send RESET_TEMPO: {e}")
class LightingController:
"""Main lighting control logic."""
def __init__(self):
self.led_controller = LEDController(LED_SERVER_URI)
self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT)
# Lighting state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.beat_index = 0
self.beat_sending_enabled = True
# Rate limiting
self.last_param_update = 0.0
self.param_update_interval = 0.1
self.pending_param_update = False
def _current_color_rgb(self):
"""Get current RGB color tuple."""
r = max(0, min(255, int(self.color_r)))
g = max(0, min(255, int(self.color_g)))
b = max(0, min(255, int(self.color_b)))
return (r, g, b)
async def _send_full_parameters(self):
"""Send all parameters to LED bars."""
full_payload = {
"d": {
"t": "u", # Message type: update
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
"dl": self.delay,
"cl": [self._current_color_rgb()],
"br": self.brightness,
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"s": self.beat_index % 256,
}
}
# Add empty entries for each bar
for bar_name in LED_BAR_NAMES:
full_payload[bar_name] = {}
await self.led_controller.send_data(full_payload)
async def _request_param_update(self):
"""Request parameter update with rate limiting."""
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
self.last_param_update = current_time
await self._send_full_parameters()
else:
self.pending_param_update = True
async def _send_normal_pattern(self):
"""Send normal pattern to all bars."""
patterns_needing_params = ["alternating", "flicker", "n_chase", "rainbow", "radiate"]
payload = {
"d": {
"t": "b", # Message type: beat
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
}
}
if self.current_pattern in patterns_needing_params:
payload["d"].update({
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"dl": self.delay,
"s": self.beat_index % 256,
})
for bar_name in LED_BAR_NAMES:
payload[bar_name] = {}
await self.led_controller.send_data(payload)
async def _handle_sequential_pulse(self):
"""Handle sequential pulse pattern."""
from bar_config import LEFT_BARS, RIGHT_BARS
bar_index = self.beat_index % 4
payload = {
"d": {
"t": "b",
"pt": "o", # off
}
}
left_bar = LEFT_BARS[bar_index]
right_bar = RIGHT_BARS[bar_index]
payload[left_bar] = {"pt": "p"} # pulse
payload[right_bar] = {"pt": "p"} # pulse
await self.led_controller.send_data(payload)
async def _handle_alternating_phase(self):
"""Handle alternating pattern with phase offset."""
payload = {
"d": {
"t": "b",
"pt": "a", # alternating
"n1": self.n1,
"n2": self.n2,
"s": self.beat_index % 2,
}
}
swap_bars = ["101", "103", "105", "107"]
for bar_name in LED_BAR_NAMES:
if bar_name in swap_bars:
payload[bar_name] = {"s": (self.beat_index + 1) % 2}
else:
payload[bar_name] = {}
await self.led_controller.send_data(payload)
async def handle_beat(self, bpm_value):
"""Handle beat from sound detector."""
if not self.beat_sending_enabled or not self.current_pattern:
return
self.beat_index = (self.beat_index + 1) % 1000000
# Send periodic parameter updates every 8 beats
if self.beat_index % 8 == 0:
await self._send_full_parameters()
# Check for pending parameter updates
if self.pending_param_update:
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
self.last_param_update = current_time
self.pending_param_update = False
await self._send_full_parameters()
# Handle pattern-specific beat logic
if self.current_pattern == "sequential_pulse":
await self._handle_sequential_pulse()
elif self.current_pattern == "alternating_phase":
await self._handle_alternating_phase()
elif self.current_pattern:
await self._send_normal_pattern()
async def handle_ui_command(self, message_type, data):
"""Handle command from UI client."""
if message_type == "pattern_change":
self.current_pattern = data.get("pattern", "")
await self._send_full_parameters()
logging.info(f"Pattern changed to: {self.current_pattern}")
elif message_type == "color_change":
self.color_r = data.get("r", self.color_r)
self.color_g = data.get("g", self.color_g)
self.color_b = data.get("b", self.color_b)
await self._request_param_update()
elif message_type == "brightness_change":
self.brightness = data.get("brightness", self.brightness)
await self._request_param_update()
elif message_type == "parameter_change":
if "n1" in data:
self.n1 = data["n1"]
if "n2" in data:
self.n2 = data["n2"]
if "n3" in data:
self.n3 = data["n3"]
await self._request_param_update()
elif message_type == "delay_change":
self.delay = data.get("delay", self.delay)
await self._request_param_update()
elif message_type == "beat_toggle":
self.beat_sending_enabled = data.get("enabled", True)
logging.info(f"Beat sending {'enabled' if self.beat_sending_enabled else 'disabled'}")
elif message_type == "reset_tempo":
await self.sound_controller.send_reset_tempo()
class ControlServer:
"""WebSocket server for UI client communication and TCP server for sound."""
def __init__(self):
self.lighting_controller = LightingController()
self.clients = set()
self.tcp_server = None
async def handle_ui_client(self, websocket):
"""Handle UI client WebSocket connection."""
self.clients.add(websocket)
client_addr = websocket.remote_address
logging.info(f"UI client connected: {client_addr}")
try:
async for message in websocket:
try:
data = json.loads(message)
message_type = data.get("type")
message_data = data.get("data", {})
await self.lighting_controller.handle_ui_command(message_type, message_data)
except json.JSONDecodeError:
logging.error(f"Invalid JSON from client {client_addr}: {message}")
except Exception as e:
logging.error(f"Error handling message from client {client_addr}: {e}")
except websockets.exceptions.ConnectionClosed:
logging.info(f"UI client disconnected: {client_addr}")
except Exception as e:
logging.error(f"Error in UI client handler: {e}")
finally:
self.clients.discard(websocket)
async def handle_tcp_client(self, reader, writer):
"""Handle TCP client (sound detector) connection."""
addr = writer.get_extra_info('peername')
logging.info(f"Sound client connected: {addr}")
try:
while True:
data = await reader.read(4096)
if not data:
logging.info(f"Sound client disconnected: {addr}")
break
message = data.decode().strip()
if self.lighting_controller.beat_sending_enabled:
try:
bpm_value = float(message)
await self.lighting_controller.handle_beat(bpm_value)
except ValueError:
logging.warning(f"Non-BPM message from {addr}: {message}")
except Exception as e:
logging.error(f"Error processing beat from {addr}: {e}")
except asyncio.CancelledError:
logging.info(f"Sound client handler cancelled: {addr}")
except Exception as e:
logging.error(f"Error handling sound client {addr}: {e}")
finally:
logging.info(f"Closing connection for sound client: {addr}")
writer.close()
await writer.wait_closed()
async def start_tcp_server(self):
"""Start TCP server for sound detector."""
self.tcp_server = await asyncio.start_server(
self.handle_tcp_client, "127.0.0.1", 65432
)
addrs = ', '.join(str(sock.getsockname()) for sock in self.tcp_server.sockets)
logging.info(f"TCP server listening on {addrs}")
async def start_websocket_server(self):
"""Start WebSocket server for UI clients."""
server = await websockets.serve(
self.handle_ui_client, "localhost", CONTROL_SERVER_PORT
)
logging.info(f"WebSocket server listening on localhost:{CONTROL_SERVER_PORT}")
async def run(self):
"""Run the control server."""
# Connect to LED server
await self.lighting_controller.led_controller.connect()
# Start servers and heartbeat task
await asyncio.gather(
self.start_websocket_server(),
self.start_tcp_server(),
self._heartbeat_loop()
)
async def _heartbeat_loop(self):
"""Send periodic heartbeats to keep LED connection alive."""
try:
while True:
await asyncio.sleep(5) # Send heartbeat every 5 seconds
if self.lighting_controller.led_controller.is_connected:
# Send a simple heartbeat to keep connection alive
heartbeat_data = {
"d": {
"t": "h", # heartbeat type
}
}
await self.lighting_controller.led_controller.send_data(heartbeat_data)
except asyncio.CancelledError:
logging.info("Heartbeat loop cancelled")
except Exception as e:
logging.error(f"Heartbeat loop error: {e}")
async def main():
"""Main entry point."""
server = ControlServer()
try:
await server.run()
except KeyboardInterrupt:
logging.info("Server interrupted by user")
except Exception as e:
logging.error(f"Server error: {e}")
finally:
await server.lighting_controller.led_controller.close()
if __name__ == "__main__":
asyncio.run(main())

602
src/ui_client.py Normal file
View File

@@ -0,0 +1,602 @@
#!/usr/bin/env python3
"""
UI Client for Lighting Controller
Handles the user interface and MIDI controller input.
Communicates with the control server via WebSocket.
"""
import asyncio
import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import mido
import logging
from async_tkinter_loop import async_handler, async_mainloop
import websockets
import websocket
# Configuration
CONFIG_FILE = "config.json"
CONTROL_SERVER_URI = "ws://localhost:8765"
# Dark theme colors
bg_color = "#2e2e2e"
fg_color = "white"
trough_color_red = "#4a0000"
trough_color_green = "#004a00"
trough_color_blue = "#00004a"
trough_color_brightness = "#4a4a4a"
trough_color_delay = "#4a4a4a"
active_bg_color = "#4a4a4a"
highlight_pattern_color = "#6a5acd"
active_palette_color_border = "#FFD700"
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class WebSocketClient:
"""WebSocket client for communicating with the control server."""
def __init__(self, uri):
self.uri = uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
async def connect(self):
"""Establish WebSocket connection to control server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to control server at {self.uri}...")
self.websocket = await websockets.connect(self.uri)
self.is_connected = True
logging.info("Connected to control server")
except Exception as e:
logging.error(f"Failed to connect to control server: {e}")
self.is_connected = False
self.websocket = None
async def send_message(self, message_type, data=None):
"""Send a message to the control server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to control server")
return
try:
message = {
"type": message_type,
"data": data or {}
}
await self.websocket.send(json.dumps(message))
logging.debug(f"Sent message: {message}")
except Exception as e:
logging.error(f"Failed to send message: {e}")
self.is_connected = False
async def close(self):
"""Close WebSocket connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from control server")
class MidiController:
"""Handles MIDI controller input and sends commands to control server."""
def __init__(self, websocket_client):
self.websocket_client = websocket_client
self.midi_port_index = 0
self.available_ports = []
self.midi_port = None
self.midi_task = None
# MIDI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.knob7 = 0
self.knob8 = 0
self.beat_sending_enabled = True
def get_midi_ports(self):
"""Get list of available MIDI input ports."""
try:
return mido.get_input_names()
except Exception as e:
logging.error(f"Error getting MIDI ports: {e}")
return []
def load_midi_preference(self):
"""Load saved MIDI device preference."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
return config.get('midi_device_index', 0)
except Exception as e:
logging.error(f"Error loading MIDI preference: {e}")
return 0
def save_midi_preference(self):
"""Save current MIDI device preference."""
try:
config = {
'midi_device_index': self.midi_port_index,
'midi_device_name': self.available_ports[self.midi_port_index] if self.available_ports else None
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
logging.error(f"Error saving MIDI preference: {e}")
async def initialize_midi(self):
"""Initialize MIDI port connection."""
self.available_ports = self.get_midi_ports()
self.midi_port_index = self.load_midi_preference()
if not self.available_ports:
logging.warning("No MIDI ports available")
return False
if not (0 <= self.midi_port_index < len(self.available_ports)):
self.midi_port_index = 0
try:
port_name = self.available_ports[self.midi_port_index]
self.midi_port = mido.open_input(port_name)
logging.info(f"Connected to MIDI port: {port_name}")
return True
except Exception as e:
logging.error(f"Failed to open MIDI port: {e}")
return False
async def start_midi_listener(self):
"""Start listening for MIDI messages."""
if not self.midi_port:
return
try:
while True:
msg = self.midi_port.receive(block=False)
if msg:
await self.handle_midi_message(msg)
await asyncio.sleep(0.001)
except asyncio.CancelledError:
logging.info("MIDI listener cancelled")
except Exception as e:
logging.error(f"MIDI listener error: {e}")
async def handle_midi_message(self, msg):
"""Handle incoming MIDI message and send to control server."""
if msg.type == 'note_on':
# Pattern selection (notes 36-51)
logging.info(f"MIDI Note {msg.note}: {msg.velocity}")
pattern_bindings = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate"
]
idx = msg.note - 36
if 0 <= idx < len(pattern_bindings):
self.current_pattern = pattern_bindings[idx]
await self.websocket_client.send_message("pattern_change", {
"pattern": self.current_pattern
})
logging.info(f"Pattern changed to: {self.current_pattern}")
elif msg.type == 'control_change':
# Handle control change messages
control = msg.control
value = msg.value
logging.info(f"MIDI CC {control}: {value}")
if control == 30: # Red
self.color_r = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 31: # Green
self.color_g = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 32: # Blue
self.color_b = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 33: # Brightness
self.brightness = round((value / 127) * 100)
await self.websocket_client.send_message("brightness_change", {
"brightness": self.brightness
})
elif control == 34: # n1
self.n1 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n1": self.n1
})
elif control == 35: # n2
self.n2 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n2": self.n2
})
elif control == 36: # n3
self.n3 = max(1, value)
await self.websocket_client.send_message("parameter_change", {
"n3": self.n3
})
elif control == 37: # Delay
self.delay = value * 4
await self.websocket_client.send_message("delay_change", {
"delay": self.delay
})
elif control == 27: # Beat sending toggle
self.beat_sending_enabled = (value == 127)
await self.websocket_client.send_message("beat_toggle", {
"enabled": self.beat_sending_enabled
})
def close(self):
"""Close MIDI connection."""
if self.midi_port:
self.midi_port.close()
self.midi_port = None
class UIClient:
"""Main UI client application."""
def __init__(self):
self.root = tk.Tk()
self.root.configure(bg=bg_color)
self.root.title("Lighting Controller - UI Client")
# WebSocket client
self.websocket_client = WebSocketClient(CONTROL_SERVER_URI)
# MIDI controller
self.midi_controller = MidiController(self.websocket_client)
# UI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.setup_ui()
self.setup_async_tasks()
def setup_ui(self):
"""Setup the user interface."""
# Configure ttk style
style = ttk.Style()
style.theme_use("alt")
style.configure(".", background=bg_color, foreground=fg_color, font=("Arial", 14))
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure("TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5])
# MIDI Controller Selection
midi_frame = ttk.LabelFrame(self.root, text="MIDI Controller")
midi_frame.pack(padx=16, pady=8, fill="x")
# MIDI port dropdown
self.midi_port_var = tk.StringVar()
midi_dropdown = ttk.Combobox(
midi_frame,
textvariable=self.midi_port_var,
values=[],
state="readonly",
font=("Arial", 12)
)
midi_dropdown.pack(padx=8, pady=4, fill="x")
midi_dropdown.bind("<<ComboboxSelected>>", self.on_midi_port_change)
# Refresh MIDI ports button
refresh_button = ttk.Button(
midi_frame,
text="Refresh MIDI Ports",
command=self.refresh_midi_ports
)
refresh_button.pack(padx=8, pady=4)
# MIDI connection status
self.midi_status_label = tk.Label(
midi_frame,
text="Status: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 10)
)
self.midi_status_label.pack(padx=8, pady=2)
# Controls overview
controls_frame = ttk.Frame(self.root)
controls_frame.pack(padx=16, pady=8, fill="both")
# Dials display
dials_frame = ttk.LabelFrame(controls_frame, text="Dials (CC30-37)")
dials_frame.pack(side="left", padx=12)
for c in range(2):
dials_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
dials_frame.grid_rowconfigure(rr, minsize=70)
self.dials_boxes = []
placeholders = {
(0, 0): "n3\n-", (0, 1): "Delay\n-",
(1, 0): "n1\n-", (1, 1): "n2\n-",
(2, 0): "B\n-", (2, 1): "Bright\n-",
(3, 0): "R\n-", (3, 1): "G\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
dials_frame,
text=placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.dials_boxes.append(lbl)
# Knobs display
knobs_frame = ttk.LabelFrame(controls_frame, text="Knobs (CC38-45)")
knobs_frame.pack(side="left", padx=12)
for c in range(2):
knobs_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
knobs_frame.grid_rowconfigure(rr, minsize=70)
self.knobs_boxes = []
knob_placeholders = {
(0, 0): "CC44\n-", (0, 1): "CC45\n-",
(1, 0): "Rad n1\n-", (1, 1): "Rad delay\n-",
(2, 0): "Alt n1\n-", (2, 1): "Alt n2\n-",
(3, 0): "Pulse n1\n-", (3, 1): "Pulse n2\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
knobs_frame,
text=knob_placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.knobs_boxes.append(lbl)
# Buttons display
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(side="left", padx=12)
buttons1_frame = ttk.LabelFrame(buttons_frame, text="Buttons (notes 36-51)")
buttons1_frame.pack(side="top", pady=8)
for c in range(4):
buttons1_frame.grid_columnconfigure(c, minsize=140)
for rr in range(1, 5):
buttons1_frame.grid_rowconfigure(rr, minsize=70)
self.button1_cells = []
for r in range(4):
for c in range(4):
lbl = tk.Label(
buttons1_frame,
text="",
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
self.button1_cells.append(lbl)
# Connection status
self.connection_status = tk.Label(
self.root,
text="Control Server: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 12)
)
self.connection_status.pack(pady=8)
# Schedule periodic UI updates
self.root.after(200, self.update_status_labels)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def setup_async_tasks(self):
"""Setup async tasks for WebSocket and MIDI."""
# Connect to control server
self.root.after(100, async_handler(self.websocket_client.connect))
# Initialize MIDI
self.root.after(200, async_handler(self.initialize_midi))
@async_handler
async def initialize_midi(self):
"""Initialize MIDI controller."""
success = await self.midi_controller.initialize_midi()
if success:
# Update UI
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
if self.midi_controller.available_ports:
self.midi_port_var.set(self.midi_controller.available_ports[self.midi_controller.midi_port_index])
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
break
break
self.midi_status_label.config(
text=f"Status: Connected to {self.midi_controller.available_ports[self.midi_controller.midi_port_index]}",
fg="green"
)
# Start MIDI listener
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def refresh_midi_ports(self):
"""Refresh MIDI ports list."""
old_ports = self.midi_controller.available_ports.copy()
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
if (self.midi_controller.available_ports and
self.midi_port_var.get() not in self.midi_controller.available_ports):
self.midi_port_var.set(self.midi_controller.available_ports[0])
self.midi_controller.midi_port_index = 0
self.midi_controller.save_midi_preference()
break
break
def on_midi_port_change(self, event):
"""Handle MIDI port selection change."""
selected_port = self.midi_port_var.get()
if selected_port in self.midi_controller.available_ports:
self.midi_controller.midi_port_index = self.midi_controller.available_ports.index(selected_port)
self.midi_controller.save_midi_preference()
# Restart MIDI connection
asyncio.create_task(self.restart_midi())
@async_handler
async def restart_midi(self):
"""Restart MIDI connection with new port."""
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
if self.midi_controller.midi_port:
self.midi_controller.midi_port.close()
success = await self.midi_controller.initialize_midi()
if success:
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def update_status_labels(self):
"""Update UI status labels."""
# Update connection status
if self.websocket_client.is_connected:
self.connection_status.config(text="Control Server: Connected", fg="green")
else:
self.connection_status.config(text="Control Server: Disconnected", fg="red")
# Update dial displays
dial_values = [
("n3", self.midi_controller.n3), ("Delay", self.midi_controller.delay),
("n1", self.midi_controller.n1), ("n2", self.midi_controller.n2),
("B", self.midi_controller.color_b), ("Brightness", self.midi_controller.brightness),
("R", self.midi_controller.color_r), ("G", self.midi_controller.color_g),
]
for idx, (label, value) in enumerate(dial_values):
if idx < len(self.dials_boxes):
self.dials_boxes[idx].config(text=f"{label}\n{value}")
# Update knobs
knob_values = [
("CC44", self.midi_controller.knob7), ("CC45", self.midi_controller.knob8),
("Rad n1", self.midi_controller.n1), ("Rad delay", self.midi_controller.delay),
("Alt n1", self.midi_controller.n1), ("Alt n2", self.midi_controller.n2),
("Pulse n1", self.midi_controller.n1), ("Pulse n2", self.midi_controller.n2),
]
for idx, (label, value) in enumerate(knob_values):
if idx < len(self.knobs_boxes):
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
# Update buttons
icon_for = {
"pulse": "💥", "flicker": "", "alternating": "↔️",
"n_chase": "🏃", "rainbow": "🌈", "radiate": "🌟",
"sequential_pulse": "🔄", "alternating_phase": "", "-": "",
}
bank1_patterns = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate",
"-", "-", "-", "-", "-", "-", "-", "-",
]
# Display names for UI (with line breaks for better display)
display_names = {
"pulse": "pulse",
"sequential_pulse": "sequential\npulse",
"alternating": "alternating",
"alternating_phase": "alternating\nphase",
"n_chase": "n chase",
"rainbow": "rainbow",
"flicker": "flicker",
"radiate": "radiate",
}
current_pattern = self.midi_controller.current_pattern
for idx, lbl in enumerate(self.button1_cells):
pattern_name = bank1_patterns[idx]
is_selected = (current_pattern == pattern_name and pattern_name != "-")
display_name = display_names.get(pattern_name, pattern_name)
icon = icon_for.get(pattern_name, "")
text = f"{icon} {display_name}" if pattern_name != "-" else ""
if is_selected:
lbl.config(text=text, bg=highlight_pattern_color)
else:
lbl.config(text=text, bg=bg_color)
# Reschedule
self.root.after(200, self.update_status_labels)
def on_closing(self):
"""Handle application closing."""
logging.info("Closing UI client...")
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
self.midi_controller.close()
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
def run(self):
"""Run the UI client."""
async_mainloop(self.root)
if __name__ == "__main__":
app = UIClient()
app.run()

View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
Startup script for the separated lighting controller architecture.
Starts the control server, sound detector, and UI client.
"""
import subprocess
import sys
import time
import signal
import os
from pathlib import Path
def start_process(command, name, cwd=None):
"""Start a subprocess and return the process object."""
print(f"Starting {name}...")
try:
process = subprocess.Popen(
command,
shell=True,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if os.name != 'nt' else None
)
print(f"{name} started with PID {process.pid}")
return process
except Exception as e:
print(f"Failed to start {name}: {e}")
return None
def main():
"""Main startup function."""
print("Starting Lighting Controller (Separated Architecture)")
print("=" * 50)
# Get the project directory
project_dir = Path(__file__).parent
processes = []
try:
# Start control server
control_process = start_process(
"python src/control_server.py",
"Control Server",
cwd=project_dir
)
if control_process:
processes.append(("Control Server", control_process))
# Wait a moment for the control server to start
time.sleep(2)
# Start sound detector
sound_process = start_process(
"python src/sound.py",
"Sound Detector",
cwd=project_dir
)
if sound_process:
processes.append(("Sound Detector", sound_process))
# Wait a moment for the sound detector to start
time.sleep(1)
# Start UI client
ui_process = start_process(
"python src/ui_client.py",
"UI Client",
cwd=project_dir
)
if ui_process:
processes.append(("UI Client", ui_process))
print("\nAll components started successfully!")
print("Press Ctrl+C to stop all components...")
# Wait for processes
try:
while True:
time.sleep(1)
# Check if any process has died
for name, process in processes:
if process.poll() is not None:
print(f"Warning: {name} has stopped unexpectedly")
except KeyboardInterrupt:
print("\nShutting down all components...")
except Exception as e:
print(f"Error during startup: {e}")
finally:
# Clean up all processes
for name, process in processes:
if process and process.poll() is None:
print(f"Stopping {name}...")
try:
if os.name != 'nt':
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
else:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
print(f"Force killing {name}...")
if os.name != 'nt':
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
else:
process.kill()
except Exception as e:
print(f"Error stopping {name}: {e}")
print("All components stopped.")
if __name__ == "__main__":
main()