Files
led-controller/tests/test_simulated_beat_continuity.py
Jimmy ace5770b3a refactor(api): complete fastapi migration and related features
Finish native FastAPI controllers, drop vendored microdot, and add
Wi-Fi driver runtime, beat SSE, simulated BPM, sequence playback
improvements, bridge ESP-NOW sources, UI updates, and tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-11 22:55:28 +12:00

157 lines
4.8 KiB
Python

"""Background simulated beat clock vs live audio."""
import asyncio
import os
import sys
import time
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SRC_PATH = os.path.join(PROJECT_ROOT, "src")
if SRC_PATH not in sys.path:
sys.path.insert(0, SRC_PATH)
from util import sequence_playback as sp # noqa: E402
from util.audio_detector import AudioBeatDetector, set_shared_beat_detector # noqa: E402
def _loop_ctx():
return {
"lanes": [[{"preset_id": "1", "beats": 99}]],
"lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}],
"num_lanes": 1,
"loop": True,
"sequence_loop_beat": 0,
}
async def _run_background_beats(*, bpm: float, seconds: float, audio_running: bool) -> int:
sp.stop()
sp.clear_pending_play()
det = AudioBeatDetector()
set_shared_beat_detector(det)
try:
with det._lock:
det._running = bool(audio_running)
det._status["running"] = bool(audio_running)
if audio_running:
det._status["bpm"] = float(bpm)
except Exception:
pass
ctx = _loop_ctx()
with sp._beat_run_lock:
sp._beat_run = ctx
sp._beat_consumer_started = False
sp._background_beat_task = None
sp.ensure_beat_consumer_started()
monkeypatch_bpm = bpm
def fake_bpm():
return monkeypatch_bpm
orig = sp._simulated_bpm_from_settings
sp._simulated_bpm_from_settings = fake_bpm # type: ignore[method-assign]
try:
await asyncio.sleep(seconds)
finally:
sp._simulated_bpm_from_settings = orig # type: ignore[method-assign]
with sp._beat_run_lock:
st = sp._beat_run["lane_states"][0] if sp._beat_run else {}
beat_count = int(st.get("beatCount", 0))
tick = sp.simulated_beat_tick()
sp.stop()
set_shared_beat_detector(None)
return beat_count, tick
def test_background_beats_continue_past_four_with_audio_off():
beat_count, tick = asyncio.run(
_run_background_beats(bpm=200.0, seconds=2.5, audio_running=False)
)
assert beat_count > 4, f"expected more than 4 beats, got {beat_count}"
assert tick > 4, f"expected tick past 4, got {tick}"
def test_background_advances_sequence_when_audio_on_without_beats():
beat_count, tick = asyncio.run(
_run_background_beats(bpm=200.0, seconds=2.5, audio_running=True)
)
assert beat_count > 4, f"sim should fill when audio is on but not clocking, got {beat_count}"
assert tick > 4, f"background tick should still count, got {tick}"
def test_holdover_fills_beats_between_sparse_real_detections():
det = AudioBeatDetector()
set_shared_beat_detector(det)
try:
with det._lock:
det._running = True
det._status["running"] = True
async def run():
sp.stop()
sp.clear_pending_play()
ctx = _loop_ctx()
with sp._beat_run_lock:
sp._beat_run = ctx
sp._beat_consumer_started = False
sp._background_beat_task = None
sp.ensure_beat_consumer_started()
det._record_beat(120.0)
await asyncio.sleep(2.2)
with sp._beat_run_lock:
beat_count = int(ctx["lane_states"][0].get("beatCount", 0))
sp.stop()
return beat_count
beat_count = asyncio.run(run())
assert beat_count > 2, f"holdover should advance between kicks, got {beat_count}"
finally:
set_shared_beat_detector(None)
def test_live_audio_advances_sequence_when_running():
det = AudioBeatDetector()
set_shared_beat_detector(det)
try:
with det._lock:
det._running = True
det._status["running"] = True
async def run():
sp.stop()
sp.clear_pending_play()
ctx = _loop_ctx()
with sp._beat_run_lock:
sp._beat_run = ctx
sp._beat_consumer_started = False
sp._background_beat_task = None
sp.ensure_beat_consumer_started()
gap = sp._min_processed_beat_gap_s() + 0.01
for _ in range(8):
det._record_beat(400.0)
await asyncio.sleep(gap)
with sp._beat_run_lock:
beat_count = int(ctx["lane_states"][0].get("beatCount", 0))
sp.stop()
return beat_count
beat_count = asyncio.run(run())
assert beat_count > 4, f"audio should drive sequence, got {beat_count}"
finally:
set_shared_beat_detector(None)
def test_beat_dedupe_drops_double_fire():
sp.stop()
sp.clear_pending_play()
sp._accept_thread_beat_now()
assert sp._accept_thread_beat_now() is False
time.sleep(sp._min_processed_beat_gap_s() + 0.02)
assert sp._accept_thread_beat_now() is True
sp.stop()