"""Deferred sequence start on beat / downbeat.""" import asyncio import json import os import sys PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SRC_PATH = os.path.join(PROJECT_ROOT, "src") if SRC_PATH not in sys.path: sys.path.insert(0, SRC_PATH) from util import sequence_playback as sp # noqa: E402 def test_normalize_wait_for(): assert sp._normalize_wait_for({"wait_for": "beat"}) == "beat" assert sp._normalize_wait_for({"start_on": "downbeat"}) == "downbeat" assert sp._normalize_wait_for({"wait_for": "next_beat"}) == "beat" assert sp._normalize_wait_for({}) is None assert sp._play_options_without_wait({"wait_for": "beat", "zone_id": "1"}) == {"zone_id": "1"} def test_pending_play_status_empty(): sp.clear_pending_play() assert sp.pending_play_status() == {"pending": False} def test_queue_and_clear_pending(): sp.clear_pending_play() sp._queue_pending_start("z1", "s1", "p1", {"simulated_bpm": 120}, "beat", bpm=120.0) st = sp.pending_play_status() assert st["pending"] is True assert st["wait_for"] == "beat" assert st["sequence_id"] == "s1" sp.clear_pending_play() assert sp.pending_play_status()["pending"] is False def test_try_consume_pending_beat(): sp.clear_pending_play() sp._queue_pending_start("z1", "s1", "p1", None, "beat", bpm=120.0) async def fake_start(*_a, **_k): return None sp._start_immediate = fake_start # type: ignore[method-assign] assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is True assert sp.pending_play_status()["pending"] is False def test_try_consume_pending_downbeat_skips_upbeat(): sp.clear_pending_play() sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0) assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is False assert sp.pending_play_status()["pending"] is True async def fake_start(*_a, **_k): return None sp._start_immediate = fake_start # type: ignore[method-assign] assert asyncio.run(sp._try_consume_pending_play(is_downbeat=True)) is True sp.clear_pending_play() def test_downbeat_start_counts_trigger_beat(monkeypatch): """The downbeat that starts playback is beat 1 of the step, not beat 0.""" sp.clear_pending_play() sp.stop() async def fake_start(_z, _s, _p, _opts): sp._beat_run = { "lanes": [[{"preset_id": "1", "beats": 4}]], "lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}], "num_lanes": 1, "sequence_loop_beat": 0, } monkeypatch.setattr(sp, "_start_immediate", fake_start) sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0) async def run(): assert await sp._try_consume_pending_play(is_downbeat=True) is True await sp.process_active_beat_advance() asyncio.run(run()) assert sp._beat_run["lane_states"][0]["beatCount"] == 1 sp.stop() def _prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log): import types async def fake_deliver(_bridge, messages, _macs, _devices, **_kw): for raw in messages: deliver_log.append(json.loads(raw)) return ([], 0) def fake_clear(lane_index): route_log.append(("clear", lane_index)) monkeypatch.setattr(sp, "_resolve_lane_device_names", lambda _i, _c: ["dev-a"]) monkeypatch.setattr( "util.driver_delivery.deliver_json_messages", fake_deliver, ) fake_transport = types.ModuleType("models.transport") fake_transport.get_current_bridge = lambda: object() monkeypatch.setitem(sys.modules, "models.transport", fake_transport) monkeypatch.setattr( sp, "_reset_after_sequence_change", lambda: reset_log.extend(["route", "audio"]), ) monkeypatch.setattr( "util.beat_driver_route.clear_sequence_manual_lane_route", fake_clear, ) def test_prime_all_lanes_delivery_order(monkeypatch): """Sequence start: step-0 presets, select, rest presets, then beat/route reset.""" deliver_log: list = [] route_log: list = [] reset_log: list = [] _prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log) ctx = { "lanes": [ [ {"preset_id": "p1", "beats": 2}, {"preset_id": "p2", "beats": 2}, ] ], "lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}], "num_lanes": 1, "sequence_doc": { "lanes": [ [ {"preset_id": "p1", "beats": 2}, {"preset_id": "p2", "beats": 2}, ] ], "group_ids": ["g1"], }, "zone_doc": {"group_ids": ["g1"], "brightness": 200}, "presets_map": { "p1": { "pattern": "solid", "colors": ["#FF0000"], "auto": True, }, "p2": { "pattern": "solid", "colors": ["#00FF00"], "auto": True, }, }, "devices": object(), "groups": object(), "settings": {}, "palette_colors": [], } asyncio.run(sp._prime_all_lanes(ctx)) assert len(deliver_log) == 3 step0_msg = deliver_log[0] select_msg = deliver_log[1] rest_msg = deliver_log[2] assert set(step0_msg["presets"]) == {"p1"} assert select_msg["select"] == ["p1"] assert set(rest_msg["presets"]) == {"p2"} for body in deliver_log: assert not ("presets" in body and "select" in body) assert route_log == [("clear", 0)] assert reset_log == ["route", "audio"] assert ctx.get("_sequence_primed") is True def test_prime_all_lanes_single_preset(monkeypatch): """One preset in the lane: step-0 presets, select, reset (no rest phase).""" deliver_log: list = [] route_log: list = [] reset_log: list = [] _prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log) ctx = { "lanes": [[{"preset_id": "p1", "beats": 2}]], "lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}], "num_lanes": 1, "sequence_doc": { "lanes": [[{"preset_id": "p1", "beats": 2}]], "group_ids": ["g1"], }, "zone_doc": {"group_ids": ["g1"], "brightness": 200}, "presets_map": { "p1": { "pattern": "solid", "colors": ["#FF0000"], "auto": True, } }, "devices": object(), "groups": object(), "settings": {}, "palette_colors": [], } asyncio.run(sp._prime_all_lanes(ctx)) assert len(deliver_log) == 2 assert set(deliver_log[0]["presets"]) == {"p1"} assert deliver_log[1]["select"] == ["p1"] assert route_log == [("clear", 0)] assert reset_log == ["route", "audio"] def test_prime_all_lanes_merges_same_groups(monkeypatch): """Lanes sharing group ids get one step-0 broadcast, not one per lane.""" deliver_log: list = [] route_log: list = [] reset_log: list = [] _prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log) ctx = { "lanes": [ [{"preset_id": "p1", "beats": 2}], [{"preset_id": "p2", "beats": 2}], ], "lane_states": [ {"stepIdx": 0, "beatCount": 0, "done": False}, {"stepIdx": 0, "beatCount": 0, "done": False}, ], "num_lanes": 2, "sequence_doc": { "lanes": [ [{"preset_id": "p1", "beats": 2}], [{"preset_id": "p2", "beats": 2}], ], "lanes_group_ids": [["g1"], ["g1"]], }, "zone_doc": {"group_ids": ["g1"], "brightness": 200}, "presets_map": { "p1": {"pattern": "solid", "colors": ["#FF0000"], "auto": True}, "p2": {"pattern": "solid", "colors": ["#00FF00"], "auto": True}, }, "devices": object(), "groups": object(), "settings": {}, "palette_colors": [], } asyncio.run(sp._prime_all_lanes(ctx)) preset_msgs = [b for b in deliver_log if "presets" in b] select_msgs = [b for b in deliver_log if "select" in b] assert len(preset_msgs) == 1 assert set(preset_msgs[0]["presets"]) == {"p1", "p2"} assert preset_msgs[0]["groups"] == ["g1"] assert len(select_msgs) == 2 assert route_log == [("clear", 0), ("clear", 1)]