test: cover audio, sequences, pattern direction, and settings
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -112,12 +112,6 @@ def parse_args() -> argparse.Namespace:
|
||||
default=0.12,
|
||||
help="Aubio detection threshold",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--silence-gate-db",
|
||||
type=float,
|
||||
default=-58.0,
|
||||
help="Ignore beat triggers when frame RMS is below this dB level",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
@@ -131,6 +125,141 @@ def _estimate_bpm(beat_times: Deque[float]) -> float | None:
|
||||
return 60.0 / float(np.median(valid))
|
||||
|
||||
|
||||
def _is_plausible_ioi(
|
||||
last_trigger_s: float,
|
||||
beat_times: Deque[float],
|
||||
now_s: float,
|
||||
*,
|
||||
min_ratio: float = 0.42,
|
||||
max_ratio: float = 2.5,
|
||||
) -> bool:
|
||||
"""Reject double-time / half-time false triggers vs recent median interval."""
|
||||
if last_trigger_s <= 0 or len(beat_times) < 2:
|
||||
return True
|
||||
ioi = now_s - last_trigger_s
|
||||
if ioi <= 0:
|
||||
return False
|
||||
intervals = np.diff(np.array(list(beat_times)[-8:], dtype=np.float64))
|
||||
if intervals.size == 0:
|
||||
return True
|
||||
med = float(np.median(intervals))
|
||||
if med < 0.05:
|
||||
return True
|
||||
return (ioi >= med * min_ratio) and (ioi <= med * max_ratio)
|
||||
|
||||
|
||||
class BarPhaseTracker:
|
||||
"""Track beat-in-bar from downbeat counting (kick hints)."""
|
||||
|
||||
def __init__(self, beats_per_bar: int = 4, kick_conf_min: float = 1.15):
|
||||
self.beats_per_bar = max(1, int(beats_per_bar))
|
||||
self.kick_conf_min = float(kick_conf_min)
|
||||
self.bar_beat = 1
|
||||
self.is_downbeat = True
|
||||
self.confidence = 0.0
|
||||
self._last_downbeat_s = 0.0
|
||||
self._aligned_kicks = 0
|
||||
self._total_beats = 0
|
||||
|
||||
def reset(self) -> None:
|
||||
self.bar_beat = 1
|
||||
self.is_downbeat = True
|
||||
self.confidence = 0.0
|
||||
self._last_downbeat_s = 0.0
|
||||
self._aligned_kicks = 0
|
||||
self._total_beats = 0
|
||||
|
||||
def anchor_downbeat(self, now_s: float) -> None:
|
||||
self.bar_beat = 1
|
||||
self.is_downbeat = True
|
||||
self._last_downbeat_s = float(now_s)
|
||||
self.confidence = max(self.confidence, 0.85)
|
||||
|
||||
def _bar_duration_s(
|
||||
self, bpm: float | None, median_ioi: float | None
|
||||
) -> float | None:
|
||||
if bpm is not None and bpm > 0:
|
||||
return (60.0 / float(bpm)) * self.beats_per_bar
|
||||
if median_ioi is not None and median_ioi > 0:
|
||||
return float(median_ioi) * self.beats_per_bar
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _near_whole_bars(elapsed: float, bar_dur: float, tol: float = 0.14) -> bool:
|
||||
if bar_dur <= 0 or elapsed <= 0:
|
||||
return False
|
||||
n = elapsed / bar_dur
|
||||
nearest = max(1, round(n))
|
||||
return abs(n - nearest) <= tol
|
||||
|
||||
def on_beat(
|
||||
self,
|
||||
now_s: float,
|
||||
beat_type: str,
|
||||
beat_type_conf: float,
|
||||
*,
|
||||
bpm: float | None = None,
|
||||
median_ioi: float | None = None,
|
||||
) -> dict[str, int | float | bool | str]:
|
||||
self._total_beats += 1
|
||||
bar_dur = self._bar_duration_s(bpm, median_ioi)
|
||||
is_kick = (
|
||||
str(beat_type or "").lower() == "kick"
|
||||
and float(beat_type_conf or 0.0) >= self.kick_conf_min
|
||||
)
|
||||
|
||||
downbeat_locked = False
|
||||
if is_kick:
|
||||
if self._last_downbeat_s <= 0 or self._total_beats <= 2:
|
||||
downbeat_locked = True
|
||||
elif bar_dur and self._near_whole_bars(
|
||||
now_s - self._last_downbeat_s, bar_dur
|
||||
):
|
||||
downbeat_locked = True
|
||||
elif is_kick and self.bar_beat >= max(2, self.beats_per_bar - 1):
|
||||
downbeat_locked = True
|
||||
|
||||
prev_bar_beat = int(self.bar_beat)
|
||||
if downbeat_locked:
|
||||
self.bar_beat = 1
|
||||
self.is_downbeat = True
|
||||
self._last_downbeat_s = float(now_s)
|
||||
self._aligned_kicks += 1
|
||||
elif self._total_beats <= 1:
|
||||
self.bar_beat = 1
|
||||
self.is_downbeat = True
|
||||
else:
|
||||
self.bar_beat = (prev_bar_beat % self.beats_per_bar) + 1
|
||||
self.is_downbeat = self.bar_beat == 1
|
||||
|
||||
if self._total_beats >= self.beats_per_bar:
|
||||
bars_seen = max(1, self._total_beats // self.beats_per_bar)
|
||||
self.confidence = min(1.0, self._aligned_kicks / bars_seen)
|
||||
|
||||
return {
|
||||
"bar_beat": int(self.bar_beat),
|
||||
"beats_per_bar": int(self.beats_per_bar),
|
||||
"is_downbeat": bool(self.is_downbeat),
|
||||
"phase_confidence": round(float(self.confidence), 3),
|
||||
"bar_phase_readout": f"{int(self.bar_beat)}/{int(self.beats_per_bar)}",
|
||||
}
|
||||
|
||||
|
||||
def _resolve_bpm(
|
||||
beat_times: Deque[float],
|
||||
aubio_bpm: float | None,
|
||||
) -> float | None:
|
||||
estimated = _estimate_bpm(beat_times)
|
||||
if estimated is None:
|
||||
return aubio_bpm
|
||||
if aubio_bpm is None or aubio_bpm <= 0:
|
||||
return estimated
|
||||
ratio = float(aubio_bpm) / estimated
|
||||
if ratio > 1.75 or ratio < 0.57:
|
||||
return estimated
|
||||
return estimated
|
||||
|
||||
|
||||
def _load_aubio_if_needed(mode: str):
|
||||
if mode == "custom":
|
||||
return None
|
||||
@@ -170,6 +299,8 @@ class BeatDetectRuntime:
|
||||
)
|
||||
self.last_trigger_s = 0.0
|
||||
self.debounce_s = float(args.min_ioi_ms) / 1000.0
|
||||
bpb = int(getattr(args, "beats_per_bar", 4) or 4)
|
||||
self.bar_phase = BarPhaseTracker(beats_per_bar=bpb)
|
||||
|
||||
def setup(self, sample_rate: int):
|
||||
self.sample_rate = int(sample_rate)
|
||||
@@ -192,13 +323,37 @@ class BeatDetectRuntime:
|
||||
self.beat_times.clear()
|
||||
self.tempo = None
|
||||
if self.aubio is not None:
|
||||
self.tempo = self.aubio.tempo(
|
||||
self.args.aubio_method, win_size, self.frame_size, self.sample_rate
|
||||
)
|
||||
if hasattr(self.tempo, "set_threshold"):
|
||||
self.tempo.set_threshold(float(self.args.aubio_threshold))
|
||||
if hasattr(self.tempo, "set_minioi_ms"):
|
||||
self.tempo.set_minioi_ms(float(self.args.min_ioi_ms))
|
||||
self._init_aubio_tempo(win_size)
|
||||
|
||||
def _init_aubio_tempo(self, win_size: int):
|
||||
self.tempo = self.aubio.tempo(
|
||||
self.args.aubio_method, win_size, self.frame_size, self.sample_rate
|
||||
)
|
||||
if hasattr(self.tempo, "set_threshold"):
|
||||
self.tempo.set_threshold(float(self.args.aubio_threshold))
|
||||
if hasattr(self.tempo, "set_minioi_ms"):
|
||||
self.tempo.set_minioi_ms(float(self.args.min_ioi_ms))
|
||||
|
||||
def reset_tempo_state(self) -> None:
|
||||
"""Clear tempo/aubio history without losing bar phase."""
|
||||
self.baseline = 1e-6
|
||||
if self.prev_mag is not None:
|
||||
self.prev_mag[:] = 0.0
|
||||
self.beat_times.clear()
|
||||
self.last_trigger_s = 0.0
|
||||
if self.aubio is not None and self.sample_rate > 0:
|
||||
win_size = max(1024, self.frame_size * max(2, self.args.win_mult))
|
||||
self._init_aubio_tempo(win_size)
|
||||
|
||||
def reset_state(self):
|
||||
"""Full reset (manual): tempo history and bar phase."""
|
||||
self.reset_tempo_state()
|
||||
self.bar_phase.reset()
|
||||
|
||||
def anchor_bar_phase(self, now_s: float | None = None) -> None:
|
||||
if now_s is None:
|
||||
now_s = time.time()
|
||||
self.bar_phase.anchor_downbeat(now_s)
|
||||
|
||||
def _classify_hit(self, mag: np.ndarray):
|
||||
total = float(np.mean(mag) + 1e-9)
|
||||
@@ -227,8 +382,6 @@ class BeatDetectRuntime:
|
||||
f32 = frame.astype(np.float32)
|
||||
rms = float(np.sqrt(np.mean(f32 * f32) + 1e-12))
|
||||
db = 20.0 * np.log10(max(rms, 1e-12))
|
||||
if db < float(self.args.silence_gate_db):
|
||||
return None
|
||||
mag = np.abs(np.fft.rfft(f32 * self.window)).astype(np.float32)
|
||||
band_energy = float(np.mean(mag[self.band_mask]))
|
||||
flux = float(np.mean(np.maximum(0.0, mag - self.prev_mag)))
|
||||
@@ -260,14 +413,30 @@ class BeatDetectRuntime:
|
||||
should_trigger = aubio_hit
|
||||
else:
|
||||
should_trigger = custom_hit or aubio_hit
|
||||
if should_trigger and not _is_plausible_ioi(
|
||||
self.last_trigger_s, self.beat_times, now_s
|
||||
):
|
||||
should_trigger = False
|
||||
if not should_trigger:
|
||||
return None
|
||||
|
||||
self.last_trigger_s = now_s
|
||||
self.beat_times.append(now_s)
|
||||
bpm = aubio_bpm if aubio_bpm is not None else _estimate_bpm(self.beat_times)
|
||||
bpm = _resolve_bpm(self.beat_times, aubio_bpm)
|
||||
strength = score / max(1e-9, self.baseline)
|
||||
beat_type, beat_type_conf = self._classify_hit(mag)
|
||||
median_ioi = None
|
||||
if len(self.beat_times) >= 2:
|
||||
intervals = np.diff(np.array(self.beat_times, dtype=np.float64))
|
||||
if intervals.size > 0:
|
||||
median_ioi = float(np.median(intervals))
|
||||
phase = self.bar_phase.on_beat(
|
||||
now_s,
|
||||
beat_type,
|
||||
beat_type_conf,
|
||||
bpm=bpm,
|
||||
median_ioi=median_ioi,
|
||||
)
|
||||
if self.args.mode == "custom":
|
||||
src = "custom"
|
||||
elif self.args.mode == "aubio":
|
||||
@@ -288,6 +457,7 @@ class BeatDetectRuntime:
|
||||
"beat_type": beat_type,
|
||||
"beat_type_confidence": beat_type_conf,
|
||||
"db": db,
|
||||
**phase,
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user