160 lines
4.3 KiB
Python
160 lines
4.3 KiB
Python
"""Manual beat route: suppress duplicate select after sequence step change."""
|
|
|
|
import os
|
|
import sys
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
SRC_PATH = os.path.join(PROJECT_ROOT, "src")
|
|
if SRC_PATH not in sys.path:
|
|
sys.path.insert(0, SRC_PATH)
|
|
|
|
from util import beat_driver_route as bdr # noqa: E402
|
|
|
|
|
|
def _patch_delivery(monkeypatch):
|
|
delivered = []
|
|
|
|
async def fake_batch(pairs):
|
|
delivered.extend(pairs)
|
|
|
|
def fake_schedule(coro, _loop):
|
|
import asyncio
|
|
|
|
asyncio.run(coro)
|
|
|
|
monkeypatch.setattr(bdr, "_deliver_select_batch", fake_batch)
|
|
monkeypatch.setattr(bdr, "_main_loop", object())
|
|
monkeypatch.setattr("asyncio.run_coroutine_threadsafe", fake_schedule)
|
|
return delivered
|
|
|
|
|
|
def test_reset_manual_lane_strides_zeros_counters():
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"5",
|
|
{"p": "chase", "a": False, "manual_beat_n": 1},
|
|
)
|
|
with bdr._route_lock:
|
|
bdr._lane_manual[0]["beat_counter"] = 4
|
|
bdr._preset_session_beats = 3
|
|
bdr.reset_manual_lane_strides()
|
|
with bdr._route_lock:
|
|
assert bdr._lane_manual[0]["beat_counter"] == 0
|
|
assert bdr._preset_session_beats == 0
|
|
|
|
|
|
def test_suppress_next_notify_skips_one_select(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"5",
|
|
{"p": "chase", "a": False, "manual_beat_n": 1},
|
|
)
|
|
bdr.mark_sequence_manual_lane_select_sent(0)
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == []
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("5", None)]
|
|
|
|
|
|
def test_suppress_does_not_advance_beat_counter(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"5",
|
|
{"p": "chase", "a": False, "manual_beat_n": 2},
|
|
)
|
|
bdr.mark_sequence_manual_lane_select_sent(0)
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == []
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("5", None)]
|
|
|
|
delivered.clear()
|
|
bdr.notify_beat_detected()
|
|
assert delivered == []
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("5", None)]
|
|
|
|
|
|
def test_duplicate_lanes_dedupe_to_one_select_per_beat(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
body = {"p": "radiate", "a": False, "manual_beat_n": 1}
|
|
entry = {
|
|
"device_names": ["desk"],
|
|
"wire_preset_id": "42",
|
|
"pattern": "radiate",
|
|
"manual_beat_n": 1,
|
|
"beat_counter": 0,
|
|
}
|
|
with bdr._route_lock:
|
|
bdr._lane_manual.clear()
|
|
bdr._lane_manual[-1] = dict(entry)
|
|
bdr._lane_manual[0] = dict(entry)
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("42", None)]
|
|
|
|
|
|
def test_sequence_lane_manual_delivers_per_beat_select(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"42",
|
|
{"p": "radiate", "a": False, "manual_beat_n": 1},
|
|
)
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("42", None)]
|
|
|
|
|
|
def test_sequence_auto_lane_skips_per_beat_select(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"3",
|
|
{"p": "colour_cycle", "a": True, "manual_beat_n": 1},
|
|
)
|
|
with bdr._route_lock:
|
|
assert 0 not in bdr._lane_manual
|
|
bdr.notify_beat_detected()
|
|
assert delivered == []
|
|
|
|
|
|
def test_sequence_lane_chase_delivers_per_beat_select(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
bdr.set_sequence_manual_lane_route(
|
|
0,
|
|
["desk"],
|
|
"5",
|
|
{"p": "chase", "a": False, "manual_beat_n": 1},
|
|
)
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("5", None)]
|
|
|
|
|
|
def test_standalone_overlay_skipped_when_sequence_lane_covers(monkeypatch):
|
|
delivered = _patch_delivery(monkeypatch)
|
|
body = {"p": "chase", "a": False, "manual_beat_n": 1}
|
|
|
|
bdr.set_sequence_manual_lane_route(1, ["desk"], "5", body)
|
|
bdr._apply_manual_beat_route_standalone_overlay(["desk"], "5", body)
|
|
|
|
with bdr._route_lock:
|
|
assert -1 not in bdr._lane_manual
|
|
assert 1 in bdr._lane_manual
|
|
|
|
bdr.notify_beat_detected()
|
|
assert delivered == [("5", None)]
|