UI: Color palette REST integration, MIDI 44–51 color slot selection, Color 1/2 previews with next indicator and click-to-select target; use REST for pattern changes and parameter updates (brightness, delay, n1–n3); send colors only on confirm; load palette on startup; fix NoneType await issue in async handlers

This commit is contained in:
2025-10-03 23:38:52 +13:00
parent 0906cb22e6
commit a654527dc3
4 changed files with 1925 additions and 83 deletions

View File

@@ -15,14 +15,118 @@ import logging
from async_tkinter_loop import async_handler, async_mainloop
import websockets
import websocket
from dotenv import load_dotenv
import atexit
import signal
import urllib.request
import urllib.error
# Load environment variables from .env file
load_dotenv()
# Single instance locker to prevent multiple UI processes
class SingleInstanceLocker:
def __init__(self, name: str):
self.name = name
self.lock_path = f"/tmp/{name}.lock"
self._fd = None
self.acquire()
def acquire(self):
try:
self._fd = os.open(self.lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.write(self._fd, str(os.getpid()).encode())
except FileExistsError:
# If lock exists but process is dead, remove it
try:
with open(self.lock_path, 'r') as f:
pid_str = f.read().strip()
if pid_str and pid_str.isdigit():
pid = int(pid_str)
try:
os.kill(pid, 0)
# Process exists, deny new instance
raise SystemExit("Another UI instance is already running.")
except OSError:
# Stale lock; remove
os.remove(self.lock_path)
return self.acquire()
else:
os.remove(self.lock_path)
return self.acquire()
except Exception:
raise SystemExit("Another UI instance may be running (lock busy).")
def release(self):
try:
if self._fd is not None:
os.close(self._fd)
self._fd = None
if os.path.exists(self.lock_path):
os.remove(self.lock_path)
except Exception:
pass
# Configuration
CONFIG_FILE = "config.json"
CONTROL_SERVER_URI = os.getenv("CONTROL_SERVER_URI", "ws://localhost:8765")
# Minimal .env loader (no external dependency)
def load_dotenv(filepath: str = ".env"):
try:
if not os.path.exists(filepath):
return
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' not in line:
continue
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
# Do not overwrite if already set in environment
if key and key not in os.environ:
os.environ[key] = value
except Exception:
# Silently ignore .env parse errors to avoid breaking UI
pass
# Load environment variables from .env if present
load_dotenv()
# Control server URI can be overridden via environment
CONTROL_SERVER_URI = os.getenv("CONTROL_SERVER_URI", "ws://10.1.1.117:8765")
def _build_palette_api_base() -> str:
try:
# Expect ws://host:port or ws://host
uri = CONTROL_SERVER_URI
if uri.startswith("ws://"):
http = "http://" + uri[len("ws://"):]
elif uri.startswith("wss://"):
http = "https://" + uri[len("wss://"):]
else:
http = uri
# Append API path
if http.endswith('/'):
http = http[:-1]
return f"{http}/api/color-palette"
except Exception:
return "http://localhost:8765/api/color-palette"
PALETTE_API_BASE = _build_palette_api_base()
def _build_base_http() -> str:
try:
uri = CONTROL_SERVER_URI
if uri.startswith("ws://"):
http = "http://" + uri[len("ws://"):]
elif uri.startswith("wss://"):
http = "https://" + uri[len("wss://"):]
else:
http = uri
if http.endswith('/'):
http = http[:-1]
return http
except Exception:
return "http://localhost:8765"
HTTP_BASE = _build_base_http()
# Dark theme colors
bg_color = "#2e2e2e"
@@ -48,6 +152,7 @@ class WebSocketClient:
self.websocket = None
self.is_connected = False
self.reconnect_task = None
self._stop_reconnect = False
async def connect(self):
"""Establish WebSocket connection to control server."""
@@ -64,6 +169,22 @@ class WebSocketClient:
self.is_connected = False
self.websocket = None
async def auto_reconnect(self, base_interval: float = 2.0, max_interval: float = 10.0):
"""Background task to reconnect if the connection drops."""
backoff = base_interval
while not self._stop_reconnect:
if not self.is_connected:
await self.connect()
# Exponential backoff when not connected
if not self.is_connected:
await asyncio.sleep(backoff)
backoff = min(max_interval, backoff * 1.5)
else:
backoff = base_interval
else:
# Connected; check again later
await asyncio.sleep(base_interval)
async def send_message(self, message_type, data=None):
"""Send a message to the control server."""
if not self.is_connected or not self.websocket:
@@ -83,11 +204,15 @@ class WebSocketClient:
async def close(self):
"""Close WebSocket connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from control server")
self._stop_reconnect = True
if self.websocket:
try:
await self.websocket.close()
except Exception:
pass
self.is_connected = False
self.websocket = None
logging.info("Disconnected from control server")
class MidiController:
@@ -113,6 +238,13 @@ class MidiController:
self.knob7 = 0
self.knob8 = 0
self.beat_sending_enabled = True
# Color palette selection state (two selected indices, alternating target)
self.selected_indices = [0, 1]
self.next_selected_target = 0 # 0 selects color1 next, 1 selects color2 next
# Optional async callback set by UI to persist selected indices via REST
self.on_select_palette_indices = None
# Optional async callback to persist parameter changes via REST
self.on_parameters_change = None
def get_midi_ports(self):
"""Get list of available MIDI input ports."""
@@ -185,19 +317,42 @@ class MidiController:
async def handle_midi_message(self, msg):
"""Handle incoming MIDI message and send to control server."""
if msg.type == 'note_on':
# Pattern selection (notes 36-51)
# Pattern selection for specific MIDI notes
logging.info(f"MIDI Note {msg.note}: {msg.velocity}")
pattern_bindings = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate"
]
idx = msg.note - 36
if 0 <= idx < len(pattern_bindings):
self.current_pattern = pattern_bindings[idx]
await self.websocket_client.send_message("pattern_change", {
"pattern": self.current_pattern
})
logging.info(f"Pattern changed to: {self.current_pattern}")
note_to_pattern = {
36: "on",
37: "o",
38: "f",
39: "a",
40: "p",
41: "r",
42: "rd",
43: "sm",
}
pattern = note_to_pattern.get(msg.note)
if pattern:
self.current_pattern = pattern
# Send pattern change via REST per api.md
try:
data = json.dumps({"pattern": pattern}).encode('utf-8')
req = urllib.request.Request(f"{HTTP_BASE}/api/pattern", data=data, method='POST', headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=2.0) as resp:
_ = resp.read()
logging.info(f"Pattern changed to: {pattern}")
except Exception as e:
logging.error(f"Failed to POST pattern change: {e}")
return
# Color selection notes 44-51 map to color slots 0-7
if 44 <= msg.note <= 51:
slot_index = msg.note - 44
# Set the selected slot for the currently chosen target (0=Color1, 1=Color2)
self.selected_indices[self.next_selected_target] = slot_index
if callable(self.on_select_palette_indices):
try:
self.on_select_palette_indices(self.selected_indices)
except Exception as _e:
logging.debug(f"Failed to persist selected indices: {_e}")
logging.info(f"Set Color {self.next_selected_target+1} to slot {slot_index+1}")
elif msg.type == 'control_change':
# Handle control change messages
@@ -205,46 +360,26 @@ class MidiController:
value = msg.value
logging.info(f"MIDI CC {control}: {value}")
if control == 30: # Red
self.color_r = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 31: # Green
self.color_g = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 32: # Blue
self.color_b = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 33: # Brightness
if control == 33: # Brightness (0-100)
self.brightness = round((value / 127) * 100)
await self.websocket_client.send_message("brightness_change", {
"brightness": self.brightness
})
elif control == 34: # n1
if callable(self.on_parameters_change):
self.on_parameters_change({"brightness": self.brightness})
elif control == 34: # n1 (0-255)
self.n1 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n1": self.n1
})
elif control == 35: # n2
if callable(self.on_parameters_change):
self.on_parameters_change({"n1": self.n1})
elif control == 35: # n2 (0-255)
self.n2 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n2": self.n2
})
elif control == 36: # n3
self.n3 = max(1, value)
await self.websocket_client.send_message("parameter_change", {
"n3": self.n3
})
elif control == 37: # Delay
self.delay = value * 4
await self.websocket_client.send_message("delay_change", {
"delay": self.delay
})
if callable(self.on_parameters_change):
self.on_parameters_change({"n2": self.n2})
elif control == 36: # n3 (>=1)
self.n3 = max(1, int(value))
if callable(self.on_parameters_change):
self.on_parameters_change({"n3": self.n3})
elif control == 37: # Delay (ms)
self.delay = int(value) * 4
if callable(self.on_parameters_change):
self.on_parameters_change({"delay": self.delay})
elif control == 27: # Beat sending toggle
self.beat_sending_enabled = (value == 127)
await self.websocket_client.send_message("beat_toggle", {
@@ -262,16 +397,20 @@ class UIClient:
"""Main UI client application."""
def __init__(self):
# Ensure single instance via lock file
self._lock = SingleInstanceLocker('lighting_controller_ui')
self.root = tk.Tk()
self.root.configure(bg=bg_color)
self.root.title("Lighting Controller - UI Client")
# Restore last window geometry if available
self.load_window_geometry()
# WebSocket client
self.websocket_client = WebSocketClient(CONTROL_SERVER_URI)
# MIDI controller
self.midi_controller = MidiController(self.websocket_client)
# UI state
self.current_pattern = ""
self.delay = 100
@@ -282,10 +421,31 @@ class UIClient:
self.n1 = 10
self.n2 = 10
self.n3 = 1
# Cache for per-pattern windows
self.pattern_windows = {}
# Color slots (8) and selected slot via MIDI
self.color_slots = []
self.selected_color_slot = None
self._init_color_slots()
self.setup_ui()
self.setup_async_tasks()
# Hook MIDI controller selection persistence to REST method
try:
self.midi_controller.on_select_palette_indices = self.persist_selected_indices
self.midi_controller.on_parameters_change = self.persist_parameters
except Exception:
pass
# Graceful shutdown on signals
try:
signal.signal(signal.SIGTERM, lambda *_: self.on_closing())
signal.signal(signal.SIGINT, lambda *_: self.on_closing())
except Exception:
pass
atexit.register(self._cleanup_at_exit)
def setup_ui(self):
"""Setup the user interface."""
# Configure ttk style
@@ -295,9 +455,13 @@ class UIClient:
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure("TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5])
# MIDI Controller Selection
midi_frame = ttk.LabelFrame(self.root, text="MIDI Controller")
midi_frame.pack(padx=16, pady=8, fill="x")
# Top bar: MIDI Controller (left) + Selected Colors (right)
top_bar = ttk.Frame(self.root)
top_bar.pack(padx=16, pady=8, fill="x")
# MIDI Controller Selection (smaller, on the left)
midi_frame = ttk.LabelFrame(top_bar, text="MIDI Controller")
midi_frame.pack(side="left", padx=8)
# MIDI port dropdown
self.midi_port_var = tk.StringVar()
@@ -306,7 +470,7 @@ class UIClient:
textvariable=self.midi_port_var,
values=[],
state="readonly",
font=("Arial", 12)
font=("Arial", 11)
)
midi_dropdown.pack(padx=8, pady=4, fill="x")
midi_dropdown.bind("<<ComboboxSelected>>", self.on_midi_port_change)
@@ -329,6 +493,27 @@ class UIClient:
)
self.midi_status_label.pack(padx=8, pady=2)
# Selected color preview boxes (on the right)
previews_frame = ttk.LabelFrame(top_bar, text="Selected Colors")
previews_frame.pack(side="right", padx=8)
self.color1_preview = tk.Label(previews_frame, text="Color 1", width=14, height=2, bg="#000000", fg="#FFFFFF", font=("Arial", 12), borderwidth=2, relief="ridge")
self.color2_preview = tk.Label(previews_frame, text="Color 2", width=14, height=2, bg="#000000", fg="#FFFFFF", font=("Arial", 12), borderwidth=2, relief="ridge")
self.color1_preview.grid(row=0, column=0, padx=8, pady=8)
self.color2_preview.grid(row=1, column=0, padx=8, pady=8)
# Click to choose which target is set by MIDI
def _set_target_color1(_e=None):
try:
self.midi_controller.next_selected_target = 0
except Exception:
pass
def _set_target_color2(_e=None):
try:
self.midi_controller.next_selected_target = 1
except Exception:
pass
self.color1_preview.bind("<Button-1>", _set_target_color1)
self.color2_preview.bind("<Button-1>", _set_target_color2)
# Controls overview
controls_frame = ttk.Frame(self.root)
controls_frame.pack(padx=16, pady=8, fill="both")
@@ -422,6 +607,11 @@ class UIClient:
)
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
self.button1_cells.append(lbl)
# Bind clicks to open/focus child window and select pattern
for idx, lbl in enumerate(self.button1_cells):
lbl.bind("<Button-1>", lambda e, i=idx: self.on_pattern_button_click(i))
# (Previews moved to top bar)
# Connection status
self.connection_status = tk.Label(
@@ -437,10 +627,126 @@ class UIClient:
self.root.after(200, self.update_status_labels)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def open_pattern_window(self, pattern_name: str):
"""Create or focus a child window per pattern with sliders for parameters."""
if pattern_name in self.pattern_windows:
win = self.pattern_windows[pattern_name]
if win.winfo_exists():
try:
win.deiconify()
win.lift()
win.focus_force()
except Exception:
pass
return
win = tk.Toplevel(self.root)
win.title(f"Pattern: {pattern_name}")
win.configure(bg=bg_color)
# Make the child window larger for touch use
try:
win.geometry("900x520")
except Exception:
pass
self.pattern_windows[pattern_name] = win
def on_close():
try:
win.withdraw()
except Exception:
try:
win.destroy()
except Exception:
pass
win.protocol("WM_DELETE_WINDOW", on_close)
frm = ttk.Frame(win)
frm.pack(padx=20, pady=20, fill="both", expand=True)
# Helper to add a labeled slider
def add_slider(row: int, label_text: str, from_value, to_value, initial_value, on_change_cb):
lbl = tk.Label(frm, text=label_text, bg=bg_color, fg=fg_color, font=("Arial", 16))
lbl.grid(row=row, column=0, sticky="w", padx=12, pady=16)
var = tk.IntVar(value=int(initial_value))
s = tk.Scale(
frm,
from_=from_value,
to=to_value,
orient="horizontal",
showvalue=True,
resolution=1,
variable=var,
length=700,
sliderlength=32,
width=26,
bg=bg_color,
fg=fg_color,
highlightthickness=0,
troughcolor=trough_color_brightness,
command=lambda v: on_change_cb(int(float(v))),
)
# Enable click-to-jump behavior on the slider trough
def click_set_value(event, scale=s, vmin=from_value, vmax=to_value):
try:
# Compute fraction across the widget width
width = max(1, scale.winfo_width())
frac = min(1.0, max(0.0, event.x / width))
value = int(round(vmin + frac * (vmax - vmin)))
scale.set(value)
on_change_cb(int(value))
except Exception:
pass
s.bind("<Button-1>", click_set_value, add="+")
s.grid(row=row, column=1, sticky="ew", padx=12, pady=16)
frm.grid_columnconfigure(1, weight=1)
return s
# Sliders: n1, n2, n3, delay
add_slider(0, "n1", 0, 127, self.n1, self._on_change_n1)
add_slider(1, "n2", 0, 127, self.n2, self._on_change_n2)
add_slider(2, "n3", 1, 127, self.n3, self._on_change_n3)
add_slider(3, "delay (ms)", 0, 1000, self.delay, self._on_change_delay)
# Close button row
btn_row = 4
btns = ttk.Frame(win)
btns.pack(fill="x", padx=16, pady=10)
close_btn = tk.Button(btns, text="Close", command=on_close, font=("Arial", 14), padx=16, pady=8)
close_btn.pack(side="right")
try:
win.deiconify(); win.lift()
except Exception:
pass
@async_handler
async def _on_change_n1(self, value: int):
self.n1 = int(value)
self.persist_parameters({"n1": self.n1})
@async_handler
async def _on_change_n2(self, value: int):
self.n2 = int(value)
self.persist_parameters({"n2": self.n2})
@async_handler
async def _on_change_n3(self, value: int):
self.n3 = int(value) if value >= 1 else 1
self.persist_parameters({"n3": self.n3})
@async_handler
async def _on_change_delay(self, value: int):
self.delay = int(value)
self.persist_parameters({"delay": self.delay})
def setup_async_tasks(self):
"""Setup async tasks for WebSocket and MIDI."""
# Connect to control server
self.root.after(100, async_handler(self.websocket_client.connect))
# Start auto-reconnect background task
self.root.after(300, async_handler(self.start_ws_reconnect))
# Fetch color palette shortly after connect
self.root.after(600, async_handler(self.fetch_color_palette))
# Initialize MIDI
self.root.after(200, async_handler(self.initialize_midi))
@@ -474,6 +780,14 @@ class UIClient:
self.midi_controller.start_midi_listener()
)
@async_handler
async def start_ws_reconnect(self):
if not self.websocket_client.reconnect_task:
self.websocket_client._stop_reconnect = False
self.websocket_client.reconnect_task = asyncio.create_task(
self.websocket_client.auto_reconnect()
)
def refresh_midi_ports(self):
"""Refresh MIDI ports list."""
old_ports = self.midi_controller.available_ports.copy()
@@ -500,7 +814,7 @@ class UIClient:
self.midi_controller.midi_port_index = self.midi_controller.available_ports.index(selected_port)
self.midi_controller.save_midi_preference()
# Restart MIDI connection
asyncio.create_task(self.restart_midi())
self.restart_midi()
@async_handler
async def restart_midi(self):
@@ -548,58 +862,407 @@ class UIClient:
# Update buttons
icon_for = {
"pulse": "💥", "flicker": "", "alternating": "↔️",
"n_chase": "🏃", "rainbow": "🌈", "radiate": "🌟",
"sequential_pulse": "🔄", "alternating_phase": "", "-": "",
# long names
"on": "🟢", "off": "", "flicker": "",
"alternating": "↔️", "pulse": "💥", "rainbow": "🌈",
"radiate": "🌟", "segmented_movement": "🔀",
# short codes used by led-bar
"o": "", "f": "", "a": "↔️", "p": "💥",
"r": "🌈", "rd": "🌟", "sm": "🔀",
"-": "",
}
bank1_patterns = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate",
"-", "-", "-", "-", "-", "-", "-", "-",
]
bank1_patterns = self.get_bank1_patterns()
# Display names for UI (with line breaks for better display)
display_names = {
"pulse": "pulse",
"sequential_pulse": "sequential\npulse",
"alternating": "alternating",
"alternating_phase": "alternating\nphase",
"n_chase": "n chase",
"rainbow": "rainbow",
# long
"on": "on",
"off": "off",
"flicker": "flicker",
"alternating": "alternating",
"pulse": "pulse",
"rainbow": "rainbow",
"radiate": "radiate",
"segmented_movement": "segmented\nmovement",
# short
"o": "off",
"f": "flicker",
"a": "alternating",
"p": "pulse",
"r": "rainbow",
"rd": "radiate",
"sm": "segmented\nmovement",
}
current_pattern = self.midi_controller.current_pattern
# Normalize current pattern for highlight (map short codes to long names)
current_raw = self.midi_controller.current_pattern or self.current_pattern
short_to_long = {
"o": "off", "f": "flicker", "a": "alternating", "p": "pulse",
"r": "rainbow", "rd": "radiate", "sm": "segmented_movement",
}
current_pattern = short_to_long.get(current_raw, current_raw)
for idx, lbl in enumerate(self.button1_cells):
pattern_name = bank1_patterns[idx]
is_selected = (current_pattern == pattern_name and pattern_name != "-")
display_name = display_names.get(pattern_name, pattern_name)
icon = icon_for.get(pattern_name, "")
icon = icon_for.get(pattern_name, icon_for.get(current_raw, ""))
text = f"{icon} {display_name}" if pattern_name != "-" else ""
if is_selected:
lbl.config(text=text, bg=highlight_pattern_color)
else:
lbl.config(text=text, bg=bg_color)
# Render color cells in indices 8..15
if self.color_slots and len(self.button1_cells) >= 16:
for color_idx in range(8):
cell_index = 8 + color_idx
lbl = self.button1_cells[cell_index]
r, g, b = self.color_slots[color_idx]
hex_color = self._rgb_to_hex(r, g, b)
text_color = "#000000" if (r*0.299 + g*0.587 + b*0.114) > 186 else "#FFFFFF"
# Indicate if this slot is currently assigned to color1 or color2
assigned = "1" if (hasattr(self.midi_controller, 'selected_indices') and self.midi_controller.selected_indices and self.midi_controller.selected_indices[0] == color_idx) else ("2" if (hasattr(self.midi_controller, 'selected_indices') and len(self.midi_controller.selected_indices) > 1 and self.midi_controller.selected_indices[1] == color_idx) else "")
label_text = f"C{color_idx+1}{' ('+assigned+')' if assigned else ''}"
lbl.config(
text=label_text,
bg=hex_color,
fg=text_color,
)
# Update selected color preview boxes
try:
if hasattr(self, 'color1_preview') and hasattr(self, 'color2_preview'):
if hasattr(self.midi_controller, 'selected_indices') and self.color_slots:
idx1 = self.midi_controller.selected_indices[0] if len(self.midi_controller.selected_indices) > 0 else 0
idx2 = self.midi_controller.selected_indices[1] if len(self.midi_controller.selected_indices) > 1 else 1
r1, g1, b1 = self.color_slots[idx1]
r2, g2, b2 = self.color_slots[idx2]
# Update preview colors
self.color1_preview.configure(bg=self._rgb_to_hex(r1, g1, b1), fg=("#000000" if (r1*0.299+g1*0.587+b1*0.114) > 186 else "#FFFFFF"))
self.color2_preview.configure(bg=self._rgb_to_hex(r2, g2, b2), fg=("#000000" if (r2*0.299+g2*0.587+b2*0.114) > 186 else "#FFFFFF"))
# Indicate which color will be set next by MIDI (toggle target)
next_target = getattr(self.midi_controller, 'next_selected_target', 0)
if next_target == 0:
# Next sets Color 1
self.color1_preview.configure(text="Color 1 (next)", borderwidth=3, relief="solid")
self.color2_preview.configure(text="Color 2", borderwidth=2, relief="ridge")
else:
# Next sets Color 2
self.color1_preview.configure(text="Color 1", borderwidth=2, relief="ridge")
self.color2_preview.configure(text="Color 2 (next)", borderwidth=3, relief="solid")
except Exception:
pass
# Render color cells in indices 8..15
if self.color_slots and len(self.button1_cells) >= 16:
for color_idx in range(8):
cell_index = 8 + color_idx
lbl = self.button1_cells[cell_index]
r, g, b = self.color_slots[color_idx]
hex_color = self._rgb_to_hex(r, g, b)
text_color = "#000000" if (r*0.299 + g*0.587 + b*0.114) > 186 else "#FFFFFF"
is_selected_color = (self.selected_color_slot == color_idx)
lbl.config(
text=f"C{color_idx+1}",
bg=hex_color,
fg=text_color,
borderwidth=4 if is_selected_color else 2,
relief="solid" if is_selected_color else "ridge",
)
# Reschedule
self.root.after(200, self.update_status_labels)
@async_handler
async def fetch_color_palette(self):
"""Request color palette from server and hydrate UI state."""
try:
req = urllib.request.Request(PALETTE_API_BASE, method='GET')
with urllib.request.urlopen(req, timeout=2.0) as resp:
data = json.loads(resp.read().decode('utf-8'))
palette = data.get("palette")
selected_indices = data.get("selected_indices")
if isinstance(palette, list) and len(palette) == 8:
new_slots = []
for c in palette:
r = int(c.get("r", 0)); g = int(c.get("g", 0)); b = int(c.get("b", 0))
r = max(0, min(255, r)); g = max(0, min(255, g)); b = max(0, min(255, b))
new_slots.append((r, g, b))
self.color_slots = new_slots
if isinstance(selected_indices, list) and len(selected_indices) == 2:
try:
self.midi_controller.selected_indices = [int(selected_indices[0]), int(selected_indices[1])]
except Exception:
pass
except Exception as e:
logging.debug(f"Failed to fetch color palette (REST): {e}")
@async_handler
async def persist_palette(self):
"""POST current palette to server via REST."""
try:
payload = {
"palette": [{"r": c[0], "g": c[1], "b": c[2]} for c in self.color_slots]
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(PALETTE_API_BASE, data=data, method='POST', headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=2.0) as resp:
_ = resp.read()
except Exception as e:
logging.debug(f"Failed to persist palette (REST): {e}")
@async_handler
async def persist_selected_indices(self, indices: list[int]):
"""POST selected indices via REST."""
try:
payload = {"selected_indices": [int(indices[0]), int(indices[1])]} if len(indices) == 2 else None
if not payload:
return
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(PALETTE_API_BASE, data=data, method='POST', headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=2.0) as resp:
_ = resp.read()
except Exception as e:
logging.debug(f"Failed to persist selected indices (REST): {e}")
@async_handler
async def persist_parameters(self, params: dict):
"""POST parameter changes via REST to /api/parameters."""
try:
data = json.dumps(params).encode('utf-8')
req = urllib.request.Request(f"{HTTP_BASE}/api/parameters", data=data, method='POST', headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=2.0) as resp:
_ = resp.read()
except Exception as e:
logging.debug(f"Failed to persist parameters (REST): {e}")
def on_closing(self):
"""Handle application closing."""
logging.info("Closing UI client...")
# Persist window geometry
self.save_window_geometry()
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
self.midi_controller.close()
try:
if self.websocket_client and self.websocket_client.reconnect_task:
self.websocket_client._stop_reconnect = True
self.websocket_client.reconnect_task.cancel()
except Exception:
pass
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
# Release lock
try:
self._lock.release()
except Exception:
pass
def run(self):
"""Run the UI client."""
async_mainloop(self.root)
def _cleanup_at_exit(self):
try:
self._lock.release()
except Exception:
pass
def get_bank1_patterns(self):
return [
"on", "off", "flicker", "alternating",
"pulse", "rainbow", "radiate", "segmented_movement",
"-", "-", "-", "-", "-", "-", "-", "-",
]
def on_pattern_button_click(self, index: int):
patterns = self.get_bank1_patterns()
if 0 <= index < len(patterns):
pattern = patterns[index]
if index < 8 and pattern != "-":
# Send selection and open the window
self.select_pattern(pattern)
self.open_pattern_window(pattern)
elif index >= 8:
# Open color editor window for the slot (no selection change)
slot_index = index - 8
self.open_color_window(slot_index)
@async_handler
async def select_pattern(self, pattern: str):
try:
self.current_pattern = pattern
# Use REST API per api.md
try:
data = json.dumps({"pattern": pattern}).encode('utf-8')
req = urllib.request.Request(f"{HTTP_BASE}/api/pattern", data=data, method='POST', headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=2.0) as resp:
_ = resp.read()
except Exception as e:
logging.debug(f"Pattern REST update failed: {e}")
except Exception as e:
logging.debug(f"Failed to select pattern {pattern}: {e}")
def _init_color_slots(self):
if not self.color_slots:
self.color_slots = [
(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
(255, 0, 255), (0, 255, 255), (255, 255, 255), (128, 128, 128),
]
def _rgb_to_hex(self, r, g, b):
r = max(0, min(255, int(r)))
g = max(0, min(255, int(g)))
b = max(0, min(255, int(b)))
return f"#{r:02x}{g:02x}{b:02x}"
def open_color_window(self, slot_index: int):
key = f"color_slot_{slot_index}"
if key in self.pattern_windows:
win = self.pattern_windows[key]
if win.winfo_exists():
try:
win.deiconify(); win.lift(); win.focus_force()
except Exception:
pass
return
win = tk.Toplevel(self.root)
win.title(f"Color Slot {slot_index+1}")
win.configure(bg=bg_color)
try:
win.geometry("700x360")
except Exception:
pass
self.pattern_windows[key] = win
def on_close():
try:
win.withdraw()
except Exception:
try:
win.destroy()
except Exception:
pass
win.protocol("WM_DELETE_WINDOW", on_close)
frm = ttk.Frame(win)
frm.pack(padx=20, pady=20, fill="both", expand=True)
r, g, b = self.color_slots[slot_index]
def add_rgb_slider(row, label, initial, on_change_cb):
lbl = tk.Label(frm, text=label, bg=bg_color, fg=fg_color, font=("Arial", 16))
lbl.grid(row=row, column=0, sticky="w", padx=12, pady=16)
var = tk.IntVar(value=int(initial))
s = tk.Scale(frm, from_=0, to=255, orient="horizontal", showvalue=True, resolution=1,
variable=var, length=560, sliderlength=28, width=20, bg=bg_color, fg=fg_color,
highlightthickness=0, troughcolor=trough_color_brightness,
command=lambda v: on_change_cb(int(float(v))))
s.grid(row=row, column=1, sticky="ew", padx=12, pady=16)
frm.grid_columnconfigure(1, weight=1)
return s
def on_r(val):
self._update_color_slot(slot_index, r=val)
def on_g(val):
self._update_color_slot(slot_index, g=val)
def on_b(val):
self._update_color_slot(slot_index, b=val)
add_rgb_slider(0, "Red", r, on_r)
add_rgb_slider(1, "Green", g, on_g)
add_rgb_slider(2, "Blue", b, on_b)
# Preview acts as a confirm/select button for this color slot
def on_preview_click():
self.selected_color_slot = slot_index
rr, gg, bb = self.color_slots[slot_index]
# Persist selection via REST (first index is used for pattern color per api.md)
asyncio.create_task(self.persist_selected_indices([slot_index, 1]))
# For immediate LED update, optional: send parameters API if required. Keeping color_change for now is removed per REST-only guidance.
on_close()
preview = tk.Button(
frm,
text="Use This Color",
font=("Arial", 14),
bg=self._rgb_to_hex(r, g, b),
fg="#000000",
width=16,
height=2,
command=on_preview_click,
)
preview.grid(row=3, column=0, columnspan=2, pady=12)
# Close button row
btns = ttk.Frame(win)
btns.pack(fill="x", padx=16, pady=10)
close_btn = tk.Button(btns, text="Close", command=on_close, font=("Arial", 14), padx=16, pady=8)
close_btn.pack(side="right")
def refresh_preview():
rr, gg, bb = self.color_slots[slot_index]
preview.configure(bg=self._rgb_to_hex(rr, gg, bb),
fg="#000000" if (rr*0.299+gg*0.587+bb*0.114) > 186 else "#FFFFFF")
self.root.after(200, refresh_preview)
refresh_preview()
def _update_color_slot(self, slot_index, r=None, g=None, b=None):
cr, cg, cb = self.color_slots[slot_index]
if r is not None: cr = int(r)
if g is not None: cg = int(g)
if b is not None: cb = int(b)
self.color_slots[slot_index] = (cr, cg, cb)
# Update palette colors on backend via REST (no immediate output)
asyncio.create_task(self.persist_palette())
# Do not send color_change here; only send when user confirms via preview button
# ----------------------
# Window geometry persist
# ----------------------
def load_window_geometry(self):
"""Load last saved window geometry from config and apply it."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
cfg = json.load(f)
geom = cfg.get('window_geometry')
if isinstance(geom, dict):
x = geom.get('x')
y = geom.get('y')
w = geom.get('w')
h = geom.get('h')
if all(isinstance(v, int) for v in (x, y, w, h)) and w > 0 and h > 0:
self.root.geometry(f"{w}x{h}+{x}+{y}")
except Exception as e:
logging.debug(f"Failed to load window geometry: {e}")
def save_window_geometry(self):
"""Save current window geometry to config."""
try:
# Get current position and size
self.root.update_idletasks()
x = self.root.winfo_x()
y = self.root.winfo_y()
w = self.root.winfo_width()
h = self.root.winfo_height()
cfg = {}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r') as f:
cfg = json.load(f) or {}
except Exception:
cfg = {}
cfg['window_geometry'] = {'x': int(x), 'y': int(y), 'w': int(w), 'h': int(h)}
with open(CONFIG_FILE, 'w') as f:
json.dump(cfg, f, indent=2)
except Exception as e:
logging.debug(f"Failed to save window geometry: {e}")
if __name__ == "__main__":
app = UIClient()