Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
58806ef654 |
105
settings.json
105
settings.json
@@ -1,9 +1,8 @@
|
|||||||
{
|
{
|
||||||
"lights": {
|
"lights": {
|
||||||
"sign": {
|
"1": {
|
||||||
"names": [
|
"names": [
|
||||||
"tt-sign",
|
"10"
|
||||||
"1"
|
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
"colors": [
|
"colors": [
|
||||||
@@ -12,13 +11,13 @@
|
|||||||
"#00ff00"
|
"#00ff00"
|
||||||
],
|
],
|
||||||
"brightness": 9,
|
"brightness": 9,
|
||||||
"pattern": "off",
|
"pattern": "on",
|
||||||
"delay": 50
|
"delay": 50
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"dj": {
|
"2": {
|
||||||
"names": [
|
"names": [
|
||||||
"dj"
|
"13"
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
"colors": [
|
"colors": [
|
||||||
@@ -32,12 +31,9 @@
|
|||||||
"delay": 520
|
"delay": 520
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"middle": {
|
"3": {
|
||||||
"names": [
|
"names": [
|
||||||
"middle1",
|
"11"
|
||||||
"middle2",
|
|
||||||
"middle3",
|
|
||||||
"middle4"
|
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
"colors": [
|
"colors": [
|
||||||
@@ -51,10 +47,11 @@
|
|||||||
"delay": 520
|
"delay": 520
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"sides": {
|
"all": {
|
||||||
"names": [
|
"names": [
|
||||||
"left",
|
"10",
|
||||||
"right"
|
"11",
|
||||||
|
"13"
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
"colors": [
|
"colors": [
|
||||||
@@ -67,86 +64,6 @@
|
|||||||
"pattern": "on",
|
"pattern": "on",
|
||||||
"delay": 520
|
"delay": 520
|
||||||
}
|
}
|
||||||
},
|
|
||||||
"outside": {
|
|
||||||
"names": [
|
|
||||||
"outside"
|
|
||||||
],
|
|
||||||
"settings": {
|
|
||||||
"colors": [
|
|
||||||
"#0000ff",
|
|
||||||
"#c30074",
|
|
||||||
"#00ff00",
|
|
||||||
"#000000"
|
|
||||||
],
|
|
||||||
"brightness": 6,
|
|
||||||
"pattern": "on",
|
|
||||||
"delay": 520
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"middle1": {
|
|
||||||
"names": [
|
|
||||||
"middle1"
|
|
||||||
],
|
|
||||||
"settings": {
|
|
||||||
"colors": [
|
|
||||||
"#0000ff",
|
|
||||||
"#c30074",
|
|
||||||
"#00ff00",
|
|
||||||
"#000000"
|
|
||||||
],
|
|
||||||
"brightness": 6,
|
|
||||||
"pattern": "flicker",
|
|
||||||
"delay": 520
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"middle2": {
|
|
||||||
"names": [
|
|
||||||
"middle2"
|
|
||||||
],
|
|
||||||
"settings": {
|
|
||||||
"colors": [
|
|
||||||
"#0000ff",
|
|
||||||
"#c30074",
|
|
||||||
"#00ff00",
|
|
||||||
"#000000"
|
|
||||||
],
|
|
||||||
"brightness": 6,
|
|
||||||
"pattern": "flicker",
|
|
||||||
"delay": 520
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"middle3": {
|
|
||||||
"names": [
|
|
||||||
"middle3"
|
|
||||||
],
|
|
||||||
"settings": {
|
|
||||||
"colors": [
|
|
||||||
"#0000ff",
|
|
||||||
"#c30074",
|
|
||||||
"#00ff00",
|
|
||||||
"#000000"
|
|
||||||
],
|
|
||||||
"brightness": 6,
|
|
||||||
"pattern": "flicker",
|
|
||||||
"delay": 520
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"middle4": {
|
|
||||||
"names": [
|
|
||||||
"middle4"
|
|
||||||
],
|
|
||||||
"settings": {
|
|
||||||
"colors": [
|
|
||||||
"#0000ff",
|
|
||||||
"#c30074",
|
|
||||||
"#00ff00",
|
|
||||||
"#000000"
|
|
||||||
],
|
|
||||||
"brightness": 6,
|
|
||||||
"pattern": "flicker",
|
|
||||||
"delay": 520
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"patterns": [
|
"patterns": [
|
||||||
|
@@ -1,38 +0,0 @@
|
|||||||
# LED Bar Configuration
|
|
||||||
# Modify these names as needed for your setup
|
|
||||||
|
|
||||||
# LED Bar Names/IDs - 4 left bars + 4 right bars
|
|
||||||
LED_BAR_NAMES = [
|
|
||||||
"100", # Left Bar 1
|
|
||||||
"101", # Left Bar 2
|
|
||||||
"102", # Left Bar 3
|
|
||||||
"103", # Left Bar 4
|
|
||||||
"104", # Right Bar 1
|
|
||||||
"105", # Right Bar 2
|
|
||||||
"106", # Right Bar 3
|
|
||||||
"107", # Right Bar 4
|
|
||||||
]
|
|
||||||
|
|
||||||
# Left and right bar groups for spatial control
|
|
||||||
LEFT_BARS = ["100", "101", "102", "103"]
|
|
||||||
RIGHT_BARS = ["104", "105", "106", "107"]
|
|
||||||
|
|
||||||
# Number of LED bars
|
|
||||||
NUM_BARS = len(LED_BAR_NAMES)
|
|
||||||
|
|
||||||
# Default settings for all bars
|
|
||||||
DEFAULT_BAR_SETTINGS = {
|
|
||||||
"pattern": "pulse",
|
|
||||||
"delay": 100,
|
|
||||||
"colors": [(0, 255, 0)], # Default green
|
|
||||||
"brightness": 10,
|
|
||||||
"num_leds": 200,
|
|
||||||
"n1": 10,
|
|
||||||
"n2": 10,
|
|
||||||
"n3": 1,
|
|
||||||
"n": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# ESP-NOW broadcast settings
|
|
||||||
ESP_NOW_CHANNEL = 1
|
|
||||||
ESP_NOW_ENCRYPTION = False
|
|
917
src/main.py
917
src/main.py
@@ -5,9 +5,8 @@ import json
|
|||||||
from async_tkinter_loop import async_handler, async_mainloop
|
from async_tkinter_loop import async_handler, async_mainloop
|
||||||
from networking import WebSocketClient
|
from networking import WebSocketClient
|
||||||
import color_utils
|
import color_utils
|
||||||
|
from settings import Settings
|
||||||
import time
|
import time
|
||||||
from midi import MidiHandler # Import MidiHandler
|
|
||||||
|
|
||||||
# Dark theme colors (unchanged)
|
# Dark theme colors (unchanged)
|
||||||
bg_color = "#2e2e2e"
|
bg_color = "#2e2e2e"
|
||||||
@@ -25,24 +24,28 @@ active_palette_color_border = "#FFD700" # Gold color
|
|||||||
|
|
||||||
class App:
|
class App:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
|
self.settings = Settings()
|
||||||
self.root = tk.Tk()
|
self.root = tk.Tk()
|
||||||
# self.root.attributes("-fullscreen", True)
|
self.root.attributes("-fullscreen", True)
|
||||||
self.root.configure(bg=bg_color)
|
self.root.configure(bg=bg_color)
|
||||||
|
|
||||||
|
# Debouncing variables (remain the same)
|
||||||
|
self.last_rgb_update_time = 0
|
||||||
|
self.rgb_update_interval_ms = 100
|
||||||
|
self.pending_rgb_update_id = None
|
||||||
|
|
||||||
|
self.last_brightness_update_time = 0
|
||||||
|
self.brightness_update_interval_ms = 100
|
||||||
|
self.pending_brightness_update_id = None
|
||||||
|
|
||||||
|
self.last_delay_update_time = 0
|
||||||
|
self.delay_update_interval_ms = 100
|
||||||
|
self.pending_delay_update_id = None
|
||||||
|
|
||||||
# --- WebSocketClient ---
|
# --- WebSocketClient ---
|
||||||
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
|
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
|
||||||
self.root.after(100, async_handler(self.websocket_client.connect))
|
self.root.after(100, async_handler(self.websocket_client.connect))
|
||||||
|
|
||||||
# --- MIDI Handler ---
|
|
||||||
MIDI_PORT_INDEX = 1 # Adjust as needed
|
|
||||||
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
|
|
||||||
print(f"Initializing MIDI handler with port index {MIDI_PORT_INDEX}")
|
|
||||||
self.midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
|
|
||||||
print("MIDI handler initialized")
|
|
||||||
self.midi_task: asyncio.Task | None = None
|
|
||||||
# Start MIDI in background
|
|
||||||
self.root.after(0, async_handler(self.start_midi))
|
|
||||||
|
|
||||||
# Configure ttk style (unchanged)
|
# Configure ttk style (unchanged)
|
||||||
style = ttk.Style()
|
style = ttk.Style()
|
||||||
style.theme_use("alt")
|
style.theme_use("alt")
|
||||||
@@ -51,255 +54,691 @@ class App:
|
|||||||
style.configure(
|
style.configure(
|
||||||
"TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5]
|
"TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5]
|
||||||
)
|
)
|
||||||
|
style.map("TNotebook.Tab", background=[("selected", active_bg_color)], foreground=[("selected", fg_color)])
|
||||||
|
style.configure("TFrame", background=bg_color)
|
||||||
|
|
||||||
# (Status box removed per request)
|
# Create Notebook for tabs (unchanged)
|
||||||
|
self.notebook = ttk.Notebook(self.root)
|
||||||
|
self.notebook.pack(expand=1, fill="both")
|
||||||
|
|
||||||
# Controls overview (dials grid left, buttons grids right)
|
self.tabs = {}
|
||||||
controls_frame = ttk.Frame(self.root)
|
self.create_tabs()
|
||||||
controls_frame.pack(padx=16, pady=8, fill="both")
|
|
||||||
|
|
||||||
# Dials box: 4 rows by 2 columns (top-left origin):
|
self.notebook.bind("<<NotebookTabChanged>>", self.on_tab_change)
|
||||||
# Row0: n3 (left), Delay (right)
|
|
||||||
# Row1: n1 (left), n2 (right)
|
|
||||||
# Row2: B (left), Brightness (right)
|
|
||||||
# Row3: R (left), G (right)
|
|
||||||
dials_frame = ttk.LabelFrame(controls_frame, text="Dials (CC30-37)")
|
|
||||||
dials_frame.pack(side="left", padx=12)
|
|
||||||
for c in range(2):
|
|
||||||
dials_frame.grid_columnconfigure(c, minsize=140)
|
|
||||||
for rr in range(4):
|
|
||||||
dials_frame.grid_rowconfigure(rr, minsize=70)
|
|
||||||
|
|
||||||
self.dials_boxes: list[tk.Label] = []
|
# Add Reload Config Button (unchanged)
|
||||||
# Create with placeholders so they are visible before first update
|
reload_button = tk.Button(
|
||||||
placeholders = {
|
self.root,
|
||||||
(0, 0): "n3\n-",
|
text="Reload Config",
|
||||||
(0, 1): "Delay\n-",
|
command=self.reload_config,
|
||||||
(1, 0): "n1\n-",
|
bg=active_bg_color,
|
||||||
(1, 1): "n2\n-",
|
fg=fg_color,
|
||||||
(2, 0): "B\n-",
|
font=("Arial", 20),
|
||||||
(2, 1): "Bright\n-",
|
padx=20,
|
||||||
(3, 0): "R\n-",
|
pady=10,
|
||||||
(3, 1): "G\n-",
|
relief=tk.FLAT,
|
||||||
}
|
)
|
||||||
for r in range(4):
|
reload_button.pack(side=tk.BOTTOM, pady=20)
|
||||||
for c in range(2):
|
|
||||||
lbl = tk.Label(
|
|
||||||
dials_frame,
|
|
||||||
text=placeholders.get((r, c), "-"),
|
|
||||||
bg=bg_color,
|
|
||||||
fg=fg_color,
|
|
||||||
font=("Arial", 14),
|
|
||||||
padx=6,
|
|
||||||
pady=6,
|
|
||||||
borderwidth=2,
|
|
||||||
relief="ridge",
|
|
||||||
width=14,
|
|
||||||
height=4,
|
|
||||||
anchor="center",
|
|
||||||
justify="center",
|
|
||||||
)
|
|
||||||
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
|
|
||||||
self.dials_boxes.append(lbl)
|
|
||||||
|
|
||||||
# Additional knobs box: 4 rows by 2 columns (CC38-45)
|
|
||||||
# Row0: K1 (left), K2 (right)
|
|
||||||
# Row1: K3 (left), K4 (right)
|
|
||||||
# Row2: K5 (left), K6 (right)
|
|
||||||
# Row3: K7 (left), K8 (right)
|
|
||||||
knobs_frame = ttk.LabelFrame(controls_frame, text="Knobs (CC38-45)")
|
|
||||||
knobs_frame.pack(side="left", padx=12)
|
|
||||||
for c in range(2):
|
|
||||||
knobs_frame.grid_columnconfigure(c, minsize=140)
|
|
||||||
for rr in range(4):
|
|
||||||
knobs_frame.grid_rowconfigure(rr, minsize=70)
|
|
||||||
|
|
||||||
self.knobs_boxes: list[tk.Label] = []
|
|
||||||
# Create with placeholders so they are visible before first update
|
|
||||||
knob_placeholders = {
|
|
||||||
(0, 0): "CC44\n-",
|
|
||||||
(0, 1): "CC45\n-",
|
|
||||||
(1, 0): "Rad n1\n-",
|
|
||||||
(1, 1): "Rad delay\n-",
|
|
||||||
(2, 0): "Alt n1\n-",
|
|
||||||
(2, 1): "Alt n2\n-",
|
|
||||||
(3, 0): "Pulse n1\n-",
|
|
||||||
(3, 1): "Pulse n2\n-",
|
|
||||||
}
|
|
||||||
for r in range(4):
|
|
||||||
for c in range(2):
|
|
||||||
lbl = tk.Label(
|
|
||||||
knobs_frame,
|
|
||||||
text=knob_placeholders.get((r, c), "-"),
|
|
||||||
bg=bg_color,
|
|
||||||
fg=fg_color,
|
|
||||||
font=("Arial", 14),
|
|
||||||
padx=6,
|
|
||||||
pady=6,
|
|
||||||
borderwidth=2,
|
|
||||||
relief="ridge",
|
|
||||||
width=14,
|
|
||||||
height=4,
|
|
||||||
anchor="center",
|
|
||||||
justify="center",
|
|
||||||
)
|
|
||||||
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
|
|
||||||
self.knobs_boxes.append(lbl)
|
|
||||||
|
|
||||||
# Buttons bank (single)
|
|
||||||
buttons_frame = ttk.Frame(controls_frame)
|
|
||||||
buttons_frame.pack(side="left", padx=12)
|
|
||||||
|
|
||||||
buttons1_frame = ttk.LabelFrame(buttons_frame, text="Buttons (notes 36-51)")
|
|
||||||
buttons1_frame.pack(side="top", pady=8)
|
|
||||||
for c in range(4):
|
|
||||||
buttons1_frame.grid_columnconfigure(c, minsize=140)
|
|
||||||
for rr in range(1, 5):
|
|
||||||
buttons1_frame.grid_rowconfigure(rr, minsize=70)
|
|
||||||
self.button1_cells: list[tk.Label] = []
|
|
||||||
for r in range(4):
|
|
||||||
for c in range(4):
|
|
||||||
lbl = tk.Label(
|
|
||||||
buttons1_frame,
|
|
||||||
text="",
|
|
||||||
bg=bg_color,
|
|
||||||
fg=fg_color,
|
|
||||||
font=("Arial", 14),
|
|
||||||
padx=6,
|
|
||||||
pady=6,
|
|
||||||
borderwidth=2,
|
|
||||||
relief="ridge",
|
|
||||||
width=14,
|
|
||||||
height=4,
|
|
||||||
anchor="center",
|
|
||||||
justify="center",
|
|
||||||
)
|
|
||||||
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
|
|
||||||
self.button1_cells.append(lbl)
|
|
||||||
|
|
||||||
# (No second buttons bank)
|
|
||||||
|
|
||||||
# (No status labels to pack)
|
|
||||||
|
|
||||||
# schedule periodic UI updates
|
|
||||||
self.root.after(200, self.update_status_labels)
|
|
||||||
|
|
||||||
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
|
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
|
||||||
|
|
||||||
async_mainloop(self.root)
|
async_mainloop(self.root)
|
||||||
|
|
||||||
@async_handler
|
|
||||||
async def start_midi(self):
|
|
||||||
# Launch MidiHandler.run() as a background task
|
|
||||||
if self.midi_task is None or self.midi_task.done():
|
|
||||||
self.midi_task = asyncio.create_task(self.midi_handler.run())
|
|
||||||
|
|
||||||
def on_closing(self):
|
def on_closing(self):
|
||||||
print("Closing application...")
|
print("Closing application...")
|
||||||
if self.midi_task and not self.midi_task.done():
|
|
||||||
self.midi_task.cancel()
|
|
||||||
asyncio.create_task(self.websocket_client.close())
|
asyncio.create_task(self.websocket_client.close())
|
||||||
self.root.destroy()
|
self.root.destroy()
|
||||||
|
|
||||||
|
def create_tabs(self):
|
||||||
|
for tab_name in list(self.tabs.keys()):
|
||||||
|
self.notebook.forget(self.tabs[tab_name])
|
||||||
|
del self.tabs[tab_name]
|
||||||
|
|
||||||
|
for key, value in self.settings["lights"].items():
|
||||||
|
tab = ttk.Frame(self.notebook)
|
||||||
|
self.notebook.add(tab, text=key)
|
||||||
|
self.create_light_control_widgets(tab, key, value["names"], value["settings"])
|
||||||
|
self.tabs[key] = tab
|
||||||
|
|
||||||
|
def create_light_control_widgets(self, tab, tab_name, ids, initial_settings):
|
||||||
|
slider_length = 800
|
||||||
|
slider_width = 50
|
||||||
|
|
||||||
|
# Extract initial color, brightness, and delay
|
||||||
|
initial_colors = initial_settings.get("colors", ["#000000"])
|
||||||
|
initial_hex_color = initial_colors[0] if initial_colors else "#000000"
|
||||||
|
initial_brightness = initial_settings.get("brightness", 127)
|
||||||
|
initial_delay = initial_settings.get("delay", 0)
|
||||||
|
initial_pattern = initial_settings.get("pattern", "on")
|
||||||
|
|
||||||
|
initial_r, initial_g, initial_b = color_utils.hex_to_rgb(initial_hex_color)
|
||||||
|
|
||||||
|
# Main frame to hold everything within the tab
|
||||||
|
main_tab_frame = tk.Frame(tab, bg=bg_color)
|
||||||
|
main_tab_frame.pack(expand=True, fill="both", padx=10, pady=10)
|
||||||
|
|
||||||
|
# Left panel for sliders
|
||||||
|
slider_panel_frame = tk.Frame(main_tab_frame, bg=bg_color)
|
||||||
|
slider_panel_frame.pack(side=tk.LEFT, padx=10, pady=10)
|
||||||
|
|
||||||
|
# Common slider configuration
|
||||||
|
slider_config = {
|
||||||
|
"from_": 255,
|
||||||
|
"to": 0,
|
||||||
|
"orient": tk.VERTICAL,
|
||||||
|
"length": slider_length,
|
||||||
|
"width": slider_width,
|
||||||
|
"bg": bg_color,
|
||||||
|
"fg": fg_color,
|
||||||
|
"highlightbackground": bg_color,
|
||||||
|
"activebackground": active_bg_color,
|
||||||
|
"resolution": 1,
|
||||||
|
"sliderlength": 70,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Red Slider
|
||||||
|
red_slider = tk.Scale(slider_panel_frame, label="Red", troughcolor=trough_color_red, **slider_config)
|
||||||
|
red_slider.set(initial_r)
|
||||||
|
red_slider.pack(side=tk.LEFT, padx=10)
|
||||||
|
red_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
|
||||||
|
red_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
|
||||||
|
|
||||||
|
# Green Slider
|
||||||
|
green_slider = tk.Scale(slider_panel_frame, label="Green", troughcolor=trough_color_green, **slider_config)
|
||||||
|
green_slider.set(initial_g)
|
||||||
|
green_slider.pack(side=tk.LEFT, padx=10)
|
||||||
|
green_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
|
||||||
|
green_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
|
||||||
|
|
||||||
|
# Blue Slider
|
||||||
|
blue_slider = tk.Scale(slider_panel_frame, label="Blue", troughcolor=trough_color_blue, **slider_config)
|
||||||
|
blue_slider.set(initial_b)
|
||||||
|
blue_slider.pack(side=tk.LEFT, padx=10)
|
||||||
|
blue_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
|
||||||
|
blue_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
|
||||||
|
|
||||||
|
# Brightness Slider
|
||||||
|
brightness_slider = tk.Scale(
|
||||||
|
slider_panel_frame, label="Brightness", troughcolor=trough_color_brightness, **slider_config
|
||||||
|
)
|
||||||
|
brightness_slider.set(initial_brightness)
|
||||||
|
brightness_slider.pack(side=tk.LEFT, padx=10)
|
||||||
|
brightness_slider.bind("<B1-Motion>", lambda _: self.schedule_update_brightness(tab))
|
||||||
|
brightness_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_brightness(tab, force_send=True))
|
||||||
|
|
||||||
|
# Delay Slider
|
||||||
|
delay_slider_config = slider_config.copy()
|
||||||
|
delay_slider_config.update(
|
||||||
|
{
|
||||||
|
"from_": 1000,
|
||||||
|
"to": 0,
|
||||||
|
"resolution": 10,
|
||||||
|
"label": "Delay (ms)",
|
||||||
|
"troughcolor": trough_color_delay,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
delay_slider = tk.Scale(slider_panel_frame, **delay_slider_config)
|
||||||
|
delay_slider.set(initial_delay)
|
||||||
|
delay_slider.pack(side=tk.LEFT, padx=10)
|
||||||
|
delay_slider.bind("<B1-Motion>", lambda _: self.schedule_update_delay(tab))
|
||||||
|
delay_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_delay(tab, force_send=True))
|
||||||
|
|
||||||
|
# Store references to widgets for this tab
|
||||||
|
tab.widgets = {
|
||||||
|
"red_slider": red_slider,
|
||||||
|
"green_slider": green_slider,
|
||||||
|
"blue_slider": blue_slider,
|
||||||
|
"brightness_slider": brightness_slider,
|
||||||
|
"delay_slider": delay_slider,
|
||||||
|
"selected_color_index": 0, # Default to the first color
|
||||||
|
}
|
||||||
|
tab.colors_in_palette = initial_colors.copy() # Store the list of hex colors for this tab
|
||||||
|
tab.color_swatch_frames = [] # To hold references to the color swatches
|
||||||
|
|
||||||
|
# Right panel for IDs, Patterns, and NEW Color Palette
|
||||||
|
right_panel_frame = tk.Frame(main_tab_frame, bg=bg_color)
|
||||||
|
right_panel_frame.pack(side=tk.LEFT, padx=20, pady=10, anchor="n", expand=True, fill="both")
|
||||||
|
|
||||||
|
# IDs section - MODIFIED TO BE SIDE-BY-SIDE
|
||||||
|
ids_frame = tk.Frame(right_panel_frame, bg=bg_color)
|
||||||
|
ids_frame.pack(pady=10, fill=tk.X)
|
||||||
|
tk.Label(ids_frame, text="Associated Names:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(pady=10)
|
||||||
|
|
||||||
|
# New inner frame for the IDs to be displayed horizontally
|
||||||
|
ids_inner_frame = tk.Frame(ids_frame, bg=bg_color)
|
||||||
|
ids_inner_frame.pack(fill=tk.X, expand=True) # Pack this frame to fill available width
|
||||||
|
|
||||||
|
for light_id in ids:
|
||||||
|
tk.Label(ids_inner_frame, text=str(light_id), font=("Arial", 18), bg=bg_color, fg=fg_color).pack(
|
||||||
|
side=tk.LEFT, padx=5, pady=2
|
||||||
|
) # Pack labels horizontally
|
||||||
|
|
||||||
|
# --- New Frame to hold Patterns and Color Palette side-by-side ---
|
||||||
|
patterns_and_palette_frame = tk.Frame(right_panel_frame, bg=bg_color)
|
||||||
|
patterns_and_palette_frame.pack(pady=20, fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
# Patterns section
|
||||||
|
patterns_frame = tk.Frame(patterns_and_palette_frame, bg=bg_color, bd=2, relief=tk.GROOVE)
|
||||||
|
patterns_frame.pack(side=tk.LEFT, padx=10, pady=5, fill=tk.BOTH, expand=True) # Pack to the left
|
||||||
|
tk.Label(patterns_frame, text="Patterns:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(pady=10)
|
||||||
|
|
||||||
|
tab.pattern_buttons = {}
|
||||||
|
patterns = self.settings.get("patterns", [])
|
||||||
|
for pattern_name in patterns:
|
||||||
|
button = tk.Button(
|
||||||
|
patterns_frame,
|
||||||
|
text=pattern_name,
|
||||||
|
command=lambda p=pattern_name: self.send_pattern(tab_name, p),
|
||||||
|
bg=active_bg_color,
|
||||||
|
fg=fg_color,
|
||||||
|
font=("Arial", 18),
|
||||||
|
padx=15,
|
||||||
|
pady=5,
|
||||||
|
relief=tk.FLAT,
|
||||||
|
)
|
||||||
|
button.pack(pady=5, fill=tk.X)
|
||||||
|
tab.pattern_buttons[pattern_name] = button
|
||||||
|
|
||||||
|
self.highlight_pattern_button(tab, initial_pattern)
|
||||||
|
|
||||||
|
# --- Color Palette Editor Section ---
|
||||||
|
color_palette_editor_frame = tk.Frame(patterns_and_palette_frame, bg=bg_color, bd=2, relief=tk.GROOVE)
|
||||||
|
color_palette_editor_frame.pack(side=tk.LEFT, padx=10, pady=5, fill=tk.BOTH, expand=True) # Pack to the left
|
||||||
|
tab.color_palette_editor_frame = color_palette_editor_frame # Store reference for update_ui_for_pattern
|
||||||
|
|
||||||
|
tk.Label(color_palette_editor_frame, text="Color Palette:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(
|
||||||
|
pady=10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Frame to hold color swatches (will be dynamic)
|
||||||
|
tab.color_swatches_container = tk.Frame(color_palette_editor_frame, bg=bg_color)
|
||||||
|
tab.color_swatches_container.pack(pady=5, fill=tk.BOTH, expand=True)
|
||||||
|
|
||||||
|
# Buttons for Add/Remove Color
|
||||||
|
palette_buttons_frame = tk.Frame(color_palette_editor_frame, bg=bg_color)
|
||||||
|
palette_buttons_frame.pack(pady=10, fill=tk.X)
|
||||||
|
|
||||||
|
add_color_button = tk.Button(
|
||||||
|
palette_buttons_frame,
|
||||||
|
text="Add Color",
|
||||||
|
command=lambda t=tab: self.add_color_to_palette(t),
|
||||||
|
bg=active_bg_color,
|
||||||
|
fg=fg_color,
|
||||||
|
font=("Arial", 16),
|
||||||
|
padx=10,
|
||||||
|
pady=5,
|
||||||
|
relief=tk.FLAT,
|
||||||
|
)
|
||||||
|
add_color_button.pack(side=tk.LEFT, expand=True, padx=5)
|
||||||
|
|
||||||
|
remove_color_button = tk.Button(
|
||||||
|
palette_buttons_frame,
|
||||||
|
text="Remove Selected",
|
||||||
|
command=lambda t=tab: self.remove_selected_color_from_palette(t),
|
||||||
|
bg=active_bg_color,
|
||||||
|
fg=fg_color,
|
||||||
|
font=("Arial", 16),
|
||||||
|
padx=10,
|
||||||
|
pady=5,
|
||||||
|
relief=tk.FLAT,
|
||||||
|
)
|
||||||
|
remove_color_button.pack(side=tk.RIGHT, expand=True, padx=5)
|
||||||
|
|
||||||
|
# Initial population of the color palette
|
||||||
|
self.refresh_color_palette_display(tab)
|
||||||
|
|
||||||
|
# The initial call to update_ui_for_pattern now only sets slider values and highlights
|
||||||
|
self.update_ui_for_pattern(tab, initial_pattern)
|
||||||
|
|
||||||
|
def refresh_color_palette_display(self, tab):
|
||||||
|
"""Clears and repopulates the color swatches in the palette display."""
|
||||||
|
# Clear existing swatches
|
||||||
|
for frame in tab.color_swatch_frames:
|
||||||
|
frame.destroy()
|
||||||
|
tab.color_swatch_frames.clear()
|
||||||
|
|
||||||
|
for i, hex_color in enumerate(tab.colors_in_palette):
|
||||||
|
swatch_frame = tk.Frame(
|
||||||
|
tab.color_swatches_container, bg=hex_color, width=100, height=50, bd=2, relief=tk.SOLID
|
||||||
|
)
|
||||||
|
swatch_frame.pack(pady=3, padx=5, fill=tk.X)
|
||||||
|
# Bind click to select this color for editing
|
||||||
|
swatch_frame.bind("<Button-1>", lambda event, idx=i, t=tab: self.select_color_in_palette(t, idx))
|
||||||
|
|
||||||
|
# Add a label inside to make it clickable too
|
||||||
|
swatch_label = tk.Label(
|
||||||
|
swatch_frame,
|
||||||
|
text=f"Color {i+1}",
|
||||||
|
bg=hex_color,
|
||||||
|
fg=color_utils.get_contrast_text_color(hex_color),
|
||||||
|
font=("Arial", 14),
|
||||||
|
width=5,
|
||||||
|
height=3,
|
||||||
|
)
|
||||||
|
swatch_label.pack(expand=True, fill=tk.BOTH)
|
||||||
|
swatch_label.bind("<Button-1>", lambda event, idx=i, t=tab: self.select_color_in_palette(t, idx))
|
||||||
|
|
||||||
|
tab.color_swatch_frames.append(swatch_frame)
|
||||||
|
|
||||||
|
# Re-highlight the currently selected color
|
||||||
|
self._highlight_selected_color_swatch(tab)
|
||||||
|
|
||||||
|
def _highlight_selected_color_swatch(self, tab):
|
||||||
|
"""Applies/removes highlight border to the selected color swatch."""
|
||||||
|
current_index = tab.widgets["selected_color_index"]
|
||||||
|
for i, swatch_frame in enumerate(tab.color_swatch_frames):
|
||||||
|
if i == current_index:
|
||||||
|
swatch_frame.config(highlightbackground=active_palette_color_border, highlightthickness=3)
|
||||||
|
else:
|
||||||
|
swatch_frame.config(highlightbackground=swatch_frame.cget("bg"), highlightthickness=0) # Reset to no highlight
|
||||||
|
|
||||||
|
def select_color_in_palette(self, tab, index: int):
|
||||||
|
"""Selects a color in the palette, updates sliders, and highlights swatch.
|
||||||
|
This now also triggers an RGB update to the device."""
|
||||||
|
if not (0 <= index < len(tab.colors_in_palette)):
|
||||||
|
return
|
||||||
|
|
||||||
|
tab.widgets["selected_color_index"] = index
|
||||||
|
self._highlight_selected_color_swatch(tab)
|
||||||
|
|
||||||
|
# Update RGB sliders with the selected color
|
||||||
|
hex_color = tab.colors_in_palette[index]
|
||||||
|
r, g, b = color_utils.hex_to_rgb(hex_color)
|
||||||
|
tab.widgets["red_slider"].set(r)
|
||||||
|
tab.widgets["green_slider"].set(g)
|
||||||
|
tab.widgets["blue_slider"].set(b)
|
||||||
|
|
||||||
|
print(f"Selected color index {index}: {hex_color}")
|
||||||
|
|
||||||
|
# Immediately send the update, as changing the selected color implies
|
||||||
|
# a desire to change the light's current color, regardless of pattern.
|
||||||
|
# This will also save the settings.
|
||||||
|
self.schedule_update_rgb(tab, force_send=True)
|
||||||
|
|
||||||
|
def add_color_to_palette(self, tab):
|
||||||
|
"""Adds a new black color to the palette and selects it, with a limit of 10 colors."""
|
||||||
|
MAX_COLORS = 8 # Define the maximum number of colors allowed
|
||||||
|
|
||||||
|
if len(tab.colors_in_palette) >= MAX_COLORS:
|
||||||
|
messagebox.showwarning("Color Limit Reached", f"You can add a maximum of {MAX_COLORS} colors to the palette.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Simplified: just add black. If unique colors were required globally,
|
||||||
|
# more complex logic would be needed here.
|
||||||
|
tab.colors_in_palette.append("#000000") # Add black as default
|
||||||
|
self.refresh_color_palette_display(tab)
|
||||||
|
# Select the newly added color
|
||||||
|
self.select_color_in_palette(tab, len(tab.colors_in_palette) - 1)
|
||||||
|
self.save_current_tab_settings() # Save changes to settings.json
|
||||||
|
|
||||||
|
def remove_selected_color_from_palette(self, tab):
|
||||||
|
"""Removes the currently selected color from the palette."""
|
||||||
|
current_index = tab.widgets["selected_color_index"]
|
||||||
|
if len(tab.colors_in_palette) <= 1:
|
||||||
|
messagebox.showwarning("Cannot Remove", "There must be at least one color in the palette.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if messagebox.askyesno("Confirm Delete", f"Are you sure you want to remove Color {current_index + 1}?"):
|
||||||
|
del tab.colors_in_palette[current_index]
|
||||||
|
# Adjust selected index if the removed color was the last one
|
||||||
|
if current_index >= len(tab.colors_in_palette):
|
||||||
|
tab.widgets["selected_color_index"] = len(tab.colors_in_palette) - 1
|
||||||
|
if tab.widgets["selected_color_index"] < 0: # Should not happen with 1-color check
|
||||||
|
tab.widgets["selected_color_index"] = 0
|
||||||
|
|
||||||
|
self.refresh_color_palette_display(tab)
|
||||||
|
# Update sliders with the new selected color (if any)
|
||||||
|
if tab.colors_in_palette:
|
||||||
|
self.select_color_in_palette(tab, tab.widgets["selected_color_index"])
|
||||||
|
else: # If palette became empty (shouldn't happen with 1-color check)
|
||||||
|
tab.widgets["red_slider"].set(0)
|
||||||
|
tab.widgets["green_slider"].set(0)
|
||||||
|
tab.widgets["blue_slider"].set(0)
|
||||||
|
|
||||||
|
self.save_current_tab_settings() # Save changes to settings.json
|
||||||
|
|
||||||
|
def update_ui_for_pattern(self, tab, current_pattern: str):
|
||||||
|
"""
|
||||||
|
Manages the state of the UI elements based on the selected pattern.
|
||||||
|
The Color Palette Editor is always visible. RGB sliders update
|
||||||
|
based on the currently selected color in the palette, or the first
|
||||||
|
color if the palette is empty or not in transition mode and a new tab/pattern is selected.
|
||||||
|
"""
|
||||||
|
# The color_palette_editor_frame is always packed, so no visibility control needed here.
|
||||||
|
|
||||||
|
# When the pattern changes, we need to ensure the RGB sliders reflect
|
||||||
|
# the appropriate color based on the context.
|
||||||
|
|
||||||
|
if tab.colors_in_palette:
|
||||||
|
# If in 'transition' mode, set sliders to the currently selected color in the palette.
|
||||||
|
if current_pattern == "transition":
|
||||||
|
self.select_color_in_palette(tab, tab.widgets["selected_color_index"])
|
||||||
|
else:
|
||||||
|
# If not in 'transition' mode, but a color is selected, update sliders to that.
|
||||||
|
# Or, if this is a fresh load/tab change, ensure it's the first color.
|
||||||
|
# This ensures the sliders consistently show the color that will be sent
|
||||||
|
# for 'on'/'blink' based on the palette's first entry.
|
||||||
|
hex_color = tab.colors_in_palette[tab.widgets["selected_color_index"]]
|
||||||
|
r, g, b = color_utils.hex_to_rgb(hex_color)
|
||||||
|
tab.widgets["red_slider"].set(r)
|
||||||
|
tab.widgets["green_slider"].set(g)
|
||||||
|
tab.widgets["blue_slider"].set(b)
|
||||||
|
self._highlight_selected_color_swatch(tab) # Re-highlight even if index didn't change
|
||||||
|
else:
|
||||||
|
# Handle empty palette scenario (shouldn't happen with default ["#000000"])
|
||||||
|
tab.widgets["red_slider"].set(0)
|
||||||
|
tab.widgets["green_slider"].set(0)
|
||||||
|
tab.widgets["blue_slider"].set(0)
|
||||||
|
tab.widgets["selected_color_index"] = 0 # Ensure index is valid
|
||||||
|
self._highlight_selected_color_swatch(tab)
|
||||||
|
|
||||||
|
# Brightness and Delay sliders are always visible.
|
||||||
|
|
||||||
|
def highlight_pattern_button(self, tab_widget, active_pattern_name):
|
||||||
|
if hasattr(tab_widget, "pattern_buttons"):
|
||||||
|
for pattern_name, button in tab_widget.pattern_buttons.items():
|
||||||
|
if pattern_name == active_pattern_name:
|
||||||
|
button.config(bg=highlight_pattern_color)
|
||||||
|
else:
|
||||||
|
button.config(bg=active_bg_color)
|
||||||
|
|
||||||
|
def on_tab_change(self, event):
|
||||||
|
selected_tab_name = self.notebook.tab(self.notebook.select(), "text")
|
||||||
|
current_tab_widget = self.notebook.nametowidget(self.notebook.select())
|
||||||
|
|
||||||
|
initial_settings = self.settings["lights"][selected_tab_name]["settings"]
|
||||||
|
|
||||||
|
# Ensure current_tab_widget has the necessary attributes
|
||||||
|
if not hasattr(current_tab_widget, "colors_in_palette"):
|
||||||
|
# This tab might not have been fully initialized yet, or recreated
|
||||||
|
# In a full reload, create_tabs ensures it is.
|
||||||
|
return
|
||||||
|
|
||||||
|
# Update the local colors_in_palette list for the tab
|
||||||
|
current_tab_widget.colors_in_palette = initial_settings.get("colors", ["#000000"]).copy()
|
||||||
|
current_tab_widget.widgets["selected_color_index"] = 0 # Default to first color
|
||||||
|
|
||||||
|
# Refresh the color palette display and select the first color
|
||||||
|
self.refresh_color_palette_display(current_tab_widget)
|
||||||
|
if current_tab_widget.colors_in_palette:
|
||||||
|
self.select_color_in_palette(current_tab_widget, 0)
|
||||||
|
else: # If palette became empty (shouldn't happen with default ["#000000"])
|
||||||
|
current_tab_widget.widgets["red_slider"].set(0)
|
||||||
|
current_tab_widget.widgets["green_slider"].set(0)
|
||||||
|
current_tab_widget.widgets["blue_slider"].set(0)
|
||||||
|
|
||||||
|
# Update brightness and delay sliders
|
||||||
|
current_tab_widget.widgets["brightness_slider"].set(initial_settings.get("brightness", 127))
|
||||||
|
current_tab_widget.widgets["delay_slider"].set(initial_settings.get("delay", 0))
|
||||||
|
|
||||||
|
# Highlight the active pattern button
|
||||||
|
initial_pattern = initial_settings.get("pattern", "on")
|
||||||
|
self.highlight_pattern_button(current_tab_widget, initial_pattern)
|
||||||
|
|
||||||
|
# Update UI visibility based on the current pattern
|
||||||
|
self.update_ui_for_pattern(current_tab_widget, initial_pattern)
|
||||||
|
|
||||||
|
def reload_config(self):
|
||||||
|
print("Reloading configuration...")
|
||||||
|
self.settings = Settings()
|
||||||
|
self.create_tabs()
|
||||||
|
# After recreating, ensure the currently selected tab's sliders are updated
|
||||||
|
# Trigger on_tab_change manually for the currently selected tab
|
||||||
|
self.on_tab_change(None)
|
||||||
|
|
||||||
|
# --- Debouncing functions (no change to core logic, just how they call update_rgb) ---
|
||||||
|
def schedule_update_rgb(self, tab, force_send=False):
|
||||||
|
current_time = time.time() * 1000
|
||||||
|
if force_send:
|
||||||
|
if self.pending_rgb_update_id:
|
||||||
|
self.root.after_cancel(self.pending_rgb_update_id)
|
||||||
|
self.pending_rgb_update_id = None
|
||||||
|
self.update_rgb(tab)
|
||||||
|
self.last_rgb_update_time = current_time
|
||||||
|
elif current_time - self.last_rgb_update_time >= self.rgb_update_interval_ms:
|
||||||
|
if self.pending_rgb_update_id:
|
||||||
|
self.root.after_cancel(self.pending_rgb_update_id)
|
||||||
|
self.pending_rgb_update_id = None
|
||||||
|
self.update_rgb(tab)
|
||||||
|
self.last_rgb_update_time = current_time
|
||||||
|
else:
|
||||||
|
if self.pending_rgb_update_id:
|
||||||
|
self.root.after_cancel(self.pending_rgb_update_id)
|
||||||
|
time_to_wait = int(self.rgb_update_interval_ms - (current_time - self.last_rgb_update_time))
|
||||||
|
self.pending_rgb_update_id = self.root.after(time_to_wait, lambda: self.update_rgb(tab))
|
||||||
|
|
||||||
|
def schedule_update_brightness(self, tab, force_send=False):
|
||||||
|
current_time = time.time() * 1000
|
||||||
|
if force_send:
|
||||||
|
if self.pending_brightness_update_id:
|
||||||
|
self.root.after_cancel(self.pending_brightness_update_id)
|
||||||
|
self.pending_brightness_update_id = None
|
||||||
|
self.update_brightness(tab)
|
||||||
|
self.last_brightness_update_time = current_time
|
||||||
|
elif current_time - self.last_brightness_update_time >= self.brightness_update_interval_ms:
|
||||||
|
if self.pending_brightness_update_id:
|
||||||
|
self.root.after_cancel(self.pending_brightness_update_id)
|
||||||
|
self.pending_brightness_update_id = None
|
||||||
|
self.update_brightness(tab)
|
||||||
|
self.last_brightness_update_time = current_time
|
||||||
|
else:
|
||||||
|
if self.pending_brightness_update_id:
|
||||||
|
self.root.after_cancel(self.pending_brightness_update_id)
|
||||||
|
time_to_wait = int(self.brightness_update_interval_ms - (current_time - self.last_brightness_update_time))
|
||||||
|
self.pending_brightness_update_id = self.root.after(time_to_wait, lambda: self.update_brightness(tab))
|
||||||
|
|
||||||
|
def schedule_update_delay(self, tab, force_send=False):
|
||||||
|
current_time = time.time() * 1000
|
||||||
|
if force_send:
|
||||||
|
if self.pending_delay_update_id:
|
||||||
|
self.root.after_cancel(self.pending_delay_update_id)
|
||||||
|
self.pending_delay_update_id = None
|
||||||
|
self.update_delay(tab)
|
||||||
|
self.last_delay_update_time = current_time
|
||||||
|
elif current_time - self.last_delay_update_time >= self.delay_update_interval_ms:
|
||||||
|
if self.pending_delay_update_id:
|
||||||
|
self.root.after_cancel(self.pending_delay_update_id)
|
||||||
|
self.pending_delay_update_id = None
|
||||||
|
self.update_delay(tab)
|
||||||
|
self.last_delay_update_time = current_time
|
||||||
|
else:
|
||||||
|
if self.pending_delay_update_id:
|
||||||
|
self.root.after_cancel(self.pending_delay_update_id)
|
||||||
|
time_to_wait = int(self.delay_update_interval_ms - (current_time - self.last_delay_update_time))
|
||||||
|
self.pending_delay_update_id = self.root.after(time_to_wait, lambda: self.update_delay(tab))
|
||||||
|
|
||||||
# --- Asynchronous Update Functions ---
|
# --- Asynchronous Update Functions ---
|
||||||
@async_handler
|
@async_handler
|
||||||
async def update_rgb(self, tab):
|
async def update_rgb(self, tab):
|
||||||
await asyncio.sleep(0) # Yield control
|
"""Update the currently selected color in the palette and send to the server."""
|
||||||
|
try:
|
||||||
|
red_slider = tab.widgets["red_slider"]
|
||||||
|
green_slider = tab.widgets["green_slider"]
|
||||||
|
blue_slider = tab.widgets["blue_slider"]
|
||||||
|
|
||||||
def update_status_labels(self):
|
r = red_slider.get()
|
||||||
# Pull values from midi_handler
|
g = green_slider.get()
|
||||||
delay = self.midi_handler.delay
|
b = blue_slider.get()
|
||||||
brightness = self.midi_handler.brightness
|
|
||||||
r = getattr(self.midi_handler, 'color_r', 0)
|
|
||||||
g = getattr(self.midi_handler, 'color_g', 0)
|
|
||||||
b = getattr(self.midi_handler, 'color_b', 0)
|
|
||||||
# Single bank values
|
|
||||||
brightness = getattr(self.midi_handler, 'brightness', '-')
|
|
||||||
r = getattr(self.midi_handler, 'color_r', 0)
|
|
||||||
g = getattr(self.midi_handler, 'color_g', 0)
|
|
||||||
b = getattr(self.midi_handler, 'color_b', 0)
|
|
||||||
pattern = getattr(self.midi_handler, 'current_pattern', '') or '-'
|
|
||||||
n1 = getattr(self.midi_handler, 'n1', '-')
|
|
||||||
n2 = getattr(self.midi_handler, 'n2', '-')
|
|
||||||
n3 = getattr(self.midi_handler, 'n3', '-')
|
|
||||||
|
|
||||||
# Update dials 2x4 grid (left→right, top→bottom):
|
hex_color = f"#{r:02x}{g:02x}{b:02x}"
|
||||||
# Row0: n3, Delay
|
print(f"Updating selected color to: {hex_color}")
|
||||||
# Row1: n1, n2
|
|
||||||
# Row2: B, Brightness
|
|
||||||
# Row3: R, G
|
|
||||||
dial_values = [
|
|
||||||
("n3", n3), ("Delay", getattr(self.midi_handler, 'delay', '-')),
|
|
||||||
("n1", n1), ("n2", n2),
|
|
||||||
("B", b), ("Brightness", brightness),
|
|
||||||
("R", r), ("G", g),
|
|
||||||
]
|
|
||||||
# Update dial displays
|
|
||||||
for idx, (label, value) in enumerate(dial_values):
|
|
||||||
if idx < len(self.dials_boxes):
|
|
||||||
self.dials_boxes[idx].config(text=f"{label}\n{value}")
|
|
||||||
|
|
||||||
# Update additional knobs (CC38-45)
|
selected_color_index = tab.widgets["selected_color_index"]
|
||||||
knob_values = [
|
if 0 <= selected_color_index < len(tab.colors_in_palette):
|
||||||
("CC44", getattr(self.midi_handler, 'knob7', '-')), ("CC45", getattr(self.midi_handler, 'knob8', '-')),
|
tab.colors_in_palette[selected_color_index] = hex_color
|
||||||
("Rad n1", getattr(self.midi_handler, 'n1', '-')), ("Rad delay", getattr(self.midi_handler, 'delay', '-')),
|
self.refresh_color_palette_display(tab) # Update swatch immediately
|
||||||
("Alt n1", getattr(self.midi_handler, 'n1', '-')), ("Alt n2", getattr(self.midi_handler, 'n2', '-')),
|
|
||||||
("Pulse n1", getattr(self.midi_handler, 'n1', '-')), ("Pulse n2", getattr(self.midi_handler, 'n2', '-')),
|
|
||||||
]
|
|
||||||
for idx, (label, value) in enumerate(knob_values):
|
|
||||||
if idx < len(self.knobs_boxes):
|
|
||||||
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
|
|
||||||
|
|
||||||
# Update buttons bank mappings and selection (single bank)
|
selected_server = self.notebook.tab(self.notebook.select(), "text")
|
||||||
# Pattern icons for nicer appearance
|
names = self.settings["lights"][selected_server]["names"]
|
||||||
icon_for = {
|
|
||||||
"pulse": "💥",
|
# Determine which colors to send based on the current pattern.
|
||||||
"flicker": "✨",
|
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
|
||||||
"alternating": "↔️",
|
colors_to_send = []
|
||||||
"n chase": "🏃",
|
|
||||||
"rainbow": "🌈",
|
if current_pattern == "transition":
|
||||||
"radiate": "🌟",
|
colors_to_send = tab.colors_in_palette.copy()
|
||||||
"sequential\npulse": "🔄",
|
elif current_pattern in ["on", "blink", "theater_chase", "flicker"]: # Add other patterns that use a single color
|
||||||
"alternating\nphase": "⚡",
|
if tab.colors_in_palette:
|
||||||
"-": "",
|
# For non-transition patterns, the device typically uses only the first color.
|
||||||
}
|
# However, if a user picks a color from the palette, we want THAT color to be the one
|
||||||
bank1_patterns = [
|
# sent and active. So, the selected color from the palette *becomes* the first color
|
||||||
# Pulse patterns (row 1)
|
# in the list we send to the device for these modes.
|
||||||
"pulse", "sequential\npulse",
|
# This ensures the light matches the selected palette color.
|
||||||
# Alternating patterns (row 2)
|
colors_to_send = [hex_color] # Send the color currently set by the sliders
|
||||||
"alternating", "alternating\nphase",
|
else:
|
||||||
# Chase/movement patterns (row 3)
|
colors_to_send = ["#000000"] # Default if palette is empty
|
||||||
"n chase", "rainbow",
|
else: # For other patterns like "off", "rainbow" where colors might not be primary
|
||||||
# Effect patterns (row 4)
|
# We still want to send the *current* palette state for saving,
|
||||||
"flicker", "radiate",
|
# but the device firmware might ignore it for these patterns.
|
||||||
"-", "-", "-", "-",
|
colors_to_send = tab.colors_in_palette.copy()
|
||||||
"-", "-", "-", "-",
|
|
||||||
]
|
payload = {
|
||||||
|
"save": True, # Always save this change to config
|
||||||
# Map MIDI handler pattern names to GUI display names
|
"names": names,
|
||||||
pattern_name_mapping = {
|
"settings": {
|
||||||
"sequential_pulse": "sequential\npulse",
|
"colors": colors_to_send, # This now dynamically changes based on pattern
|
||||||
"alternating_phase": "alternating\nphase",
|
"brightness": tab.widgets["brightness_slider"].get(),
|
||||||
"n_chase": "n chase",
|
"delay": tab.widgets["delay_slider"].get(),
|
||||||
}
|
"pattern": current_pattern, # Always send the current pattern
|
||||||
|
},
|
||||||
# Get the display name for the current pattern
|
}
|
||||||
display_pattern = pattern_name_mapping.get(pattern, pattern)
|
|
||||||
|
# Update the settings object with the new color list (and potentially other synced values)
|
||||||
# notes numbers per cell (bottom-left origin)
|
self.settings["lights"][selected_server]["settings"]["colors"] = tab.colors_in_palette.copy()
|
||||||
for idx, lbl in enumerate(self.button1_cells):
|
self.settings["lights"][selected_server]["settings"]["brightness"] = tab.widgets["brightness_slider"].get()
|
||||||
name = bank1_patterns[idx]
|
self.settings["lights"][selected_server]["settings"]["delay"] = tab.widgets["delay_slider"].get()
|
||||||
sel = (display_pattern == name and name != "-")
|
self.settings.save()
|
||||||
icon = icon_for.get(name, "")
|
|
||||||
text = f"{icon} {name}" if name != "-" else ""
|
await self.websocket_client.send_data(payload)
|
||||||
if sel:
|
print(f"Sent RGB payload: {payload}")
|
||||||
lbl.config(text=text, bg=highlight_pattern_color)
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error updating RGB: {e}")
|
||||||
|
|
||||||
|
@async_handler
|
||||||
|
async def update_brightness(self, tab):
|
||||||
|
try:
|
||||||
|
brightness_slider = tab.widgets["brightness_slider"]
|
||||||
|
brightness = brightness_slider.get()
|
||||||
|
print(f"Brightness: {brightness}")
|
||||||
|
|
||||||
|
selected_server = self.notebook.tab(self.notebook.select(), "text")
|
||||||
|
names = self.settings["lights"][selected_server]["names"]
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"save": True,
|
||||||
|
"names": names,
|
||||||
|
"settings": {
|
||||||
|
"brightness": brightness,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
# Update the settings object with the new brightness
|
||||||
|
self.settings["lights"][selected_server]["settings"]["brightness"] = brightness
|
||||||
|
self.settings.save()
|
||||||
|
await self.websocket_client.send_data(payload)
|
||||||
|
print(f"Sent brightness payload: {payload}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error updating brightness: {e}")
|
||||||
|
|
||||||
|
@async_handler
|
||||||
|
async def update_delay(self, tab):
|
||||||
|
try:
|
||||||
|
delay_slider = tab.widgets["delay_slider"]
|
||||||
|
delay = delay_slider.get()
|
||||||
|
print(f"Delay: {delay}")
|
||||||
|
|
||||||
|
selected_server = self.notebook.tab(self.notebook.select(), "text")
|
||||||
|
names = self.settings["lights"][selected_server]["names"]
|
||||||
|
payload = {
|
||||||
|
"save": True,
|
||||||
|
"names": names,
|
||||||
|
"settings": {
|
||||||
|
"delay": delay,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
# Update the settings object with the new delay
|
||||||
|
self.settings["lights"][selected_server]["settings"]["delay"] = delay
|
||||||
|
self.settings.save()
|
||||||
|
await self.websocket_client.send_data(payload)
|
||||||
|
print(f"Sent delay payload: {payload}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error updating delay: {e}")
|
||||||
|
|
||||||
|
@async_handler
|
||||||
|
async def send_pattern(self, tab_name: str, pattern_name: str):
|
||||||
|
try:
|
||||||
|
names = self.settings["lights"][tab_name]["names"]
|
||||||
|
# Get the actual tab widget to access its `colors_in_palette` and other attributes
|
||||||
|
current_tab_widget = None
|
||||||
|
for key, tab_widget in self.tabs.items():
|
||||||
|
if key == tab_name:
|
||||||
|
current_tab_widget = tab_widget
|
||||||
|
break
|
||||||
|
|
||||||
|
if not current_tab_widget:
|
||||||
|
print(f"Error: Could not find tab widget for {tab_name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
current_settings_for_tab = self.settings["lights"][tab_name]["settings"]
|
||||||
|
|
||||||
|
payload_settings = {
|
||||||
|
"pattern": pattern_name,
|
||||||
|
"brightness": current_settings_for_tab.get("brightness", 127),
|
||||||
|
"delay": current_settings_for_tab.get("delay", 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Determine colors to send based on the *newly selected* pattern
|
||||||
|
if pattern_name == "transition":
|
||||||
|
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
|
||||||
|
elif pattern_name in ["on", "blink"]: # Add other patterns that use a single color here
|
||||||
|
# When switching TO 'on' or 'blink', ensure the color sent is the one
|
||||||
|
# currently displayed on the sliders (which reflects the selected palette color).
|
||||||
|
r = current_tab_widget.widgets["red_slider"].get()
|
||||||
|
g = current_tab_widget.widgets["green_slider"].get()
|
||||||
|
b = current_tab_widget.widgets["blue_slider"].get()
|
||||||
|
hex_color_from_sliders = f"#{r:02x}{g:02x}{b:02x}"
|
||||||
|
payload_settings["colors"] = [hex_color_from_sliders]
|
||||||
else:
|
else:
|
||||||
lbl.config(text=text, bg=bg_color)
|
# For other patterns, send the full palette, device might ignore or use default
|
||||||
# (no second bank to update)
|
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
|
||||||
|
|
||||||
# reschedule
|
payload = {
|
||||||
self.root.after(200, self.update_status_labels)
|
"save": True,
|
||||||
|
"names": names,
|
||||||
|
"settings": payload_settings,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update the settings object with the new pattern and current colors/brightness/delay
|
||||||
|
self.settings["lights"][tab_name]["settings"]["pattern"] = pattern_name
|
||||||
|
# Always save the full current palette state in settings.
|
||||||
|
self.settings["lights"][tab_name]["settings"]["colors"] = current_tab_widget.colors_in_palette.copy()
|
||||||
|
self.settings.save()
|
||||||
|
|
||||||
|
self.highlight_pattern_button(current_tab_widget, pattern_name)
|
||||||
|
self.update_ui_for_pattern(current_tab_widget, pattern_name) # Update UI based on new pattern
|
||||||
|
|
||||||
|
await self.websocket_client.send_data(payload)
|
||||||
|
print(f"Sent pattern payload: {payload}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error sending pattern: {e}")
|
||||||
|
|
||||||
|
def save_current_tab_settings(self):
|
||||||
|
"""Saves the current state of the active tab's settings (colors, brightness, delay, pattern) to config."""
|
||||||
|
selected_server = self.notebook.tab(self.notebook.select(), "text")
|
||||||
|
current_tab_widget = self.notebook.nametowidget(self.notebook.select())
|
||||||
|
|
||||||
|
if not hasattr(current_tab_widget, "colors_in_palette"):
|
||||||
|
return # Tab not fully initialized yet
|
||||||
|
|
||||||
|
# Update settings for the current tab in the self.settings object
|
||||||
|
self.settings["lights"][selected_server]["settings"]["colors"] = current_tab_widget.colors_in_palette
|
||||||
|
self.settings["lights"][selected_server]["settings"]["brightness"] = current_tab_widget.widgets["brightness_slider"].get()
|
||||||
|
self.settings["lights"][selected_server]["settings"]["delay"] = current_tab_widget.widgets["delay_slider"].get()
|
||||||
|
# The pattern is updated in send_pattern already, but ensure consistency
|
||||||
|
# For simplicity, we assume send_pattern is the primary way to change pattern.
|
||||||
|
|
||||||
|
self.settings.save()
|
||||||
|
print(f"Saved settings for {selected_server}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
639
src/midi.py
639
src/midi.py
@@ -1,570 +1,105 @@
|
|||||||
import mido
|
import mido
|
||||||
import asyncio
|
import asyncio
|
||||||
import networking
|
import time
|
||||||
import socket
|
import networking # <--- This will now correctly import your module
|
||||||
import json
|
|
||||||
import logging # Added logging import
|
|
||||||
import time # Added for initial state read
|
|
||||||
import tkinter as tk
|
|
||||||
from tkinter import ttk, messagebox # Import messagebox for confirmations
|
|
||||||
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
|
|
||||||
|
|
||||||
# Pattern name mapping for shorter JSON payloads
|
|
||||||
PATTERN_NAMES = {
|
|
||||||
"flicker": "f",
|
|
||||||
"fill_range": "fr",
|
|
||||||
"n_chase": "nc",
|
|
||||||
"alternating": "a",
|
|
||||||
"pulse": "p",
|
|
||||||
"rainbow": "r",
|
|
||||||
"specto": "s",
|
|
||||||
"radiate": "rd",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# Configure logging
|
async def midi_to_websocket_listener(midi_port_index: int, websocket_uri: str):
|
||||||
DEBUG_MODE = True # Set to False for INFO level logging
|
"""
|
||||||
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
Listens to a specific MIDI port and sends data to a WebSocket server
|
||||||
|
when Note 32 (and 33) is pressed.
|
||||||
|
"""
|
||||||
|
delay = 100 # Default delay value
|
||||||
|
|
||||||
# TCP Server Configuration
|
# 1. Get MIDI port name
|
||||||
TCP_HOST = "127.0.0.1"
|
|
||||||
TCP_PORT = 65432
|
|
||||||
|
|
||||||
# Sound Control Server Configuration (for sending reset)
|
|
||||||
SOUND_CONTROL_HOST = "127.0.0.1"
|
|
||||||
SOUND_CONTROL_PORT = 65433
|
|
||||||
|
|
||||||
class MidiHandler:
|
|
||||||
def __init__(self, midi_port_index: int, websocket_uri: str):
|
|
||||||
self.midi_port_index = midi_port_index
|
|
||||||
self.websocket_uri = websocket_uri
|
|
||||||
self.ws_client = networking.WebSocketClient(websocket_uri)
|
|
||||||
self.delay = 100 # Default delay value, controlled by MIDI controller
|
|
||||||
self.brightness = 100 # Default brightness value, controlled by MIDI controller
|
|
||||||
self.tcp_host = TCP_HOST
|
|
||||||
self.tcp_port = TCP_PORT
|
|
||||||
self.beat_sending_enabled = True # New: Local flag for beat sending
|
|
||||||
self.sound_control_host = SOUND_CONTROL_HOST
|
|
||||||
self.sound_control_port = SOUND_CONTROL_PORT
|
|
||||||
# RGB controlled by CC 30/31/32 (default green)
|
|
||||||
self.color_r = 0
|
|
||||||
self.color_g = 255
|
|
||||||
self.color_b = 0
|
|
||||||
# Generic parameters controlled via CC
|
|
||||||
# Raw CC-driven parameters (0-127)
|
|
||||||
self.n1 = 10
|
|
||||||
self.n2 = 10
|
|
||||||
self.n3 = 1
|
|
||||||
# Additional knobs (CC38-45)
|
|
||||||
self.knob1 = 0
|
|
||||||
self.knob2 = 0
|
|
||||||
self.knob3 = 0
|
|
||||||
self.knob4 = 0
|
|
||||||
self.knob5 = 0
|
|
||||||
self.knob6 = 0
|
|
||||||
self.knob7 = 0
|
|
||||||
self.knob8 = 0
|
|
||||||
# Current state for GUI display
|
|
||||||
self.current_bpm: float | None = None
|
|
||||||
self.current_pattern: str = ""
|
|
||||||
self.beat_index: int = 0
|
|
||||||
|
|
||||||
# Rate limiting for parameter updates
|
|
||||||
self.last_param_update: float = 0.0
|
|
||||||
self.param_update_interval: float = 0.1 # 100ms minimum between updates
|
|
||||||
self.pending_param_update: bool = False
|
|
||||||
|
|
||||||
# Sequential pulse pattern state
|
|
||||||
self.sequential_pulse_enabled: bool = False
|
|
||||||
self.sequential_pulse_step: int = 0
|
|
||||||
|
|
||||||
def _current_color_rgb(self) -> tuple:
|
|
||||||
r = max(0, min(255, int(self.color_r)))
|
|
||||||
g = max(0, min(255, int(self.color_g)))
|
|
||||||
b = max(0, min(255, int(self.color_b)))
|
|
||||||
return (r, g, b)
|
|
||||||
|
|
||||||
async def _handle_sequential_pulse(self):
|
|
||||||
"""Handle sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored"""
|
|
||||||
from bar_config import LEFT_BARS, RIGHT_BARS
|
|
||||||
|
|
||||||
# Calculate which bar should pulse based on beat (1 beat per bar)
|
|
||||||
bar_index = self.beat_index % 4 # 0-3, cycles every 4 beats
|
|
||||||
|
|
||||||
# Create minimal payload - defaults to off
|
|
||||||
payload = {
|
|
||||||
"d": { # Defaults - off for all bars
|
|
||||||
"t": "b", # Message type: beat
|
|
||||||
"pt": "o", # off
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Set specific bars to pulse
|
|
||||||
left_bar = LEFT_BARS[bar_index]
|
|
||||||
right_bar = RIGHT_BARS[bar_index]
|
|
||||||
|
|
||||||
payload[left_bar] = {"pt": "p"} # pulse
|
|
||||||
payload[right_bar] = {"pt": "p"} # pulse
|
|
||||||
|
|
||||||
# logging.debug(f"[Sequential Pulse] Beat {self.beat_index}, pulsing bars {left_bar} and {right_bar}")
|
|
||||||
await self.ws_client.send_data(payload)
|
|
||||||
|
|
||||||
async def _handle_alternating_phase(self):
|
|
||||||
"""Handle alternating pattern with phase offset: every second bar uses different step"""
|
|
||||||
from bar_config import LED_BAR_NAMES
|
|
||||||
|
|
||||||
# Create minimal payload - same n1/n2 for all bars
|
|
||||||
payload = {
|
|
||||||
"d": { # Defaults - pattern and n1/n2
|
|
||||||
"t": "b", # Message type: beat
|
|
||||||
"pt": "a", # alternating
|
|
||||||
"n1": self.n1,
|
|
||||||
"n2": self.n2,
|
|
||||||
"s": self.beat_index % 2, # Default step for in-phase bars
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Set step offset for every second bar (bars 101, 103, 105, 107)
|
|
||||||
swap_bars = ["101", "103", "105", "107"]
|
|
||||||
for bar_name in LED_BAR_NAMES:
|
|
||||||
if bar_name in swap_bars:
|
|
||||||
# Send step offset for out-of-phase bars
|
|
||||||
payload[bar_name] = {"s": (self.beat_index + 1) % 2}
|
|
||||||
else:
|
|
||||||
# In-phase bars use defaults (no override needed)
|
|
||||||
payload[bar_name] = {}
|
|
||||||
|
|
||||||
# logging.debug(f"[Alternating Phase] Beat {self.beat_index}, step offset for bars {swap_bars}")
|
|
||||||
await self.ws_client.send_data(payload)
|
|
||||||
|
|
||||||
async def _send_full_parameters(self):
|
|
||||||
"""Send all parameters to bars - may require multiple packets due to size limit"""
|
|
||||||
from bar_config import LED_BAR_NAMES
|
|
||||||
|
|
||||||
# Calculate packet size for full parameters
|
|
||||||
full_payload = {
|
|
||||||
"d": {
|
|
||||||
"t": "u", # Message type: update
|
|
||||||
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
|
|
||||||
"dl": self.delay,
|
|
||||||
"cl": [self._current_color_rgb()],
|
|
||||||
"br": self.brightness,
|
|
||||||
"n1": self.n1,
|
|
||||||
"n2": self.n2,
|
|
||||||
"n3": self.n3,
|
|
||||||
"s": self.beat_index % 256, # Use full range for rainbow patterns
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Estimate size: ~200 bytes for defaults + 8 bars * 2 bytes = ~216 bytes
|
|
||||||
# This should fit in one packet, but let's be safe
|
|
||||||
payload_size = len(str(full_payload))
|
|
||||||
|
|
||||||
if payload_size > 200: # Split into 2 packets if too large
|
|
||||||
# Packet 1: Pattern and timing parameters
|
|
||||||
payload1 = {
|
|
||||||
"d": {
|
|
||||||
"t": "u", # Message type: update
|
|
||||||
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
|
|
||||||
"dl": self.delay,
|
|
||||||
"br": self.brightness,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for bar_name in LED_BAR_NAMES:
|
|
||||||
payload1[bar_name] = {}
|
|
||||||
|
|
||||||
# Packet 2: Color and pattern parameters
|
|
||||||
payload2 = {
|
|
||||||
"d": {
|
|
||||||
"t": "u", # Message type: update
|
|
||||||
"cl": [self._current_color_rgb()],
|
|
||||||
"n1": self.n1,
|
|
||||||
"n2": self.n2,
|
|
||||||
"n3": self.n3,
|
|
||||||
"s": self.beat_index % 2, # Keep step small (0 or 1) for alternating patterns
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for bar_name in LED_BAR_NAMES:
|
|
||||||
payload2[bar_name] = {}
|
|
||||||
|
|
||||||
# logging.debug(f"[Full Params] Sending in 2 packets due to size ({payload_size} bytes)")
|
|
||||||
await self.ws_client.send_data(payload1)
|
|
||||||
await asyncio.sleep(0.01) # Small delay between packets
|
|
||||||
await self.ws_client.send_data(payload2)
|
|
||||||
else:
|
|
||||||
# Single packet
|
|
||||||
for bar_name in LED_BAR_NAMES:
|
|
||||||
full_payload[bar_name] = {}
|
|
||||||
|
|
||||||
# logging.debug(f"[Full Params] Sending single packet ({payload_size} bytes)")
|
|
||||||
await self.ws_client.send_data(full_payload)
|
|
||||||
|
|
||||||
async def _request_param_update(self):
|
|
||||||
"""Request a parameter update with rate limiting"""
|
|
||||||
import time
|
|
||||||
current_time = time.time()
|
|
||||||
|
|
||||||
if current_time - self.last_param_update >= self.param_update_interval:
|
|
||||||
# Can send immediately
|
|
||||||
self.last_param_update = current_time
|
|
||||||
await self._send_full_parameters()
|
|
||||||
# logging.debug("[Rate Limit] Parameter update sent immediately")
|
|
||||||
else:
|
|
||||||
# Rate limited - mark as pending
|
|
||||||
self.pending_param_update = True
|
|
||||||
# logging.debug("[Rate Limit] Parameter update queued (rate limited)")
|
|
||||||
|
|
||||||
async def _send_normal_pattern(self):
|
|
||||||
"""Send normal pattern to all bars - include required parameters"""
|
|
||||||
# Patterns that need specific parameters
|
|
||||||
patterns_needing_params = ["alternating", "flicker", "n_chase", "rainbow", "radiate"]
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
"d": { # Defaults
|
|
||||||
"t": "b", # Message type: beat
|
|
||||||
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add required parameters for patterns that need them
|
|
||||||
if self.current_pattern in patterns_needing_params:
|
|
||||||
payload["d"].update({
|
|
||||||
"n1": self.n1,
|
|
||||||
"n2": self.n2,
|
|
||||||
"n3": self.n3,
|
|
||||||
"dl": self.delay,
|
|
||||||
"s": self.beat_index % 256, # Use full range for rainbow patterns
|
|
||||||
})
|
|
||||||
|
|
||||||
# Add empty entries for each bar (they'll use defaults)
|
|
||||||
for bar_name in LED_BAR_NAMES:
|
|
||||||
payload[bar_name] = {}
|
|
||||||
|
|
||||||
# logging.debug(f"[Beat] Triggering '{self.current_pattern}' for {len(LED_BAR_NAMES)} bars")
|
|
||||||
await self.ws_client.send_data(payload)
|
|
||||||
|
|
||||||
async def _send_reset_to_sound(self):
|
|
||||||
try:
|
|
||||||
reader, writer = await asyncio.open_connection(self.sound_control_host, self.sound_control_port)
|
|
||||||
cmd = "RESET_TEMPO\n".encode('utf-8')
|
|
||||||
writer.write(cmd)
|
|
||||||
await writer.drain()
|
|
||||||
resp = await reader.read(100)
|
|
||||||
logging.info(f"[MidiHandler - Control] Sent RESET_TEMPO, response: {resp.decode().strip()}")
|
|
||||||
writer.close()
|
|
||||||
await writer.wait_closed()
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"[MidiHandler - Control] Failed to send RESET_TEMPO: {e}")
|
|
||||||
|
|
||||||
async def _handle_tcp_client(self, reader, writer):
|
|
||||||
addr = writer.get_extra_info('peername')
|
|
||||||
logging.info(f"[MidiHandler - TCP Server] Connected by {addr}") # Changed to info
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
data = await reader.read(4096) # Read up to 4KB of data
|
|
||||||
if not data:
|
|
||||||
logging.info(f"[MidiHandler - TCP Server] Client {addr} disconnected.") # Changed to info
|
|
||||||
break
|
|
||||||
|
|
||||||
message = data.decode().strip()
|
|
||||||
# logging.debug(f"[MidiHandler - TCP Server] Received from {addr}: {message}") # Changed to debug
|
|
||||||
|
|
||||||
if self.beat_sending_enabled:
|
|
||||||
try:
|
|
||||||
# Attempt to parse as float (BPM) from sound.py
|
|
||||||
bpm_value = float(message)
|
|
||||||
self.current_bpm = bpm_value
|
|
||||||
# On each beat, trigger currently selected pattern(s)
|
|
||||||
if not self.current_pattern:
|
|
||||||
pass # No pattern selected yet; ignoring beat
|
|
||||||
else:
|
|
||||||
self.beat_index = (self.beat_index + 1) % 1000000
|
|
||||||
|
|
||||||
# Send periodic parameter updates every 8 beats
|
|
||||||
if self.beat_index % 8 == 0:
|
|
||||||
await self._send_full_parameters()
|
|
||||||
|
|
||||||
# Check for pending parameter updates (rate limited)
|
|
||||||
if self.pending_param_update:
|
|
||||||
import time
|
|
||||||
current_time = time.time()
|
|
||||||
if current_time - self.last_param_update >= self.param_update_interval:
|
|
||||||
self.last_param_update = current_time
|
|
||||||
self.pending_param_update = False
|
|
||||||
await self._send_full_parameters()
|
|
||||||
# logging.debug("[Rate Limit] Pending parameter update sent")
|
|
||||||
|
|
||||||
if self.current_pattern == "sequential_pulse":
|
|
||||||
# Sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored
|
|
||||||
await self._handle_sequential_pulse()
|
|
||||||
elif self.current_pattern == "alternating_phase":
|
|
||||||
# Alternating pattern with phase offset: every second bar is out of phase
|
|
||||||
await self._handle_alternating_phase()
|
|
||||||
elif self.current_pattern:
|
|
||||||
# Normal pattern mode - run on all bars
|
|
||||||
await self._send_normal_pattern()
|
|
||||||
except ValueError:
|
|
||||||
logging.warning(f"[MidiHandler - TCP Server] Received non-BPM message from {addr}, not forwarding: {message}") # Changed to warning
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"[MidiHandler - TCP Server] Error processing received message from {addr}: {e}") # Changed to error
|
|
||||||
else:
|
|
||||||
pass # Beat sending disabled
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logging.info(f"[MidiHandler - TCP Server] Client handler for {addr} cancelled.") # Changed to info
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"[MidiHandler - TCP Server] Error handling client {addr}: {e}") # Changed to error
|
|
||||||
finally:
|
|
||||||
logging.info(f"[MidiHandler - TCP Server] Closing connection for {addr}") # Changed to info
|
|
||||||
writer.close()
|
|
||||||
await writer.wait_closed()
|
|
||||||
|
|
||||||
async def _midi_tcp_server(self):
|
|
||||||
server = await asyncio.start_server(
|
|
||||||
lambda r, w: self._handle_tcp_client(r, w), self.tcp_host, self.tcp_port)
|
|
||||||
|
|
||||||
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
|
|
||||||
logging.info(f"[MidiHandler - TCP Server] Serving on {addrs}") # Changed to info
|
|
||||||
|
|
||||||
async with server:
|
|
||||||
await server.serve_forever()
|
|
||||||
|
|
||||||
async def _read_initial_cc_state(self, port, timeout_s: float = 0.5):
|
|
||||||
"""Read initial CC values from the MIDI device for a short period to populate state."""
|
|
||||||
start = time.time()
|
|
||||||
while time.time() - start < timeout_s:
|
|
||||||
msg = port.receive(block=False)
|
|
||||||
if msg and msg.type == 'control_change':
|
|
||||||
if msg.control == 36:
|
|
||||||
self.n3 = max(1, msg.value)
|
|
||||||
logging.info(f"[Init] n3 set to {self.n3} from CC36")
|
|
||||||
elif msg.control == 37:
|
|
||||||
self.delay = msg.value * 4
|
|
||||||
logging.info(f"[Init] Delay set to {self.delay} ms from CC37")
|
|
||||||
elif msg.control == 39:
|
|
||||||
self.delay = msg.value * 4
|
|
||||||
logging.info(f"[Init] Delay set to {self.delay} ms from CC39")
|
|
||||||
elif msg.control == 33:
|
|
||||||
self.brightness = round((msg.value / 127) * 100)
|
|
||||||
logging.info(f"[Init] Brightness set to {self.brightness} from CC33")
|
|
||||||
elif msg.control == 30:
|
|
||||||
self.color_r = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"[Init] Red set to {self.color_r} from CC30")
|
|
||||||
elif msg.control == 31:
|
|
||||||
self.color_g = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"[Init] Green set to {self.color_g} from CC31")
|
|
||||||
elif msg.control == 32:
|
|
||||||
self.color_b = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"[Init] Blue set to {self.color_b} from CC32")
|
|
||||||
elif msg.control == 34:
|
|
||||||
self.n1 = int(msg.value)
|
|
||||||
logging.info(f"[Init] n1 set to {self.n1} from CC34")
|
|
||||||
elif msg.control == 35:
|
|
||||||
self.n2 = int(msg.value)
|
|
||||||
logging.info(f"[Init] n2 set to {self.n2} from CC35")
|
|
||||||
elif msg.control == 27:
|
|
||||||
self.beat_sending_enabled = (msg.value == 127)
|
|
||||||
logging.info(f"[Init] Beat sending {'ENABLED' if self.beat_sending_enabled else 'DISABLED'} from CC27")
|
|
||||||
await asyncio.sleep(0.001)
|
|
||||||
|
|
||||||
async def _midi_listener(self):
|
|
||||||
logging.info("Midi function") # Changed to info
|
|
||||||
"""
|
|
||||||
Listens to a specific MIDI port and sends data to a WebSocket server
|
|
||||||
when Note 32 (and 33) is pressed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# 1. Get MIDI port name
|
|
||||||
port_names = mido.get_input_names()
|
|
||||||
if not port_names:
|
|
||||||
logging.warning("No MIDI input ports found. Please connect your device.") # Changed to warning
|
|
||||||
return
|
|
||||||
if not (0 <= self.midi_port_index < len(port_names)):
|
|
||||||
logging.error(f"Error: MIDI port index {self.midi_port_index} out of range. Available ports: {port_names}") # Changed to error
|
|
||||||
logging.info("Available ports:") # Changed to info
|
|
||||||
for i, name in enumerate(port_names):
|
|
||||||
logging.info(f" {i}: {name}") # Changed to info
|
|
||||||
return
|
|
||||||
|
|
||||||
midi_port_name = port_names[self.midi_port_index]
|
|
||||||
logging.info(f"Selected MIDI input port: {midi_port_name}") # Changed to info
|
|
||||||
|
|
||||||
try:
|
|
||||||
with mido.open_input(midi_port_name) as port:
|
|
||||||
logging.info(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.") # Changed to info
|
|
||||||
# Read initial controller state briefly
|
|
||||||
await self._read_initial_cc_state(port)
|
|
||||||
while True:
|
|
||||||
msg = port.receive(block=False) # Non-blocking read
|
|
||||||
if msg:
|
|
||||||
# logging.debug(msg) # Changed to debug
|
|
||||||
match msg.type:
|
|
||||||
case 'note_on':
|
|
||||||
# logging.debug(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}") # Changed to debug
|
|
||||||
# Bank1 patterns starting at MIDI note 36
|
|
||||||
pattern_bindings: list[str] = [
|
|
||||||
# Pulse patterns (row 1)
|
|
||||||
"pulse",
|
|
||||||
"sequential_pulse",
|
|
||||||
# Alternating patterns (row 2)
|
|
||||||
"alternating",
|
|
||||||
"alternating_phase",
|
|
||||||
# Chase/movement patterns (row 3)
|
|
||||||
"n_chase",
|
|
||||||
"rainbow",
|
|
||||||
# Effect patterns (row 4)
|
|
||||||
"flicker",
|
|
||||||
"radiate",
|
|
||||||
]
|
|
||||||
idx = msg.note - 36
|
|
||||||
if 0 <= idx < len(pattern_bindings):
|
|
||||||
pattern_name = pattern_bindings[idx]
|
|
||||||
self.current_pattern = pattern_name
|
|
||||||
logging.info(f"[Select] Pattern selected via note {msg.note}: {self.current_pattern} (n1={self.n1}, n2={self.n2})")
|
|
||||||
|
|
||||||
# Send full parameters when pattern changes
|
|
||||||
await self._send_full_parameters()
|
|
||||||
else:
|
|
||||||
pass # Note not bound to patterns
|
|
||||||
|
|
||||||
case 'control_change':
|
|
||||||
match msg.control:
|
|
||||||
case 36:
|
|
||||||
self.n3 = max(1, msg.value) # Update n3 step rate
|
|
||||||
logging.info(f"n3 set to {self.n3} by MIDI controller (CC36)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 37:
|
|
||||||
self.delay = msg.value * 4 # Update instance delay
|
|
||||||
logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC37)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 38:
|
|
||||||
self.n1 = msg.value # pulse n1 for pulse patterns
|
|
||||||
logging.info(f"Pulse n1 set to {self.n1} by MIDI controller (CC38)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 39:
|
|
||||||
self.n2 = msg.value # pulse n2 for pulse patterns
|
|
||||||
logging.info(f"Pulse n2 set to {self.n2} by MIDI controller (CC39)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 40:
|
|
||||||
self.n1 = msg.value # n1 for alternating patterns
|
|
||||||
logging.info(f"Alternating n1 set to {self.n1} by MIDI controller (CC40)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 41:
|
|
||||||
self.n2 = msg.value # n2 for alternating patterns
|
|
||||||
logging.info(f"Alternating n2 set to {self.n2} by MIDI controller (CC41)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 42:
|
|
||||||
self.n1 = msg.value # radiate n1 for radiate patterns
|
|
||||||
logging.info(f"Radiate n1 set to {self.n1} by MIDI controller (CC42)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 43:
|
|
||||||
self.delay = msg.value * 4 # delay for radiate patterns
|
|
||||||
logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC43)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 44:
|
|
||||||
self.knob7 = msg.value
|
|
||||||
logging.info(f"Knob7 set to {self.knob7} by MIDI controller (CC44)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 45:
|
|
||||||
self.knob8 = msg.value
|
|
||||||
logging.info(f"Knob8 set to {self.knob8} by MIDI controller (CC45)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 27:
|
|
||||||
if msg.value == 127:
|
|
||||||
self.beat_sending_enabled = True
|
|
||||||
logging.info("[MidiHandler - Listener] Beat sending ENABLED by MIDI control.") # Changed to info
|
|
||||||
elif msg.value == 0:
|
|
||||||
self.beat_sending_enabled = False
|
|
||||||
logging.info("[MidiHandler - Listener] Beat sending DISABLED by MIDI control.") # Changed to info
|
|
||||||
case 29:
|
|
||||||
if msg.value == 127:
|
|
||||||
logging.info("[MidiHandler - Listener] RESET_TEMPO requested by control 29.")
|
|
||||||
await self._send_reset_to_sound()
|
|
||||||
case 33:
|
|
||||||
# Map 0-127 to 0-100 brightness scale
|
|
||||||
self.brightness = round((msg.value / 127) * 100)
|
|
||||||
logging.info(f"Brightness set to {self.brightness} by MIDI controller (CC33)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 30:
|
|
||||||
# Red 0-127 -> 0-255
|
|
||||||
self.color_r = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"Red set to {self.color_r}")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 31:
|
|
||||||
# Green 0-127 -> 0-255
|
|
||||||
self.color_g = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"Green set to {self.color_g}")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 32:
|
|
||||||
# Blue 0-127 -> 0-255
|
|
||||||
self.color_b = round((msg.value / 127) * 255)
|
|
||||||
logging.info(f"Blue set to {self.color_b}")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 34:
|
|
||||||
self.n1 = int(msg.value)
|
|
||||||
logging.info(f"n1 set to {self.n1} by MIDI controller (CC34)")
|
|
||||||
await self._request_param_update()
|
|
||||||
case 35:
|
|
||||||
self.n2 = int(msg.value)
|
|
||||||
logging.info(f"n2 set to {self.n2} by MIDI controller (CC35)")
|
|
||||||
await self._request_param_update()
|
|
||||||
|
|
||||||
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
|
|
||||||
|
|
||||||
except mido.PortsError as e:
|
|
||||||
logging.error(f"Error opening MIDI port '{midi_port_name}': {e}") # Changed to error
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logging.info(f"MIDI listener cancelled.") # Changed to info
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"An unexpected error occurred in MIDI listener: {e}") # Changed to error
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
try:
|
|
||||||
await self.ws_client.connect()
|
|
||||||
logging.info(f"[MidiHandler] WebSocket client connected to {self.ws_client.uri}") # Changed to info
|
|
||||||
|
|
||||||
# List available MIDI ports for debugging
|
|
||||||
print(f"Available MIDI input ports: {mido.get_input_names()}")
|
|
||||||
print(f"Trying to open MIDI port index {self.midi_port_index}")
|
|
||||||
|
|
||||||
await asyncio.gather(
|
|
||||||
self._midi_listener(),
|
|
||||||
self._midi_tcp_server()
|
|
||||||
)
|
|
||||||
except mido.PortsError as e:
|
|
||||||
logging.error(f"[MidiHandler] Error opening MIDI port: {e}") # Changed to error
|
|
||||||
print(f"MIDI Port Error: {e}")
|
|
||||||
print(f"Available MIDI ports: {mido.get_input_names()}")
|
|
||||||
print("Please check your MIDI device connection and port index")
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logging.info("[MidiHandler] Tasks cancelled due to program shutdown.") # Changed to info
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logging.info("\n[MidiHandler] Program interrupted by user.") # Changed to info
|
|
||||||
finally:
|
|
||||||
logging.info("[MidiHandler] Main program finished. Closing WebSocket client...") # Changed to info
|
|
||||||
await self.ws_client.close()
|
|
||||||
logging.info("[MidiHandler] WebSocket client closed.") # Changed to info
|
|
||||||
|
|
||||||
def print_midi_ports():
|
|
||||||
logging.info("\n--- Available MIDI Input Ports ---") # Changed to info
|
|
||||||
port_names = mido.get_input_names()
|
port_names = mido.get_input_names()
|
||||||
if not port_names:
|
if not port_names:
|
||||||
logging.warning("No MIDI input ports found.") # Changed to warning
|
print("No MIDI input ports found. Please connect your device.")
|
||||||
else:
|
return
|
||||||
|
if not (0 <= midi_port_index < len(port_names)):
|
||||||
|
print(f"Error: MIDI port index {midi_port_index} out of range. Available ports: {port_names}")
|
||||||
|
print("Available ports:")
|
||||||
for i, name in enumerate(port_names):
|
for i, name in enumerate(port_names):
|
||||||
logging.info(f" {i}: {name}") # Changed to info
|
print(f" {i}: {name}")
|
||||||
logging.info("----------------------------------") # Changed to info
|
return
|
||||||
|
|
||||||
|
midi_port_name = port_names[midi_port_index]
|
||||||
|
print(f"Selected MIDI input port: {midi_port_name}")
|
||||||
|
|
||||||
|
# 2. Initialize WebSocket client (using your actual networking.py)
|
||||||
|
ws_client = networking.WebSocketClient(websocket_uri)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 3. Connect WebSocket
|
||||||
|
await ws_client.connect()
|
||||||
|
print(f"WebSocket client connected to {ws_client.uri}")
|
||||||
|
|
||||||
|
# 4. Open MIDI port and start listening loop
|
||||||
|
with mido.open_input(midi_port_name) as port:
|
||||||
|
print(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.")
|
||||||
|
while True:
|
||||||
|
msg = port.receive(block=False) # Non-blocking read
|
||||||
|
if msg:
|
||||||
|
match msg.type:
|
||||||
|
case 'note_on':
|
||||||
|
print(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}")
|
||||||
|
match msg.note:
|
||||||
|
case 32:
|
||||||
|
await ws_client.send_data({
|
||||||
|
"names": ["1"],
|
||||||
|
"settings": {
|
||||||
|
"pattern": "pulse",
|
||||||
|
"delay": delay,
|
||||||
|
"colors": ["#00ff00"],
|
||||||
|
"brightness": 100,
|
||||||
|
"num_leds": 200,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
case 33:
|
||||||
|
await ws_client.send_data({
|
||||||
|
"names": ["2"],
|
||||||
|
"settings": {
|
||||||
|
"pattern": "chase",
|
||||||
|
"speed": 10,
|
||||||
|
"color": "#00FFFF",
|
||||||
|
}
|
||||||
|
})
|
||||||
|
case 'control_change':
|
||||||
|
match msg.control:
|
||||||
|
case 36:
|
||||||
|
|
||||||
|
delay = msg.value * 4
|
||||||
|
print(f"Delay set to {delay} ms")
|
||||||
|
|
||||||
|
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
|
||||||
|
|
||||||
|
except mido.PortsError as e:
|
||||||
|
print(f"Error opening MIDI port '{midi_port_name}': {e}")
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
print(f"MIDI listener cancelled.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An unexpected error occurred: {e}")
|
||||||
|
finally:
|
||||||
|
# 5. Disconnect WebSocket and clean up
|
||||||
|
# This assumes your WebSocketClient has a ._connected attribute or similar way to check state.
|
||||||
|
# If your client's disconnect method is safe to call even if not connected, you can simplify.
|
||||||
|
await ws_client.close()
|
||||||
|
print("MIDI listener stopped and cleaned up.")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
print_midi_ports()
|
|
||||||
# --- Configuration ---
|
# --- Configuration ---
|
||||||
MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device
|
MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device
|
||||||
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
|
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
|
||||||
# --- End Configuration ---
|
# --- End Configuration ---
|
||||||
|
|
||||||
midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
|
try:
|
||||||
await midi_handler.run()
|
await midi_to_websocket_listener(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nProgram interrupted by user.")
|
||||||
|
finally:
|
||||||
|
print("Main program finished.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
27
src/settings.py
Normal file
27
src/settings.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
class Settings(dict):
|
||||||
|
SETTINGS_FILE = "settings.json"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.load() # Load settings from file during initialization
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
try:
|
||||||
|
j = json.dumps(self, indent=4)
|
||||||
|
with open(self.SETTINGS_FILE, 'w') as file:
|
||||||
|
file.write(j)
|
||||||
|
print("Settings saved successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error saving settings: {e}")
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
try:
|
||||||
|
with open(self.SETTINGS_FILE, 'r') as file:
|
||||||
|
loaded_settings = json.load(file)
|
||||||
|
self.update(loaded_settings)
|
||||||
|
print("Settings loaded successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading settings {e}")
|
||||||
|
self.save()
|
278
src/sound.py
278
src/sound.py
@@ -4,207 +4,121 @@ import pyaudio
|
|||||||
import aubio
|
import aubio
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
import websocket # pip install websocket-client
|
||||||
import json
|
import json
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
import logging # Added logging import
|
|
||||||
import asyncio # Re-added asyncio import
|
|
||||||
import threading # Added threading for control server
|
|
||||||
|
|
||||||
# Configure logging
|
seconds = 10 # how long this script should run (if not using infinite loop)
|
||||||
DEBUG_MODE = True # Set to False for INFO level logging
|
|
||||||
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
|
|
||||||
# TCP Server Configuration (assuming midi.py runs this)
|
bufferSize = 512
|
||||||
MIDI_TCP_HOST = "127.0.0.1"
|
windowSizeMultiple = 2 # or 4 for higher accuracy, but more computational cost
|
||||||
MIDI_TCP_PORT = 65432
|
|
||||||
|
|
||||||
# Sound Control Server Configuration (for midi.py to control sound.py)
|
audioInputDeviceIndex = 7 # use 'arecord -l' to check available audio devices
|
||||||
SOUND_CONTROL_HOST = "127.0.0.1"
|
audioInputChannels = 1
|
||||||
SOUND_CONTROL_PORT = 65433
|
|
||||||
|
|
||||||
class SoundBeatDetector:
|
pa = pyaudio.PyAudio()
|
||||||
def __init__(self, tcp_host: str, tcp_port: int):
|
|
||||||
self.tcp_host = tcp_host
|
|
||||||
self.tcp_port = tcp_port
|
|
||||||
self.tcp_socket = None
|
|
||||||
self.connected_to_midi = False
|
|
||||||
self.reconnect_delay = 1 # seconds
|
|
||||||
# Note: beat_sending_enabled is not used in this simplified flow
|
|
||||||
|
|
||||||
self.bufferSize = 512
|
print("Available audio input devices:")
|
||||||
self.windowSizeMultiple = 2
|
info = pa.get_host_api_info_by_index(0)
|
||||||
self.audioInputDeviceIndex = 7
|
num_devices = info.get('deviceCount')
|
||||||
self.audioInputChannels = 1
|
found_device = False
|
||||||
|
for i in range(0, num_devices):
|
||||||
|
device_info = pa.get_device_info_by_host_api_device_index(0, i)
|
||||||
|
if (device_info.get('maxInputChannels')) > 0:
|
||||||
|
print(f" Input Device id {i} - {device_info.get('name')}")
|
||||||
|
if i == audioInputDeviceIndex:
|
||||||
|
found_device = True
|
||||||
|
|
||||||
self.pa = pyaudio.PyAudio()
|
if not found_device:
|
||||||
|
print(f"Warning: Audio input device index {audioInputDeviceIndex} not found or has no input channels.")
|
||||||
|
# Consider exiting or picking a default if necessary
|
||||||
|
|
||||||
logging.info("Available audio input devices:")
|
try:
|
||||||
info = self.pa.get_host_api_info_by_index(0)
|
audioInputDevice = pa.get_device_info_by_index(audioInputDeviceIndex)
|
||||||
num_devices = info.get('deviceCount')
|
audioInputSampleRate = int(audioInputDevice['defaultSampleRate'])
|
||||||
found_device = False
|
except Exception as e:
|
||||||
for i in range(0, num_devices):
|
print(f"Error getting audio device info for index {audioInputDeviceIndex}: {e}")
|
||||||
device_info = self.pa.get_device_info_by_host_api_device_index(0, i)
|
pa.terminate()
|
||||||
if (device_info.get('maxInputChannels')) > 0:
|
exit()
|
||||||
logging.info(f" Input Device id {i} - {device_info.get('name')}")
|
|
||||||
if i == self.audioInputDeviceIndex:
|
|
||||||
found_device = True
|
|
||||||
|
|
||||||
if not found_device:
|
# create the aubio tempo detection:
|
||||||
logging.warning(f"Audio input device index {self.audioInputDeviceIndex} not found or has no input channels.")
|
hopSize = bufferSize
|
||||||
|
winSize = hopSize * windowSizeMultiple
|
||||||
|
tempoDetection = aubio.tempo(method='default', buf_size=winSize, hop_size=hopSize, samplerate=audioInputSampleRate)
|
||||||
|
|
||||||
try:
|
# --- WebSocket Setup ---
|
||||||
audioInputDevice = self.pa.get_device_info_by_index(self.audioInputDeviceIndex)
|
websocket_url = "ws://192.168.4.1:80/ws"
|
||||||
self.audioInputSampleRate = int(audioInputDevice['defaultSampleRate'])
|
ws = None
|
||||||
except Exception as e:
|
try:
|
||||||
logging.error(f"Error getting audio device info for index {self.audioInputDeviceIndex}: {e}")
|
ws = websocket.create_connection(websocket_url)
|
||||||
self.pa.terminate()
|
print(f"Successfully connected to WebSocket at {websocket_url}")
|
||||||
exit()
|
except Exception as e:
|
||||||
|
print(f"Failed to connect to WebSocket: {e}. Data will not be sent over WebSocket.")
|
||||||
|
# --- End WebSocket Setup ---
|
||||||
|
|
||||||
self.hopSize = self.bufferSize
|
# this function gets called by the input stream, as soon as enough samples are collected from the audio input:
|
||||||
self.winSize = self.hopSize * self.windowSizeMultiple
|
def readAudioFrames(in_data, frame_count, time_info, status):
|
||||||
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
|
global ws # Allow modification of the global ws variable
|
||||||
|
|
||||||
self.inputStream = None
|
signal = np.frombuffer(in_data, dtype=np.float32)
|
||||||
self._control_thread = None
|
|
||||||
|
|
||||||
self._connect_to_midi_server()
|
beat = tempoDetection(signal)
|
||||||
self._start_control_server() # Start control server in background
|
if beat:
|
||||||
|
bpm = tempoDetection.get_bpm()
|
||||||
|
print(f"beat! (running with {bpm:.2f} bpm)") # Use f-string for cleaner formatting, removed extra bells
|
||||||
|
data_to_send = {
|
||||||
|
"names": ["1"],
|
||||||
|
"settings": {
|
||||||
|
"pattern": "pulse",
|
||||||
|
"delay": 10,
|
||||||
|
"colors": ["#00ff00"],
|
||||||
|
"brightness": 10,
|
||||||
|
"num_leds": 200,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
def reset_tempo_detection(self):
|
if ws: # Only send if the websocket connection is established
|
||||||
"""Re-initializes the aubio tempo detection object."""
|
|
||||||
logging.info("[SoundBeatDetector] Resetting tempo detection.")
|
|
||||||
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
|
|
||||||
|
|
||||||
def _control_server_loop(self):
|
|
||||||
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
try:
|
|
||||||
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
srv.bind((SOUND_CONTROL_HOST, SOUND_CONTROL_PORT))
|
|
||||||
srv.listen(5)
|
|
||||||
logging.info(f"[SoundBeatDetector - Control] Listening on {SOUND_CONTROL_HOST}:{SOUND_CONTROL_PORT}")
|
|
||||||
while True:
|
|
||||||
conn, addr = srv.accept()
|
|
||||||
with conn:
|
|
||||||
logging.info(f"[SoundBeatDetector - Control] Connection from {addr}")
|
|
||||||
try:
|
|
||||||
data = conn.recv(1024)
|
|
||||||
if not data:
|
|
||||||
continue
|
|
||||||
command = data.decode().strip()
|
|
||||||
logging.debug(f"[SoundBeatDetector - Control] Received command: {command}")
|
|
||||||
if command == "RESET_TEMPO":
|
|
||||||
self.reset_tempo_detection()
|
|
||||||
response = "OK: Tempo reset\n"
|
|
||||||
else:
|
|
||||||
response = "ERROR: Unknown command\n"
|
|
||||||
conn.sendall(response.encode('utf-8'))
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"[SoundBeatDetector - Control] Error handling control message: {e}")
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"[SoundBeatDetector - Control] Server error: {e}")
|
|
||||||
finally:
|
|
||||||
try:
|
try:
|
||||||
srv.close()
|
ws.send(json.dumps(data_to_send))
|
||||||
except Exception:
|
# print("Sent data over WebSocket") # Optional: for debugging
|
||||||
pass
|
except websocket.WebSocketConnectionClosedException:
|
||||||
|
print("WebSocket connection closed, attempting to reconnect...")
|
||||||
|
ws = None # Mark as closed, connection will need to be re-established if desired
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error sending over WebSocket: {e}")
|
||||||
|
|
||||||
def _start_control_server(self):
|
return (in_data, pyaudio.paContinue)
|
||||||
if self._control_thread and self._control_thread.is_alive():
|
|
||||||
return
|
|
||||||
self._control_thread = threading.Thread(target=self._control_server_loop, daemon=True)
|
|
||||||
self._control_thread.start()
|
|
||||||
|
|
||||||
def _connect_to_midi_server(self):
|
|
||||||
if self.tcp_socket:
|
|
||||||
self.tcp_socket.close()
|
|
||||||
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.tcp_socket.settimeout(self.reconnect_delay)
|
|
||||||
try:
|
|
||||||
logging.info(f"[SoundBeatDetector] Attempting to connect to MIDI TCP server at {self.tcp_host}:{self.tcp_port}...")
|
|
||||||
self.tcp_socket.connect((self.tcp_host, self.tcp_port))
|
|
||||||
self.tcp_socket.setblocking(0)
|
|
||||||
self.connected_to_midi = True
|
|
||||||
logging.info(f"[SoundBeatDetector] Successfully connected to MIDI TCP server.")
|
|
||||||
except (socket.error, socket.timeout) as e:
|
|
||||||
logging.error(f"[SoundBeatDetector] Failed to connect to MIDI TCP server: {e}")
|
|
||||||
self.connected_to_midi = False
|
|
||||||
|
|
||||||
# Removed _handle_control_client and _control_server (replaced by simple threaded server)
|
# create and start the input stream
|
||||||
|
try:
|
||||||
|
inputStream = pa.open(format=pyaudio.paFloat32,
|
||||||
|
input=True,
|
||||||
|
channels=audioInputChannels,
|
||||||
|
input_device_index=audioInputDeviceIndex,
|
||||||
|
frames_per_buffer=bufferSize,
|
||||||
|
rate=audioInputSampleRate,
|
||||||
|
stream_callback=readAudioFrames)
|
||||||
|
|
||||||
def readAudioFrames(self, in_data, frame_count, time_info, status):
|
inputStream.start_stream()
|
||||||
signal = np.frombuffer(in_data, dtype=np.float32)
|
print("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.")
|
||||||
|
|
||||||
beat = self.tempoDetection(signal)
|
# Loop to keep the script running, allowing graceful shutdown
|
||||||
if beat:
|
while inputStream.is_active():
|
||||||
bpm = self.tempoDetection.get_bpm()
|
sleep(0.1) # Small delay to prevent busy-waiting
|
||||||
logging.debug(f"beat! (running with {bpm:.2f} bpm)") # Changed to debug
|
|
||||||
bpm_message = str(bpm)
|
|
||||||
|
|
||||||
if self.connected_to_midi and self.tcp_socket:
|
except KeyboardInterrupt:
|
||||||
try:
|
print("\nKeyboardInterrupt: Stopping script gracefully.")
|
||||||
message_bytes = (bpm_message + "\n").encode('utf-8')
|
except Exception as e:
|
||||||
self.tcp_socket.sendall(message_bytes)
|
print(f"An error occurred with the audio stream: {e}")
|
||||||
logging.debug(f"[SoundBeatDetector] Sent BPM to MIDI TCP server: {bpm_message}") # Changed to debug
|
finally:
|
||||||
except socket.error as e:
|
# Ensure streams and resources are closed
|
||||||
logging.error(f"[SoundBeatDetector] Error sending BPM to MIDI TCP server: {e}. Attempting to reconnect...")
|
if 'inputStream' in locals() and inputStream.is_active():
|
||||||
self.connected_to_midi = False
|
inputStream.stop_stream()
|
||||||
self._connect_to_midi_server()
|
if 'inputStream' in locals() and not inputStream.is_stopped():
|
||||||
elif not self.connected_to_midi:
|
inputStream.close()
|
||||||
logging.warning("[SoundBeatDetector] Not connected to MIDI TCP server, attempting to reconnect...") # Changed to warning
|
pa.terminate()
|
||||||
self._connect_to_midi_server()
|
if ws:
|
||||||
else:
|
print("Closing WebSocket connection.")
|
||||||
logging.warning("[SoundBeatDetector] TCP socket not initialized, cannot send BPM.") # Changed to warning
|
ws.close()
|
||||||
|
|
||||||
return (in_data, pyaudio.paContinue)
|
print("Script finished.")
|
||||||
|
|
||||||
def start_stream(self):
|
|
||||||
try:
|
|
||||||
self.inputStream = self.pa.open(format=pyaudio.paFloat32,
|
|
||||||
input=True,
|
|
||||||
channels=self.audioInputChannels,
|
|
||||||
input_device_index=self.audioInputDeviceIndex,
|
|
||||||
frames_per_buffer=self.bufferSize,
|
|
||||||
rate=self.audioInputSampleRate,
|
|
||||||
stream_callback=self.readAudioFrames)
|
|
||||||
|
|
||||||
self.inputStream.start_stream()
|
|
||||||
logging.info("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.")
|
|
||||||
|
|
||||||
while self.inputStream.is_active():
|
|
||||||
sleep(0.1)
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logging.info("\nKeyboardInterrupt: Stopping script gracefully.")
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"An error occurred with the audio stream: {e}")
|
|
||||||
finally:
|
|
||||||
self.stop_stream()
|
|
||||||
|
|
||||||
def stop_stream(self):
|
|
||||||
if self.inputStream and self.inputStream.is_active():
|
|
||||||
self.inputStream.stop_stream()
|
|
||||||
if self.inputStream and not self.inputStream.is_stopped():
|
|
||||||
self.inputStream.close()
|
|
||||||
self.pa.terminate()
|
|
||||||
if self.tcp_socket and self.connected_to_midi:
|
|
||||||
logging.info("[SoundBeatDetector] Closing TCP socket.")
|
|
||||||
self.tcp_socket.close()
|
|
||||||
self.connected_to_midi = False
|
|
||||||
logging.info("SoundBeatDetector stopped.")
|
|
||||||
|
|
||||||
# Removed async def run(self)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# TCP Server Configuration (should match midi.py)
|
|
||||||
MIDI_TCP_HOST = "127.0.0.1"
|
|
||||||
MIDI_TCP_PORT = 65432
|
|
||||||
|
|
||||||
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT)
|
|
||||||
logging.info("Starting SoundBeatDetector...")
|
|
||||||
try:
|
|
||||||
sound_detector.start_stream()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logging.info("\nProgram interrupted by user.")
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"An error occurred during main execution: {e}")
|
|
Reference in New Issue
Block a user