import mido import asyncio import networking import socket import json import logging # Added logging import import time # Added for initial state read import tkinter as tk from tkinter import ttk, messagebox # Import messagebox for confirmations from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS # Pattern name mapping for shorter JSON payloads PATTERN_NAMES = { "flicker": "f", "fill_range": "fr", "n_chase": "nc", "alternating": "a", "pulse": "p", "rainbow": "r", "specto": "s", "radiate": "rd", } # Configure logging DEBUG_MODE = True # Set to False for INFO level logging logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # TCP Server Configuration TCP_HOST = "127.0.0.1" TCP_PORT = 65432 # Sound Control Server Configuration (for sending reset) SOUND_CONTROL_HOST = "127.0.0.1" SOUND_CONTROL_PORT = 65433 class MidiHandler: def __init__(self, midi_port_index: int, websocket_uri: str): self.midi_port_index = midi_port_index self.websocket_uri = websocket_uri self.ws_client = networking.WebSocketClient(websocket_uri) self.delay = 100 # Default delay value, controlled by MIDI controller self.brightness = 100 # Default brightness value, controlled by MIDI controller self.tcp_host = TCP_HOST self.tcp_port = TCP_PORT self.beat_sending_enabled = True # New: Local flag for beat sending self.sound_control_host = SOUND_CONTROL_HOST self.sound_control_port = SOUND_CONTROL_PORT # RGB controlled by CC 30/31/32 (default green) self.color_r = 0 self.color_g = 255 self.color_b = 0 # Generic parameters controlled via CC # Raw CC-driven parameters (0-127) self.n1 = 10 self.n2 = 10 self.n3 = 1 # Current state for GUI display self.current_bpm: float | None = None self.current_pattern: str = "" self.beat_index: int = 0 # Sequential pulse pattern state self.sequential_pulse_enabled: bool = False self.sequential_pulse_step: int = 0 def _current_color_rgb(self) -> tuple: r = max(0, min(255, int(self.color_r))) g = max(0, min(255, int(self.color_g))) b = max(0, min(255, int(self.color_b))) return (r, g, b) async def _handle_sequential_pulse(self): """Handle sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored""" from bar_config import LEFT_BARS, RIGHT_BARS # Calculate which bar should pulse based on beat (1 beat per bar) bar_index = self.beat_index % 4 # 0-3, cycles every 4 beats # Create minimal payload - only send pattern payload = { "d": { # Defaults - only pattern "pt": "p", # pulse } } # Set specific bars to pulse left_bar = LEFT_BARS[bar_index] right_bar = RIGHT_BARS[bar_index] payload[left_bar] = {"pt": "p"} # pulse payload[right_bar] = {"pt": "p"} # pulse logging.debug(f"[Sequential Pulse] Beat {self.beat_index}, pulsing bars {left_bar} and {right_bar}") await self.ws_client.send_data(payload) async def _handle_alternating_phase(self): """Handle alternating pattern with phase offset: every second bar swaps n1 and n2""" from bar_config import LED_BAR_NAMES # Create minimal payload - only send what changes payload = { "d": { # Defaults - only pattern and n1/n2 "pt": "a", # alternating "n1": self.n1, "n2": self.n2, } } # Set n1/n2 swap for every second bar (bars 101, 103, 105, 107) swap_bars = ["101", "103", "105", "107"] for bar_name in LED_BAR_NAMES: if bar_name in swap_bars: # Swap n1 and n2 for out-of-phase bars payload[bar_name] = {"n1": self.n2, "n2": self.n1} else: # In-phase bars use defaults (no override needed) payload[bar_name] = {} logging.debug(f"[Alternating Phase] Beat {self.beat_index}, n1/n2 swap for bars {swap_bars}") await self.ws_client.send_data(payload) async def _send_full_parameters(self): """Send all parameters to bars - may require multiple packets due to size limit""" from bar_config import LED_BAR_NAMES # Calculate packet size for full parameters full_payload = { "d": { "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "dl": self.delay, "cl": [self._current_color_rgb()], "br": self.brightness, "n1": self.n1, "n2": self.n2, "n3": self.n3, } } # Estimate size: ~200 bytes for defaults + 8 bars * 2 bytes = ~216 bytes # This should fit in one packet, but let's be safe payload_size = len(str(full_payload)) if payload_size > 200: # Split into 2 packets if too large # Packet 1: Pattern and timing parameters payload1 = { "d": { "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), "dl": self.delay, "br": self.brightness, } } for bar_name in LED_BAR_NAMES: payload1[bar_name] = {} # Packet 2: Color and pattern parameters payload2 = { "d": { "cl": [self._current_color_rgb()], "n1": self.n1, "n2": self.n2, "n3": self.n3, } } for bar_name in LED_BAR_NAMES: payload2[bar_name] = {} logging.debug(f"[Full Params] Sending in 2 packets due to size ({payload_size} bytes)") await self.ws_client.send_data(payload1) await asyncio.sleep(0.01) # Small delay between packets await self.ws_client.send_data(payload2) else: # Single packet for bar_name in LED_BAR_NAMES: full_payload[bar_name] = {} logging.debug(f"[Full Params] Sending single packet ({payload_size} bytes)") await self.ws_client.send_data(full_payload) async def _send_normal_pattern(self): """Send normal pattern to all bars - minimal payload for beats""" payload = { "d": { # Defaults - only pattern "pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern), } } # Add empty entries for each bar (they'll use defaults) for bar_name in LED_BAR_NAMES: payload[bar_name] = {} logging.debug(f"[Beat] Triggering '{self.current_pattern}' for {len(LED_BAR_NAMES)} bars using defaults") await self.ws_client.send_data(payload) async def _send_reset_to_sound(self): try: reader, writer = await asyncio.open_connection(self.sound_control_host, self.sound_control_port) cmd = "RESET_TEMPO\n".encode('utf-8') writer.write(cmd) await writer.drain() resp = await reader.read(100) logging.info(f"[MidiHandler - Control] Sent RESET_TEMPO, response: {resp.decode().strip()}") writer.close() await writer.wait_closed() except Exception as e: logging.error(f"[MidiHandler - Control] Failed to send RESET_TEMPO: {e}") async def _handle_tcp_client(self, reader, writer): addr = writer.get_extra_info('peername') logging.info(f"[MidiHandler - TCP Server] Connected by {addr}") # Changed to info try: while True: data = await reader.read(4096) # Read up to 4KB of data if not data: logging.info(f"[MidiHandler - TCP Server] Client {addr} disconnected.") # Changed to info break message = data.decode().strip() logging.debug(f"[MidiHandler - TCP Server] Received from {addr}: {message}") # Changed to debug if self.beat_sending_enabled: try: # Attempt to parse as float (BPM) from sound.py bpm_value = float(message) self.current_bpm = bpm_value # On each beat, trigger currently selected pattern(s) if not self.current_pattern: logging.debug("[Beat] No pattern selected yet; ignoring beat") else: self.beat_index = (self.beat_index + 1) % 1000000 # Send periodic parameter updates every 8 beats if self.beat_index % 8 == 0: await self._send_full_parameters() if self.current_pattern == "sequential_pulse": # Sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored await self._handle_sequential_pulse() elif self.current_pattern == "alternating_phase": # Alternating pattern with phase offset: every second bar is out of phase await self._handle_alternating_phase() elif self.current_pattern: # Normal pattern mode - run on all bars await self._send_normal_pattern() except ValueError: logging.warning(f"[MidiHandler - TCP Server] Received non-BPM message from {addr}, not forwarding: {message}") # Changed to warning except Exception as e: logging.error(f"[MidiHandler - TCP Server] Error processing received message from {addr}: {e}") # Changed to error else: logging.debug(f"[MidiHandler - TCP Server] Beat received from {addr} but sending to WebSocket is disabled: {message}") # Changed to debug except asyncio.CancelledError: logging.info(f"[MidiHandler - TCP Server] Client handler for {addr} cancelled.") # Changed to info except Exception as e: logging.error(f"[MidiHandler - TCP Server] Error handling client {addr}: {e}") # Changed to error finally: logging.info(f"[MidiHandler - TCP Server] Closing connection for {addr}") # Changed to info writer.close() await writer.wait_closed() async def _midi_tcp_server(self): server = await asyncio.start_server( lambda r, w: self._handle_tcp_client(r, w), self.tcp_host, self.tcp_port) addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) logging.info(f"[MidiHandler - TCP Server] Serving on {addrs}") # Changed to info async with server: await server.serve_forever() async def _read_initial_cc_state(self, port, timeout_s: float = 0.5): """Read initial CC values from the MIDI device for a short period to populate state.""" start = time.time() while time.time() - start < timeout_s: msg = port.receive(block=False) if msg and msg.type == 'control_change': if msg.control == 36: self.n3 = max(1, msg.value) logging.info(f"[Init] n3 set to {self.n3} from CC36") elif msg.control == 37: self.delay = msg.value * 4 logging.info(f"[Init] Delay set to {self.delay} ms from CC37") elif msg.control == 33: self.brightness = round((msg.value / 127) * 100) logging.info(f"[Init] Brightness set to {self.brightness} from CC33") elif msg.control == 30: self.color_r = round((msg.value / 127) * 255) logging.info(f"[Init] Red set to {self.color_r} from CC30") elif msg.control == 31: self.color_g = round((msg.value / 127) * 255) logging.info(f"[Init] Green set to {self.color_g} from CC31") elif msg.control == 32: self.color_b = round((msg.value / 127) * 255) logging.info(f"[Init] Blue set to {self.color_b} from CC32") elif msg.control == 34: self.n1 = int(msg.value) logging.info(f"[Init] n1 set to {self.n1} from CC34") elif msg.control == 35: self.n2 = int(msg.value) logging.info(f"[Init] n2 set to {self.n2} from CC35") elif msg.control == 27: self.beat_sending_enabled = (msg.value == 127) logging.info(f"[Init] Beat sending {'ENABLED' if self.beat_sending_enabled else 'DISABLED'} from CC27") await asyncio.sleep(0.001) async def _midi_listener(self): logging.info("Midi function") # Changed to info """ Listens to a specific MIDI port and sends data to a WebSocket server when Note 32 (and 33) is pressed. """ # 1. Get MIDI port name port_names = mido.get_input_names() if not port_names: logging.warning("No MIDI input ports found. Please connect your device.") # Changed to warning return if not (0 <= self.midi_port_index < len(port_names)): logging.error(f"Error: MIDI port index {self.midi_port_index} out of range. Available ports: {port_names}") # Changed to error logging.info("Available ports:") # Changed to info for i, name in enumerate(port_names): logging.info(f" {i}: {name}") # Changed to info return midi_port_name = port_names[self.midi_port_index] logging.info(f"Selected MIDI input port: {midi_port_name}") # Changed to info try: with mido.open_input(midi_port_name) as port: logging.info(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.") # Changed to info # Read initial controller state briefly await self._read_initial_cc_state(port) while True: msg = port.receive(block=False) # Non-blocking read if msg: logging.debug(msg) # Changed to debug match msg.type: case 'note_on': logging.debug(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}") # Changed to debug # Bank1 patterns starting at MIDI note 36 pattern_bindings: list[tuple[str, dict]] = [ ("pulse", {"n1": 120, "n2": 120}), ("flicker", {}), ("alternating", {"n1": 6, "n2": 6}), ("n_chase", {"n1": 5, "n2": 5}), ("rainbow", {}), ("radiate", {"n1": 8}), ("sequential_pulse", {}), ("alternating_phase", {}), ] idx = msg.note - 36 if 0 <= idx < len(pattern_bindings): pattern_name, extra = pattern_bindings[idx] self.current_pattern = pattern_name # Apply any immediate param tweaks from binding to local state if "n1" in extra: self.n1 = extra["n1"] if "n2" in extra: self.n2 = extra["n2"] logging.info(f"[Select] Pattern selected via note {msg.note}: {self.current_pattern} (n1={self.n1}, n2={self.n2})") # Send full parameters when pattern changes await self._send_full_parameters() else: logging.debug(f"Note {msg.note} not bound to patterns") case 'control_change': match msg.control: case 36: self.n3 = max(1, msg.value) # Update n3 step rate logging.info(f"n3 set to {self.n3} by MIDI controller (CC36)") case 37: self.delay = msg.value * 4 # Update instance delay logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC37)") case 27: if msg.value == 127: self.beat_sending_enabled = True logging.info("[MidiHandler - Listener] Beat sending ENABLED by MIDI control.") # Changed to info elif msg.value == 0: self.beat_sending_enabled = False logging.info("[MidiHandler - Listener] Beat sending DISABLED by MIDI control.") # Changed to info case 29: if msg.value == 127: logging.info("[MidiHandler - Listener] RESET_TEMPO requested by control 29.") await self._send_reset_to_sound() case 33: # Map 0-127 to 0-100 brightness scale self.brightness = round((msg.value / 127) * 100) logging.info(f"Brightness set to {self.brightness} by MIDI controller (CC33)") case 30: # Red 0-127 -> 0-255 self.color_r = round((msg.value / 127) * 255) logging.info(f"Red set to {self.color_r}") case 31: # Green 0-127 -> 0-255 self.color_g = round((msg.value / 127) * 255) logging.info(f"Green set to {self.color_g}") case 32: # Blue 0-127 -> 0-255 self.color_b = round((msg.value / 127) * 255) logging.info(f"Blue set to {self.color_b}") case 34: self.n1 = int(msg.value) logging.info(f"n1 set to {self.n1} by MIDI controller (CC34)") case 35: self.n2 = int(msg.value) logging.info(f"n2 set to {self.n2} by MIDI controller (CC35)") await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop except mido.PortsError as e: logging.error(f"Error opening MIDI port '{midi_port_name}': {e}") # Changed to error except asyncio.CancelledError: logging.info(f"MIDI listener cancelled.") # Changed to info except Exception as e: logging.error(f"An unexpected error occurred in MIDI listener: {e}") # Changed to error async def run(self): try: await self.ws_client.connect() logging.info(f"[MidiHandler] WebSocket client connected to {self.ws_client.uri}") # Changed to info await asyncio.gather( self._midi_listener(), self._midi_tcp_server() ) except mido.PortsError as e: logging.error(f"[MidiHandler] Error opening MIDI port: {e}") # Changed to error except asyncio.CancelledError: logging.info("[MidiHandler] Tasks cancelled due to program shutdown.") # Changed to info except KeyboardInterrupt: logging.info("\n[MidiHandler] Program interrupted by user.") # Changed to info finally: logging.info("[MidiHandler] Main program finished. Closing WebSocket client...") # Changed to info await self.ws_client.close() logging.info("[MidiHandler] WebSocket client closed.") # Changed to info def print_midi_ports(): logging.info("\n--- Available MIDI Input Ports ---") # Changed to info port_names = mido.get_input_names() if not port_names: logging.warning("No MIDI input ports found.") # Changed to warning else: for i, name in enumerate(port_names): logging.info(f" {i}: {name}") # Changed to info logging.info("----------------------------------") # Changed to info async def main(): print_midi_ports() # --- Configuration --- MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws" # --- End Configuration --- midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI) await midi_handler.run() if __name__ == "__main__": asyncio.run(main())