#!/usr/bin/python import pyaudio import aubio import numpy as np from time import sleep import websocket # pip install websocket-client import json seconds = 10 # how long this script should run (if not using infinite loop) bufferSize = 512 windowSizeMultiple = 2 # or 4 for higher accuracy, but more computational cost audioInputDeviceIndex = 7 # use 'arecord -l' to check available audio devices audioInputChannels = 1 pa = pyaudio.PyAudio() print("Available audio input devices:") info = pa.get_host_api_info_by_index(0) num_devices = info.get('deviceCount') found_device = False for i in range(0, num_devices): device_info = pa.get_device_info_by_host_api_device_index(0, i) if (device_info.get('maxInputChannels')) > 0: print(f" Input Device id {i} - {device_info.get('name')}") if i == audioInputDeviceIndex: found_device = True if not found_device: print(f"Warning: Audio input device index {audioInputDeviceIndex} not found or has no input channels.") # Consider exiting or picking a default if necessary try: audioInputDevice = pa.get_device_info_by_index(audioInputDeviceIndex) audioInputSampleRate = int(audioInputDevice['defaultSampleRate']) except Exception as e: print(f"Error getting audio device info for index {audioInputDeviceIndex}: {e}") pa.terminate() exit() # create the aubio tempo detection: hopSize = bufferSize winSize = hopSize * windowSizeMultiple tempoDetection = aubio.tempo(method='default', buf_size=winSize, hop_size=hopSize, samplerate=audioInputSampleRate) # --- WebSocket Setup --- websocket_url = "ws://192.168.4.1:80/ws" ws = None try: ws = websocket.create_connection(websocket_url) print(f"Successfully connected to WebSocket at {websocket_url}") except Exception as e: print(f"Failed to connect to WebSocket: {e}. Data will not be sent over WebSocket.") # --- End WebSocket Setup --- # this function gets called by the input stream, as soon as enough samples are collected from the audio input: def readAudioFrames(in_data, frame_count, time_info, status): global ws # Allow modification of the global ws variable signal = np.frombuffer(in_data, dtype=np.float32) beat = tempoDetection(signal) if beat: bpm = tempoDetection.get_bpm() print(f"beat! (running with {bpm:.2f} bpm)") # Use f-string for cleaner formatting, removed extra bells data_to_send = { "names": ["1"], "settings": { "pattern": "pulse", "delay": 10, "colors": ["#00ff00"], "brightness": 10, "num_leds": 200, }, } if ws: # Only send if the websocket connection is established try: ws.send(json.dumps(data_to_send)) # print("Sent data over WebSocket") # Optional: for debugging except websocket.WebSocketConnectionClosedException: print("WebSocket connection closed, attempting to reconnect...") ws = None # Mark as closed, connection will need to be re-established if desired except Exception as e: print(f"Error sending over WebSocket: {e}") return (in_data, pyaudio.paContinue) # create and start the input stream try: inputStream = pa.open(format=pyaudio.paFloat32, input=True, channels=audioInputChannels, input_device_index=audioInputDeviceIndex, frames_per_buffer=bufferSize, rate=audioInputSampleRate, stream_callback=readAudioFrames) inputStream.start_stream() print("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.") # Loop to keep the script running, allowing graceful shutdown while inputStream.is_active(): sleep(0.1) # Small delay to prevent busy-waiting except KeyboardInterrupt: print("\nKeyboardInterrupt: Stopping script gracefully.") except Exception as e: print(f"An error occurred with the audio stream: {e}") finally: # Ensure streams and resources are closed if 'inputStream' in locals() and inputStream.is_active(): inputStream.stop_stream() if 'inputStream' in locals() and not inputStream.is_stopped(): inputStream.close() pa.terminate() if ws: print("Closing WebSocket connection.") ws.close() print("Script finished.")