263 lines
8.4 KiB
Python
263 lines
8.4 KiB
Python
"""Deferred sequence start on beat / downbeat."""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
SRC_PATH = os.path.join(PROJECT_ROOT, "src")
|
|
if SRC_PATH not in sys.path:
|
|
sys.path.insert(0, SRC_PATH)
|
|
|
|
from util import sequence_playback as sp # noqa: E402
|
|
|
|
|
|
def test_normalize_wait_for():
|
|
assert sp._normalize_wait_for({"wait_for": "beat"}) == "beat"
|
|
assert sp._normalize_wait_for({"start_on": "downbeat"}) == "downbeat"
|
|
assert sp._normalize_wait_for({"wait_for": "next_beat"}) == "beat"
|
|
assert sp._normalize_wait_for({}) is None
|
|
assert sp._play_options_without_wait({"wait_for": "beat", "zone_id": "1"}) == {"zone_id": "1"}
|
|
|
|
|
|
def test_pending_play_status_empty():
|
|
sp.clear_pending_play()
|
|
assert sp.pending_play_status() == {"pending": False}
|
|
|
|
|
|
def test_queue_and_clear_pending():
|
|
sp.clear_pending_play()
|
|
sp._queue_pending_start("z1", "s1", "p1", {"simulated_bpm": 120}, "beat", bpm=120.0)
|
|
st = sp.pending_play_status()
|
|
assert st["pending"] is True
|
|
assert st["wait_for"] == "beat"
|
|
assert st["sequence_id"] == "s1"
|
|
sp.clear_pending_play()
|
|
assert sp.pending_play_status()["pending"] is False
|
|
|
|
|
|
def test_try_consume_pending_beat():
|
|
sp.clear_pending_play()
|
|
sp._queue_pending_start("z1", "s1", "p1", None, "beat", bpm=120.0)
|
|
|
|
async def fake_start(*_a, **_k):
|
|
return None
|
|
|
|
sp._start_immediate = fake_start # type: ignore[method-assign]
|
|
assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is True
|
|
assert sp.pending_play_status()["pending"] is False
|
|
|
|
|
|
def test_try_consume_pending_downbeat_skips_upbeat():
|
|
sp.clear_pending_play()
|
|
sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0)
|
|
assert asyncio.run(sp._try_consume_pending_play(is_downbeat=False)) is False
|
|
assert sp.pending_play_status()["pending"] is True
|
|
|
|
async def fake_start(*_a, **_k):
|
|
return None
|
|
|
|
sp._start_immediate = fake_start # type: ignore[method-assign]
|
|
assert asyncio.run(sp._try_consume_pending_play(is_downbeat=True)) is True
|
|
sp.clear_pending_play()
|
|
|
|
|
|
def test_downbeat_start_counts_trigger_beat(monkeypatch):
|
|
"""The downbeat that starts playback is beat 1 of the step, not beat 0."""
|
|
sp.clear_pending_play()
|
|
sp.stop()
|
|
|
|
async def fake_start(_z, _s, _p, _opts):
|
|
sp._beat_run = {
|
|
"lanes": [[{"preset_id": "1", "beats": 4}]],
|
|
"lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}],
|
|
"num_lanes": 1,
|
|
"sequence_loop_beat": 0,
|
|
}
|
|
|
|
monkeypatch.setattr(sp, "_start_immediate", fake_start)
|
|
sp._queue_pending_start("z1", "s1", "p1", None, "downbeat", bpm=120.0)
|
|
|
|
async def run():
|
|
assert await sp._try_consume_pending_play(is_downbeat=True) is True
|
|
await sp.process_active_beat_advance()
|
|
|
|
asyncio.run(run())
|
|
assert sp._beat_run["lane_states"][0]["beatCount"] == 1
|
|
sp.stop()
|
|
|
|
|
|
def _prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log):
|
|
import types
|
|
|
|
async def fake_deliver(_bridge, messages, _macs, _devices, **_kw):
|
|
for raw in messages:
|
|
deliver_log.append(json.loads(raw))
|
|
return ([], 0)
|
|
|
|
def fake_clear(lane_index):
|
|
route_log.append(("clear", lane_index))
|
|
|
|
monkeypatch.setattr(sp, "_resolve_lane_device_names", lambda _i, _c: ["dev-a"])
|
|
monkeypatch.setattr(
|
|
"util.driver_delivery.deliver_json_messages",
|
|
fake_deliver,
|
|
)
|
|
fake_transport = types.ModuleType("models.transport")
|
|
fake_transport.get_current_bridge = lambda: object()
|
|
monkeypatch.setitem(sys.modules, "models.transport", fake_transport)
|
|
monkeypatch.setattr(
|
|
sp,
|
|
"_reset_after_sequence_change",
|
|
lambda: reset_log.extend(["route", "audio"]),
|
|
)
|
|
monkeypatch.setattr(
|
|
"util.beat_driver_route.clear_sequence_manual_lane_route",
|
|
fake_clear,
|
|
)
|
|
|
|
|
|
def test_prime_all_lanes_delivery_order(monkeypatch):
|
|
"""Sequence start: step-0 presets, select, rest presets, then beat/route reset."""
|
|
deliver_log: list = []
|
|
route_log: list = []
|
|
reset_log: list = []
|
|
_prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log)
|
|
|
|
ctx = {
|
|
"lanes": [
|
|
[
|
|
{"preset_id": "p1", "beats": 2},
|
|
{"preset_id": "p2", "beats": 2},
|
|
]
|
|
],
|
|
"lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}],
|
|
"num_lanes": 1,
|
|
"sequence_doc": {
|
|
"lanes": [
|
|
[
|
|
{"preset_id": "p1", "beats": 2},
|
|
{"preset_id": "p2", "beats": 2},
|
|
]
|
|
],
|
|
"group_ids": ["g1"],
|
|
},
|
|
"zone_doc": {"group_ids": ["g1"], "brightness": 200},
|
|
"presets_map": {
|
|
"p1": {
|
|
"pattern": "solid",
|
|
"colors": ["#FF0000"],
|
|
"auto": True,
|
|
},
|
|
"p2": {
|
|
"pattern": "solid",
|
|
"colors": ["#00FF00"],
|
|
"auto": True,
|
|
},
|
|
},
|
|
"devices": object(),
|
|
"groups": object(),
|
|
"settings": {},
|
|
"palette_colors": [],
|
|
}
|
|
|
|
asyncio.run(sp._prime_all_lanes(ctx))
|
|
|
|
assert len(deliver_log) == 3
|
|
step0_msg = deliver_log[0]
|
|
select_msg = deliver_log[1]
|
|
rest_msg = deliver_log[2]
|
|
assert set(step0_msg["presets"]) == {"p1"}
|
|
assert select_msg["select"] == ["p1"]
|
|
assert set(rest_msg["presets"]) == {"p2"}
|
|
for body in deliver_log:
|
|
assert not ("presets" in body and "select" in body)
|
|
assert route_log == [("clear", 0)]
|
|
assert reset_log == ["route", "audio"]
|
|
assert ctx.get("_sequence_primed") is True
|
|
|
|
|
|
def test_prime_all_lanes_single_preset(monkeypatch):
|
|
"""One preset in the lane: step-0 presets, select, reset (no rest phase)."""
|
|
deliver_log: list = []
|
|
route_log: list = []
|
|
reset_log: list = []
|
|
_prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log)
|
|
|
|
ctx = {
|
|
"lanes": [[{"preset_id": "p1", "beats": 2}]],
|
|
"lane_states": [{"stepIdx": 0, "beatCount": 0, "done": False}],
|
|
"num_lanes": 1,
|
|
"sequence_doc": {
|
|
"lanes": [[{"preset_id": "p1", "beats": 2}]],
|
|
"group_ids": ["g1"],
|
|
},
|
|
"zone_doc": {"group_ids": ["g1"], "brightness": 200},
|
|
"presets_map": {
|
|
"p1": {
|
|
"pattern": "solid",
|
|
"colors": ["#FF0000"],
|
|
"auto": True,
|
|
}
|
|
},
|
|
"devices": object(),
|
|
"groups": object(),
|
|
"settings": {},
|
|
"palette_colors": [],
|
|
}
|
|
|
|
asyncio.run(sp._prime_all_lanes(ctx))
|
|
|
|
assert len(deliver_log) == 2
|
|
assert set(deliver_log[0]["presets"]) == {"p1"}
|
|
assert deliver_log[1]["select"] == ["p1"]
|
|
assert route_log == [("clear", 0)]
|
|
assert reset_log == ["route", "audio"]
|
|
|
|
|
|
def test_prime_all_lanes_merges_same_groups(monkeypatch):
|
|
"""Lanes sharing group ids get one step-0 broadcast, not one per lane."""
|
|
deliver_log: list = []
|
|
route_log: list = []
|
|
reset_log: list = []
|
|
_prime_all_lanes_test_mocks(monkeypatch, deliver_log, route_log, reset_log)
|
|
|
|
ctx = {
|
|
"lanes": [
|
|
[{"preset_id": "p1", "beats": 2}],
|
|
[{"preset_id": "p2", "beats": 2}],
|
|
],
|
|
"lane_states": [
|
|
{"stepIdx": 0, "beatCount": 0, "done": False},
|
|
{"stepIdx": 0, "beatCount": 0, "done": False},
|
|
],
|
|
"num_lanes": 2,
|
|
"sequence_doc": {
|
|
"lanes": [
|
|
[{"preset_id": "p1", "beats": 2}],
|
|
[{"preset_id": "p2", "beats": 2}],
|
|
],
|
|
"lanes_group_ids": [["g1"], ["g1"]],
|
|
},
|
|
"zone_doc": {"group_ids": ["g1"], "brightness": 200},
|
|
"presets_map": {
|
|
"p1": {"pattern": "solid", "colors": ["#FF0000"], "auto": True},
|
|
"p2": {"pattern": "solid", "colors": ["#00FF00"], "auto": True},
|
|
},
|
|
"devices": object(),
|
|
"groups": object(),
|
|
"settings": {},
|
|
"palette_colors": [],
|
|
}
|
|
|
|
asyncio.run(sp._prime_all_lanes(ctx))
|
|
|
|
preset_msgs = [b for b in deliver_log if "presets" in b]
|
|
select_msgs = [b for b in deliver_log if "select" in b]
|
|
assert len(preset_msgs) == 1
|
|
assert set(preset_msgs[0]["presets"]) == {"p1", "p2"}
|
|
assert preset_msgs[0]["groups"] == ["g1"]
|
|
assert len(select_msgs) == 2
|
|
assert route_log == [("clear", 0), ("clear", 1)]
|