Move to src. Add midi and sound

This commit is contained in:
2025-09-07 21:15:42 +12:00
parent b77b29415c
commit 9fc58a827b
9 changed files with 568 additions and 7 deletions

105
src/midi.py Normal file
View File

@@ -0,0 +1,105 @@
import mido
import asyncio
import time
import networking # <--- This will now correctly import your module
async def midi_to_websocket_listener(midi_port_index: int, websocket_uri: str):
"""
Listens to a specific MIDI port and sends data to a WebSocket server
when Note 32 (and 33) is pressed.
"""
delay = 100 # Default delay value
# 1. Get MIDI port name
port_names = mido.get_input_names()
if not port_names:
print("No MIDI input ports found. Please connect your device.")
return
if not (0 <= midi_port_index < len(port_names)):
print(f"Error: MIDI port index {midi_port_index} out of range. Available ports: {port_names}")
print("Available ports:")
for i, name in enumerate(port_names):
print(f" {i}: {name}")
return
midi_port_name = port_names[midi_port_index]
print(f"Selected MIDI input port: {midi_port_name}")
# 2. Initialize WebSocket client (using your actual networking.py)
ws_client = networking.WebSocketClient(websocket_uri)
try:
# 3. Connect WebSocket
await ws_client.connect()
print(f"WebSocket client connected to {ws_client.uri}")
# 4. Open MIDI port and start listening loop
with mido.open_input(midi_port_name) as port:
print(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.")
while True:
msg = port.receive(block=False) # Non-blocking read
if msg:
match msg.type:
case 'note_on':
print(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}")
match msg.note:
case 32:
await ws_client.send_data({
"names": ["1"],
"settings": {
"pattern": "pulse",
"delay": delay,
"colors": ["#00ff00"],
"brightness": 100,
"num_leds": 200,
}
})
case 33:
await ws_client.send_data({
"names": ["2"],
"settings": {
"pattern": "chase",
"speed": 10,
"color": "#00FFFF",
}
})
case 'control_change':
match msg.control:
case 36:
delay = msg.value * 4
print(f"Delay set to {delay} ms")
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
except mido.PortsError as e:
print(f"Error opening MIDI port '{midi_port_name}': {e}")
except asyncio.CancelledError:
print(f"MIDI listener cancelled.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# 5. Disconnect WebSocket and clean up
# This assumes your WebSocketClient has a ._connected attribute or similar way to check state.
# If your client's disconnect method is safe to call even if not connected, you can simplify.
await ws_client.close()
print("MIDI listener stopped and cleaned up.")
async def main():
# --- Configuration ---
MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
# --- End Configuration ---
try:
await midi_to_websocket_listener(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
except KeyboardInterrupt:
print("\nProgram interrupted by user.")
finally:
print("Main program finished.")
if __name__ == "__main__":
asyncio.run(main())