feat(audio): move beat routing server-side and extend presets
Route beat-triggered manual selects from the controller server, add preset background and beat-counter UI support, and bump led-driver to include the matching pattern/runtime fixes. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
375
tests/beat_detect.py
Normal file
375
tests/beat_detect.py
Normal file
@@ -0,0 +1,375 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Live beat detection utility with custom/aubio/hybrid modes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import collections
|
||||
import queue
|
||||
import sys
|
||||
import time
|
||||
from typing import Deque
|
||||
|
||||
try:
|
||||
import numpy as np
|
||||
except ImportError as exc:
|
||||
raise SystemExit(
|
||||
"Missing dependency: numpy. Install with `pip install numpy`."
|
||||
) from exc
|
||||
|
||||
try:
|
||||
import sounddevice as sd
|
||||
except ImportError as exc:
|
||||
raise SystemExit(
|
||||
"Missing dependency: sounddevice. Install with `pip install sounddevice`."
|
||||
) from exc
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError:
|
||||
requests = None
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Beat detector utility")
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
choices=("custom", "aubio", "hybrid"),
|
||||
default="aubio",
|
||||
help="Detection mode",
|
||||
)
|
||||
parser.add_argument("--device", default=None, help="Input device name or index")
|
||||
parser.add_argument(
|
||||
"--sample-rate",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Audio sample rate (0 = use selected device default)",
|
||||
)
|
||||
parser.add_argument("--hop-size", type=int, default=256, help="Frame hop size in samples")
|
||||
parser.add_argument("--win-mult", type=int, default=2, help="Aubio window size multiplier")
|
||||
parser.add_argument(
|
||||
"--min-band-hz",
|
||||
type=float,
|
||||
default=45.0,
|
||||
help="Low frequency bound used for beat energy",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-band-hz",
|
||||
type=float,
|
||||
default=180.0,
|
||||
help="High frequency bound used for beat energy",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--energy-weight",
|
||||
type=float,
|
||||
default=0.7,
|
||||
help="Weight for low-band energy component (0..1)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--flux-weight",
|
||||
type=float,
|
||||
default=0.3,
|
||||
help="Weight for spectral flux component (0..1)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--threshold-multiplier",
|
||||
type=float,
|
||||
default=1.35,
|
||||
help="Custom-mode threshold multiplier vs adaptive baseline",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ema-alpha",
|
||||
type=float,
|
||||
default=0.08,
|
||||
help="Adaptive baseline smoothing (higher reacts faster)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--min-ioi-ms",
|
||||
type=float,
|
||||
default=85.0,
|
||||
help="Minimum time between beats in milliseconds",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--bpm-window",
|
||||
type=int,
|
||||
default=8,
|
||||
help="How many recent beat intervals to use for BPM estimate",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--post-url",
|
||||
default="",
|
||||
help="Optional HTTP URL to POST beat events",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--aubio-method",
|
||||
default="default",
|
||||
choices=("default", "specdiff", "hfc", "complex", "phase", "energy"),
|
||||
help="Aubio tempo method",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--aubio-threshold",
|
||||
type=float,
|
||||
default=0.12,
|
||||
help="Aubio detection threshold",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--silence-gate-db",
|
||||
type=float,
|
||||
default=-58.0,
|
||||
help="Ignore beat triggers when frame RMS is below this dB level",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _estimate_bpm(beat_times: Deque[float]) -> float | None:
|
||||
if len(beat_times) < 3:
|
||||
return None
|
||||
intervals = np.diff(np.array(beat_times, dtype=np.float64))
|
||||
valid = intervals[(intervals > 0.2) & (intervals < 2.0)]
|
||||
if valid.size == 0:
|
||||
return None
|
||||
return 60.0 / float(np.median(valid))
|
||||
|
||||
|
||||
def _load_aubio_if_needed(mode: str):
|
||||
if mode == "custom":
|
||||
return None
|
||||
try:
|
||||
import aubio
|
||||
return aubio
|
||||
except ImportError:
|
||||
dist_packages = "/usr/lib/python3/dist-packages"
|
||||
if dist_packages not in sys.path:
|
||||
sys.path.append(dist_packages)
|
||||
try:
|
||||
import aubio
|
||||
return aubio
|
||||
except ImportError:
|
||||
raise SystemExit("aubio not installed; use --mode custom or install aubio")
|
||||
|
||||
|
||||
class BeatDetectRuntime:
|
||||
"""Reusable detector runtime so web and CLI can share logic."""
|
||||
|
||||
def __init__(self, args):
|
||||
self.args = args
|
||||
self.aubio = _load_aubio_if_needed(args.mode)
|
||||
self.sample_rate = 0
|
||||
self.frame_size = 0
|
||||
self.tempo = None
|
||||
self.band_mask = None
|
||||
self.freqs = None
|
||||
self.window = None
|
||||
self.prev_mag = None
|
||||
self.kick_mask = None
|
||||
self.snare_mask = None
|
||||
self.hat_mask = None
|
||||
self.baseline = 1e-6
|
||||
self.beat_times: Deque[float] = collections.deque(
|
||||
maxlen=max(2, args.bpm_window + 1)
|
||||
)
|
||||
self.last_trigger_s = 0.0
|
||||
self.debounce_s = float(args.min_ioi_ms) / 1000.0
|
||||
|
||||
def setup(self, sample_rate: int):
|
||||
self.sample_rate = int(sample_rate)
|
||||
self.frame_size = max(128, int(self.args.hop_size))
|
||||
win_size = max(1024, self.frame_size * max(2, self.args.win_mult))
|
||||
freqs = np.fft.rfftfreq(self.frame_size, d=1.0 / self.sample_rate)
|
||||
self.freqs = freqs
|
||||
self.band_mask = (freqs >= self.args.min_band_hz) & (
|
||||
freqs <= self.args.max_band_hz
|
||||
)
|
||||
self.kick_mask = (freqs >= 40.0) & (freqs <= 140.0)
|
||||
self.snare_mask = (freqs >= 140.0) & (freqs <= 3000.0)
|
||||
self.hat_mask = (freqs >= 5000.0) & (freqs <= 12000.0)
|
||||
if not np.any(self.band_mask):
|
||||
raise ValueError("Invalid band range for current sample rate")
|
||||
self.window = np.hanning(self.frame_size).astype(np.float32)
|
||||
self.prev_mag = np.zeros(freqs.shape[0], dtype=np.float32)
|
||||
self.baseline = 1e-6
|
||||
self.last_trigger_s = 0.0
|
||||
self.beat_times.clear()
|
||||
self.tempo = None
|
||||
if self.aubio is not None:
|
||||
self.tempo = self.aubio.tempo(
|
||||
self.args.aubio_method, win_size, self.frame_size, self.sample_rate
|
||||
)
|
||||
if hasattr(self.tempo, "set_threshold"):
|
||||
self.tempo.set_threshold(float(self.args.aubio_threshold))
|
||||
if hasattr(self.tempo, "set_minioi_ms"):
|
||||
self.tempo.set_minioi_ms(float(self.args.min_ioi_ms))
|
||||
|
||||
def _classify_hit(self, mag: np.ndarray):
|
||||
total = float(np.mean(mag) + 1e-9)
|
||||
kick = float(np.mean(mag[self.kick_mask])) / total if np.any(self.kick_mask) else 0.0
|
||||
snare = float(np.mean(mag[self.snare_mask])) / total if np.any(self.snare_mask) else 0.0
|
||||
hat = float(np.mean(mag[self.hat_mask])) / total if np.any(self.hat_mask) else 0.0
|
||||
scores = {
|
||||
"kick": kick,
|
||||
"snare": snare,
|
||||
"hat": hat,
|
||||
}
|
||||
label, value = max(scores.items(), key=lambda kv: kv[1])
|
||||
if value < 1.15:
|
||||
return "unknown", value
|
||||
return label, value
|
||||
|
||||
def process_frame(self, frame: np.ndarray, now_s: float | None = None):
|
||||
if self.window is None or self.band_mask is None:
|
||||
raise RuntimeError("Runtime not setup")
|
||||
if frame.shape[0] != self.frame_size:
|
||||
if frame.shape[0] > self.frame_size:
|
||||
frame = frame[: self.frame_size]
|
||||
else:
|
||||
frame = np.pad(frame, (0, self.frame_size - frame.shape[0]))
|
||||
|
||||
f32 = frame.astype(np.float32)
|
||||
rms = float(np.sqrt(np.mean(f32 * f32) + 1e-12))
|
||||
db = 20.0 * np.log10(max(rms, 1e-12))
|
||||
if db < float(self.args.silence_gate_db):
|
||||
return None
|
||||
mag = np.abs(np.fft.rfft(f32 * self.window)).astype(np.float32)
|
||||
band_energy = float(np.mean(mag[self.band_mask]))
|
||||
flux = float(np.mean(np.maximum(0.0, mag - self.prev_mag)))
|
||||
self.prev_mag[:] = mag
|
||||
|
||||
weight_sum = max(1e-6, self.args.energy_weight + self.args.flux_weight)
|
||||
score = ((self.args.energy_weight * band_energy) + (self.args.flux_weight * flux)) / weight_sum
|
||||
self.baseline = ((1.0 - self.args.ema_alpha) * self.baseline) + (
|
||||
self.args.ema_alpha * score
|
||||
)
|
||||
threshold = self.baseline * self.args.threshold_multiplier
|
||||
custom_hit = score > threshold
|
||||
|
||||
aubio_hit = False
|
||||
aubio_bpm = None
|
||||
if self.tempo is not None:
|
||||
aubio_hit = bool(self.tempo(f32)[0])
|
||||
val = float(self.tempo.get_bpm())
|
||||
aubio_bpm = val if val > 0 else None
|
||||
|
||||
if now_s is None:
|
||||
now_s = time.time()
|
||||
if (now_s - self.last_trigger_s) < self.debounce_s:
|
||||
return None
|
||||
|
||||
if self.args.mode == "custom":
|
||||
should_trigger = custom_hit
|
||||
elif self.args.mode == "aubio":
|
||||
should_trigger = aubio_hit
|
||||
else:
|
||||
should_trigger = custom_hit or aubio_hit
|
||||
if not should_trigger:
|
||||
return None
|
||||
|
||||
self.last_trigger_s = now_s
|
||||
self.beat_times.append(now_s)
|
||||
bpm = aubio_bpm if aubio_bpm is not None else _estimate_bpm(self.beat_times)
|
||||
strength = score / max(1e-9, self.baseline)
|
||||
beat_type, beat_type_conf = self._classify_hit(mag)
|
||||
if self.args.mode == "custom":
|
||||
src = "custom"
|
||||
elif self.args.mode == "aubio":
|
||||
src = "aubio"
|
||||
elif custom_hit and aubio_hit:
|
||||
src = "both"
|
||||
elif custom_hit:
|
||||
src = "custom"
|
||||
else:
|
||||
src = "aubio"
|
||||
return {
|
||||
"ts": now_s,
|
||||
"bpm": bpm,
|
||||
"src": src,
|
||||
"score": score,
|
||||
"threshold": threshold,
|
||||
"strength": strength,
|
||||
"beat_type": beat_type,
|
||||
"beat_type_confidence": beat_type_conf,
|
||||
"db": db,
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
runtime = BeatDetectRuntime(args)
|
||||
|
||||
if args.post_url and requests is None:
|
||||
raise SystemExit("`requests` is required for --post-url (pip install requests)")
|
||||
|
||||
if args.sample_rate > 0:
|
||||
sample_rate = args.sample_rate
|
||||
else:
|
||||
dev_info = sd.query_devices(args.device, "input")
|
||||
sample_rate = int(dev_info["default_samplerate"])
|
||||
|
||||
runtime.setup(sample_rate=sample_rate)
|
||||
frame_size = runtime.frame_size
|
||||
audio_q: "queue.Queue[np.ndarray]" = queue.Queue(maxsize=64)
|
||||
|
||||
def audio_callback(indata, frames, _time_info, status):
|
||||
_ = frames
|
||||
if status:
|
||||
print(f"audio status: {status}")
|
||||
mono = np.asarray(indata[:, 0], dtype=np.float32)
|
||||
if not audio_q.full():
|
||||
audio_q.put_nowait(mono)
|
||||
|
||||
print(
|
||||
"Listening... Ctrl+C to stop. "
|
||||
f"mode={args.mode} sr={sample_rate} hop={frame_size} "
|
||||
f"band={args.min_band_hz:.0f}-{args.max_band_hz:.0f}Hz "
|
||||
f"custom_th={args.threshold_multiplier:.2f} aubio_th={args.aubio_threshold:.2f} "
|
||||
f"min_ioi={args.min_ioi_ms:.0f}ms"
|
||||
)
|
||||
|
||||
with sd.InputStream(
|
||||
device=args.device,
|
||||
channels=1,
|
||||
samplerate=sample_rate,
|
||||
blocksize=frame_size,
|
||||
callback=audio_callback,
|
||||
):
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
frame = audio_q.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
if frame.shape[0] != frame_size:
|
||||
if frame.shape[0] > frame_size:
|
||||
frame = frame[:frame_size]
|
||||
else:
|
||||
frame = np.pad(frame, (0, frame_size - frame.shape[0]))
|
||||
|
||||
event = runtime.process_frame(frame, now_s=time.time())
|
||||
if event is None:
|
||||
continue
|
||||
now_s = event["ts"]
|
||||
bpm = event["bpm"]
|
||||
bpm_text = f"{bpm:.1f}" if isinstance(bpm, (float, int)) else "--"
|
||||
src = event["src"]
|
||||
print(
|
||||
f"[{args.mode}] BEAT bpm={bpm_text} src={src} type={event['beat_type']} "
|
||||
f"type_conf={event['beat_type_confidence']:.2f} strength={event['strength']:.2f} "
|
||||
f"db={event['db']:.1f} "
|
||||
f"score={event['score']:.3e} threshold={event['threshold']:.3e}"
|
||||
)
|
||||
|
||||
if args.post_url and requests is not None:
|
||||
try:
|
||||
requests.post(
|
||||
args.post_url,
|
||||
json={"beat": True, "source": src, "ts": now_s, "bpm": bpm},
|
||||
timeout=0.5,
|
||||
)
|
||||
except Exception as exc:
|
||||
print(f"post failed: {exc}")
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopped.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
75
tests/make_bpm_test_audio.py
Normal file
75
tests/make_bpm_test_audio.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate a click-track WAV file at a known BPM.
|
||||
|
||||
Example:
|
||||
python tests/make_bpm_test_audio.py --bpm 128 --seconds 60 --output tests/audio_128bpm.wav
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import math
|
||||
import struct
|
||||
import wave
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Generate known-BPM click track")
|
||||
parser.add_argument("--bpm", type=float, required=True, help="Target BPM (e.g. 120)")
|
||||
parser.add_argument("--seconds", type=float, default=30.0, help="Audio duration in seconds")
|
||||
parser.add_argument("--sample-rate", type=int, default=44100, help="Sample rate")
|
||||
parser.add_argument("--click-ms", type=float, default=25.0, help="Click length in milliseconds")
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
default="tests/bpm_test.wav",
|
||||
help="Output WAV path",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
if args.bpm <= 0:
|
||||
raise SystemExit("--bpm must be > 0")
|
||||
if args.seconds <= 0:
|
||||
raise SystemExit("--seconds must be > 0")
|
||||
|
||||
sample_rate = int(args.sample_rate)
|
||||
total_samples = int(args.seconds * sample_rate)
|
||||
beat_interval = 60.0 / float(args.bpm)
|
||||
click_samples = max(1, int((args.click_ms / 1000.0) * sample_rate))
|
||||
|
||||
data = [0.0] * total_samples
|
||||
beat_index = 0
|
||||
t = 0.0
|
||||
while t < args.seconds:
|
||||
start = int(t * sample_rate)
|
||||
# Slight accent every 4 beats to help human counting.
|
||||
freq = 1760.0 if beat_index % 4 == 0 else 1320.0
|
||||
amp = 0.9 if beat_index % 4 == 0 else 0.6
|
||||
for i in range(click_samples):
|
||||
idx = start + i
|
||||
if idx >= total_samples:
|
||||
break
|
||||
env = math.exp(-8.0 * (i / click_samples))
|
||||
s = amp * env * math.sin((2.0 * math.pi * freq * i) / sample_rate)
|
||||
data[idx] += s
|
||||
t += beat_interval
|
||||
beat_index += 1
|
||||
|
||||
with wave.open(args.output, "wb") as wf:
|
||||
wf.setnchannels(1)
|
||||
wf.setsampwidth(2)
|
||||
wf.setframerate(sample_rate)
|
||||
frames = bytearray()
|
||||
for s in data:
|
||||
clipped = max(-1.0, min(1.0, s))
|
||||
frames.extend(struct.pack("<h", int(clipped * 32767)))
|
||||
wf.writeframes(bytes(frames))
|
||||
|
||||
print(f"Wrote {args.output} at {args.bpm:.2f} BPM for {args.seconds:.1f}s")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
171
tests/play_varying_click_track.py
Normal file
171
tests/play_varying_click_track.py
Normal file
@@ -0,0 +1,171 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Play a click track with tempo variation for BPM detector testing.
|
||||
|
||||
Examples:
|
||||
python tests/play_varying_click_track.py --start-bpm 90 --end-bpm 150 --seconds 60
|
||||
python tests/play_varying_click_track.py --pattern steps --step-bpms 100,120,140,160 --step-seconds 8
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import math
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Play varying-BPM click track")
|
||||
parser.add_argument("--device", default=None, help="Output audio device name or index")
|
||||
parser.add_argument("--sample-rate", type=int, default=44100, help="Output sample rate")
|
||||
parser.add_argument("--seconds", type=float, default=60.0, help="Playback duration")
|
||||
parser.add_argument(
|
||||
"--stabilize-seconds",
|
||||
type=float,
|
||||
default=5.0,
|
||||
help="Play initial BPM for this many seconds before varying",
|
||||
)
|
||||
parser.add_argument("--pattern", choices=("sweep", "steps"), default="sweep")
|
||||
|
||||
# Sweep options
|
||||
parser.add_argument("--start-bpm", type=float, default=90.0, help="Sweep start BPM")
|
||||
parser.add_argument("--end-bpm", type=float, default=150.0, help="Sweep end BPM")
|
||||
|
||||
# Step options
|
||||
parser.add_argument(
|
||||
"--step-bpms",
|
||||
default="100,120,140,160",
|
||||
help="Comma-separated BPMs for step mode",
|
||||
)
|
||||
parser.add_argument("--step-seconds", type=float, default=8.0, help="Seconds per BPM step")
|
||||
|
||||
# Click tone options
|
||||
parser.add_argument("--click-ms", type=float, default=25.0, help="Click duration")
|
||||
parser.add_argument("--accent-every", type=int, default=4, help="Accent every N beats")
|
||||
parser.add_argument(
|
||||
"--hold-beats",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Hold BPM for this many beats before recalculating",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def bpm_for_time(t_s: float, args: argparse.Namespace, step_bpms: list[float]) -> float:
|
||||
if t_s < args.stabilize_seconds:
|
||||
return args.start_bpm if args.pattern == "sweep" else (step_bpms[0] if step_bpms else 120.0)
|
||||
|
||||
adj_t = t_s - args.stabilize_seconds
|
||||
if args.pattern == "sweep":
|
||||
active_seconds = max(1e-6, args.seconds - args.stabilize_seconds)
|
||||
if active_seconds <= 0:
|
||||
return args.start_bpm
|
||||
alpha = min(1.0, max(0.0, adj_t / active_seconds))
|
||||
return args.start_bpm + (args.end_bpm - args.start_bpm) * alpha
|
||||
|
||||
if not step_bpms:
|
||||
return 120.0
|
||||
if args.step_seconds <= 0:
|
||||
return step_bpms[0]
|
||||
idx = int(adj_t // args.step_seconds) % len(step_bpms)
|
||||
return step_bpms[idx]
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
if args.seconds <= 0:
|
||||
raise SystemExit("--seconds must be > 0")
|
||||
if args.sample_rate <= 0:
|
||||
raise SystemExit("--sample-rate must be > 0")
|
||||
|
||||
step_bpms = [float(x.strip()) for x in args.step_bpms.split(",") if x.strip()]
|
||||
click_samples = max(1, int(args.click_ms * args.sample_rate / 1000.0))
|
||||
|
||||
state = {
|
||||
"t": 0.0, # playback time in seconds
|
||||
"next_beat": 0.0,
|
||||
"beat_idx": 0,
|
||||
"current_bpm": 0.0,
|
||||
"held_bpm": 0.0,
|
||||
"hold_counter": 0,
|
||||
"click_remaining": 0,
|
||||
"click_phase": 0,
|
||||
"click_freq": 1320.0,
|
||||
"click_amp": 0.6,
|
||||
}
|
||||
|
||||
def callback(outdata, frames, _time_info, status):
|
||||
if status:
|
||||
print(f"audio status: {status}")
|
||||
block = np.zeros(frames, dtype=np.float32)
|
||||
for i in range(frames):
|
||||
t = state["t"]
|
||||
dynamic_bpm = bpm_for_time(t, args, step_bpms)
|
||||
state["current_bpm"] = dynamic_bpm
|
||||
bpm = state["held_bpm"] if state["held_bpm"] > 0 else dynamic_bpm
|
||||
beat_interval = 60.0 / max(1e-6, bpm)
|
||||
|
||||
if t >= state["next_beat"]:
|
||||
hold_beats = max(1, int(args.hold_beats))
|
||||
if state["hold_counter"] <= 0:
|
||||
state["held_bpm"] = dynamic_bpm
|
||||
state["hold_counter"] = hold_beats
|
||||
state["hold_counter"] -= 1
|
||||
bpm = state["held_bpm"]
|
||||
beat_interval = 60.0 / max(1e-6, bpm)
|
||||
state["beat_idx"] += 1
|
||||
state["next_beat"] = t + beat_interval
|
||||
state["click_remaining"] = click_samples
|
||||
state["click_phase"] = 0
|
||||
accented = (
|
||||
args.accent_every > 0 and state["beat_idx"] % args.accent_every == 1
|
||||
)
|
||||
state["click_freq"] = 1760.0 if accented else 1320.0
|
||||
state["click_amp"] = 0.9 if accented else 0.6
|
||||
|
||||
if state["click_remaining"] > 0:
|
||||
p = state["click_phase"]
|
||||
env = math.exp(-8.0 * (p / click_samples))
|
||||
sample = state["click_amp"] * env * math.sin(
|
||||
2.0 * math.pi * state["click_freq"] * (p / args.sample_rate)
|
||||
)
|
||||
block[i] = sample
|
||||
state["click_phase"] += 1
|
||||
state["click_remaining"] -= 1
|
||||
|
||||
state["t"] += 1.0 / args.sample_rate
|
||||
|
||||
outdata[:, 0] = block
|
||||
|
||||
print(
|
||||
f"Playing varying click track for {args.seconds:.1f}s ({args.pattern}), "
|
||||
f"stabilize={args.stabilize_seconds:.1f}s"
|
||||
)
|
||||
with sd.OutputStream(
|
||||
samplerate=args.sample_rate,
|
||||
channels=1,
|
||||
dtype="float32",
|
||||
callback=callback,
|
||||
device=args.device,
|
||||
blocksize=0,
|
||||
):
|
||||
start = time.time()
|
||||
last_printed_beat = 0
|
||||
while (time.time() - start) < args.seconds:
|
||||
beat_idx = int(state["beat_idx"])
|
||||
if beat_idx != last_printed_beat:
|
||||
last_printed_beat = beat_idx
|
||||
print(
|
||||
f"beat={beat_idx:04d} bpm={state['held_bpm']:.2f} "
|
||||
f"(target={state['current_bpm']:.2f})"
|
||||
)
|
||||
time.sleep(0.05)
|
||||
|
||||
print("Done.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user