From 301e1c64bf90282d435906bb4639022373b8c323 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Sun, 17 May 2026 18:32:12 +1200 Subject: [PATCH] test: cover audio, sequences, pattern direction, and settings Co-authored-by: Cursor --- tests/beat_detect.py | 202 +++++++++++++++++++++-- tests/test_audio_reset_tracking.py | 61 +++++++ tests/test_bar_phase.py | 70 ++++++++ tests/test_beat_detect_ioi.py | 28 ++++ tests/test_beat_driver_route_suppress.py | 105 ++++++++++++ tests/test_endpoints_pytest.py | 16 +- tests/test_pattern_direction.py | 36 ++++ tests/test_sequence_beat_phase_sync.py | 43 +++++ tests/test_sequence_pending_start.py | 88 ++++++++++ tests/test_sequence_playback_loop.py | 30 ++++ tests/test_ui_settings.py | 22 +++ 11 files changed, 681 insertions(+), 20 deletions(-) create mode 100644 tests/test_audio_reset_tracking.py create mode 100644 tests/test_bar_phase.py create mode 100644 tests/test_beat_detect_ioi.py create mode 100644 tests/test_beat_driver_route_suppress.py create mode 100644 tests/test_pattern_direction.py create mode 100644 tests/test_sequence_beat_phase_sync.py create mode 100644 tests/test_sequence_pending_start.py create mode 100644 tests/test_sequence_playback_loop.py create mode 100644 tests/test_ui_settings.py diff --git a/tests/beat_detect.py b/tests/beat_detect.py index 51c654d..3a712a4 100644 --- a/tests/beat_detect.py +++ b/tests/beat_detect.py @@ -112,12 +112,6 @@ def parse_args() -> argparse.Namespace: default=0.12, help="Aubio detection threshold", ) - parser.add_argument( - "--silence-gate-db", - type=float, - default=-58.0, - help="Ignore beat triggers when frame RMS is below this dB level", - ) return parser.parse_args() @@ -131,6 +125,141 @@ def _estimate_bpm(beat_times: Deque[float]) -> float | None: return 60.0 / float(np.median(valid)) +def _is_plausible_ioi( + last_trigger_s: float, + beat_times: Deque[float], + now_s: float, + *, + min_ratio: float = 0.42, + max_ratio: float = 2.5, +) -> bool: + """Reject double-time / half-time false triggers vs recent median interval.""" + if last_trigger_s <= 0 or len(beat_times) < 2: + return True + ioi = now_s - last_trigger_s + if ioi <= 0: + return False + intervals = np.diff(np.array(list(beat_times)[-8:], dtype=np.float64)) + if intervals.size == 0: + return True + med = float(np.median(intervals)) + if med < 0.05: + return True + return (ioi >= med * min_ratio) and (ioi <= med * max_ratio) + + +class BarPhaseTracker: + """Track beat-in-bar from downbeat counting (kick hints).""" + + def __init__(self, beats_per_bar: int = 4, kick_conf_min: float = 1.15): + self.beats_per_bar = max(1, int(beats_per_bar)) + self.kick_conf_min = float(kick_conf_min) + self.bar_beat = 1 + self.is_downbeat = True + self.confidence = 0.0 + self._last_downbeat_s = 0.0 + self._aligned_kicks = 0 + self._total_beats = 0 + + def reset(self) -> None: + self.bar_beat = 1 + self.is_downbeat = True + self.confidence = 0.0 + self._last_downbeat_s = 0.0 + self._aligned_kicks = 0 + self._total_beats = 0 + + def anchor_downbeat(self, now_s: float) -> None: + self.bar_beat = 1 + self.is_downbeat = True + self._last_downbeat_s = float(now_s) + self.confidence = max(self.confidence, 0.85) + + def _bar_duration_s( + self, bpm: float | None, median_ioi: float | None + ) -> float | None: + if bpm is not None and bpm > 0: + return (60.0 / float(bpm)) * self.beats_per_bar + if median_ioi is not None and median_ioi > 0: + return float(median_ioi) * self.beats_per_bar + return None + + @staticmethod + def _near_whole_bars(elapsed: float, bar_dur: float, tol: float = 0.14) -> bool: + if bar_dur <= 0 or elapsed <= 0: + return False + n = elapsed / bar_dur + nearest = max(1, round(n)) + return abs(n - nearest) <= tol + + def on_beat( + self, + now_s: float, + beat_type: str, + beat_type_conf: float, + *, + bpm: float | None = None, + median_ioi: float | None = None, + ) -> dict[str, int | float | bool | str]: + self._total_beats += 1 + bar_dur = self._bar_duration_s(bpm, median_ioi) + is_kick = ( + str(beat_type or "").lower() == "kick" + and float(beat_type_conf or 0.0) >= self.kick_conf_min + ) + + downbeat_locked = False + if is_kick: + if self._last_downbeat_s <= 0 or self._total_beats <= 2: + downbeat_locked = True + elif bar_dur and self._near_whole_bars( + now_s - self._last_downbeat_s, bar_dur + ): + downbeat_locked = True + elif is_kick and self.bar_beat >= max(2, self.beats_per_bar - 1): + downbeat_locked = True + + prev_bar_beat = int(self.bar_beat) + if downbeat_locked: + self.bar_beat = 1 + self.is_downbeat = True + self._last_downbeat_s = float(now_s) + self._aligned_kicks += 1 + elif self._total_beats <= 1: + self.bar_beat = 1 + self.is_downbeat = True + else: + self.bar_beat = (prev_bar_beat % self.beats_per_bar) + 1 + self.is_downbeat = self.bar_beat == 1 + + if self._total_beats >= self.beats_per_bar: + bars_seen = max(1, self._total_beats // self.beats_per_bar) + self.confidence = min(1.0, self._aligned_kicks / bars_seen) + + return { + "bar_beat": int(self.bar_beat), + "beats_per_bar": int(self.beats_per_bar), + "is_downbeat": bool(self.is_downbeat), + "phase_confidence": round(float(self.confidence), 3), + "bar_phase_readout": f"{int(self.bar_beat)}/{int(self.beats_per_bar)}", + } + + +def _resolve_bpm( + beat_times: Deque[float], + aubio_bpm: float | None, +) -> float | None: + estimated = _estimate_bpm(beat_times) + if estimated is None: + return aubio_bpm + if aubio_bpm is None or aubio_bpm <= 0: + return estimated + ratio = float(aubio_bpm) / estimated + if ratio > 1.75 or ratio < 0.57: + return estimated + return estimated + + def _load_aubio_if_needed(mode: str): if mode == "custom": return None @@ -170,6 +299,8 @@ class BeatDetectRuntime: ) self.last_trigger_s = 0.0 self.debounce_s = float(args.min_ioi_ms) / 1000.0 + bpb = int(getattr(args, "beats_per_bar", 4) or 4) + self.bar_phase = BarPhaseTracker(beats_per_bar=bpb) def setup(self, sample_rate: int): self.sample_rate = int(sample_rate) @@ -192,13 +323,37 @@ class BeatDetectRuntime: self.beat_times.clear() self.tempo = None if self.aubio is not None: - self.tempo = self.aubio.tempo( - self.args.aubio_method, win_size, self.frame_size, self.sample_rate - ) - if hasattr(self.tempo, "set_threshold"): - self.tempo.set_threshold(float(self.args.aubio_threshold)) - if hasattr(self.tempo, "set_minioi_ms"): - self.tempo.set_minioi_ms(float(self.args.min_ioi_ms)) + self._init_aubio_tempo(win_size) + + def _init_aubio_tempo(self, win_size: int): + self.tempo = self.aubio.tempo( + self.args.aubio_method, win_size, self.frame_size, self.sample_rate + ) + if hasattr(self.tempo, "set_threshold"): + self.tempo.set_threshold(float(self.args.aubio_threshold)) + if hasattr(self.tempo, "set_minioi_ms"): + self.tempo.set_minioi_ms(float(self.args.min_ioi_ms)) + + def reset_tempo_state(self) -> None: + """Clear tempo/aubio history without losing bar phase.""" + self.baseline = 1e-6 + if self.prev_mag is not None: + self.prev_mag[:] = 0.0 + self.beat_times.clear() + self.last_trigger_s = 0.0 + if self.aubio is not None and self.sample_rate > 0: + win_size = max(1024, self.frame_size * max(2, self.args.win_mult)) + self._init_aubio_tempo(win_size) + + def reset_state(self): + """Full reset (manual): tempo history and bar phase.""" + self.reset_tempo_state() + self.bar_phase.reset() + + def anchor_bar_phase(self, now_s: float | None = None) -> None: + if now_s is None: + now_s = time.time() + self.bar_phase.anchor_downbeat(now_s) def _classify_hit(self, mag: np.ndarray): total = float(np.mean(mag) + 1e-9) @@ -227,8 +382,6 @@ class BeatDetectRuntime: f32 = frame.astype(np.float32) rms = float(np.sqrt(np.mean(f32 * f32) + 1e-12)) db = 20.0 * np.log10(max(rms, 1e-12)) - if db < float(self.args.silence_gate_db): - return None mag = np.abs(np.fft.rfft(f32 * self.window)).astype(np.float32) band_energy = float(np.mean(mag[self.band_mask])) flux = float(np.mean(np.maximum(0.0, mag - self.prev_mag))) @@ -260,14 +413,30 @@ class BeatDetectRuntime: should_trigger = aubio_hit else: should_trigger = custom_hit or aubio_hit + if should_trigger and not _is_plausible_ioi( + self.last_trigger_s, self.beat_times, now_s + ): + should_trigger = False if not should_trigger: return None self.last_trigger_s = now_s self.beat_times.append(now_s) - bpm = aubio_bpm if aubio_bpm is not None else _estimate_bpm(self.beat_times) + bpm = _resolve_bpm(self.beat_times, aubio_bpm) strength = score / max(1e-9, self.baseline) beat_type, beat_type_conf = self._classify_hit(mag) + median_ioi = None + if len(self.beat_times) >= 2: + intervals = np.diff(np.array(self.beat_times, dtype=np.float64)) + if intervals.size > 0: + median_ioi = float(np.median(intervals)) + phase = self.bar_phase.on_beat( + now_s, + beat_type, + beat_type_conf, + bpm=bpm, + median_ioi=median_ioi, + ) if self.args.mode == "custom": src = "custom" elif self.args.mode == "aubio": @@ -288,6 +457,7 @@ class BeatDetectRuntime: "beat_type": beat_type, "beat_type_confidence": beat_type_conf, "db": db, + **phase, } diff --git a/tests/test_audio_reset_tracking.py b/tests/test_audio_reset_tracking.py new file mode 100644 index 0000000..93c094f --- /dev/null +++ b/tests/test_audio_reset_tracking.py @@ -0,0 +1,61 @@ +"""Reset detector must not stop the stream or clear ``running``.""" + +import os +import sys +import time + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util.audio_detector import AudioBeatDetector # noqa: E402 + + +class _FakeRuntime: + def __init__(self): + self.reset_calls = 0 + + def reset_state(self): + self.reset_calls += 1 + + +def test_reset_tracking_false_when_not_running(): + det = AudioBeatDetector() + assert det.reset_tracking() is False + + +def test_reset_tracking_queues_on_audio_thread(): + det = AudioBeatDetector() + rt = _FakeRuntime() + with det._lock: + det._running = True + det._runtime = rt + det._status["running"] = True + det._status["bpm"] = 128.0 + det._status["beat_seq"] = 7 + + assert det.reset_tracking() is True + assert rt.reset_calls == 0 + assert det._pending_reset is True + + st = det.status() + assert st["running"] is True + assert st["bpm"] == 128.0 + assert st["beat_seq"] == 7 + + det._process_pending_reset(rt) + assert rt.reset_calls == 1 + assert det._pending_reset is False + assert det.status()["running"] is True + + +def test_status_keeps_bpm_during_holdover(): + det = AudioBeatDetector() + with det._lock: + det._running = True + det._holdover_active = True + det._status["running"] = True + det._status["bpm"] = 128.0 + det._status["last_beat_ts"] = time.time() - 10.0 + assert det.status()["bpm"] == 128.0 diff --git a/tests/test_bar_phase.py b/tests/test_bar_phase.py new file mode 100644 index 0000000..0728b40 --- /dev/null +++ b/tests/test_bar_phase.py @@ -0,0 +1,70 @@ +"""Bar phase (beat-in-bar) tracking for audio beat detection.""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) + +from tests.beat_detect import BarPhaseTracker # noqa: E402 + + +def test_bar_phase_increments_on_non_kick_beats(): + tr = BarPhaseTracker(beats_per_bar=4) + r1 = tr.on_beat(1.0, "snare", 1.3, bpm=120.0) + assert r1["bar_beat"] == 1 + r2 = tr.on_beat(1.5, "snare", 1.2, bpm=120.0) + assert r2["bar_beat"] == 2 + r3 = tr.on_beat(2.0, "hat", 1.1, bpm=120.0) + assert r3["bar_beat"] == 3 + + +def test_kick_near_bar_boundary_resets_to_downbeat(): + tr = BarPhaseTracker(beats_per_bar=4) + tr.on_beat(0.0, "kick", 1.4, bpm=120.0) + tr.on_beat(0.5, "snare", 1.2, bpm=120.0) + tr.on_beat(1.0, "snare", 1.2, bpm=120.0) + tr.on_beat(1.5, "snare", 1.2, bpm=120.0) + r = tr.on_beat(2.0, "kick", 1.5, bpm=120.0) + assert r["bar_beat"] == 1 + assert r["is_downbeat"] is True + + +def test_anchor_downbeat_sets_confidence(): + tr = BarPhaseTracker(beats_per_bar=4) + tr.anchor_downbeat(10.0) + assert tr.bar_beat == 1 + assert tr.confidence >= 0.85 + + +def test_reset_tempo_preserves_bar_phase(): + from argparse import Namespace + + from tests.beat_detect import BeatDetectRuntime # noqa: E402 + + args = Namespace( + mode="custom", + hop_size=256, + win_mult=2, + min_band_hz=45.0, + max_band_hz=180.0, + energy_weight=0.7, + flux_weight=0.3, + threshold_multiplier=1.35, + ema_alpha=0.08, + min_ioi_ms=100.0, + bpm_window=8, + aubio_method="default", + aubio_threshold=0.12, + beats_per_bar=4, + ) + rt = BeatDetectRuntime(args) + rt.setup(44100) + rt.bar_phase.on_beat(0.0, "kick", 1.5, bpm=120.0) + rt.bar_phase.on_beat(0.5, "snare", 1.2, bpm=120.0) + assert rt.bar_phase.bar_beat == 2 + rt.reset_tempo_state() + assert rt.bar_phase.bar_beat == 2 + rt.reset_state() + assert rt.bar_phase.bar_beat == 1 diff --git a/tests/test_beat_detect_ioi.py b/tests/test_beat_detect_ioi.py new file mode 100644 index 0000000..7d432b4 --- /dev/null +++ b/tests/test_beat_detect_ioi.py @@ -0,0 +1,28 @@ +"""Beat interval plausibility helpers (audio detector).""" + +import os +import sys +from collections import deque + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) + +from tests.beat_detect import _is_plausible_ioi, _resolve_bpm # noqa: E402 + + +def test_is_plausible_ioi_rejects_double_time(): + times = deque([0.0, 0.5, 1.0]) + assert _is_plausible_ioi(1.0, times, 1.15) is False + + +def test_is_plausible_ioi_accepts_steady_grid(): + times = deque([0.0, 0.5, 1.0]) + assert _is_plausible_ioi(1.0, times, 1.5) is True + + +def test_resolve_bpm_prefers_intervals_over_wrong_aubio(): + times = deque([0.0, 0.5, 1.0, 1.5, 2.0]) + bpm = _resolve_bpm(times, 70.0) + assert bpm is not None + assert abs(bpm - 120.0) < 5.0 diff --git a/tests/test_beat_driver_route_suppress.py b/tests/test_beat_driver_route_suppress.py new file mode 100644 index 0000000..b14d635 --- /dev/null +++ b/tests/test_beat_driver_route_suppress.py @@ -0,0 +1,105 @@ +"""Manual beat route: suppress duplicate select after sequence step change.""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util import beat_driver_route as bdr # noqa: E402 + + +def _patch_delivery(monkeypatch): + delivered = [] + + async def fake_batch(pairs): + delivered.extend(pairs) + + def fake_schedule(coro, _loop): + import asyncio + + asyncio.run(coro) + + monkeypatch.setattr(bdr, "_deliver_select_batch", fake_batch) + monkeypatch.setattr(bdr, "_main_loop", object()) + monkeypatch.setattr("asyncio.run_coroutine_threadsafe", fake_schedule) + return delivered + + +def test_suppress_next_notify_skips_one_select(monkeypatch): + delivered = _patch_delivery(monkeypatch) + + bdr.set_sequence_manual_lane_route( + 0, + ["desk"], + "5", + {"p": "chase", "a": False, "manual_beat_n": 1}, + ) + bdr.mark_sequence_manual_lane_select_sent(0) + + bdr.notify_beat_detected() + assert delivered == [] + + bdr.notify_beat_detected() + assert delivered == [(["desk"], "5")] + + +def test_suppress_does_not_advance_beat_counter(monkeypatch): + delivered = _patch_delivery(monkeypatch) + + bdr.set_sequence_manual_lane_route( + 0, + ["desk"], + "42", + {"p": "radiate", "a": False, "manual_beat_n": 2}, + ) + bdr.mark_sequence_manual_lane_select_sent(0) + + bdr.notify_beat_detected() + assert delivered == [] + + bdr.notify_beat_detected() + assert delivered == [(["desk"], "42")] + + delivered.clear() + bdr.notify_beat_detected() + assert delivered == [] + + bdr.notify_beat_detected() + assert delivered == [(["desk"], "42")] + + +def test_duplicate_lanes_dedupe_to_one_select_per_beat(monkeypatch): + delivered = _patch_delivery(monkeypatch) + body = {"p": "radiate", "a": False, "manual_beat_n": 1} + entry = { + "device_names": ["desk"], + "wire_preset_id": "42", + "pattern": "radiate", + "manual_beat_n": 1, + "beat_counter": 0, + } + with bdr._route_lock: + bdr._lane_manual.clear() + bdr._lane_manual[-1] = dict(entry) + bdr._lane_manual[0] = dict(entry) + + bdr.notify_beat_detected() + assert delivered == [(["desk"], "42")] + + +def test_standalone_overlay_skipped_when_sequence_lane_covers(monkeypatch): + delivered = _patch_delivery(monkeypatch) + body = {"p": "radiate", "a": False, "manual_beat_n": 1} + + bdr.set_sequence_manual_lane_route(1, ["desk"], "42", body) + bdr._apply_manual_beat_route_standalone_overlay(["desk"], "42", body) + + with bdr._route_lock: + assert -1 not in bdr._lane_manual + assert 1 in bdr._lane_manual + + bdr.notify_beat_detected() + assert delivered == [(["desk"], "42")] diff --git a/tests/test_endpoints_pytest.py b/tests/test_endpoints_pytest.py index 81f408a..9be096a 100644 --- a/tests/test_endpoints_pytest.py +++ b/tests/test_endpoints_pytest.py @@ -352,19 +352,27 @@ def test_settings_controller(server): ) assert resp.status_code == 400 - resp = c.put(f"{base_url}/settings/settings", json={"wifi_channel": 11}) + resp = c.put(f"{base_url}/settings", json={"wifi_channel": 11}) assert resp.status_code == 200 - resp = c.put(f"{base_url}/settings/settings", json={"wifi_channel": 12}) + resp = c.put(f"{base_url}/settings", json={"wifi_channel": 12}) assert resp.status_code == 400 - resp = c.put(f"{base_url}/settings/settings", json={"global_brightness": 42}) + resp = c.put(f"{base_url}/settings", json={"global_brightness": 42}) assert resp.status_code == 200 resp = c.get(f"{base_url}/settings") assert resp.status_code == 200 assert resp.json().get("global_brightness") == 42 - resp = c.put(f"{base_url}/settings/settings", json={"global_brightness": 300}) + resp = c.put( + f"{base_url}/settings", + json={"sequence_switch_wait": "downbeat"}, + ) + assert resp.status_code == 200 + resp = c.get(f"{base_url}/settings") + assert resp.json().get("sequence_switch_wait") == "downbeat" + + resp = c.put(f"{base_url}/settings", json={"global_brightness": 300}) assert resp.status_code == 400 diff --git a/tests/test_pattern_direction.py b/tests/test_pattern_direction.py new file mode 100644 index 0000000..cad28d6 --- /dev/null +++ b/tests/test_pattern_direction.py @@ -0,0 +1,36 @@ +"""LED strip reverse (n5) mapping for upside-down installs.""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +DRIVER_SRC = os.path.join(PROJECT_ROOT, "led-driver", "src") +if DRIVER_SRC not in sys.path: + sys.path.insert(0, DRIVER_SRC) + +from patterns.pattern_direction import is_reversed, led_i, signed # noqa: E402 +from preset import Preset # noqa: E402 + + +class _FakeDriver: + num_leds = 10 + + +def test_preset_reverse_sets_n5(): + p = Preset({"p": "chase", "reverse": True}) + assert p.n5 == 1 + assert is_reversed(p) is True + + +def test_led_i_mirrors_index(): + drv = _FakeDriver() + p = Preset({"p": "chase", "n5": 1}) + assert led_i(drv, p, 0) == 9 + assert led_i(drv, p, 9) == 0 + assert led_i(drv, p, 3) == 6 + + +def test_signed_negates_when_reversed(): + p = Preset({"p": "chase", "n5": 1}) + assert signed(p, 4) == -4 + assert signed(Preset({"p": "chase", "n5": 0}), 4) == 4 diff --git a/tests/test_sequence_beat_phase_sync.py b/tests/test_sequence_beat_phase_sync.py new file mode 100644 index 0000000..2a459fa --- /dev/null +++ b/tests/test_sequence_beat_phase_sync.py @@ -0,0 +1,43 @@ +"""Sequence beat phase alignment (sync to musical downbeat).""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util.sequence_playback import apply_beat_phase_sync # noqa: E402 + + +def _ctx(lane_states): + return {"lane_states": lane_states, "sequence_loop_beat": 5} + + +def test_apply_beat_phase_sync_step_resets_beat_count_only(): + ctx = _ctx( + [ + {"stepIdx": 2, "beatCount": 3, "done": False}, + {"stepIdx": 1, "beatCount": 1, "done": True}, + ] + ) + ok, resend = apply_beat_phase_sync(ctx, "step") + assert ok is True + assert resend is False + assert ctx["lane_states"][0]["stepIdx"] == 2 + assert ctx["lane_states"][0]["beatCount"] == 0 + assert ctx["lane_states"][1]["beatCount"] == 1 + assert ctx["sequence_loop_beat"] == 5 + + +def test_apply_beat_phase_sync_pass_restarts_pass(): + ctx = _ctx([{"stepIdx": 2, "beatCount": 3, "done": False}]) + ok, resend = apply_beat_phase_sync(ctx, "pass") + assert ok is True + assert resend is True + st = ctx["lane_states"][0] + assert st["stepIdx"] == 0 + assert st["beatCount"] == 0 + assert st["done"] is False + assert ctx["sequence_loop_beat"] == 0 diff --git a/tests/test_sequence_pending_start.py b/tests/test_sequence_pending_start.py new file mode 100644 index 0000000..398bfb8 --- /dev/null +++ b/tests/test_sequence_pending_start.py @@ -0,0 +1,88 @@ +"""Deferred sequence start on beat / downbeat.""" + +import asyncio +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util import sequence_playback as sp # noqa: E402 + + +def test_normalize_wait_for(): + assert sp._normalize_wait_for({"wait_for": "beat"}) == "beat" + assert sp._normalize_wait_for({"start_on": "downbeat"}) == "downbeat" + assert sp._normalize_wait_for({"wait_for": "next_beat"}) == "beat" + assert sp._normalize_wait_for({}) is None + assert sp._play_options_without_wait({"wait_for": "beat", "zone_id": "1"}) == {"zone_id": "1"} + + +def test_pending_play_status_empty(): + sp.clear_pending_play() + assert sp.pending_play_status() == {"pending": False} + + +def test_queue_and_clear_pending(): + sp.clear_pending_play() + sp._queue_pending_start("z1", "s1", "p1", {"simulated_bpm": 120}, "beat", bpm=120.0) + st = sp.pending_play_status() + assert st["pending"] is True + assert st["wait_for"] == "beat" + assert st["sequence_id"] == "s1" + sp.clear_pending_play() + assert sp.pending_play_status()["pending"] is False + + +def test_try_consume_pending_beat(): + sp.clear_pending_play() + sp._queue_pending_start("z1", "s1", "p1", None, "beat", bpm=120.0) + + async def fake_start(*_a, **_k): + return None + + sp._start_immediate = fake_start # type: ignore[method-assign] + assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is True + assert sp.pending_play_status()["pending"] is False + + +def test_try_consume_pending_downbeat_skips_upbeat(): + sp.clear_pending_play() + sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0) + assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is False + assert sp.pending_play_status()["pending"] is True + + async def fake_start(*_a, **_k): + return None + + sp._start_immediate = fake_start # type: ignore[method-assign] + assert asyncio.run(sp._try_consume_pending_play(is_downbeat=True)) is True + sp.clear_pending_play() + + +def test_downbeat_start_counts_trigger_beat(monkeypatch): + """The downbeat that starts playback is beat 1 of the step, not beat 0.""" + sp.clear_pending_play() + sp.stop() + + async def fake_start(_z, _s, _p, _opts): + sp._beat_run = { + "lanes": [[{"preset_id": "1", "beats": 4}]], + "lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}], + "num_lanes": 1, + "sequence_loop_beat": 0, + } + + monkeypatch.setattr(sp, "_start_immediate", fake_start) + sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0) + + async def run(): + assert await sp._try_consume_pending_play(is_downbeat=True) is True + await sp.process_active_beat_advance() + + asyncio.run(run()) + assert sp._beat_run["lane_states"][0]["beatCount"] == 1 + sp.stop() + diff --git a/tests/test_sequence_playback_loop.py b/tests/test_sequence_playback_loop.py new file mode 100644 index 0000000..80c2503 --- /dev/null +++ b/tests/test_sequence_playback_loop.py @@ -0,0 +1,30 @@ +"""Sequence playback loop flag coercion.""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util.sequence_playback import ( # noqa: E402 + _coerce_loop, + _ordered_unique_preset_ids_in_lane, +) + + +def test_coerce_loop(): + assert _coerce_loop({"loop": True}) is True + assert _coerce_loop({"loop": False}) is False + assert _coerce_loop({"sequence_loop": 0}) is False + assert _coerce_loop({}) is True + + +def test_ordered_unique_preset_ids_in_lane(): + lane = [ + {"preset_id": "6", "beats": 1}, + {"preset_id": "4", "beats": 2}, + {"preset_id": "6", "beats": 1}, + ] + assert _ordered_unique_preset_ids_in_lane(lane) == ["6", "4"] diff --git a/tests/test_ui_settings.py b/tests/test_ui_settings.py new file mode 100644 index 0000000..34adbe5 --- /dev/null +++ b/tests/test_ui_settings.py @@ -0,0 +1,22 @@ +"""Server-owned UI settings (no browser localStorage).""" + +import os +import sys + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SRC_PATH = os.path.join(PROJECT_ROOT, "src") +if SRC_PATH not in sys.path: + sys.path.insert(0, SRC_PATH) + +from util.audio_run_persist import read_audio_run_state # noqa: E402 +from util.sequence_playback import _sequence_switch_wait_from_settings # noqa: E402 + + +def test_audio_run_state_includes_device_form_fields(): + st = read_audio_run_state() + assert "device_override" in st + assert "device_select" in st + + +def test_sequence_switch_wait_from_settings(): + assert _sequence_switch_wait_from_settings() in ("beat", "downbeat")