import collections import importlib.util import os import queue import threading import time class AudioBeatDetector: def __init__(self): self._lock = threading.Lock() self._thread = None self._stream = None self._running = False self._stop_event = threading.Event() self._status = { "running": False, "bpm": None, "last_beat_ts": None, "beat_seq": 0, "beat_type": "unknown", "beat_type_confidence": 0.0, "error": None, "device": None, } def list_input_devices(self): import sounddevice as sd devices = sd.query_devices() hostapis = sd.query_hostapis() default_input_idx = None try: default_input_idx = int(sd.default.device[0]) except Exception: default_input_idx = None out = [] for idx, dev in enumerate(devices): name = str(dev.get("name", f"Input {idx}")) chans = int(dev.get("max_input_channels", 0)) is_monitor_named = "monitor" in name.lower() if chans <= 0 and not is_monitor_named: continue sr = int(dev.get("default_samplerate", 44100)) hostapi_idx = int(dev.get("hostapi", -1)) hostapi_name = ( str(hostapis[hostapi_idx].get("name", "unknown")) if 0 <= hostapi_idx < len(hostapis) else "unknown" ) is_default = default_input_idx is not None and idx == default_input_idx ch_label = f"{chans}ch" if chans > 0 else "0ch?" label = f"[{idx}] {name} ({ch_label} @ {sr}Hz, {hostapi_name})" if is_default: label = f"{label} [default]" if is_monitor_named: label = f"{label} [monitor]" out.append( { "id": idx, "name": name, "label": label, "max_input_channels": chans, "default_samplerate": sr, "is_default": is_default, "hostapi": hostapi_name, } ) return out def diagnostics(self): import sounddevice as sd devices = sd.query_devices() hostapis = sd.query_hostapis() default_input = None try: default_input = sd.default.device[0] except Exception: default_input = None return { "default_input": default_input, "hostapis": hostapis, "devices": devices, } def start(self, device=None): should_restart = False with self._lock: should_restart = self._running if should_restart: self.stop() with self._lock: self._stop_event.clear() self._status.update( { "running": True, "bpm": None, "last_beat_ts": None, "beat_seq": 0, "beat_type": "unknown", "beat_type_confidence": 0.0, "error": None, "device": device, } ) self._running = True self._thread = threading.Thread( target=self._run_loop, args=(device,), daemon=True ) self._thread.start() def stop(self): with self._lock: self._stop_event.set() t = self._thread stream = self._stream try: import sounddevice as sd sd.stop(ignore_errors=True) except Exception: pass if stream is not None: try: stream.abort() except Exception: pass try: stream.stop() except Exception: pass try: stream.close() except Exception: pass if t and t.is_alive(): t.join(timeout=3.0) with self._lock: self._running = False self._thread = None self._stream = None self._status["running"] = False def status(self): with self._lock: return dict(self._status) def _set_error(self, msg): print(f"[audio] {msg}") with self._lock: self._status["error"] = msg self._status["running"] = False self._running = False def _record_beat(self, bpm, beat_type="unknown", beat_type_confidence=0.0): now = time.time() with self._lock: self._status["last_beat_ts"] = now self._status["bpm"] = bpm self._status["beat_type"] = beat_type self._status["beat_type_confidence"] = float(beat_type_confidence or 0.0) self._status["beat_seq"] = int(self._status.get("beat_seq", 0)) + 1 try: from util import sequence_playback as seq_pb seq_pb.push_thread_beat() except Exception as e: print(f"[audio] sequence beat queue: {e}") def _run_loop(self, device): try: import argparse import numpy as np import sounddevice as sd except Exception as e: self._set_error(f"audio deps unavailable: {e}") return try: root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) beat_detect_path = os.path.join(root_dir, "tests", "beat_detect.py") spec = importlib.util.spec_from_file_location("beat_detect_runtime", beat_detect_path) if spec is None or spec.loader is None: raise RuntimeError("cannot load tests/beat_detect.py") beat_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(beat_mod) if device is None: try: device = int(sd.default.device[0]) except Exception: device = -1 if device is None or device < 0: raise RuntimeError( "no default input device; open Audio, pick an input, then Start" ) dev_info = sd.query_devices(device, "input") sample_rate = int(dev_info["default_samplerate"]) args = argparse.Namespace( mode="aubio", device=device, sample_rate=sample_rate, hop_size=256, win_mult=2, min_band_hz=45.0, max_band_hz=180.0, energy_weight=0.7, flux_weight=0.3, threshold_multiplier=1.35, ema_alpha=0.08, min_ioi_ms=85.0, bpm_window=8, post_url="", aubio_method="default", aubio_threshold=0.12, silence_gate_db=-58.0, ) runtime = beat_mod.BeatDetectRuntime(args) runtime.setup(sample_rate=sample_rate) hop_size = runtime.frame_size audio_q = queue.Queue(maxsize=64) def callback(indata, frames, _time_info, status): _ = frames if status: print(f"[audio] status: {status}") mono = np.asarray(indata[:, 0], dtype=np.float32) if not audio_q.full(): audio_q.put_nowait(mono) stream = sd.InputStream( device=device, channels=1, samplerate=sample_rate, blocksize=hop_size, callback=callback, ) with self._lock: self._stream = stream stream.start() try: while not self._stop_event.is_set(): try: frame = audio_q.get(timeout=0.1) except queue.Empty: continue if frame.shape[0] != hop_size: if frame.shape[0] > hop_size: frame = frame[:hop_size] else: frame = np.pad(frame, (0, hop_size - frame.shape[0])) event = runtime.process_frame(frame, now_s=time.time()) if event is None: continue bpm = event.get("bpm") self._record_beat( bpm, beat_type=event.get("beat_type", "unknown"), beat_type_confidence=event.get("beat_type_confidence", 0.0), ) finally: try: stream.stop() except Exception: pass try: stream.close() except Exception: pass with self._lock: if self._stream is stream: self._stream = None except Exception as e: self._set_error(f"detector failed: {e}") return finally: with self._lock: self._running = False self._status["running"] = False # Set from ``main`` so sequence playback can tell real audio from simulated beats. _shared_beat_detector = None def set_shared_beat_detector(det): global _shared_beat_detector _shared_beat_detector = det def shared_beat_detector_running(): d = _shared_beat_detector if d is None: return False try: return bool(d.status().get("running")) except Exception: return False