- Created Flask backend with REST API endpoints - Built HTML/CSS/JavaScript frontend - Replaced RGB sliders with color pickers for each palette color - Reorganized layout: color palette on left, patterns on right - Added persistence for color changes - Integrated WebSocket client for lighting controller communication - Added tab management, profile support, and pattern selection
1055 lines
38 KiB
Python
1055 lines
38 KiB
Python
"""
|
|
Textual-based UI for the lighting controller.
|
|
This replaces the Tkinter-based UI with a modern terminal/web interface.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import math
|
|
import time
|
|
from typing import Optional
|
|
from textual import on, App, work
|
|
from textual.app import ComposeResult
|
|
from textual.containers import Container, Horizontal, Vertical, Grid
|
|
from textual.widgets import (
|
|
Button,
|
|
Label,
|
|
Input,
|
|
Select,
|
|
Tabs,
|
|
Tab,
|
|
TabPane,
|
|
Slider,
|
|
Static,
|
|
Header,
|
|
Footer,
|
|
DataTable,
|
|
Log
|
|
)
|
|
from textual.screen import Screen, ModalScreen
|
|
from textual.binding import Binding
|
|
from textual.message import Message
|
|
from textual.worker import Worker, get_current_worker
|
|
from networking import WebSocketClient
|
|
import color_utils
|
|
from settings import Settings
|
|
|
|
|
|
def delay_to_slider(delay_ms, min_delay=10, max_delay=10000):
|
|
"""Convert delay in ms to slider position (0-1000) using logarithmic scale."""
|
|
if delay_ms <= min_delay:
|
|
return 0
|
|
if delay_ms >= max_delay:
|
|
return 1000
|
|
if min_delay == max_delay:
|
|
return 0
|
|
return 1000 * math.log(delay_ms / min_delay) / math.log(max_delay / min_delay)
|
|
|
|
|
|
def slider_to_delay(slider_value, min_delay=10, max_delay=10000):
|
|
"""Convert slider position (0-1000) to delay in ms using logarithmic scale."""
|
|
if slider_value <= 0:
|
|
return min_delay
|
|
if slider_value >= 1000:
|
|
return max_delay
|
|
if min_delay == max_delay:
|
|
return min_delay
|
|
return int(min_delay * ((max_delay / min_delay) ** (slider_value / 1000)))
|
|
|
|
|
|
class ColorSwatch(Static):
|
|
"""A clickable color swatch widget."""
|
|
|
|
def __init__(self, hex_color: str, index: int, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.hex_color = hex_color
|
|
self.index = index
|
|
self.border_title = f"Color {index + 1}"
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Static(f"Color {self.index + 1}", classes="color-label")
|
|
|
|
def on_mount(self) -> None:
|
|
r, g, b = color_utils.hex_to_rgb(self.hex_color)
|
|
# Use ANSI color codes for terminal, or CSS for web
|
|
self.styles.background = self.hex_color
|
|
text_color = color_utils.get_contrast_text_color(self.hex_color)
|
|
self.styles.color = text_color
|
|
|
|
def on_click(self) -> None:
|
|
self.post_message(ColorSelected(self.index))
|
|
|
|
|
|
class ColorSelected(Message):
|
|
"""Message sent when a color swatch is clicked."""
|
|
def __init__(self, index: int):
|
|
super().__init__()
|
|
self.index = index
|
|
|
|
|
|
class PatternButton(Button):
|
|
"""A pattern selection button."""
|
|
|
|
def __init__(self, pattern_name: str, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.pattern_name = pattern_name
|
|
self.label = pattern_name
|
|
|
|
|
|
class AddTabScreen(ModalScreen):
|
|
"""Modal screen for adding a new tab."""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Container(classes="dialog"):
|
|
yield Label("Add New Tab", classes="dialog-title")
|
|
yield Label("Tab Name:", classes="label")
|
|
yield Input(placeholder="Enter tab name", id="tab-name-input")
|
|
yield Label("Device IDs (comma-separated):", classes="label")
|
|
yield Input(placeholder="1,2,3", id="ids-input", value="1")
|
|
with Horizontal(classes="button-row"):
|
|
yield Button("Add", id="add-button", variant="primary")
|
|
yield Button("Cancel", id="cancel-button", variant="default")
|
|
|
|
def on_mount(self) -> None:
|
|
self.query_one("#tab-name-input", Input).focus()
|
|
|
|
@on(Button.Pressed, "#add-button")
|
|
def on_add(self) -> None:
|
|
name_input = self.query_one("#tab-name-input", Input)
|
|
ids_input = self.query_one("#ids-input", Input)
|
|
tab_name = name_input.value.strip()
|
|
ids_str = ids_input.value.strip()
|
|
|
|
if not tab_name:
|
|
self.app.bell()
|
|
return
|
|
|
|
ids = [id.strip() for id in ids_str.split(",") if id.strip()]
|
|
if not ids:
|
|
ids = ["1"]
|
|
|
|
self.dismiss((tab_name, ids))
|
|
|
|
@on(Button.Pressed, "#cancel-button")
|
|
def on_cancel(self) -> None:
|
|
self.dismiss(None)
|
|
|
|
|
|
class EditTabScreen(ModalScreen):
|
|
"""Modal screen for editing a tab."""
|
|
|
|
def __init__(self, tab_name: str, current_ids: list[str], *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.tab_name = tab_name
|
|
self.current_ids = current_ids
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Container(classes="dialog"):
|
|
yield Label("Edit Tab", classes="dialog-title")
|
|
yield Label("Tab Name:", classes="label")
|
|
yield Input(placeholder="Enter tab name", id="tab-name-input", value=self.tab_name)
|
|
yield Label("Device IDs (comma-separated):", classes="label")
|
|
yield Input(placeholder="1,2,3", id="ids-input", value=", ".join(self.current_ids))
|
|
with Horizontal(classes="button-row"):
|
|
yield Button("Update", id="update-button", variant="primary")
|
|
yield Button("Cancel", id="cancel-button", variant="default")
|
|
|
|
def on_mount(self) -> None:
|
|
self.query_one("#tab-name-input", Input).focus()
|
|
|
|
@on(Button.Pressed, "#update-button")
|
|
def on_update(self) -> None:
|
|
name_input = self.query_one("#tab-name-input", Input)
|
|
ids_input = self.query_one("#ids-input", Input)
|
|
new_tab_name = name_input.value.strip()
|
|
ids_str = ids_input.value.strip()
|
|
|
|
if not new_tab_name:
|
|
self.app.bell()
|
|
return
|
|
|
|
ids = [id.strip() for id in ids_str.split(",") if id.strip()]
|
|
if not ids:
|
|
ids = ["1"]
|
|
|
|
self.dismiss((new_tab_name, ids))
|
|
|
|
@on(Button.Pressed, "#cancel-button")
|
|
def on_cancel(self) -> None:
|
|
self.dismiss(None)
|
|
|
|
|
|
class LightingControllerApp(App):
|
|
"""Main Textual application for the lighting controller."""
|
|
|
|
CSS = """
|
|
Screen {
|
|
background: $surface;
|
|
}
|
|
|
|
.main-container {
|
|
layout: horizontal;
|
|
height: 100%;
|
|
}
|
|
|
|
.left-panel {
|
|
width: 40%;
|
|
border-right: wide $primary;
|
|
padding: 1;
|
|
}
|
|
|
|
.right-panel {
|
|
width: 60%;
|
|
padding: 1;
|
|
}
|
|
|
|
.slider-container {
|
|
height: 20;
|
|
margin: 1;
|
|
}
|
|
|
|
.slider-label {
|
|
text-align: center;
|
|
margin-bottom: 1;
|
|
}
|
|
|
|
.n-params-grid {
|
|
grid-size: 2;
|
|
grid-gutter: 1;
|
|
margin: 1;
|
|
}
|
|
|
|
.n-param-container {
|
|
layout: horizontal;
|
|
align: center middle;
|
|
}
|
|
|
|
.n-param-label {
|
|
width: 8;
|
|
text-align: right;
|
|
margin-right: 1;
|
|
}
|
|
|
|
.n-param-input {
|
|
width: 6;
|
|
}
|
|
|
|
.pattern-buttons {
|
|
layout: vertical;
|
|
height: 1fr;
|
|
}
|
|
|
|
.pattern-button {
|
|
margin: 1;
|
|
width: 100%;
|
|
}
|
|
|
|
.color-palette {
|
|
layout: vertical;
|
|
height: 1fr;
|
|
}
|
|
|
|
.color-swatch {
|
|
height: 3;
|
|
margin: 1;
|
|
border: solid $primary;
|
|
}
|
|
|
|
.color-label {
|
|
text-align: center;
|
|
width: 100%;
|
|
}
|
|
|
|
.dialog {
|
|
width: 60;
|
|
height: auto;
|
|
border: solid $primary;
|
|
padding: 1;
|
|
background: $surface;
|
|
}
|
|
|
|
.dialog-title {
|
|
text-align: center;
|
|
text-style: bold;
|
|
margin-bottom: 1;
|
|
}
|
|
|
|
.label {
|
|
margin-top: 1;
|
|
}
|
|
|
|
.button-row {
|
|
align: center middle;
|
|
margin-top: 2;
|
|
}
|
|
|
|
.button-row Button {
|
|
margin: 1;
|
|
}
|
|
|
|
.ids-display {
|
|
layout: horizontal;
|
|
margin: 1;
|
|
}
|
|
|
|
.ids-label {
|
|
margin-right: 1;
|
|
}
|
|
"""
|
|
|
|
BINDINGS = [
|
|
Binding("q", "quit", "Quit"),
|
|
Binding("a", "add_tab", "Add Tab"),
|
|
Binding("e", "edit_tab", "Edit Tab"),
|
|
Binding("d", "delete_tab", "Delete Tab"),
|
|
Binding("p", "profiles", "Profiles"),
|
|
]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.settings = Settings()
|
|
self.patterns = self.load_patterns()
|
|
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
|
|
self.tabs_data = {} # Store tab widget references
|
|
self.current_tab_name = None
|
|
|
|
# Debouncing variables
|
|
self.last_rgb_update_time = 0
|
|
self.rgb_update_interval_ms = 100
|
|
self.pending_rgb_update = None
|
|
|
|
self.last_brightness_update_time = 0
|
|
self.brightness_update_interval_ms = 100
|
|
self.pending_brightness_update = None
|
|
|
|
self.last_delay_update_time = 0
|
|
self.delay_update_interval_ms = 100
|
|
self.pending_delay_update = None
|
|
|
|
self.last_n_params_update_time = 0
|
|
self.n_params_update_interval_ms = 100
|
|
self.pending_n_params_update = None
|
|
|
|
def load_patterns(self):
|
|
"""Load patterns from settings.json file."""
|
|
try:
|
|
patterns = self.settings.get("patterns", {})
|
|
if not patterns:
|
|
patterns = {}
|
|
self.settings["patterns"] = patterns
|
|
self.settings.save()
|
|
return patterns
|
|
except Exception as e:
|
|
print(f"Error loading patterns: {e}")
|
|
return {}
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
yield Tabs(id="main-tabs")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Called when the app is mounted."""
|
|
# Connect to WebSocket
|
|
self.connect_websocket()
|
|
# Create tabs if we have lights configured
|
|
if self.settings.get("lights"):
|
|
self.create_tabs()
|
|
|
|
@work(exclusive=False)
|
|
async def connect_websocket(self) -> None:
|
|
"""Connect to the WebSocket server."""
|
|
await self.websocket_client.connect()
|
|
|
|
def create_tabs(self) -> None:
|
|
"""Create tabs from settings."""
|
|
tabs_widget = self.query_one("#main-tabs", Tabs)
|
|
tabs_widget.clear()
|
|
|
|
tab_order = self.get_tab_order()
|
|
|
|
if not tab_order:
|
|
return
|
|
|
|
for tab_name in tab_order:
|
|
if tab_name in self.settings.get("lights", {}):
|
|
tab_pane = TabPane(tab_name, id=f"tab-{tab_name}")
|
|
tabs_widget.add_pane(tab_pane)
|
|
self.create_tab_content(tab_pane, tab_name)
|
|
|
|
# Select first tab if available
|
|
if tab_order:
|
|
tabs_widget.active = f"tab-{tab_order[0]}"
|
|
self.current_tab_name = tab_order[0]
|
|
|
|
def create_tab_content(self, tab_pane: TabPane, tab_name: str) -> None:
|
|
"""Create the content for a tab."""
|
|
light_data = self.settings["lights"][tab_name]
|
|
ids = light_data["names"]
|
|
initial_settings = light_data["settings"]
|
|
|
|
initial_pattern = initial_settings.get("pattern", "on")
|
|
initial_brightness = initial_settings.get("brightness", 127)
|
|
pattern_settings = self.get_pattern_settings(tab_name, initial_pattern)
|
|
|
|
initial_colors = pattern_settings["colors"]
|
|
initial_hex_color = initial_colors[0] if initial_colors else "#000000"
|
|
initial_delay = pattern_settings["delay"]
|
|
initial_n1 = pattern_settings["n1"]
|
|
initial_n2 = pattern_settings["n2"]
|
|
initial_n3 = pattern_settings["n3"]
|
|
initial_n4 = pattern_settings["n4"]
|
|
|
|
initial_r, initial_g, initial_b = color_utils.hex_to_rgb(initial_hex_color)
|
|
|
|
with tab_pane:
|
|
with Container(classes="main-container"):
|
|
# Left panel with sliders and n params
|
|
with Container(classes="left-panel"):
|
|
# IDs display
|
|
with Container(classes="ids-display"):
|
|
yield Label("IDs: ", classes="ids-label")
|
|
ids_text = ", ".join(str(id) for id in ids)
|
|
yield Label(ids_text)
|
|
|
|
# RGB Sliders
|
|
yield Label("Red", classes="slider-label")
|
|
red_slider = Slider(
|
|
min=0,
|
|
max=255,
|
|
value=initial_r,
|
|
step=1,
|
|
id=f"red-slider-{tab_name}"
|
|
)
|
|
yield red_slider
|
|
|
|
yield Label("Green", classes="slider-label")
|
|
green_slider = Slider(
|
|
min=0,
|
|
max=255,
|
|
value=initial_g,
|
|
step=1,
|
|
id=f"green-slider-{tab_name}"
|
|
)
|
|
yield green_slider
|
|
|
|
yield Label("Blue", classes="slider-label")
|
|
blue_slider = Slider(
|
|
min=0,
|
|
max=255,
|
|
value=initial_b,
|
|
step=1,
|
|
id=f"blue-slider-{tab_name}"
|
|
)
|
|
yield blue_slider
|
|
|
|
yield Label("Brightness", classes="slider-label")
|
|
brightness_slider = Slider(
|
|
min=0,
|
|
max=255,
|
|
value=initial_brightness,
|
|
step=1,
|
|
id=f"brightness-slider-{tab_name}"
|
|
)
|
|
yield brightness_slider
|
|
|
|
yield Label("Delay (ms)", classes="slider-label")
|
|
pattern_config = self.patterns.get(initial_pattern, {})
|
|
min_delay = pattern_config.get("min_delay", 10)
|
|
max_delay = pattern_config.get("max_delay", 10000)
|
|
initial_delay_slider_pos = delay_to_slider(initial_delay, min_delay, max_delay)
|
|
delay_slider = Slider(
|
|
min=0,
|
|
max=1000,
|
|
value=int(initial_delay_slider_pos),
|
|
step=1,
|
|
id=f"delay-slider-{tab_name}"
|
|
)
|
|
yield delay_slider
|
|
yield Label(f"{initial_delay} ms", id=f"delay-value-{tab_name}")
|
|
|
|
# N parameters
|
|
with Grid(classes="n-params-grid"):
|
|
for i in range(1, 5):
|
|
with Container(classes="n-param-container"):
|
|
yield Label(f"n{i}", classes="n-param-label")
|
|
n_value = pattern_settings[f"n{i}"]
|
|
n_input = Input(
|
|
value=str(n_value),
|
|
classes="n-param-input",
|
|
id=f"n{i}-input-{tab_name}"
|
|
)
|
|
yield n_input
|
|
|
|
# Right panel with patterns and color palette
|
|
with Container(classes="right-panel"):
|
|
# Patterns section
|
|
yield Label("Patterns:", classes="slider-label")
|
|
with Container(classes="pattern-buttons"):
|
|
for pattern_name in self.patterns.keys():
|
|
pattern_btn = PatternButton(
|
|
pattern_name,
|
|
pattern_name,
|
|
classes="pattern-button",
|
|
id=f"pattern-{pattern_name}-{tab_name}"
|
|
)
|
|
yield pattern_btn
|
|
|
|
# Color palette section
|
|
yield Label("Color Palette:", classes="slider-label")
|
|
with Container(classes="color-palette", id=f"palette-{tab_name}"):
|
|
for i, hex_color in enumerate(initial_colors):
|
|
swatch = ColorSwatch(
|
|
hex_color,
|
|
i,
|
|
classes="color-swatch",
|
|
id=f"swatch-{i}-{tab_name}"
|
|
)
|
|
yield swatch
|
|
|
|
# Store tab data
|
|
self.tabs_data[tab_name] = {
|
|
"colors_in_palette": initial_colors.copy(),
|
|
"selected_color_index": 0,
|
|
"pattern": initial_pattern,
|
|
}
|
|
|
|
def get_tab_order(self):
|
|
"""Get the tab order from current profile, or create default order."""
|
|
if "tab_order" not in self.settings:
|
|
if "lights" in self.settings:
|
|
self.settings["tab_order"] = list(self.settings["lights"].keys())
|
|
else:
|
|
self.settings["tab_order"] = []
|
|
return self.settings.get("tab_order", [])
|
|
|
|
def get_pattern_settings(self, tab_name, pattern_name):
|
|
"""Get pattern-specific settings."""
|
|
light_settings = self.settings["lights"][tab_name]["settings"]
|
|
if "patterns" not in light_settings:
|
|
light_settings["patterns"] = {}
|
|
if pattern_name not in light_settings["patterns"]:
|
|
light_settings["patterns"][pattern_name] = {}
|
|
|
|
pattern_settings = light_settings["patterns"][pattern_name]
|
|
return {
|
|
"colors": pattern_settings.get("colors", ["#000000"]),
|
|
"delay": pattern_settings.get("delay", 100),
|
|
"n1": pattern_settings.get("n1", 10),
|
|
"n2": pattern_settings.get("n2", 10),
|
|
"n3": pattern_settings.get("n3", 10),
|
|
"n4": pattern_settings.get("n4", 10),
|
|
}
|
|
|
|
def save_pattern_settings(self, tab_name, pattern_name, colors=None, delay=None, n_params=None):
|
|
"""Save pattern-specific settings."""
|
|
light_settings = self.settings["lights"][tab_name]["settings"]
|
|
if "patterns" not in light_settings:
|
|
light_settings["patterns"] = {}
|
|
if pattern_name not in light_settings["patterns"]:
|
|
light_settings["patterns"][pattern_name] = {}
|
|
|
|
pattern_settings = light_settings["patterns"][pattern_name]
|
|
if colors is not None:
|
|
pattern_settings["colors"] = colors
|
|
if delay is not None:
|
|
pattern_settings["delay"] = delay
|
|
if n_params is not None:
|
|
for i in range(1, 5):
|
|
if f"n{i}" in n_params:
|
|
pattern_settings[f"n{i}"] = n_params[f"n{i}"]
|
|
|
|
@on(Tabs.TabActivated)
|
|
def on_tab_change(self, event: Tabs.TabActivated) -> None:
|
|
"""Handle tab change."""
|
|
tab_id = event.tab.id
|
|
if tab_id and tab_id.startswith("tab-"):
|
|
self.current_tab_name = tab_id[4:] # Remove "tab-" prefix
|
|
|
|
@on(Slider.Changed)
|
|
def on_slider_changed(self, event: Slider.Changed) -> None:
|
|
"""Handle slider changes."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
slider_id = event.slider.id
|
|
if slider_id.startswith("red-slider-") or slider_id.startswith("green-slider-") or slider_id.startswith("blue-slider-"):
|
|
self.schedule_update_rgb()
|
|
elif slider_id.startswith("brightness-slider-"):
|
|
self.schedule_update_brightness()
|
|
elif slider_id.startswith("delay-slider-"):
|
|
self.schedule_update_delay()
|
|
|
|
@on(Input.Changed)
|
|
def on_input_changed(self, event: Input.Changed) -> None:
|
|
"""Handle input changes (n parameters)."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
input_id = event.input.id
|
|
if input_id and input_id.startswith("n") and input_id.endswith(f"-{self.current_tab_name}"):
|
|
self.schedule_update_n_params()
|
|
|
|
@on(Button.Pressed)
|
|
def on_pattern_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle pattern button press."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
button = event.button
|
|
if isinstance(button, PatternButton) and self.current_tab_name in button.id:
|
|
pattern_name = button.pattern_name
|
|
self.send_pattern(self.current_tab_name, pattern_name)
|
|
|
|
@on(ColorSelected)
|
|
def on_color_selected(self, event: ColorSelected) -> None:
|
|
"""Handle color swatch selection."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
tab_data = self.tabs_data.get(self.current_tab_name)
|
|
if tab_data and 0 <= event.index < len(tab_data["colors_in_palette"]):
|
|
tab_data["selected_color_index"] = event.index
|
|
# Update RGB sliders
|
|
hex_color = tab_data["colors_in_palette"][event.index]
|
|
r, g, b = color_utils.hex_to_rgb(hex_color)
|
|
try:
|
|
self.query_one(f"#red-slider-{self.current_tab_name}", Slider).value = r
|
|
self.query_one(f"#green-slider-{self.current_tab_name}", Slider).value = g
|
|
self.query_one(f"#blue-slider-{self.current_tab_name}", Slider).value = b
|
|
except Exception:
|
|
pass # Widgets might not exist yet
|
|
|
|
def schedule_update_rgb(self) -> None:
|
|
"""Schedule RGB update with debouncing."""
|
|
current_time = time.time() * 1000
|
|
if self.pending_rgb_update:
|
|
self.pending_rgb_update.cancel()
|
|
|
|
if current_time - self.last_rgb_update_time >= self.rgb_update_interval_ms:
|
|
self.update_rgb()
|
|
self.last_rgb_update_time = current_time
|
|
else:
|
|
delay = int(self.rgb_update_interval_ms - (current_time - self.last_rgb_update_time))
|
|
self.pending_rgb_update = self.set_timer(delay / 1000, self.update_rgb)
|
|
|
|
def schedule_update_brightness(self) -> None:
|
|
"""Schedule brightness update with debouncing."""
|
|
current_time = time.time() * 1000
|
|
if self.pending_brightness_update:
|
|
self.pending_brightness_update.cancel()
|
|
|
|
if current_time - self.last_brightness_update_time >= self.brightness_update_interval_ms:
|
|
self.update_brightness()
|
|
self.last_brightness_update_time = current_time
|
|
else:
|
|
delay = int(self.brightness_update_interval_ms - (current_time - self.last_brightness_update_time))
|
|
self.pending_brightness_update = self.set_timer(delay / 1000, self.update_brightness)
|
|
|
|
def schedule_update_delay(self) -> None:
|
|
"""Schedule delay update with debouncing."""
|
|
current_time = time.time() * 1000
|
|
if self.pending_delay_update:
|
|
self.pending_delay_update.cancel()
|
|
|
|
if current_time - self.last_delay_update_time >= self.delay_update_interval_ms:
|
|
self.update_delay()
|
|
self.last_delay_update_time = current_time
|
|
else:
|
|
delay = int(self.delay_update_interval_ms - (current_time - self.last_delay_update_time))
|
|
self.pending_delay_update = self.set_timer(delay / 1000, self.update_delay)
|
|
|
|
def schedule_update_n_params(self) -> None:
|
|
"""Schedule n params update with debouncing."""
|
|
current_time = time.time() * 1000
|
|
if self.pending_n_params_update:
|
|
self.pending_n_params_update.cancel()
|
|
|
|
if current_time - self.last_n_params_update_time >= self.n_params_update_interval_ms:
|
|
self.update_n_params()
|
|
self.last_n_params_update_time = current_time
|
|
else:
|
|
delay = int(self.n_params_update_interval_ms - (current_time - self.last_n_params_update_time))
|
|
self.pending_n_params_update = self.set_timer(delay / 1000, self.update_n_params)
|
|
|
|
@work(exclusive=False)
|
|
async def update_rgb(self) -> None:
|
|
"""Update RGB values."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
try:
|
|
red_slider = self.query_one(f"#red-slider-{self.current_tab_name}", Slider)
|
|
green_slider = self.query_one(f"#green-slider-{self.current_tab_name}", Slider)
|
|
blue_slider = self.query_one(f"#blue-slider-{self.current_tab_name}", Slider)
|
|
|
|
r = red_slider.value
|
|
g = green_slider.value
|
|
b = blue_slider.value
|
|
|
|
hex_color = f"#{r:02x}{g:02x}{b:02x}"
|
|
|
|
tab_data = self.tabs_data[self.current_tab_name]
|
|
selected_index = tab_data["selected_color_index"]
|
|
if 0 <= selected_index < len(tab_data["colors_in_palette"]):
|
|
tab_data["colors_in_palette"][selected_index] = hex_color
|
|
|
|
selected_server = self.current_tab_name
|
|
names = self.settings["lights"][selected_server]["names"]
|
|
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
|
|
|
|
colors_to_send = []
|
|
if current_pattern == "transition":
|
|
colors_to_send = tab_data["colors_in_palette"].copy()
|
|
elif current_pattern in ["on", "theater_chase", "flicker"]:
|
|
colors_to_send = [hex_color]
|
|
else:
|
|
colors_to_send = tab_data["colors_in_palette"].copy()
|
|
|
|
pattern_config = self.patterns.get(current_pattern, {})
|
|
min_delay = pattern_config.get("min_delay", 10)
|
|
max_delay = pattern_config.get("max_delay", 10000)
|
|
|
|
delay_slider = self.query_one(f"#delay-slider-{self.current_tab_name}", Slider)
|
|
delay = slider_to_delay(delay_slider.value, min_delay, max_delay)
|
|
|
|
payload = {
|
|
"save": True,
|
|
"names": names,
|
|
"settings": {
|
|
"colors": colors_to_send,
|
|
"brightness": self.query_one(f"#brightness-slider-{self.current_tab_name}", Slider).value,
|
|
"delay": delay,
|
|
"pattern": current_pattern,
|
|
"n1": int(self.query_one(f"#n1-input-{self.current_tab_name}", Input).value),
|
|
"n2": int(self.query_one(f"#n2-input-{self.current_tab_name}", Input).value),
|
|
"n3": int(self.query_one(f"#n3-input-{self.current_tab_name}", Input).value),
|
|
"n4": int(self.query_one(f"#n4-input-{self.current_tab_name}", Input).value),
|
|
},
|
|
}
|
|
|
|
# Save pattern-specific settings
|
|
n_params = {
|
|
f"n{i}": int(self.query_one(f"#n{i}-input-{self.current_tab_name}", Input).value)
|
|
for i in range(1, 5)
|
|
}
|
|
self.save_pattern_settings(
|
|
selected_server,
|
|
current_pattern,
|
|
colors=tab_data["colors_in_palette"].copy(),
|
|
delay=delay,
|
|
n_params=n_params
|
|
)
|
|
|
|
self.settings["lights"][selected_server]["settings"]["brightness"] = payload["settings"]["brightness"]
|
|
self.settings.save()
|
|
|
|
await self.websocket_client.send_data(payload)
|
|
except Exception as e:
|
|
self.log(f"Error updating RGB: {e}")
|
|
|
|
@work(exclusive=False)
|
|
async def update_brightness(self) -> None:
|
|
"""Update brightness."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
try:
|
|
brightness_slider = self.query_one(f"#brightness-slider-{self.current_tab_name}", Slider)
|
|
brightness = brightness_slider.value
|
|
|
|
selected_server = self.current_tab_name
|
|
names = self.settings["lights"][selected_server]["names"]
|
|
|
|
payload = {
|
|
"save": True,
|
|
"names": names,
|
|
"settings": {
|
|
"brightness": brightness,
|
|
},
|
|
}
|
|
|
|
self.settings["lights"][selected_server]["settings"]["brightness"] = brightness
|
|
self.settings.save()
|
|
|
|
await self.websocket_client.send_data(payload)
|
|
except Exception as e:
|
|
self.log(f"Error updating brightness: {e}")
|
|
|
|
@work(exclusive=False)
|
|
async def update_delay(self) -> None:
|
|
"""Update delay."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
try:
|
|
selected_server = self.current_tab_name
|
|
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
|
|
pattern_config = self.patterns.get(current_pattern, {})
|
|
min_delay = pattern_config.get("min_delay", 10)
|
|
max_delay = pattern_config.get("max_delay", 10000)
|
|
|
|
delay_slider = self.query_one(f"#delay-slider-{self.current_tab_name}", Slider)
|
|
slider_value = delay_slider.value
|
|
delay = slider_to_delay(slider_value, min_delay, max_delay)
|
|
|
|
# Update delay value label
|
|
delay_label = self.query_one(f"#delay-value-{self.current_tab_name}", Label)
|
|
delay_label.update(f"{delay} ms")
|
|
|
|
names = self.settings["lights"][selected_server]["names"]
|
|
payload = {
|
|
"save": True,
|
|
"names": names,
|
|
"settings": {
|
|
"delay": delay,
|
|
},
|
|
}
|
|
|
|
self.save_pattern_settings(selected_server, current_pattern, delay=delay)
|
|
self.settings.save()
|
|
|
|
await self.websocket_client.send_data(payload)
|
|
except Exception as e:
|
|
self.log(f"Error updating delay: {e}")
|
|
|
|
@work(exclusive=False)
|
|
async def update_n_params(self) -> None:
|
|
"""Update n parameters."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
try:
|
|
n_params = {}
|
|
for i in range(1, 5):
|
|
n_input = self.query_one(f"#n{i}-input-{self.current_tab_name}", Input)
|
|
n_params[f"n{i}"] = int(n_input.value)
|
|
|
|
selected_server = self.current_tab_name
|
|
names = self.settings["lights"][selected_server]["names"]
|
|
payload = {
|
|
"save": True,
|
|
"names": names,
|
|
"settings": n_params,
|
|
}
|
|
|
|
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
|
|
self.save_pattern_settings(selected_server, current_pattern, n_params=n_params)
|
|
self.settings.save()
|
|
|
|
await self.websocket_client.send_data(payload)
|
|
except Exception as e:
|
|
self.log(f"Error updating n params: {e}")
|
|
|
|
@work(exclusive=False)
|
|
async def send_pattern(self, tab_name: str, pattern_name: str) -> None:
|
|
"""Send pattern change to server."""
|
|
try:
|
|
names = self.settings["lights"][tab_name]["names"]
|
|
|
|
# Save current pattern's settings
|
|
old_pattern = self.settings["lights"][tab_name]["settings"].get("pattern", "on")
|
|
tab_data = self.tabs_data[tab_name]
|
|
|
|
# Get current delay
|
|
delay_slider = self.query_one(f"#delay-slider-{tab_name}", Slider)
|
|
pattern_config = self.patterns.get(old_pattern, {})
|
|
min_delay = pattern_config.get("min_delay", 10)
|
|
max_delay = pattern_config.get("max_delay", 10000)
|
|
current_delay = slider_to_delay(delay_slider.value, min_delay, max_delay)
|
|
|
|
# Get current n params
|
|
current_n_params = {
|
|
f"n{i}": int(self.query_one(f"#n{i}-input-{tab_name}", Input).value)
|
|
for i in range(1, 5)
|
|
}
|
|
|
|
self.save_pattern_settings(
|
|
tab_name,
|
|
old_pattern,
|
|
colors=tab_data["colors_in_palette"].copy(),
|
|
delay=current_delay,
|
|
n_params=current_n_params
|
|
)
|
|
|
|
# Load new pattern's settings
|
|
new_pattern_settings = self.get_pattern_settings(tab_name, pattern_name)
|
|
|
|
# Update UI with new pattern's settings
|
|
tab_data["colors_in_palette"] = new_pattern_settings["colors"].copy()
|
|
tab_data["pattern"] = pattern_name
|
|
|
|
# Update delay slider
|
|
pattern_config = self.patterns.get(pattern_name, {})
|
|
min_delay = pattern_config.get("min_delay", 10)
|
|
max_delay = pattern_config.get("max_delay", 10000)
|
|
delay_slider_pos = delay_to_slider(new_pattern_settings["delay"], min_delay, max_delay)
|
|
delay_slider.value = int(delay_slider_pos)
|
|
delay_label = self.query_one(f"#delay-value-{tab_name}", Label)
|
|
delay_label.update(f"{new_pattern_settings['delay']} ms")
|
|
|
|
# Update n params
|
|
for i in range(1, 5):
|
|
n_input = self.query_one(f"#n{i}-input-{tab_name}", Input)
|
|
n_input.value = str(new_pattern_settings[f"n{i}"])
|
|
|
|
# Update RGB sliders to first color
|
|
if tab_data["colors_in_palette"]:
|
|
hex_color = tab_data["colors_in_palette"][0]
|
|
r, g, b = color_utils.hex_to_rgb(hex_color)
|
|
self.query_one(f"#red-slider-{tab_name}", Slider).value = r
|
|
self.query_one(f"#green-slider-{tab_name}", Slider).value = g
|
|
self.query_one(f"#blue-slider-{tab_name}", Slider).value = b
|
|
tab_data["selected_color_index"] = 0
|
|
|
|
payload_settings = {
|
|
"pattern": pattern_name,
|
|
"brightness": self.settings["lights"][tab_name]["settings"].get("brightness", 127),
|
|
"delay": new_pattern_settings["delay"],
|
|
"colors": new_pattern_settings["colors"],
|
|
}
|
|
|
|
for i in range(1, 5):
|
|
payload_settings[f"n{i}"] = new_pattern_settings[f"n{i}"]
|
|
|
|
payload = {
|
|
"save": True,
|
|
"names": names,
|
|
"settings": payload_settings,
|
|
}
|
|
|
|
self.settings["lights"][tab_name]["settings"]["pattern"] = pattern_name
|
|
self.settings.save()
|
|
|
|
await self.websocket_client.send_data(payload)
|
|
except Exception as e:
|
|
self.log(f"Error sending pattern: {e}")
|
|
|
|
def action_add_tab(self) -> None:
|
|
"""Add a new tab."""
|
|
self.push_screen(AddTabScreen(), self.on_add_tab_result)
|
|
|
|
def on_add_tab_result(self, result) -> None:
|
|
"""Handle result from add tab dialog."""
|
|
if result is None:
|
|
return
|
|
|
|
tab_name, ids = result
|
|
|
|
if tab_name in self.settings.get("lights", {}):
|
|
self.bell()
|
|
return
|
|
|
|
# Create new tab entry
|
|
self.settings.setdefault("lights", {})[tab_name] = {
|
|
"names": ids,
|
|
"settings": {
|
|
"pattern": "on",
|
|
"brightness": 127,
|
|
"colors": ["#000000"],
|
|
"delay": 100,
|
|
"n1": 10,
|
|
"n2": 10,
|
|
"n3": 10,
|
|
"n4": 10,
|
|
"patterns": {}
|
|
}
|
|
}
|
|
|
|
if "tab_order" not in self.settings:
|
|
self.settings["tab_order"] = []
|
|
self.settings["tab_order"].append(tab_name)
|
|
self.save_profile()
|
|
|
|
# Recreate tabs
|
|
self.create_tabs()
|
|
|
|
def action_edit_tab(self) -> None:
|
|
"""Edit current tab."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
light_data = self.settings["lights"][self.current_tab_name]
|
|
current_ids = light_data["names"]
|
|
|
|
self.push_screen(
|
|
EditTabScreen(self.current_tab_name, current_ids),
|
|
self.on_edit_tab_result
|
|
)
|
|
|
|
def on_edit_tab_result(self, result) -> None:
|
|
"""Handle result from edit tab dialog."""
|
|
if result is None:
|
|
return
|
|
|
|
new_tab_name, ids = result
|
|
|
|
if new_tab_name != self.current_tab_name:
|
|
if new_tab_name in self.settings.get("lights", {}):
|
|
self.bell()
|
|
return
|
|
|
|
# Rename in settings
|
|
self.settings["lights"][new_tab_name] = self.settings["lights"][self.current_tab_name]
|
|
del self.settings["lights"][self.current_tab_name]
|
|
|
|
# Update tab order
|
|
if "tab_order" in self.settings and self.current_tab_name in self.settings["tab_order"]:
|
|
index = self.settings["tab_order"].index(self.current_tab_name)
|
|
self.settings["tab_order"][index] = new_tab_name
|
|
|
|
# Update IDs
|
|
self.settings["lights"][new_tab_name]["names"] = ids
|
|
self.save_profile()
|
|
|
|
# Recreate tabs
|
|
self.create_tabs()
|
|
|
|
def action_delete_tab(self) -> None:
|
|
"""Delete current tab."""
|
|
if not self.current_tab_name:
|
|
return
|
|
|
|
# Delete from settings
|
|
del self.settings["lights"][self.current_tab_name]
|
|
|
|
# Remove from tab order
|
|
if "tab_order" in self.settings and self.current_tab_name in self.settings["tab_order"]:
|
|
self.settings["tab_order"].remove(self.current_tab_name)
|
|
|
|
self.save_profile()
|
|
|
|
# Recreate tabs
|
|
self.create_tabs()
|
|
|
|
def action_profiles(self) -> None:
|
|
"""Open profiles menu."""
|
|
# TODO: Implement profiles menu
|
|
self.bell()
|
|
|
|
def save_profile(self) -> None:
|
|
"""Save current settings to the active profile."""
|
|
current_profile = self.settings.get("current_profile")
|
|
if not current_profile:
|
|
return
|
|
|
|
try:
|
|
profile_path = os.path.join("profiles", f"{current_profile}.json")
|
|
os.makedirs("profiles", exist_ok=True)
|
|
|
|
tab_order = self.get_tab_order()
|
|
self.settings["tab_order"] = tab_order
|
|
|
|
profile_data = dict(self.settings)
|
|
profile_data.pop("current_profile", None)
|
|
with open(profile_path, 'w') as file:
|
|
json.dump(profile_data, file, indent=4)
|
|
except Exception as e:
|
|
self.log(f"Error saving profile: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = LightingControllerApp()
|
|
app.run()
|
|
|