2 Commits

5 changed files with 519 additions and 824 deletions

View File

@@ -7,12 +7,16 @@ name = "pypi"
websockets = "*"
watchfiles = "*"
async-tkinter-loop = "*"
mido = "*"
python-rtmidi = "*"
pyaudio = "*"
aubio = "*"
websocket-client = "*"
[dev-packages]
[requires]
python_version = "3"
python_version = "3.12"
[scripts]
main = "python main.py"

View File

@@ -7,91 +7,13 @@
],
"settings": {
"colors": [
"#968a00"
"#0000ff",
"#c30074",
"#00ff00"
],
"brightness": 21,
"brightness": 9,
"pattern": "off",
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"patterns": {
"pulse": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"n_chase": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"on": {
"colors": [
"#968a00"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"rainbow": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"off": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"blink": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
}
}
"delay": 50
}
},
"dj": {
@@ -101,104 +23,13 @@
"settings": {
"colors": [
"#0000ff",
"#ff0000"
"#c30074",
"#00ff00",
"#000000"
],
"brightness": 73,
"pattern": "transition",
"delay": 10000,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"patterns": {
"rainbow": {
"colors": [
"#00006a"
],
"delay": 17,
"n1": 1,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"on": {
"colors": [
"#ffff00",
"#0000ff"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"blink": {
"colors": [
"#0000d0"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"pulse": {
"delay": 1002,
"colors": [
"#006600",
"#0000ff"
],
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"transition": {
"colors": [
"#0000ff",
"#ff0000"
],
"delay": 10000,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10
},
"n_chase": {
"n1": 11,
"n2": 11,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"delay": 99,
"colors": [
"#0000ff"
]
},
"off": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10
}
}
"brightness": 6,
"pattern": "flicker",
"delay": 520
}
},
"middle": {
@@ -217,19 +48,7 @@
],
"brightness": 6,
"pattern": "flicker",
"delay": 520,
"patterns": {
"flicker": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10
}
}
"delay": 520
}
},
"sides": {
@@ -246,19 +65,7 @@
],
"brightness": 6,
"pattern": "on",
"delay": 520,
"patterns": {
"on": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10
}
}
"delay": 520
}
},
"outside": {
@@ -273,36 +80,8 @@
"#000000"
],
"brightness": 6,
"pattern": "transition",
"delay": 520,
"n1": -17,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"patterns": {
"on": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10
},
"transition": {
"colors": [
"#000000"
],
"delay": 99,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10
}
}
"pattern": "on",
"delay": 520
}
},
"middle1": {
@@ -318,16 +97,7 @@
],
"brightness": 6,
"pattern": "flicker",
"delay": 520,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"patterns": {
"flicker": {}
}
"delay": 520
}
},
"middle2": {
@@ -343,10 +113,7 @@
],
"brightness": 6,
"pattern": "flicker",
"delay": 520,
"patterns": {
"flicker": {}
}
"delay": 520
}
},
"middle3": {
@@ -362,10 +129,7 @@
],
"brightness": 6,
"pattern": "flicker",
"delay": 520,
"patterns": {
"flicker": {}
}
"delay": 520
}
},
"middle4": {
@@ -381,19 +145,18 @@
],
"brightness": 6,
"pattern": "flicker",
"delay": 520,
"patterns": {
"flicker": {}
}
"delay": 520
}
}
},
"patterns": [
"on",
"off",
"rainbow",
"transition",
"n_chase",
"blink",
"rainbow_cycle",
"color_transition",
"theater_chase",
"flicker",
"pulse"
]
}

View File

@@ -7,7 +7,6 @@ from networking import WebSocketClient
import color_utils
from settings import Settings
import time
import math
# Dark theme colors (unchanged)
bg_color = "#2e2e2e"
@@ -23,26 +22,6 @@ highlight_pattern_color = "#6a5acd"
active_palette_color_border = "#FFD700" # Gold color
def delay_to_slider(delay_ms):
"""Convert delay in ms (10-10000) to slider position (0-1000) using logarithmic scale."""
if delay_ms <= 10:
return 0
if delay_ms >= 10000:
return 1000
# Logarithmic conversion: delay = 10 * (10000/10) ^ (position/1000)
# Solving for position: position = 1000 * log(delay/10) / log(1000)
return 1000 * math.log(delay_ms / 10) / math.log(1000)
def slider_to_delay(slider_value):
"""Convert slider position (0-1000) to delay in ms (10-10000) using logarithmic scale."""
if slider_value <= 0:
return 10
if slider_value >= 1000:
return 10000
# Logarithmic conversion: delay = 10 * 1000 ^ (position/1000)
return int(10 * (1000 ** (slider_value / 1000)))
class App:
def __init__(self) -> None:
self.settings = Settings()
@@ -63,10 +42,6 @@ class App:
self.delay_update_interval_ms = 100
self.pending_delay_update_id = None
self.last_n_params_update_time = 0
self.n_params_update_interval_ms = 100
self.pending_n_params_update_id = None
# --- WebSocketClient ---
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
self.root.after(100, async_handler(self.websocket_client.connect))
@@ -91,45 +66,23 @@ class App:
self.notebook.bind("<<NotebookTabChanged>>", self.on_tab_change)
# Add Reload Config Button (unchanged)
reload_button = tk.Button(
self.root,
text="Reload Config",
command=self.reload_config,
bg=active_bg_color,
fg=fg_color,
font=("Arial", 20),
padx=20,
pady=10,
relief=tk.FLAT,
)
reload_button.pack(side=tk.BOTTOM, pady=20)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
async_mainloop(self.root)
def get_pattern_settings(self, tab_name, pattern_name):
"""Get pattern-specific settings (colors, delay, n params). Returns defaults if not found."""
light_settings = self.settings["lights"][tab_name]["settings"]
if "patterns" not in light_settings:
light_settings["patterns"] = {}
if pattern_name not in light_settings["patterns"]:
light_settings["patterns"][pattern_name] = {}
pattern_settings = light_settings["patterns"][pattern_name]
return {
"colors": pattern_settings.get("colors", ["#000000"]),
"delay": pattern_settings.get("delay", 100),
"n1": pattern_settings.get("n1", 10),
"n2": pattern_settings.get("n2", 10),
"n3": pattern_settings.get("n3", 10),
"n4": pattern_settings.get("n4", 10),
}
def save_pattern_settings(self, tab_name, pattern_name, colors=None, delay=None, n_params=None):
"""Save pattern-specific settings."""
light_settings = self.settings["lights"][tab_name]["settings"]
if "patterns" not in light_settings:
light_settings["patterns"] = {}
if pattern_name not in light_settings["patterns"]:
light_settings["patterns"][pattern_name] = {}
pattern_settings = light_settings["patterns"][pattern_name]
if colors is not None:
pattern_settings["colors"] = colors
if delay is not None:
pattern_settings["delay"] = delay
if n_params is not None:
for i in range(1, 5):
if f"n{i}" in n_params:
pattern_settings[f"n{i}"] = n_params[f"n{i}"]
def on_closing(self):
print("Closing application...")
@@ -148,22 +101,15 @@ class App:
self.tabs[key] = tab
def create_light_control_widgets(self, tab, tab_name, ids, initial_settings):
slider_length = 600
slider_length = 800
slider_width = 50
# Get initial pattern and load pattern-specific settings
initial_pattern = initial_settings.get("pattern", "on")
initial_brightness = initial_settings.get("brightness", 127) # Global brightness
pattern_settings = self.get_pattern_settings(tab_name, initial_pattern)
# Extract initial color, delay, and n params from pattern-specific settings
initial_colors = pattern_settings["colors"]
# Extract initial color, brightness, and delay
initial_colors = initial_settings.get("colors", ["#000000"])
initial_hex_color = initial_colors[0] if initial_colors else "#000000"
initial_delay = pattern_settings["delay"]
initial_n1 = pattern_settings["n1"]
initial_n2 = pattern_settings["n2"]
initial_n3 = pattern_settings["n3"]
initial_n4 = pattern_settings["n4"]
initial_brightness = initial_settings.get("brightness", 127)
initial_delay = initial_settings.get("delay", 0)
initial_pattern = initial_settings.get("pattern", "on")
initial_r, initial_g, initial_b = color_utils.hex_to_rgb(initial_hex_color)
@@ -171,13 +117,9 @@ class App:
main_tab_frame = tk.Frame(tab, bg=bg_color)
main_tab_frame.pack(expand=True, fill="both", padx=10, pady=10)
# Left panel container for sliders and n inputs
left_panel_container = tk.Frame(main_tab_frame, bg=bg_color)
left_panel_container.pack(side=tk.LEFT, padx=10, pady=10)
# Slider panel
slider_panel_frame = tk.Frame(left_panel_container, bg=bg_color)
slider_panel_frame.pack(side=tk.TOP, padx=10, pady=10)
# Left panel for sliders
slider_panel_frame = tk.Frame(main_tab_frame, bg=bg_color)
slider_panel_frame.pack(side=tk.LEFT, padx=10, pady=10)
# Common slider configuration
slider_config = {
@@ -224,142 +166,22 @@ class App:
brightness_slider.bind("<B1-Motion>", lambda _: self.schedule_update_brightness(tab))
brightness_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_brightness(tab, force_send=True))
# Delay Slider with logarithmic scale (10ms to 10000ms)
# Delay Slider
delay_slider_config = slider_config.copy()
delay_slider_config.update(
{
"from_": 1000, # Slider range 0-1000 (will be converted to 10-10000ms logarithmically)
"from_": 1000,
"to": 0,
"resolution": 1,
"resolution": 10,
"label": "Delay (ms)",
"showvalue": False, # Hide default value, we'll show logarithmic value in custom label
"troughcolor": trough_color_delay,
}
)
# Create a frame to contain the slider and its value label (to make them appear as one unit)
delay_container = tk.Frame(slider_panel_frame, bg=bg_color)
delay_container.pack(side=tk.LEFT, padx=10)
delay_slider = tk.Scale(delay_container, **delay_slider_config)
# Convert initial delay to slider position using logarithmic scale
initial_slider_pos = delay_to_slider(initial_delay)
delay_slider.set(initial_slider_pos)
# Create a custom label to show the actual delay value, positioned like the default Scale value
delay_value_label = tk.Label(delay_container, text=f"{initial_delay}", font=("Arial", 12), bg=bg_color, fg=fg_color, width=5, anchor="e")
delay_value_label.pack(side=tk.LEFT, padx=(0, 5))
# Function to update the side label when slider moves
def update_delay_value_label(event=None):
slider_value = delay_slider.get()
actual_delay = slider_to_delay(slider_value)
delay_value_label.config(text=f"{actual_delay}")
delay_slider.pack(side=tk.LEFT)
delay_slider.bind("<B1-Motion>", lambda e: (update_delay_value_label(), self.schedule_update_delay(tab)))
delay_slider.bind("<ButtonRelease-1>", lambda e: (update_delay_value_label(), self.schedule_update_delay(tab, force_send=True)))
# N inputs panel below sliders (moved down to make sliders longer)
n_inputs_frame = tk.Frame(left_panel_container, bg=bg_color)
n_inputs_frame.pack(side=tk.TOP, padx=10, pady=10)
n_inputs_inner_frame = tk.Frame(n_inputs_frame, bg=bg_color)
n_inputs_inner_frame.pack()
# Common spinbox config for n inputs - larger for touch screen
n_spinbox_config = {
"from_": 0,
"to": 255,
"increment": 1,
"width": 12,
"bg": bg_color,
"fg": fg_color,
"font": ("Arial", 24),
"buttonbackground": active_bg_color,
}
# Create n1-n4 inputs in a grid with arrows on both sides
n_inputs = {}
for i in range(1, 5):
n_frame = tk.Frame(n_inputs_inner_frame, bg=bg_color)
n_frame.grid(row=(i-1)//2, column=(i-1)%2, padx=10, pady=10)
tk.Label(n_frame, text=f"n{i}", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(pady=(0, 5))
# Create a frame for the input with arrows on both sides
input_container = tk.Frame(n_frame, bg=bg_color)
input_container.pack(pady=5)
# Use StringVar for the value
n_var = tk.StringVar(value=str(initial_settings.get(f"n{i}", 10)))
# Left arrow button (decrease)
def decrease_value(var=n_var):
try:
current_str = var.get()
if not current_str:
current = 0
else:
current = int(current_str)
new_value = current - 1
var.set(str(new_value))
self.schedule_update_n_params(tab, force_send=True)
except (ValueError, TypeError):
# If value is invalid, set to -1
var.set("-1")
self.schedule_update_n_params(tab, force_send=True)
left_arrow = tk.Button(
input_container,
text="",
font=("Arial", 32, "bold"),
bg=active_bg_color,
fg=fg_color,
relief=tk.FLAT,
command=decrease_value,
width=3,
height=1,
)
left_arrow.pack(side=tk.LEFT, padx=2)
# Entry in the middle
n_entry = tk.Entry(
input_container,
textvariable=n_var,
font=("Arial", 24),
bg=bg_color,
fg=fg_color,
width=8,
justify=tk.CENTER,
relief=tk.SUNKEN,
bd=2,
)
n_entry.pack(side=tk.LEFT, padx=2, ipady=8)
n_entry.bind("<KeyRelease>", lambda event: self.schedule_update_n_params(tab))
n_entry.bind("<FocusOut>", lambda event: self.schedule_update_n_params(tab, force_send=True))
# Right arrow button (increase)
def increase_value(var=n_var):
current = int(var.get())
new_value = current + 1
var.set(str(new_value))
self.schedule_update_n_params(tab, force_send=True)
right_arrow = tk.Button(
input_container,
text="+",
font=("Arial", 32, "bold"),
bg=active_bg_color,
fg=fg_color,
relief=tk.FLAT,
command=increase_value,
width=3,
height=1,
)
right_arrow.pack(side=tk.LEFT, padx=2)
n_inputs[f"n{i}"] = n_entry
n_inputs[f"n{i}_var"] = n_var # Store the variable for later updates
delay_slider = tk.Scale(slider_panel_frame, **delay_slider_config)
delay_slider.set(initial_delay)
delay_slider.pack(side=tk.LEFT, padx=10)
delay_slider.bind("<B1-Motion>", lambda _: self.schedule_update_delay(tab))
delay_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_delay(tab, force_send=True))
# Store references to widgets for this tab
tab.widgets = {
@@ -368,15 +190,6 @@ class App:
"blue_slider": blue_slider,
"brightness_slider": brightness_slider,
"delay_slider": delay_slider,
"delay_value_label": delay_value_label, # Store the delay value label for updates
"n1": n_inputs["n1"],
"n1_var": n_inputs["n1_var"],
"n2": n_inputs["n2"],
"n2_var": n_inputs["n2_var"],
"n3": n_inputs["n3"],
"n3_var": n_inputs["n3_var"],
"n4": n_inputs["n4"],
"n4_var": n_inputs["n4_var"],
"selected_color_index": 0, # Default to the first color
}
tab.colors_in_palette = initial_colors.copy() # Store the list of hex colors for this tab
@@ -604,7 +417,7 @@ class App:
# If not in 'transition' mode, but a color is selected, update sliders to that.
# Or, if this is a fresh load/tab change, ensure it's the first color.
# This ensures the sliders consistently show the color that will be sent
# for 'on' based on the palette's first entry.
# for 'on'/'blink' based on the palette's first entry.
hex_color = tab.colors_in_palette[tab.widgets["selected_color_index"]]
r, g, b = color_utils.hex_to_rgb(hex_color)
tab.widgets["red_slider"].set(r)
@@ -641,12 +454,8 @@ class App:
# In a full reload, create_tabs ensures it is.
return
# Get current pattern and load pattern-specific settings
current_pattern = initial_settings.get("pattern", "on")
pattern_settings = self.get_pattern_settings(selected_tab_name, current_pattern)
# Update the local colors_in_palette list for the tab
current_tab_widget.colors_in_palette = pattern_settings["colors"].copy()
current_tab_widget.colors_in_palette = initial_settings.get("colors", ["#000000"]).copy()
current_tab_widget.widgets["selected_color_index"] = 0 # Default to first color
# Refresh the color palette display and select the first color
@@ -658,19 +467,9 @@ class App:
current_tab_widget.widgets["green_slider"].set(0)
current_tab_widget.widgets["blue_slider"].set(0)
# Update brightness (global) and delay (pattern-specific) sliders
# Update brightness and delay sliders
current_tab_widget.widgets["brightness_slider"].set(initial_settings.get("brightness", 127))
# Convert delay to slider position using logarithmic scale
initial_delay = pattern_settings["delay"]
initial_delay_slider_pos = delay_to_slider(initial_delay)
current_tab_widget.widgets["delay_slider"].set(initial_delay_slider_pos)
# Update the delay value label to show the actual delay value
if "delay_value_label" in current_tab_widget.widgets:
current_tab_widget.widgets["delay_value_label"].config(text=f"{initial_delay}")
# Update n parameter inputs (pattern-specific)
for i in range(1, 5):
current_tab_widget.widgets[f"n{i}_var"].set(str(pattern_settings[f"n{i}"]))
current_tab_widget.widgets["delay_slider"].set(initial_settings.get("delay", 0))
# Highlight the active pattern button
initial_pattern = initial_settings.get("pattern", "on")
@@ -694,16 +493,13 @@ class App:
if self.pending_rgb_update_id:
self.root.after_cancel(self.pending_rgb_update_id)
self.pending_rgb_update_id = None
# Use root.after() to ensure the async event loop is running
# A delay of 0 will execute on the next event loop iteration
self.root.after(0, lambda: self.update_rgb(tab))
self.update_rgb(tab)
self.last_rgb_update_time = current_time
elif current_time - self.last_rgb_update_time >= self.rgb_update_interval_ms:
if self.pending_rgb_update_id:
self.root.after_cancel(self.pending_rgb_update_id)
self.pending_rgb_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_rgb(tab))
self.update_rgb(tab)
self.last_rgb_update_time = current_time
else:
if self.pending_rgb_update_id:
@@ -717,15 +513,13 @@ class App:
if self.pending_brightness_update_id:
self.root.after_cancel(self.pending_brightness_update_id)
self.pending_brightness_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_brightness(tab))
self.update_brightness(tab)
self.last_brightness_update_time = current_time
elif current_time - self.last_brightness_update_time >= self.brightness_update_interval_ms:
if self.pending_brightness_update_id:
self.root.after_cancel(self.pending_brightness_update_id)
self.pending_brightness_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_brightness(tab))
self.update_brightness(tab)
self.last_brightness_update_time = current_time
else:
if self.pending_brightness_update_id:
@@ -739,15 +533,13 @@ class App:
if self.pending_delay_update_id:
self.root.after_cancel(self.pending_delay_update_id)
self.pending_delay_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_delay(tab))
self.update_delay(tab)
self.last_delay_update_time = current_time
elif current_time - self.last_delay_update_time >= self.delay_update_interval_ms:
if self.pending_delay_update_id:
self.root.after_cancel(self.pending_delay_update_id)
self.pending_delay_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_delay(tab))
self.update_delay(tab)
self.last_delay_update_time = current_time
else:
if self.pending_delay_update_id:
@@ -755,28 +547,6 @@ class App:
time_to_wait = int(self.delay_update_interval_ms - (current_time - self.last_delay_update_time))
self.pending_delay_update_id = self.root.after(time_to_wait, lambda: self.update_delay(tab))
def schedule_update_n_params(self, tab, force_send=False):
current_time = time.time() * 1000
if force_send:
if self.pending_n_params_update_id:
self.root.after_cancel(self.pending_n_params_update_id)
self.pending_n_params_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_n_params(tab))
self.last_n_params_update_time = current_time
elif current_time - self.last_n_params_update_time >= self.n_params_update_interval_ms:
if self.pending_n_params_update_id:
self.root.after_cancel(self.pending_n_params_update_id)
self.pending_n_params_update_id = None
# Use root.after() to ensure the async event loop is running
self.root.after(0, lambda: self.update_n_params(tab))
self.last_n_params_update_time = current_time
else:
if self.pending_n_params_update_id:
self.root.after_cancel(self.pending_n_params_update_id)
time_to_wait = int(self.n_params_update_interval_ms - (current_time - self.last_n_params_update_time))
self.pending_n_params_update_id = self.root.after(time_to_wait, lambda: self.update_n_params(tab))
# --- Asynchronous Update Functions ---
@async_handler
async def update_rgb(self, tab):
@@ -807,7 +577,7 @@ class App:
if current_pattern == "transition":
colors_to_send = tab.colors_in_palette.copy()
elif current_pattern in ["on", "theater_chase", "flicker"]: # Add other patterns that use a single color
elif current_pattern in ["on", "blink", "theater_chase", "flicker"]: # Add other patterns that use a single color
if tab.colors_in_palette:
# For non-transition patterns, the device typically uses only the first color.
# However, if a user picks a color from the palette, we want THAT color to be the one
@@ -828,29 +598,15 @@ class App:
"settings": {
"colors": colors_to_send, # This now dynamically changes based on pattern
"brightness": tab.widgets["brightness_slider"].get(),
"delay": slider_to_delay(tab.widgets["delay_slider"].get()), # Convert from logarithmic slider
"delay": tab.widgets["delay_slider"].get(),
"pattern": current_pattern, # Always send the current pattern
"n1": int(tab.widgets["n1_var"].get()),
"n2": int(tab.widgets["n2_var"].get()),
"n3": int(tab.widgets["n3_var"].get()),
"n4": int(tab.widgets["n4_var"].get()),
},
}
# Update the settings object - save pattern-specific settings
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
slider_value = tab.widgets["delay_slider"].get()
delay = slider_to_delay(slider_value)
n_params = {f"n{i}": int(tab.widgets[f"n{i}_var"].get()) for i in range(1, 5)}
# Save pattern-specific settings
self.save_pattern_settings(selected_server, current_pattern,
colors=tab.colors_in_palette.copy(),
delay=delay,
n_params=n_params)
# Brightness is global
# Update the settings object with the new color list (and potentially other synced values)
self.settings["lights"][selected_server]["settings"]["colors"] = tab.colors_in_palette.copy()
self.settings["lights"][selected_server]["settings"]["brightness"] = tab.widgets["brightness_slider"].get()
self.settings["lights"][selected_server]["settings"]["delay"] = tab.widgets["delay_slider"].get()
self.settings.save()
await self.websocket_client.send_data(payload)
@@ -888,9 +644,8 @@ class App:
async def update_delay(self, tab):
try:
delay_slider = tab.widgets["delay_slider"]
slider_value = delay_slider.get()
delay = slider_to_delay(slider_value) # Convert from logarithmic slider to actual delay
print(f"Delay: {delay}ms (slider: {slider_value})")
delay = delay_slider.get()
print(f"Delay: {delay}")
selected_server = self.notebook.tab(self.notebook.select(), "text")
names = self.settings["lights"][selected_server]["names"]
@@ -901,41 +656,14 @@ class App:
"delay": delay,
},
}
# Update pattern-specific delay setting
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
self.save_pattern_settings(selected_server, current_pattern, delay=delay)
# Update the settings object with the new delay
self.settings["lights"][selected_server]["settings"]["delay"] = delay
self.settings.save()
await self.websocket_client.send_data(payload)
print(f"Sent delay payload: {payload}")
except Exception as e:
print(f"Error updating delay: {e}")
@async_handler
async def update_n_params(self, tab):
try:
n_params = {}
for i in range(1, 5):
n_var = tab.widgets[f"n{i}_var"]
n_params[f"n{i}"] = int(n_var.get())
print(f"N Parameters: {n_params}")
selected_server = self.notebook.tab(self.notebook.select(), "text")
names = self.settings["lights"][selected_server]["names"]
payload = {
"save": True,
"names": names,
"settings": n_params,
}
# Update pattern-specific n params
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
self.save_pattern_settings(selected_server, current_pattern, n_params=n_params)
self.settings.save()
await self.websocket_client.send_data(payload)
print(f"Sent n params payload: {payload}")
except Exception as e:
print(f"Error updating n params: {e}")
@async_handler
async def send_pattern(self, tab_name: str, pattern_name: str):
try:
@@ -952,53 +680,27 @@ class App:
return
current_settings_for_tab = self.settings["lights"][tab_name]["settings"]
# Save current pattern's settings before switching
old_pattern = current_settings_for_tab.get("pattern", "on")
current_delay = slider_to_delay(current_tab_widget.widgets["delay_slider"].get())
current_n_params = {f"n{i}": int(current_tab_widget.widgets[f"n{i}_var"].get()) for i in range(1, 5)}
self.save_pattern_settings(tab_name, old_pattern,
colors=current_tab_widget.colors_in_palette.copy(),
delay=current_delay,
n_params=current_n_params)
# Load the new pattern's settings
new_pattern_settings = self.get_pattern_settings(tab_name, pattern_name)
# Update UI with new pattern's settings
current_tab_widget.colors_in_palette = new_pattern_settings["colors"].copy()
self.refresh_color_palette_display(current_tab_widget)
# Update delay slider
delay_slider_pos = delay_to_slider(new_pattern_settings["delay"])
current_tab_widget.widgets["delay_slider"].set(delay_slider_pos)
if "delay_value_label" in current_tab_widget.widgets:
current_tab_widget.widgets["delay_value_label"].config(text=f"{new_pattern_settings['delay']}")
# Update n params
for i in range(1, 5):
current_tab_widget.widgets[f"n{i}_var"].set(str(new_pattern_settings[f"n{i}"]))
# Update RGB sliders to first color
if current_tab_widget.colors_in_palette:
hex_color = current_tab_widget.colors_in_palette[0]
r, g, b = color_utils.hex_to_rgb(hex_color)
current_tab_widget.widgets["red_slider"].set(r)
current_tab_widget.widgets["green_slider"].set(g)
current_tab_widget.widgets["blue_slider"].set(b)
current_tab_widget.widgets["selected_color_index"] = 0
self._highlight_selected_color_swatch(current_tab_widget)
payload_settings = {
"pattern": pattern_name,
"brightness": current_settings_for_tab.get("brightness", 127), # Global brightness
"delay": new_pattern_settings["delay"],
"colors": new_pattern_settings["colors"],
"brightness": current_settings_for_tab.get("brightness", 127),
"delay": current_settings_for_tab.get("delay", 0),
}
# Include n parameters
for i in range(1, 5):
payload_settings[f"n{i}"] = new_pattern_settings[f"n{i}"]
# Determine colors to send based on the *newly selected* pattern
if pattern_name == "transition":
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
elif pattern_name in ["on", "blink"]: # Add other patterns that use a single color here
# When switching TO 'on' or 'blink', ensure the color sent is the one
# currently displayed on the sliders (which reflects the selected palette color).
r = current_tab_widget.widgets["red_slider"].get()
g = current_tab_widget.widgets["green_slider"].get()
b = current_tab_widget.widgets["blue_slider"].get()
hex_color_from_sliders = f"#{r:02x}{g:02x}{b:02x}"
payload_settings["colors"] = [hex_color_from_sliders]
else:
# For other patterns, send the full palette, device might ignore or use default
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
payload = {
"save": True,
@@ -1006,8 +708,10 @@ class App:
"settings": payload_settings,
}
# Update the current pattern
# Update the settings object with the new pattern and current colors/brightness/delay
self.settings["lights"][tab_name]["settings"]["pattern"] = pattern_name
# Always save the full current palette state in settings.
self.settings["lights"][tab_name]["settings"]["colors"] = current_tab_widget.colors_in_palette.copy()
self.settings.save()
self.highlight_pattern_button(current_tab_widget, pattern_name)
@@ -1026,20 +730,10 @@ class App:
if not hasattr(current_tab_widget, "colors_in_palette"):
return # Tab not fully initialized yet
# Update settings for the current tab - save pattern-specific settings
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
slider_value = current_tab_widget.widgets["delay_slider"].get()
delay = slider_to_delay(slider_value)
n_params = {f"n{i}": int(current_tab_widget.widgets[f"n{i}_var"].get()) for i in range(1, 5)}
# Save pattern-specific settings
self.save_pattern_settings(selected_server, current_pattern,
colors=current_tab_widget.colors_in_palette,
delay=delay,
n_params=n_params)
# Brightness is global
# Update settings for the current tab in the self.settings object
self.settings["lights"][selected_server]["settings"]["colors"] = current_tab_widget.colors_in_palette
self.settings["lights"][selected_server]["settings"]["brightness"] = current_tab_widget.widgets["brightness_slider"].get()
self.settings["lights"][selected_server]["settings"]["delay"] = current_tab_widget.widgets["delay_slider"].get()
# The pattern is updated in send_pattern already, but ensure consistency
# For simplicity, we assume send_pattern is the primary way to change pattern.

View File

@@ -1,105 +1,253 @@
import mido
import asyncio
import time
import networking # <--- This will now correctly import your module
import networking
import socket
import json
import logging # Added logging import
# Configure logging
DEBUG_MODE = True # Set to False for INFO level logging
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
async def midi_to_websocket_listener(midi_port_index: int, websocket_uri: str):
"""
Listens to a specific MIDI port and sends data to a WebSocket server
when Note 32 (and 33) is pressed.
"""
delay = 100 # Default delay value
# TCP Server Configuration
TCP_HOST = "127.0.0.1"
TCP_PORT = 65432
# 1. Get MIDI port name
# Sound Control Server Configuration (for sending reset)
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
class MidiHandler:
def __init__(self, midi_port_index: int, websocket_uri: str):
self.midi_port_index = midi_port_index
self.websocket_uri = websocket_uri
self.ws_client = networking.WebSocketClient(websocket_uri)
self.delay = 100 # Default delay value, controlled by MIDI controller
self.brightness = 100 # Default brightness value, controlled by MIDI controller
self.tcp_host = TCP_HOST
self.tcp_port = TCP_PORT
self.beat_sending_enabled = True # New: Local flag for beat sending
self.sound_control_host = SOUND_CONTROL_HOST
self.sound_control_port = SOUND_CONTROL_PORT
async def _send_reset_to_sound(self):
try:
reader, writer = await asyncio.open_connection(self.sound_control_host, self.sound_control_port)
cmd = "RESET_TEMPO\n".encode('utf-8')
writer.write(cmd)
await writer.drain()
resp = await reader.read(100)
logging.info(f"[MidiHandler - Control] Sent RESET_TEMPO, response: {resp.decode().strip()}")
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"[MidiHandler - Control] Failed to send RESET_TEMPO: {e}")
async def _handle_tcp_client(self, reader, writer):
addr = writer.get_extra_info('peername')
logging.info(f"[MidiHandler - TCP Server] Connected by {addr}") # Changed to info
try:
while True:
data = await reader.read(4096) # Read up to 4KB of data
if not data:
logging.info(f"[MidiHandler - TCP Server] Client {addr} disconnected.") # Changed to info
break
message = data.decode().strip()
logging.debug(f"[MidiHandler - TCP Server] Received from {addr}: {message}") # Changed to debug
if self.beat_sending_enabled:
try:
# Attempt to parse as float (BPM) from sound.py
bpm_value = float(message)
# Construct JSON message using the current MIDI-controlled delay and brightness
json_message = {
"names": ["0"],
"settings": {
"pattern": "pulse",
"delay": self.delay, # Use MIDI-controlled delay
"colors": ["#00ff00"],
"brightness": self.brightness,
"num_leds": 200,
},
}
logging.debug(f"[MidiHandler - TCP Server] Forwarding BPM-derived JSON message to WebSocket with delay {self.delay}, brightness {self.brightness}: {json_message}") # Changed to debug
await self.ws_client.send_data(json_message)
except ValueError:
logging.warning(f"[MidiHandler - TCP Server] Received non-BPM message from {addr}, not forwarding: {message}") # Changed to warning
except Exception as e:
logging.error(f"[MidiHandler - TCP Server] Error processing received message from {addr}: {e}") # Changed to error
else:
logging.debug(f"[MidiHandler - TCP Server] Beat received from {addr} but sending to WebSocket is disabled: {message}") # Changed to debug
except asyncio.CancelledError:
logging.info(f"[MidiHandler - TCP Server] Client handler for {addr} cancelled.") # Changed to info
except Exception as e:
logging.error(f"[MidiHandler - TCP Server] Error handling client {addr}: {e}") # Changed to error
finally:
logging.info(f"[MidiHandler - TCP Server] Closing connection for {addr}") # Changed to info
writer.close()
await writer.wait_closed()
async def _midi_tcp_server(self):
server = await asyncio.start_server(
lambda r, w: self._handle_tcp_client(r, w), self.tcp_host, self.tcp_port)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
logging.info(f"[MidiHandler - TCP Server] Serving on {addrs}") # Changed to info
async with server:
await server.serve_forever()
async def _midi_listener(self):
logging.info("Midi function") # Changed to info
"""
Listens to a specific MIDI port and sends data to a WebSocket server
when Note 32 (and 33) is pressed.
"""
# 1. Get MIDI port name
port_names = mido.get_input_names()
if not port_names:
logging.warning("No MIDI input ports found. Please connect your device.") # Changed to warning
return
if not (0 <= self.midi_port_index < len(port_names)):
logging.error(f"Error: MIDI port index {self.midi_port_index} out of range. Available ports: {port_names}") # Changed to error
logging.info("Available ports:") # Changed to info
for i, name in enumerate(port_names):
logging.info(f" {i}: {name}") # Changed to info
return
midi_port_name = port_names[self.midi_port_index]
logging.info(f"Selected MIDI input port: {midi_port_name}") # Changed to info
try:
with mido.open_input(midi_port_name) as port:
logging.info(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.") # Changed to info
while True:
msg = port.receive(block=False) # Non-blocking read
if msg:
logging.debug(msg) # Changed to debug
match msg.type:
case 'note_on':
logging.debug(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}") # Changed to debug
pattern_name = "Unknown"
match msg.note:
case 48: # Original Note 48 for 'pulse'
pattern_name = "pulse"
await self.ws_client.send_data({
"names": ["0"],
"settings": {
"pattern": pattern_name,
"delay": self.delay, # Use MIDI-controlled delay
"colors": ["#00ff00"],
"brightness": self.brightness,
"num_leds": 120,
}
})
case 49: # Original Note 49 for 'theater_chase'
pattern_name = "theater_chase"
await self.ws_client.send_data({
"names": ["0"],
"settings": {
"pattern": pattern_name,
"delay": self.delay, # Use MIDI-controlled delay
"colors": ["#00ff00"],
"brightness": self.brightness,
"num_leds": 120,
"on_width": 10,
"off_width": 10,
"n1": 0,
"n2": 100
}
})
case 50: # Original Note 50 for 'alternating'
pattern_name = "alternating"
logging.debug("Triggering Alternating Pattern") # Changed to debug
await self.ws_client.send_data({
"names": ["0"],
"settings": {
"pattern": pattern_name,
"delay": self.delay, # Use MIDI-controlled delay
"colors": ["#00ff00", "#0000ff"],
"brightness": self.brightness,
"num_leds": 120,
"n1": 10,
"n2": 10
}
})
case 'control_change':
match msg.control:
case 36:
self.delay = msg.value * 4 # Update instance delay
logging.info(f"Delay set to {self.delay} ms by MIDI controller") # Changed to info
case 27:
if msg.value == 127:
self.beat_sending_enabled = True
logging.info("[MidiHandler - Listener] Beat sending ENABLED by MIDI control.") # Changed to info
elif msg.value == 0:
self.beat_sending_enabled = False
logging.info("[MidiHandler - Listener] Beat sending DISABLED by MIDI control.") # Changed to info
case 29:
if msg.value == 127:
logging.info("[MidiHandler - Listener] RESET_TEMPO requested by control 29.")
await self._send_reset_to_sound()
case 37:
# Map 0-127 to 0-100 brightness scale
self.brightness = round((msg.value / 127) * 100)
logging.info(f"Brightness set to {self.brightness} by MIDI controller")
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
except mido.PortsError as e:
logging.error(f"Error opening MIDI port '{midi_port_name}': {e}") # Changed to error
except asyncio.CancelledError:
logging.info(f"MIDI listener cancelled.") # Changed to info
except Exception as e:
logging.error(f"An unexpected error occurred in MIDI listener: {e}") # Changed to error
async def run(self):
try:
await self.ws_client.connect()
logging.info(f"[MidiHandler] WebSocket client connected to {self.ws_client.uri}") # Changed to info
await asyncio.gather(
self._midi_listener(),
self._midi_tcp_server()
)
except mido.PortsError as e:
logging.error(f"[MidiHandler] Error opening MIDI port: {e}") # Changed to error
except asyncio.CancelledError:
logging.info("[MidiHandler] Tasks cancelled due to program shutdown.") # Changed to info
except KeyboardInterrupt:
logging.info("\n[MidiHandler] Program interrupted by user.") # Changed to info
finally:
logging.info("[MidiHandler] Main program finished. Closing WebSocket client...") # Changed to info
await self.ws_client.close()
logging.info("[MidiHandler] WebSocket client closed.") # Changed to info
def print_midi_ports():
logging.info("\n--- Available MIDI Input Ports ---") # Changed to info
port_names = mido.get_input_names()
if not port_names:
print("No MIDI input ports found. Please connect your device.")
return
if not (0 <= midi_port_index < len(port_names)):
print(f"Error: MIDI port index {midi_port_index} out of range. Available ports: {port_names}")
print("Available ports:")
logging.warning("No MIDI input ports found.") # Changed to warning
else:
for i, name in enumerate(port_names):
print(f" {i}: {name}")
return
midi_port_name = port_names[midi_port_index]
print(f"Selected MIDI input port: {midi_port_name}")
# 2. Initialize WebSocket client (using your actual networking.py)
ws_client = networking.WebSocketClient(websocket_uri)
try:
# 3. Connect WebSocket
await ws_client.connect()
print(f"WebSocket client connected to {ws_client.uri}")
# 4. Open MIDI port and start listening loop
with mido.open_input(midi_port_name) as port:
print(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.")
while True:
msg = port.receive(block=False) # Non-blocking read
if msg:
match msg.type:
case 'note_on':
print(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}")
match msg.note:
case 32:
await ws_client.send_data({
"names": ["1"],
"settings": {
"pattern": "pulse",
"delay": delay,
"colors": ["#00ff00"],
"brightness": 100,
"num_leds": 200,
}
})
case 33:
await ws_client.send_data({
"names": ["2"],
"settings": {
"pattern": "chase",
"speed": 10,
"color": "#00FFFF",
}
})
case 'control_change':
match msg.control:
case 36:
delay = msg.value * 4
print(f"Delay set to {delay} ms")
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
except mido.PortsError as e:
print(f"Error opening MIDI port '{midi_port_name}': {e}")
except asyncio.CancelledError:
print(f"MIDI listener cancelled.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# 5. Disconnect WebSocket and clean up
# This assumes your WebSocketClient has a ._connected attribute or similar way to check state.
# If your client's disconnect method is safe to call even if not connected, you can simplify.
await ws_client.close()
print("MIDI listener stopped and cleaned up.")
logging.info(f" {i}: {name}") # Changed to info
logging.info("----------------------------------") # Changed to info
async def main():
print_midi_ports()
# --- Configuration ---
MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
# --- End Configuration ---
try:
await midi_to_websocket_listener(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
except KeyboardInterrupt:
print("\nProgram interrupted by user.")
finally:
print("Main program finished.")
midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
await midi_handler.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -4,121 +4,207 @@ import pyaudio
import aubio
import numpy as np
from time import sleep
import websocket # pip install websocket-client
import json
import socket
import time
import logging # Added logging import
import asyncio # Re-added asyncio import
import threading # Added threading for control server
seconds = 10 # how long this script should run (if not using infinite loop)
# Configure logging
DEBUG_MODE = True # Set to False for INFO level logging
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
bufferSize = 512
windowSizeMultiple = 2 # or 4 for higher accuracy, but more computational cost
# TCP Server Configuration (assuming midi.py runs this)
MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432
audioInputDeviceIndex = 7 # use 'arecord -l' to check available audio devices
audioInputChannels = 1
# Sound Control Server Configuration (for midi.py to control sound.py)
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
pa = pyaudio.PyAudio()
class SoundBeatDetector:
def __init__(self, tcp_host: str, tcp_port: int):
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.tcp_socket = None
self.connected_to_midi = False
self.reconnect_delay = 1 # seconds
# Note: beat_sending_enabled is not used in this simplified flow
print("Available audio input devices:")
info = pa.get_host_api_info_by_index(0)
num_devices = info.get('deviceCount')
found_device = False
for i in range(0, num_devices):
device_info = pa.get_device_info_by_host_api_device_index(0, i)
if (device_info.get('maxInputChannels')) > 0:
print(f" Input Device id {i} - {device_info.get('name')}")
if i == audioInputDeviceIndex:
found_device = True
self.bufferSize = 512
self.windowSizeMultiple = 2
self.audioInputDeviceIndex = 7
self.audioInputChannels = 1
if not found_device:
print(f"Warning: Audio input device index {audioInputDeviceIndex} not found or has no input channels.")
# Consider exiting or picking a default if necessary
self.pa = pyaudio.PyAudio()
try:
audioInputDevice = pa.get_device_info_by_index(audioInputDeviceIndex)
audioInputSampleRate = int(audioInputDevice['defaultSampleRate'])
except Exception as e:
print(f"Error getting audio device info for index {audioInputDeviceIndex}: {e}")
pa.terminate()
exit()
logging.info("Available audio input devices:")
info = self.pa.get_host_api_info_by_index(0)
num_devices = info.get('deviceCount')
found_device = False
for i in range(0, num_devices):
device_info = self.pa.get_device_info_by_host_api_device_index(0, i)
if (device_info.get('maxInputChannels')) > 0:
logging.info(f" Input Device id {i} - {device_info.get('name')}")
if i == self.audioInputDeviceIndex:
found_device = True
# create the aubio tempo detection:
hopSize = bufferSize
winSize = hopSize * windowSizeMultiple
tempoDetection = aubio.tempo(method='default', buf_size=winSize, hop_size=hopSize, samplerate=audioInputSampleRate)
if not found_device:
logging.warning(f"Audio input device index {self.audioInputDeviceIndex} not found or has no input channels.")
# --- WebSocket Setup ---
websocket_url = "ws://192.168.4.1:80/ws"
ws = None
try:
ws = websocket.create_connection(websocket_url)
print(f"Successfully connected to WebSocket at {websocket_url}")
except Exception as e:
print(f"Failed to connect to WebSocket: {e}. Data will not be sent over WebSocket.")
# --- End WebSocket Setup ---
try:
audioInputDevice = self.pa.get_device_info_by_index(self.audioInputDeviceIndex)
self.audioInputSampleRate = int(audioInputDevice['defaultSampleRate'])
except Exception as e:
logging.error(f"Error getting audio device info for index {self.audioInputDeviceIndex}: {e}")
self.pa.terminate()
exit()
# this function gets called by the input stream, as soon as enough samples are collected from the audio input:
def readAudioFrames(in_data, frame_count, time_info, status):
global ws # Allow modification of the global ws variable
self.hopSize = self.bufferSize
self.winSize = self.hopSize * self.windowSizeMultiple
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
signal = np.frombuffer(in_data, dtype=np.float32)
self.inputStream = None
self._control_thread = None
beat = tempoDetection(signal)
if beat:
bpm = tempoDetection.get_bpm()
print(f"beat! (running with {bpm:.2f} bpm)") # Use f-string for cleaner formatting, removed extra bells
data_to_send = {
"names": ["1"],
"settings": {
"pattern": "pulse",
"delay": 10,
"colors": ["#00ff00"],
"brightness": 10,
"num_leds": 200,
},
}
self._connect_to_midi_server()
self._start_control_server() # Start control server in background
if ws: # Only send if the websocket connection is established
def reset_tempo_detection(self):
"""Re-initializes the aubio tempo detection object."""
logging.info("[SoundBeatDetector] Resetting tempo detection.")
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
def _control_server_loop(self):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((SOUND_CONTROL_HOST, SOUND_CONTROL_PORT))
srv.listen(5)
logging.info(f"[SoundBeatDetector - Control] Listening on {SOUND_CONTROL_HOST}:{SOUND_CONTROL_PORT}")
while True:
conn, addr = srv.accept()
with conn:
logging.info(f"[SoundBeatDetector - Control] Connection from {addr}")
try:
data = conn.recv(1024)
if not data:
continue
command = data.decode().strip()
logging.debug(f"[SoundBeatDetector - Control] Received command: {command}")
if command == "RESET_TEMPO":
self.reset_tempo_detection()
response = "OK: Tempo reset\n"
else:
response = "ERROR: Unknown command\n"
conn.sendall(response.encode('utf-8'))
except Exception as e:
logging.error(f"[SoundBeatDetector - Control] Error handling control message: {e}")
except Exception as e:
logging.error(f"[SoundBeatDetector - Control] Server error: {e}")
finally:
try:
ws.send(json.dumps(data_to_send))
# print("Sent data over WebSocket") # Optional: for debugging
except websocket.WebSocketConnectionClosedException:
print("WebSocket connection closed, attempting to reconnect...")
ws = None # Mark as closed, connection will need to be re-established if desired
except Exception as e:
print(f"Error sending over WebSocket: {e}")
srv.close()
except Exception:
pass
return (in_data, pyaudio.paContinue)
def _start_control_server(self):
if self._control_thread and self._control_thread.is_alive():
return
self._control_thread = threading.Thread(target=self._control_server_loop, daemon=True)
self._control_thread.start()
def _connect_to_midi_server(self):
if self.tcp_socket:
self.tcp_socket.close()
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_socket.settimeout(self.reconnect_delay)
try:
logging.info(f"[SoundBeatDetector] Attempting to connect to MIDI TCP server at {self.tcp_host}:{self.tcp_port}...")
self.tcp_socket.connect((self.tcp_host, self.tcp_port))
self.tcp_socket.setblocking(0)
self.connected_to_midi = True
logging.info(f"[SoundBeatDetector] Successfully connected to MIDI TCP server.")
except (socket.error, socket.timeout) as e:
logging.error(f"[SoundBeatDetector] Failed to connect to MIDI TCP server: {e}")
self.connected_to_midi = False
# create and start the input stream
try:
inputStream = pa.open(format=pyaudio.paFloat32,
input=True,
channels=audioInputChannels,
input_device_index=audioInputDeviceIndex,
frames_per_buffer=bufferSize,
rate=audioInputSampleRate,
stream_callback=readAudioFrames)
# Removed _handle_control_client and _control_server (replaced by simple threaded server)
inputStream.start_stream()
print("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.")
def readAudioFrames(self, in_data, frame_count, time_info, status):
signal = np.frombuffer(in_data, dtype=np.float32)
# Loop to keep the script running, allowing graceful shutdown
while inputStream.is_active():
sleep(0.1) # Small delay to prevent busy-waiting
beat = self.tempoDetection(signal)
if beat:
bpm = self.tempoDetection.get_bpm()
logging.debug(f"beat! (running with {bpm:.2f} bpm)") # Changed to debug
bpm_message = str(bpm)
except KeyboardInterrupt:
print("\nKeyboardInterrupt: Stopping script gracefully.")
except Exception as e:
print(f"An error occurred with the audio stream: {e}")
finally:
# Ensure streams and resources are closed
if 'inputStream' in locals() and inputStream.is_active():
inputStream.stop_stream()
if 'inputStream' in locals() and not inputStream.is_stopped():
inputStream.close()
pa.terminate()
if ws:
print("Closing WebSocket connection.")
ws.close()
if self.connected_to_midi and self.tcp_socket:
try:
message_bytes = (bpm_message + "\n").encode('utf-8')
self.tcp_socket.sendall(message_bytes)
logging.debug(f"[SoundBeatDetector] Sent BPM to MIDI TCP server: {bpm_message}") # Changed to debug
except socket.error as e:
logging.error(f"[SoundBeatDetector] Error sending BPM to MIDI TCP server: {e}. Attempting to reconnect...")
self.connected_to_midi = False
self._connect_to_midi_server()
elif not self.connected_to_midi:
logging.warning("[SoundBeatDetector] Not connected to MIDI TCP server, attempting to reconnect...") # Changed to warning
self._connect_to_midi_server()
else:
logging.warning("[SoundBeatDetector] TCP socket not initialized, cannot send BPM.") # Changed to warning
print("Script finished.")
return (in_data, pyaudio.paContinue)
def start_stream(self):
try:
self.inputStream = self.pa.open(format=pyaudio.paFloat32,
input=True,
channels=self.audioInputChannels,
input_device_index=self.audioInputDeviceIndex,
frames_per_buffer=self.bufferSize,
rate=self.audioInputSampleRate,
stream_callback=self.readAudioFrames)
self.inputStream.start_stream()
logging.info("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.")
while self.inputStream.is_active():
sleep(0.1)
except KeyboardInterrupt:
logging.info("\nKeyboardInterrupt: Stopping script gracefully.")
except Exception as e:
logging.error(f"An error occurred with the audio stream: {e}")
finally:
self.stop_stream()
def stop_stream(self):
if self.inputStream and self.inputStream.is_active():
self.inputStream.stop_stream()
if self.inputStream and not self.inputStream.is_stopped():
self.inputStream.close()
self.pa.terminate()
if self.tcp_socket and self.connected_to_midi:
logging.info("[SoundBeatDetector] Closing TCP socket.")
self.tcp_socket.close()
self.connected_to_midi = False
logging.info("SoundBeatDetector stopped.")
# Removed async def run(self)
if __name__ == "__main__":
# TCP Server Configuration (should match midi.py)
MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT)
logging.info("Starting SoundBeatDetector...")
try:
sound_detector.start_stream()
except KeyboardInterrupt:
logging.info("\nProgram interrupted by user.")
except Exception as e:
logging.error(f"An error occurred during main execution: {e}")