import mido import asyncio import networking import socket import json import logging # Added logging import import time # Added for initial state read import tkinter as tk from tkinter import ttk, messagebox # Import messagebox for confirmations from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS # Pattern name mapping for shorter JSON payloads PATTERN_NAMES = { "flicker": "f", "fill_range": "fr", "n_chase": "nc", "alternating": "a", "pulse": "p", "rainbow": "r", "specto": "s", "radiate": "rd", } # Configure logging DEBUG_MODE = True # Set to False for INFO level logging logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # TCP Server Configuration TCP_HOST = "127.0.0.1" TCP_PORT = 65432 # Sound Control Server Configuration (for sending reset) SOUND_CONTROL_HOST = "127.0.0.1" SOUND_CONTROL_PORT = 65433 class MidiHandler: def __init__(self, midi_port_index: int, websocket_uri: str): self.midi_port_index = midi_port_index self.websocket_uri = websocket_uri self.ws_client = networking.WebSocketClient(websocket_uri) self.delay = 100 # Default delay value, controlled by MIDI controller self.brightness = 100 # Default brightness value, controlled by MIDI controller self.tcp_host = TCP_HOST self.tcp_port = TCP_PORT self.beat_sending_enabled = True # New: Local flag for beat sending self.sound_control_host = SOUND_CONTROL_HOST self.sound_control_port = SOUND_CONTROL_PORT # RGB controlled by CC 30/31/32 (default green) self.color_r = 0 self.color_g = 255 self.color_b = 0 # Generic parameters controlled via CC # Raw CC-driven parameters (0-127) self.n1 = 10 self.n2 = 10 self.n3 = 1 # Additional knobs (CC38-45) self.knob1 = 0 self.knob2 = 0 self.knob3 = 0 self.knob4 = 0 self.knob5 = 0 self.knob6 = 0 self.knob7 = 0 self.knob8 = 0 # Current state for GUI display self.current_bpm: float | None = None self.current_pattern: str = "" self.beat_index: int = 0 # Rate limiting for parameter updates self.last_param_update: float = 0.0 self.param_update_interval: float = 0.1 # 100ms minimum between updates self.pending_param_update: bool = False # Sequential pulse pattern state self.sequential_pulse_enabled: bool = False self.sequential_pulse_step: int = 0 def _current_color_rgb(self) -> tuple: r = max(0, min(255, int(self.color_r))) g = max(0, min(255, int(self.color_g))) b = max(0, min(255, int(self.color_b))) return (r, g, b) async def _handle_sequential_pulse(self): """Handle sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored""" from bar_config import LEFT_BARS, RIGHT_BARS # Calculate which bar should pulse based on beat (1 beat per bar) bar_index = self.beat_index % 4 # 0-3, cycles every 4 beats # Create minimal payload - defaults to off payload = { "d": { # Defaults - off for all bars "t": "b", # Message type: beat "pt": "o", # off } } # Set specific bars to pulse left_bar = LEFT_BARS[bar_index] right_bar = RIGHT_BARS[bar_index] payload[left_bar] = {"pt": "p"} # pulse payload[right_bar] = {"pt": "p"} # pulse # logging.debug(f"[Sequential Pulse] Beat {self.beat_index}, pulsing bars {left_bar} and {right_bar}") await self.ws_client.send_data(payload) async def _handle_alternating_phase(self): """Handle alternating pattern with phase offset: every second bar uses different step""" from bar_config import LED_BAR_NAMES # Create minimal payload - same n1/n2 for all bars payload = { "d": { # Defaults - pattern and n1/n2 "t": "b", # Message type: beat "pt": "a", # alternating "n1": self.n1, "n2": self.n2, "s": self.beat_index % 2, # Default step for in-phase bars } } # Set step offset for every second bar (bars 101, 103, 105, 107) swap_bars = ["101", "103", "105", "107"] for bar_name in LED_BAR_NAMES: if bar_name in swap_bars: # Send step offset for out-of-phase bars payload[bar_name] = {"s": (self.beat_index + 1) % 2} else: # In-phase bars use defaults (no override needed) payload[bar_name] = {} # logging.debug(f"[Alternating Phase] Beat {self.beat_index}, step offset for bars {swap_bars}") await self.ws_client.send_data(payload) async def _send_full_parameters(self): """Send all parameters to bars - may require multiple packets due to size limit""" from bar_config import LED_BAR_NAMES # Calculate packet size for full parameters full_payload = { "d": { "t": "u", # Message type: update "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "dl": self.delay, "cl": [self._current_color_rgb()], "br": self.brightness, "n1": self.n1, "n2": self.n2, "n3": self.n3, "s": self.beat_index % 256, # Use full range for rainbow patterns } } # Estimate size: ~200 bytes for defaults + 8 bars * 2 bytes = ~216 bytes # This should fit in one packet, but let's be safe payload_size = len(str(full_payload)) if payload_size > 200: # Split into 2 packets if too large # Packet 1: Pattern and timing parameters payload1 = { "d": { "t": "u", # Message type: update "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "dl": self.delay, "br": self.brightness, } } for bar_name in LED_BAR_NAMES: payload1[bar_name] = {} # Packet 2: Color and pattern parameters payload2 = { "d": { "t": "u", # Message type: update "cl": [self._current_color_rgb()], "n1": self.n1, "n2": self.n2, "n3": self.n3, "s": self.beat_index % 2, # Keep step small (0 or 1) for alternating patterns } } for bar_name in LED_BAR_NAMES: payload2[bar_name] = {} # logging.debug(f"[Full Params] Sending in 2 packets due to size ({payload_size} bytes)") await self.ws_client.send_data(payload1) await asyncio.sleep(0.01) # Small delay between packets await self.ws_client.send_data(payload2) else: # Single packet for bar_name in LED_BAR_NAMES: full_payload[bar_name] = {} # logging.debug(f"[Full Params] Sending single packet ({payload_size} bytes)") await self.ws_client.send_data(full_payload) async def _request_param_update(self): """Request a parameter update with rate limiting""" import time current_time = time.time() if current_time - self.last_param_update >= self.param_update_interval: # Can send immediately self.last_param_update = current_time await self._send_full_parameters() # logging.debug("[Rate Limit] Parameter update sent immediately") else: # Rate limited - mark as pending self.pending_param_update = True # logging.debug("[Rate Limit] Parameter update queued (rate limited)") async def _send_normal_pattern(self): """Send normal pattern to all bars - include required parameters""" # Patterns that need specific parameters patterns_needing_params = ["alternating", "flicker", "n_chase", "rainbow", "radiate"] payload = { "d": { # Defaults "t": "b", # Message type: beat "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), } } # Add required parameters for patterns that need them if self.current_pattern in patterns_needing_params: payload["d"].update({ "n1": self.n1, "n2": self.n2, "n3": self.n3, "dl": self.delay, "s": self.beat_index % 256, # Use full range for rainbow patterns }) # Add empty entries for each bar (they'll use defaults) for bar_name in LED_BAR_NAMES: payload[bar_name] = {} # logging.debug(f"[Beat] Triggering '{self.current_pattern}' for {len(LED_BAR_NAMES)} bars") await self.ws_client.send_data(payload) async def _send_reset_to_sound(self): try: reader, writer = await asyncio.open_connection(self.sound_control_host, self.sound_control_port) cmd = "RESET_TEMPO\n".encode('utf-8') writer.write(cmd) await writer.drain() resp = await reader.read(100) logging.info(f"[MidiHandler - Control] Sent RESET_TEMPO, response: {resp.decode().strip()}") writer.close() await writer.wait_closed() except Exception as e: logging.error(f"[MidiHandler - Control] Failed to send RESET_TEMPO: {e}") async def _handle_tcp_client(self, reader, writer): addr = writer.get_extra_info('peername') logging.info(f"[MidiHandler - TCP Server] Connected by {addr}") # Changed to info try: while True: data = await reader.read(4096) # Read up to 4KB of data if not data: logging.info(f"[MidiHandler - TCP Server] Client {addr} disconnected.") # Changed to info break message = data.decode().strip() # logging.debug(f"[MidiHandler - TCP Server] Received from {addr}: {message}") # Changed to debug if self.beat_sending_enabled: try: # Attempt to parse as float (BPM) from sound.py bpm_value = float(message) self.current_bpm = bpm_value # On each beat, trigger currently selected pattern(s) if not self.current_pattern: pass # No pattern selected yet; ignoring beat else: self.beat_index = (self.beat_index + 1) % 1000000 # Send periodic parameter updates every 8 beats if self.beat_index % 8 == 0: await self._send_full_parameters() # Check for pending parameter updates (rate limited) if self.pending_param_update: import time current_time = time.time() if current_time - self.last_param_update >= self.param_update_interval: self.last_param_update = current_time self.pending_param_update = False await self._send_full_parameters() # logging.debug("[Rate Limit] Pending parameter update sent") if self.current_pattern == "sequential_pulse": # Sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored await self._handle_sequential_pulse() elif self.current_pattern == "alternating_phase": # Alternating pattern with phase offset: every second bar is out of phase await self._handle_alternating_phase() elif self.current_pattern: # Normal pattern mode - run on all bars await self._send_normal_pattern() except ValueError: logging.warning(f"[MidiHandler - TCP Server] Received non-BPM message from {addr}, not forwarding: {message}") # Changed to warning except Exception as e: logging.error(f"[MidiHandler - TCP Server] Error processing received message from {addr}: {e}") # Changed to error else: pass # Beat sending disabled except asyncio.CancelledError: logging.info(f"[MidiHandler - TCP Server] Client handler for {addr} cancelled.") # Changed to info except Exception as e: logging.error(f"[MidiHandler - TCP Server] Error handling client {addr}: {e}") # Changed to error finally: logging.info(f"[MidiHandler - TCP Server] Closing connection for {addr}") # Changed to info writer.close() await writer.wait_closed() async def _midi_tcp_server(self): server = await asyncio.start_server( lambda r, w: self._handle_tcp_client(r, w), self.tcp_host, self.tcp_port) addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) logging.info(f"[MidiHandler - TCP Server] Serving on {addrs}") # Changed to info async with server: await server.serve_forever() async def _read_initial_cc_state(self, port, timeout_s: float = 0.5): """Read initial CC values from the MIDI device for a short period to populate state.""" start = time.time() while time.time() - start < timeout_s: msg = port.receive(block=False) if msg and msg.type == 'control_change': if msg.control == 36: self.n3 = max(1, msg.value) logging.info(f"[Init] n3 set to {self.n3} from CC36") elif msg.control == 37: self.delay = msg.value * 4 logging.info(f"[Init] Delay set to {self.delay} ms from CC37") elif msg.control == 39: self.delay = msg.value * 4 logging.info(f"[Init] Delay set to {self.delay} ms from CC39") elif msg.control == 33: self.brightness = round((msg.value / 127) * 100) logging.info(f"[Init] Brightness set to {self.brightness} from CC33") elif msg.control == 30: self.color_r = round((msg.value / 127) * 255) logging.info(f"[Init] Red set to {self.color_r} from CC30") elif msg.control == 31: self.color_g = round((msg.value / 127) * 255) logging.info(f"[Init] Green set to {self.color_g} from CC31") elif msg.control == 32: self.color_b = round((msg.value / 127) * 255) logging.info(f"[Init] Blue set to {self.color_b} from CC32") elif msg.control == 34: self.n1 = int(msg.value) logging.info(f"[Init] n1 set to {self.n1} from CC34") elif msg.control == 35: self.n2 = int(msg.value) logging.info(f"[Init] n2 set to {self.n2} from CC35") elif msg.control == 27: self.beat_sending_enabled = (msg.value == 127) logging.info(f"[Init] Beat sending {'ENABLED' if self.beat_sending_enabled else 'DISABLED'} from CC27") await asyncio.sleep(0.001) async def _midi_listener(self): logging.info("Midi function") # Changed to info """ Listens to a specific MIDI port and sends data to a WebSocket server when Note 32 (and 33) is pressed. """ # 1. Get MIDI port name port_names = mido.get_input_names() if not port_names: logging.warning("No MIDI input ports found. Please connect your device.") # Changed to warning return if not (0 <= self.midi_port_index < len(port_names)): logging.error(f"Error: MIDI port index {self.midi_port_index} out of range. Available ports: {port_names}") # Changed to error logging.info("Available ports:") # Changed to info for i, name in enumerate(port_names): logging.info(f" {i}: {name}") # Changed to info return midi_port_name = port_names[self.midi_port_index] logging.info(f"Selected MIDI input port: {midi_port_name}") # Changed to info try: with mido.open_input(midi_port_name) as port: logging.info(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.") # Changed to info # Read initial controller state briefly await self._read_initial_cc_state(port) while True: msg = port.receive(block=False) # Non-blocking read if msg: # logging.debug(msg) # Changed to debug match msg.type: case 'note_on': # logging.debug(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}") # Changed to debug # Bank1 patterns starting at MIDI note 36 pattern_bindings: list[str] = [ # Pulse patterns (row 1) "pulse", "sequential_pulse", # Alternating patterns (row 2) "alternating", "alternating_phase", # Chase/movement patterns (row 3) "n_chase", "rainbow", # Effect patterns (row 4) "flicker", "radiate", ] idx = msg.note - 36 if 0 <= idx < len(pattern_bindings): pattern_name = pattern_bindings[idx] self.current_pattern = pattern_name logging.info(f"[Select] Pattern selected via note {msg.note}: {self.current_pattern} (n1={self.n1}, n2={self.n2})") # Send full parameters when pattern changes await self._send_full_parameters() else: pass # Note not bound to patterns case 'control_change': match msg.control: case 36: self.n3 = max(1, msg.value) # Update n3 step rate logging.info(f"n3 set to {self.n3} by MIDI controller (CC36)") await self._request_param_update() case 37: self.delay = msg.value * 4 # Update instance delay logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC37)") await self._request_param_update() case 38: self.n1 = msg.value # pulse n1 for pulse patterns logging.info(f"Pulse n1 set to {self.n1} by MIDI controller (CC38)") await self._request_param_update() case 39: self.n2 = msg.value # pulse n2 for pulse patterns logging.info(f"Pulse n2 set to {self.n2} by MIDI controller (CC39)") await self._request_param_update() case 40: self.n1 = msg.value # n1 for alternating patterns logging.info(f"Alternating n1 set to {self.n1} by MIDI controller (CC40)") await self._request_param_update() case 41: self.n2 = msg.value # n2 for alternating patterns logging.info(f"Alternating n2 set to {self.n2} by MIDI controller (CC41)") await self._request_param_update() case 42: self.n1 = msg.value # radiate n1 for radiate patterns logging.info(f"Radiate n1 set to {self.n1} by MIDI controller (CC42)") await self._request_param_update() case 43: self.delay = msg.value * 4 # delay for radiate patterns logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC43)") await self._request_param_update() case 44: self.knob7 = msg.value logging.info(f"Knob7 set to {self.knob7} by MIDI controller (CC44)") await self._request_param_update() case 45: self.knob8 = msg.value logging.info(f"Knob8 set to {self.knob8} by MIDI controller (CC45)") await self._request_param_update() case 27: if msg.value == 127: self.beat_sending_enabled = True logging.info("[MidiHandler - Listener] Beat sending ENABLED by MIDI control.") # Changed to info elif msg.value == 0: self.beat_sending_enabled = False logging.info("[MidiHandler - Listener] Beat sending DISABLED by MIDI control.") # Changed to info case 29: if msg.value == 127: logging.info("[MidiHandler - Listener] RESET_TEMPO requested by control 29.") await self._send_reset_to_sound() case 33: # Map 0-127 to 0-100 brightness scale self.brightness = round((msg.value / 127) * 100) logging.info(f"Brightness set to {self.brightness} by MIDI controller (CC33)") await self._request_param_update() case 30: # Red 0-127 -> 0-255 self.color_r = round((msg.value / 127) * 255) logging.info(f"Red set to {self.color_r}") await self._request_param_update() case 31: # Green 0-127 -> 0-255 self.color_g = round((msg.value / 127) * 255) logging.info(f"Green set to {self.color_g}") await self._request_param_update() case 32: # Blue 0-127 -> 0-255 self.color_b = round((msg.value / 127) * 255) logging.info(f"Blue set to {self.color_b}") await self._request_param_update() case 34: self.n1 = int(msg.value) logging.info(f"n1 set to {self.n1} by MIDI controller (CC34)") await self._request_param_update() case 35: self.n2 = int(msg.value) logging.info(f"n2 set to {self.n2} by MIDI controller (CC35)") await self._request_param_update() await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop except mido.PortsError as e: logging.error(f"Error opening MIDI port '{midi_port_name}': {e}") # Changed to error except asyncio.CancelledError: logging.info(f"MIDI listener cancelled.") # Changed to info except Exception as e: logging.error(f"An unexpected error occurred in MIDI listener: {e}") # Changed to error async def run(self): try: await self.ws_client.connect() logging.info(f"[MidiHandler] WebSocket client connected to {self.ws_client.uri}") # Changed to info # List available MIDI ports for debugging print(f"Available MIDI input ports: {mido.get_input_names()}") print(f"Trying to open MIDI port index {self.midi_port_index}") await asyncio.gather( self._midi_listener(), self._midi_tcp_server() ) except mido.PortsError as e: logging.error(f"[MidiHandler] Error opening MIDI port: {e}") # Changed to error print(f"MIDI Port Error: {e}") print(f"Available MIDI ports: {mido.get_input_names()}") print("Please check your MIDI device connection and port index") except asyncio.CancelledError: logging.info("[MidiHandler] Tasks cancelled due to program shutdown.") # Changed to info except KeyboardInterrupt: logging.info("\n[MidiHandler] Program interrupted by user.") # Changed to info finally: logging.info("[MidiHandler] Main program finished. Closing WebSocket client...") # Changed to info await self.ws_client.close() logging.info("[MidiHandler] WebSocket client closed.") # Changed to info def print_midi_ports(): logging.info("\n--- Available MIDI Input Ports ---") # Changed to info port_names = mido.get_input_names() if not port_names: logging.warning("No MIDI input ports found.") # Changed to warning else: for i, name in enumerate(port_names): logging.info(f" {i}: {name}") # Changed to info logging.info("----------------------------------") # Changed to info async def main(): print_midi_ports() # --- Configuration --- MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws" # --- End Configuration --- midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI) await midi_handler.run() if __name__ == "__main__": asyncio.run(main())