Files
lighting-controller/src/ui_client.py
Pi User 0906cb22e6 Add .env file support for UI client configuration
- Use python-dotenv to load environment variables
- Add CONTROL_SERVER_URI environment variable for WebSocket connection
- Create .env.example with configuration examples
- Update Pipfile to include python-dotenv dependency
- Allows easy configuration for running UI on desktop pointing to Pi
2025-10-03 20:08:36 +13:00

607 lines
22 KiB
Python

#!/usr/bin/env python3
"""
UI Client for Lighting Controller
Handles the user interface and MIDI controller input.
Communicates with the control server via WebSocket.
"""
import asyncio
import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import mido
import logging
from async_tkinter_loop import async_handler, async_mainloop
import websockets
import websocket
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Configuration
CONFIG_FILE = "config.json"
CONTROL_SERVER_URI = os.getenv("CONTROL_SERVER_URI", "ws://localhost:8765")
# Dark theme colors
bg_color = "#2e2e2e"
fg_color = "white"
trough_color_red = "#4a0000"
trough_color_green = "#004a00"
trough_color_blue = "#00004a"
trough_color_brightness = "#4a4a4a"
trough_color_delay = "#4a4a4a"
active_bg_color = "#4a4a4a"
highlight_pattern_color = "#6a5acd"
active_palette_color_border = "#FFD700"
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class WebSocketClient:
"""WebSocket client for communicating with the control server."""
def __init__(self, uri):
self.uri = uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
async def connect(self):
"""Establish WebSocket connection to control server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to control server at {self.uri}...")
self.websocket = await websockets.connect(self.uri)
self.is_connected = True
logging.info("Connected to control server")
except Exception as e:
logging.error(f"Failed to connect to control server: {e}")
self.is_connected = False
self.websocket = None
async def send_message(self, message_type, data=None):
"""Send a message to the control server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to control server")
return
try:
message = {
"type": message_type,
"data": data or {}
}
await self.websocket.send(json.dumps(message))
logging.debug(f"Sent message: {message}")
except Exception as e:
logging.error(f"Failed to send message: {e}")
self.is_connected = False
async def close(self):
"""Close WebSocket connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from control server")
class MidiController:
"""Handles MIDI controller input and sends commands to control server."""
def __init__(self, websocket_client):
self.websocket_client = websocket_client
self.midi_port_index = 0
self.available_ports = []
self.midi_port = None
self.midi_task = None
# MIDI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.knob7 = 0
self.knob8 = 0
self.beat_sending_enabled = True
def get_midi_ports(self):
"""Get list of available MIDI input ports."""
try:
return mido.get_input_names()
except Exception as e:
logging.error(f"Error getting MIDI ports: {e}")
return []
def load_midi_preference(self):
"""Load saved MIDI device preference."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
return config.get('midi_device_index', 0)
except Exception as e:
logging.error(f"Error loading MIDI preference: {e}")
return 0
def save_midi_preference(self):
"""Save current MIDI device preference."""
try:
config = {
'midi_device_index': self.midi_port_index,
'midi_device_name': self.available_ports[self.midi_port_index] if self.available_ports else None
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
logging.error(f"Error saving MIDI preference: {e}")
async def initialize_midi(self):
"""Initialize MIDI port connection."""
self.available_ports = self.get_midi_ports()
self.midi_port_index = self.load_midi_preference()
if not self.available_ports:
logging.warning("No MIDI ports available")
return False
if not (0 <= self.midi_port_index < len(self.available_ports)):
self.midi_port_index = 0
try:
port_name = self.available_ports[self.midi_port_index]
self.midi_port = mido.open_input(port_name)
logging.info(f"Connected to MIDI port: {port_name}")
return True
except Exception as e:
logging.error(f"Failed to open MIDI port: {e}")
return False
async def start_midi_listener(self):
"""Start listening for MIDI messages."""
if not self.midi_port:
return
try:
while True:
msg = self.midi_port.receive(block=False)
if msg:
await self.handle_midi_message(msg)
await asyncio.sleep(0.001)
except asyncio.CancelledError:
logging.info("MIDI listener cancelled")
except Exception as e:
logging.error(f"MIDI listener error: {e}")
async def handle_midi_message(self, msg):
"""Handle incoming MIDI message and send to control server."""
if msg.type == 'note_on':
# Pattern selection (notes 36-51)
logging.info(f"MIDI Note {msg.note}: {msg.velocity}")
pattern_bindings = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate"
]
idx = msg.note - 36
if 0 <= idx < len(pattern_bindings):
self.current_pattern = pattern_bindings[idx]
await self.websocket_client.send_message("pattern_change", {
"pattern": self.current_pattern
})
logging.info(f"Pattern changed to: {self.current_pattern}")
elif msg.type == 'control_change':
# Handle control change messages
control = msg.control
value = msg.value
logging.info(f"MIDI CC {control}: {value}")
if control == 30: # Red
self.color_r = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 31: # Green
self.color_g = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 32: # Blue
self.color_b = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 33: # Brightness
self.brightness = round((value / 127) * 100)
await self.websocket_client.send_message("brightness_change", {
"brightness": self.brightness
})
elif control == 34: # n1
self.n1 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n1": self.n1
})
elif control == 35: # n2
self.n2 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n2": self.n2
})
elif control == 36: # n3
self.n3 = max(1, value)
await self.websocket_client.send_message("parameter_change", {
"n3": self.n3
})
elif control == 37: # Delay
self.delay = value * 4
await self.websocket_client.send_message("delay_change", {
"delay": self.delay
})
elif control == 27: # Beat sending toggle
self.beat_sending_enabled = (value == 127)
await self.websocket_client.send_message("beat_toggle", {
"enabled": self.beat_sending_enabled
})
def close(self):
"""Close MIDI connection."""
if self.midi_port:
self.midi_port.close()
self.midi_port = None
class UIClient:
"""Main UI client application."""
def __init__(self):
self.root = tk.Tk()
self.root.configure(bg=bg_color)
self.root.title("Lighting Controller - UI Client")
# WebSocket client
self.websocket_client = WebSocketClient(CONTROL_SERVER_URI)
# MIDI controller
self.midi_controller = MidiController(self.websocket_client)
# UI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.setup_ui()
self.setup_async_tasks()
def setup_ui(self):
"""Setup the user interface."""
# Configure ttk style
style = ttk.Style()
style.theme_use("alt")
style.configure(".", background=bg_color, foreground=fg_color, font=("Arial", 14))
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure("TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5])
# MIDI Controller Selection
midi_frame = ttk.LabelFrame(self.root, text="MIDI Controller")
midi_frame.pack(padx=16, pady=8, fill="x")
# MIDI port dropdown
self.midi_port_var = tk.StringVar()
midi_dropdown = ttk.Combobox(
midi_frame,
textvariable=self.midi_port_var,
values=[],
state="readonly",
font=("Arial", 12)
)
midi_dropdown.pack(padx=8, pady=4, fill="x")
midi_dropdown.bind("<<ComboboxSelected>>", self.on_midi_port_change)
# Refresh MIDI ports button
refresh_button = ttk.Button(
midi_frame,
text="Refresh MIDI Ports",
command=self.refresh_midi_ports
)
refresh_button.pack(padx=8, pady=4)
# MIDI connection status
self.midi_status_label = tk.Label(
midi_frame,
text="Status: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 10)
)
self.midi_status_label.pack(padx=8, pady=2)
# Controls overview
controls_frame = ttk.Frame(self.root)
controls_frame.pack(padx=16, pady=8, fill="both")
# Dials display
dials_frame = ttk.LabelFrame(controls_frame, text="Dials (CC30-37)")
dials_frame.pack(side="left", padx=12)
for c in range(2):
dials_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
dials_frame.grid_rowconfigure(rr, minsize=70)
self.dials_boxes = []
placeholders = {
(0, 0): "n3\n-", (0, 1): "Delay\n-",
(1, 0): "n1\n-", (1, 1): "n2\n-",
(2, 0): "B\n-", (2, 1): "Bright\n-",
(3, 0): "R\n-", (3, 1): "G\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
dials_frame,
text=placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.dials_boxes.append(lbl)
# Knobs display
knobs_frame = ttk.LabelFrame(controls_frame, text="Knobs (CC38-45)")
knobs_frame.pack(side="left", padx=12)
for c in range(2):
knobs_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
knobs_frame.grid_rowconfigure(rr, minsize=70)
self.knobs_boxes = []
knob_placeholders = {
(0, 0): "CC44\n-", (0, 1): "CC45\n-",
(1, 0): "Rad n1\n-", (1, 1): "Rad delay\n-",
(2, 0): "Alt n1\n-", (2, 1): "Alt n2\n-",
(3, 0): "Pulse n1\n-", (3, 1): "Pulse n2\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
knobs_frame,
text=knob_placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.knobs_boxes.append(lbl)
# Buttons display
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(side="left", padx=12)
buttons1_frame = ttk.LabelFrame(buttons_frame, text="Buttons (notes 36-51)")
buttons1_frame.pack(side="top", pady=8)
for c in range(4):
buttons1_frame.grid_columnconfigure(c, minsize=140)
for rr in range(1, 5):
buttons1_frame.grid_rowconfigure(rr, minsize=70)
self.button1_cells = []
for r in range(4):
for c in range(4):
lbl = tk.Label(
buttons1_frame,
text="",
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
self.button1_cells.append(lbl)
# Connection status
self.connection_status = tk.Label(
self.root,
text="Control Server: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 12)
)
self.connection_status.pack(pady=8)
# Schedule periodic UI updates
self.root.after(200, self.update_status_labels)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def setup_async_tasks(self):
"""Setup async tasks for WebSocket and MIDI."""
# Connect to control server
self.root.after(100, async_handler(self.websocket_client.connect))
# Initialize MIDI
self.root.after(200, async_handler(self.initialize_midi))
@async_handler
async def initialize_midi(self):
"""Initialize MIDI controller."""
success = await self.midi_controller.initialize_midi()
if success:
# Update UI
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
if self.midi_controller.available_ports:
self.midi_port_var.set(self.midi_controller.available_ports[self.midi_controller.midi_port_index])
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
break
break
self.midi_status_label.config(
text=f"Status: Connected to {self.midi_controller.available_ports[self.midi_controller.midi_port_index]}",
fg="green"
)
# Start MIDI listener
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def refresh_midi_ports(self):
"""Refresh MIDI ports list."""
old_ports = self.midi_controller.available_ports.copy()
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
if (self.midi_controller.available_ports and
self.midi_port_var.get() not in self.midi_controller.available_ports):
self.midi_port_var.set(self.midi_controller.available_ports[0])
self.midi_controller.midi_port_index = 0
self.midi_controller.save_midi_preference()
break
break
def on_midi_port_change(self, event):
"""Handle MIDI port selection change."""
selected_port = self.midi_port_var.get()
if selected_port in self.midi_controller.available_ports:
self.midi_controller.midi_port_index = self.midi_controller.available_ports.index(selected_port)
self.midi_controller.save_midi_preference()
# Restart MIDI connection
asyncio.create_task(self.restart_midi())
@async_handler
async def restart_midi(self):
"""Restart MIDI connection with new port."""
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
if self.midi_controller.midi_port:
self.midi_controller.midi_port.close()
success = await self.midi_controller.initialize_midi()
if success:
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def update_status_labels(self):
"""Update UI status labels."""
# Update connection status
if self.websocket_client.is_connected:
self.connection_status.config(text="Control Server: Connected", fg="green")
else:
self.connection_status.config(text="Control Server: Disconnected", fg="red")
# Update dial displays
dial_values = [
("n3", self.midi_controller.n3), ("Delay", self.midi_controller.delay),
("n1", self.midi_controller.n1), ("n2", self.midi_controller.n2),
("B", self.midi_controller.color_b), ("Brightness", self.midi_controller.brightness),
("R", self.midi_controller.color_r), ("G", self.midi_controller.color_g),
]
for idx, (label, value) in enumerate(dial_values):
if idx < len(self.dials_boxes):
self.dials_boxes[idx].config(text=f"{label}\n{value}")
# Update knobs
knob_values = [
("CC44", self.midi_controller.knob7), ("CC45", self.midi_controller.knob8),
("Rad n1", self.midi_controller.n1), ("Rad delay", self.midi_controller.delay),
("Alt n1", self.midi_controller.n1), ("Alt n2", self.midi_controller.n2),
("Pulse n1", self.midi_controller.n1), ("Pulse n2", self.midi_controller.n2),
]
for idx, (label, value) in enumerate(knob_values):
if idx < len(self.knobs_boxes):
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
# Update buttons
icon_for = {
"pulse": "💥", "flicker": "", "alternating": "↔️",
"n_chase": "🏃", "rainbow": "🌈", "radiate": "🌟",
"sequential_pulse": "🔄", "alternating_phase": "", "-": "",
}
bank1_patterns = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate",
"-", "-", "-", "-", "-", "-", "-", "-",
]
# Display names for UI (with line breaks for better display)
display_names = {
"pulse": "pulse",
"sequential_pulse": "sequential\npulse",
"alternating": "alternating",
"alternating_phase": "alternating\nphase",
"n_chase": "n chase",
"rainbow": "rainbow",
"flicker": "flicker",
"radiate": "radiate",
}
current_pattern = self.midi_controller.current_pattern
for idx, lbl in enumerate(self.button1_cells):
pattern_name = bank1_patterns[idx]
is_selected = (current_pattern == pattern_name and pattern_name != "-")
display_name = display_names.get(pattern_name, pattern_name)
icon = icon_for.get(pattern_name, "")
text = f"{icon} {display_name}" if pattern_name != "-" else ""
if is_selected:
lbl.config(text=text, bg=highlight_pattern_color)
else:
lbl.config(text=text, bg=bg_color)
# Reschedule
self.root.after(200, self.update_status_labels)
def on_closing(self):
"""Handle application closing."""
logging.info("Closing UI client...")
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
self.midi_controller.close()
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
def run(self):
"""Run the UI client."""
async_mainloop(self.root)
if __name__ == "__main__":
app = UIClient()
app.run()