feat(sequences): multi-lane playback and per-lane manual beats

- Add sequence_playback with beat and time advance, zone targeting fixes
- Per-lane manual beat routing in beat_driver_route (parallel lanes)
- Sequence API, editor JS, fix sequence model filename, tests

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-05-13 00:44:08 +12:00
parent 0ae39ab94b
commit cad0aa7e59
9 changed files with 2750 additions and 169 deletions

View File

@@ -1,51 +1,207 @@
from microdot import Microdot
from models.squence import Sequence
from microdot.session import with_session
from models.sequence import Sequence
from models.profile import Profile
from models.transport import get_current_sender
import json
controller = Microdot()
sequences = Sequence()
profiles = Profile()
@controller.get('')
async def list_sequences(request):
"""List all sequences."""
return json.dumps(sequences), 200, {'Content-Type': 'application/json'}
@controller.get('/<id>')
async def get_sequence(request, id):
"""Get a specific sequence by ID."""
sequence = sequences.read(id)
if sequence:
return json.dumps(sequence), 200, {'Content-Type': 'application/json'}
def get_current_profile_id(session=None):
"""Get the current active profile ID from session or fallback to first."""
profile_list = profiles.list()
session_profile = None
if session is not None:
session_profile = session.get("current_profile")
if session_profile and session_profile in profile_list:
return session_profile
if profile_list:
return profile_list[0]
return None
@controller.get("")
@with_session
async def list_sequences(request, session):
"""List sequences for the current profile."""
current_profile_id = get_current_profile_id(session)
if not current_profile_id:
return json.dumps({}), 200, {"Content-Type": "application/json"}
scoped = {
sid: sdata
for sid, sdata in sequences.items()
if isinstance(sdata, dict)
and str(sdata.get("profile_id")) == str(current_profile_id)
}
return json.dumps(scoped), 200, {"Content-Type": "application/json"}
@controller.get("/<id>")
@with_session
async def get_sequence(request, session, id):
"""Get a specific sequence by ID (current profile only)."""
current_profile_id = get_current_profile_id(session)
seq = sequences.read(id)
if (
seq
and current_profile_id
and str(seq.get("profile_id")) == str(current_profile_id)
):
return json.dumps(seq), 200, {"Content-Type": "application/json"}
return json.dumps({"error": "Sequence not found"}), 404
@controller.post('')
async def create_sequence(request):
"""Create a new sequence."""
try:
data = request.json or {}
group_name = data.get("group_name", "")
preset_names = data.get("presets", None)
sequence_id = sequences.create(group_name, preset_names)
if data:
sequences.update(sequence_id, data)
return json.dumps(sequences.read(sequence_id)), 201, {'Content-Type': 'application/json'}
except Exception as e:
return json.dumps({"error": str(e)}), 400
@controller.put('/<id>')
async def update_sequence(request, id):
"""Update an existing sequence."""
@controller.post("")
@with_session
async def create_sequence(request, session):
"""Create a new sequence for the current profile."""
try:
try:
data = request.json or {}
except Exception:
return (
json.dumps({"error": "Invalid JSON"}),
400,
{"Content-Type": "application/json"},
)
current_profile_id = get_current_profile_id(session)
if not current_profile_id:
return (
json.dumps({"error": "No profile available"}),
404,
{"Content-Type": "application/json"},
)
sequence_id = sequences.create(current_profile_id)
if not isinstance(data, dict):
data = {}
data = dict(data)
data["profile_id"] = str(current_profile_id)
if sequences.update(sequence_id, data):
seq_data = sequences.read(sequence_id)
return (
json.dumps({sequence_id: seq_data}),
201,
{"Content-Type": "application/json"},
)
return (
json.dumps({"error": "Failed to create sequence"}),
400,
{"Content-Type": "application/json"},
)
except Exception as e:
return json.dumps({"error": str(e)}), 400, {"Content-Type": "application/json"}
@controller.put("/<id>")
@with_session
async def update_sequence(request, session, id):
"""Update an existing sequence (current profile only)."""
try:
current_profile_id = get_current_profile_id(session)
seq = sequences.read(id)
if not seq or str(seq.get("profile_id")) != str(current_profile_id):
return json.dumps({"error": "Sequence not found"}), 404
data = request.json
if not isinstance(data, dict):
return (
json.dumps({"error": "Invalid JSON"}),
400,
{"Content-Type": "application/json"},
)
data = dict(data)
data["profile_id"] = str(current_profile_id)
if sequences.update(id, data):
return json.dumps(sequences.read(id)), 200, {'Content-Type': 'application/json'}
try:
from util.sequence_playback import stop_if_playing_sequence
stop_if_playing_sequence(str(id))
except Exception:
pass
return json.dumps(sequences.read(id)), 200, {"Content-Type": "application/json"}
return json.dumps({"error": "Sequence not found"}), 404
except Exception as e:
return json.dumps({"error": str(e)}), 400
return json.dumps({"error": str(e)}), 400, {"Content-Type": "application/json"}
@controller.delete('/<id>')
async def delete_sequence(request, id):
"""Delete a sequence."""
@controller.delete("/<id>")
@with_session
async def delete_sequence(request, session, id):
"""Delete a sequence (current profile only)."""
current_profile_id = get_current_profile_id(session)
seq = sequences.read(id)
if not seq or str(seq.get("profile_id")) != str(current_profile_id):
return json.dumps({"error": "Sequence not found"}), 404
try:
from util.sequence_playback import stop_if_playing_sequence
stop_if_playing_sequence(str(id))
except Exception:
pass
if sequences.delete(id):
return json.dumps({"message": "Sequence deleted successfully"}), 200
return (
json.dumps({"message": "Sequence deleted successfully"}),
200,
{"Content-Type": "application/json"},
)
return json.dumps({"error": "Sequence not found"}), 404
@controller.post("/stop")
@with_session
async def stop_sequence_playback(request, session):
"""Stop server-driven zone sequence playback."""
_ = request
try:
from util.sequence_playback import stop
stop()
return json.dumps({"ok": True}), 200, {"Content-Type": "application/json"}
except Exception as e:
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}
@controller.post("/<id>/play")
@with_session
async def play_sequence(request, session, id):
"""Start server-driven playback for a sequence in a zone (body: {\"zone_id\": \"...\"})."""
if not get_current_sender():
return (
json.dumps({"error": "Transport not configured"}),
503,
{"Content-Type": "application/json"},
)
current_profile_id = get_current_profile_id(session)
if not current_profile_id:
return (
json.dumps({"error": "No profile available"}),
404,
{"Content-Type": "application/json"},
)
try:
data = request.json or {}
except Exception:
data = {}
if not isinstance(data, dict):
data = {}
zone_id = data.get("zone_id") or data.get("zoneId")
if zone_id is None or str(zone_id).strip() == "":
return (
json.dumps({"error": "zone_id required"}),
400,
{"Content-Type": "application/json"},
)
zone_id = str(zone_id).strip()
try:
from util.sequence_playback import start
await start(zone_id, str(id), str(current_profile_id))
return json.dumps({"ok": True}), 200, {"Content-Type": "application/json"}
except ValueError as e:
return json.dumps({"error": str(e)}), 400, {"Content-Type": "application/json"}
except RuntimeError as e:
return json.dumps({"error": str(e)}), 503, {"Content-Type": "application/json"}
except Exception as e:
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}

148
src/models/sequence.py Normal file
View File

@@ -0,0 +1,148 @@
from models.model import Model
class Sequence(Model):
def load(self):
super().load()
self._migrate_after_load()
def _migrate_after_load(self):
try:
from models.profile import Profile
profiles = Profile()
profile_list = profiles.list()
default_profile_id = profile_list[0] if profile_list else None
except Exception:
default_profile_id = None
changed = False
for _sid, doc in list(self.items()):
if not isinstance(doc, dict):
continue
if not isinstance(doc.get("steps"), list):
presets = doc.get("presets")
if isinstance(presets, list) and presets:
doc["steps"] = [
{"preset_id": str(p), "group_ids": []} for p in presets
]
else:
doc["steps"] = []
changed = True
if "step_duration_ms" not in doc:
dur = doc.get("sequence_duration")
doc["step_duration_ms"] = (
int(dur) if isinstance(dur, (int, float)) else 3000
)
changed = True
if "loop" not in doc:
doc["loop"] = bool(doc.get("sequence_loop", False))
changed = True
if "name" not in doc:
doc["name"] = str(doc.get("group_name") or "")
changed = True
if "profile_id" not in doc and default_profile_id is not None:
doc["profile_id"] = str(default_profile_id)
changed = True
if not isinstance(doc.get("lanes"), list):
steps = doc.get("steps")
if isinstance(steps, list) and steps:
doc["lanes"] = [list(steps)]
else:
doc["lanes"] = [[]]
changed = True
if "group_ids" not in doc or not isinstance(doc.get("group_ids"), list):
doc["group_ids"] = []
changed = True
if doc.get("advance_mode") not in ("time", "beats"):
doc["advance_mode"] = "time"
changed = True
if "sequence_transition" not in doc:
doc["sequence_transition"] = 500
changed = True
# Ensure each step has beats (beat-based advance); default 1
for lane in doc.get("lanes") or []:
if not isinstance(lane, list):
continue
for step in lane:
if not isinstance(step, dict):
continue
if "beats" not in step:
step["beats"] = 1
changed = True
# Per-lane group ids (parallel to ``lanes``)
lanes_list = [x for x in (doc.get("lanes") or []) if isinstance(x, list)]
n_lanes = len(lanes_list)
lg = doc.get("lanes_group_ids")
if n_lanes and (not isinstance(lg, list) or len(lg) != n_lanes):
shared = doc.get("group_ids") if isinstance(doc.get("group_ids"), list) else []
shared_s = [str(x).strip() for x in shared if x is not None and str(x).strip()]
if n_lanes == 1 and lanes_list[0]:
first = lanes_list[0][0] if isinstance(lanes_list[0][0], dict) else {}
step_g = (
first.get("group_ids")
if isinstance(first.get("group_ids"), list)
else []
)
step_s = [
str(x).strip() for x in step_g if x is not None and str(x).strip()
]
doc["lanes_group_ids"] = [step_s if step_s else list(shared_s)]
else:
doc["lanes_group_ids"] = [list(shared_s) for _ in range(n_lanes)]
changed = True
if changed:
self.save()
def create(self, profile_id=None):
next_id = self.get_next_id()
self[next_id] = {
"name": "",
"profile_id": str(profile_id) if profile_id is not None else None,
"group_ids": [],
"lanes": [[]],
"lanes_group_ids": [[]],
"advance_mode": "time",
"steps": [],
"step_duration_ms": 3000,
"sequence_transition": 500,
"loop": True,
}
self.save()
return next_id
def read(self, id):
id_str = str(id)
return self.get(id_str, None)
def update(self, id, data):
id_str = str(id)
if id_str not in self:
return False
if not isinstance(data, dict):
return False
data = dict(data)
steps = data.get("steps")
lanes = data.get("lanes")
if isinstance(steps, list) and steps:
lanes_ok = (
isinstance(lanes, list)
and lanes
and any(isinstance(x, list) and len(x) > 0 for x in lanes)
)
if not lanes_ok:
data["lanes"] = [list(steps)]
self[id_str].update(data)
self.save()
return True
def delete(self, id):
id_str = str(id)
if id_str not in self:
return False
self.pop(id_str)
self.save()
return True
def list(self):
return list(self.keys())

View File

@@ -1,44 +0,0 @@
from models.model import Model
class Sequence(Model):
def __init__(self):
super().__init__()
def create(self, group_name="", preset_names=None):
next_id = self.get_next_id()
self[next_id] = {
"group_name": group_name,
"presets": preset_names if preset_names else [],
"sequence_duration": 3000, # Duration per preset in ms
"sequence_transition": 500, # Transition time in ms
"sequence_loop": False,
"sequence_repeat_count": 0, # 0 = infinite
"sequence_active": False,
"sequence_index": 0,
"sequence_start_time": 0
}
self.save()
return next_id
def read(self, id):
id_str = str(id)
return self.get(id_str, None)
def update(self, id, data):
id_str = str(id)
if id_str not in self:
return False
self[id_str].update(data)
self.save()
return True
def delete(self, id):
id_str = str(id)
if id_str not in self:
return False
self.pop(id_str)
self.save()
return True
def list(self):
return list(self.keys())

1115
src/static/sequences.js Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -6,9 +6,13 @@ import asyncio
import json
import os
import threading
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple
_route_lock = threading.Lock()
# Per-lane manual routes: key ``-1`` = legacy single-route (preset push / UI); keys ``0..n`` =
# zone sequence lanes so every manual lane gets its own stride counter and wire.
_lane_manual: Dict[int, Dict[str, Any]] = {}
# Public mirror for ``get_beat_route`` / header UI (derived from lane table).
_beat_route: Dict[str, Any] = {
"enabled": False,
"device_names": [],
@@ -18,6 +22,7 @@ _beat_route: Dict[str, Any] = {
"manual_beat_n": 1,
}
_beat_counter: int = 0
_preset_session_beats: int = 0
_main_loop: Optional[asyncio.AbstractEventLoop] = None
@@ -26,16 +31,65 @@ def set_beat_route_main_loop(loop: asyncio.AbstractEventLoop) -> None:
_main_loop = loop
def _pick_display_lane_key() -> Optional[int]:
"""Lane key used for header stride readout (prefer sequence lane 0)."""
if not _lane_manual:
return None
if 0 in _lane_manual:
return 0
seq_keys = [k for k in _lane_manual if isinstance(k, int) and k >= 0]
if seq_keys:
return min(seq_keys)
if -1 in _lane_manual:
return -1
return min(_lane_manual.keys())
def _sync_public_beat_route_from_lane_table() -> None:
"""Mirror ``_lane_manual`` into legacy ``_beat_route`` shape for API consumers."""
global _beat_route, _beat_counter
pick = _pick_display_lane_key()
if pick is None:
_beat_route = {
"enabled": False,
"device_names": [],
"wire_preset_id": "2",
"is_manual": False,
"pattern": "",
"manual_beat_n": 1,
}
_beat_counter = 0
return
e = _lane_manual[pick]
_beat_route = {
"enabled": True,
"device_names": list(e.get("device_names") or []),
"wire_preset_id": str(e.get("wire_preset_id") or "2"),
"is_manual": True,
"pattern": str(e.get("pattern") or ""),
"manual_beat_n": int(e.get("manual_beat_n") or 1),
}
_beat_counter = int(e.get("beat_counter", 0))
def update_beat_route(payload: Dict[str, Any]) -> None:
"""Internal: set or clear routing from explicit fields (tests / future APIs)."""
global _beat_route, _beat_counter
global _lane_manual, _beat_route, _beat_counter, _preset_session_beats
if not isinstance(payload, dict):
return
with _route_lock:
if payload.get("enabled") is False:
_beat_route = {**_beat_route, "enabled": False}
_lane_manual.clear()
_beat_route = {
**_beat_route,
"enabled": False,
"is_manual": False,
"device_names": [],
}
_beat_counter = 0
_preset_session_beats = 0
return
old = dict(_beat_route)
names = payload.get("device_names")
if not isinstance(names, list):
names = []
@@ -44,15 +98,20 @@ def update_beat_route(payload: Dict[str, Any]) -> None:
except (TypeError, ValueError):
n_raw = 1
manual_n = max(1, min(64, n_raw))
_beat_route = {
"enabled": bool(payload.get("enabled", False)),
"device_names": [str(n).strip() for n in names if str(n).strip()],
"wire_preset_id": str(payload.get("wire_preset_id") or "2"),
"is_manual": bool(payload.get("is_manual", False)),
new_wire = str(payload.get("wire_preset_id") or "2")
old_wire = str(old.get("wire_preset_id") or "2")
if not old.get("enabled") or old_wire != new_wire:
_preset_session_beats = 0
clean_names = [str(n).strip() for n in names if str(n).strip()]
_lane_manual.clear()
_lane_manual[-1] = {
"device_names": clean_names,
"wire_preset_id": new_wire,
"pattern": str(payload.get("pattern") or "").strip(),
"manual_beat_n": manual_n,
"beat_counter": 0,
}
_beat_counter = 0
_sync_public_beat_route_from_lane_table()
def get_beat_route() -> Dict[str, Any]:
@@ -60,6 +119,44 @@ def get_beat_route() -> Dict[str, Any]:
return dict(_beat_route)
def manual_beat_stride_status() -> Dict[str, Any]:
"""Audio-beat stride for a live manual preset (not sequence). For UI readout with BPM.
``beat_in_stride`` is always in ``1..stride_n`` when ``active`` (1-based within the stride).
With multiple sequence manual lanes, reflects lane 0 (or the smallest lane index).
"""
with _route_lock:
pick = _pick_display_lane_key()
if pick is None or pick not in _lane_manual:
wid = str(_beat_route.get("wire_preset_id") or "").strip()
return {"active": False, "preset_session_beats": 0, "wire_preset_id": wid}
e = _lane_manual[pick]
c = int(e.get("beat_counter", 0))
psb = int(_preset_session_beats)
wid = str(e.get("wire_preset_id") or "").strip()
try:
n = int(e.get("manual_beat_n") or 1)
except (TypeError, ValueError):
n = 1
n = max(1, min(64, n))
if c <= 0:
return {
"active": True,
"beat_in_stride": 1,
"stride_n": n,
"preset_session_beats": psb,
"wire_preset_id": wid,
}
beat_in_stride = ((c - 1) % n) + 1
return {
"active": True,
"beat_in_stride": beat_in_stride,
"stride_n": n,
"preset_session_beats": psb,
"wire_preset_id": wid,
}
def _coerce_manual_beat_n(body: Any) -> int:
"""Beats between audio-triggered selects (led-controller only); default 1 = every beat."""
if not isinstance(body, dict):
@@ -137,33 +234,99 @@ def _apply_manual_beat_route(
preset_body: Any,
) -> None:
"""Enable audio→driver routing for one manual preset, or disable if invalid."""
global _lane_manual
if not device_names:
update_beat_route({"enabled": False})
with _route_lock:
_lane_manual.clear()
_sync_public_beat_route_from_lane_table()
return
if not isinstance(preset_body, dict):
update_beat_route({"enabled": False})
with _route_lock:
_lane_manual.clear()
_sync_public_beat_route_from_lane_table()
return
if _coerce_auto_from_body(preset_body):
update_beat_route({"enabled": False})
with _route_lock:
_lane_manual.clear()
_sync_public_beat_route_from_lane_table()
return
pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip()
if pattern and not _pattern_supports_manual(pattern):
update_beat_route({"enabled": False})
with _route_lock:
_lane_manual.clear()
_sync_public_beat_route_from_lane_table()
return
update_beat_route(
{
"enabled": True,
"device_names": device_names,
"wire_preset_id": wire_preset_id,
"is_manual": True,
names = [str(n).strip() for n in device_names if str(n).strip()]
with _route_lock:
_lane_manual.clear()
_lane_manual[-1] = {
"device_names": names,
"wire_preset_id": str(wire_preset_id).strip(),
"pattern": pattern,
"manual_beat_n": _coerce_manual_beat_n(preset_body),
"beat_counter": 0,
}
)
_sync_public_beat_route_from_lane_table()
def set_sequence_manual_lane_route(
lane_index: int,
device_names: List[str],
wire_preset_id: str,
preset_body: Any,
) -> None:
"""Register or update one sequence lane's manual beat route (parallel lanes, independent strides)."""
global _lane_manual
names = [str(n).strip() for n in (device_names or []) if str(n).strip()]
if not names or not isinstance(preset_body, dict) or _coerce_auto_from_body(preset_body):
with _route_lock:
if lane_index in _lane_manual:
del _lane_manual[lane_index]
_sync_public_beat_route_from_lane_table()
return
pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip()
if pattern and not _pattern_supports_manual(pattern):
with _route_lock:
if lane_index in _lane_manual:
del _lane_manual[lane_index]
_sync_public_beat_route_from_lane_table()
return
mn = _coerce_manual_beat_n(preset_body)
wid = str(wire_preset_id).strip()
with _route_lock:
old = _lane_manual.get(lane_index)
bc = 0
if (
old
and str(old.get("wire_preset_id") or "") == wid
and int(old.get("manual_beat_n") or 1) == mn
and set(old.get("device_names") or []) == set(names)
):
bc = int(old.get("beat_counter", 0))
_lane_manual[lane_index] = {
"device_names": names,
"wire_preset_id": wid,
"pattern": pattern,
"manual_beat_n": mn,
"beat_counter": bc,
}
_sync_public_beat_route_from_lane_table()
def clear_sequence_manual_lane_route(lane_index: int) -> None:
"""Remove beat routing for one sequence lane (e.g. step switched to auto)."""
global _lane_manual
with _route_lock:
if lane_index in _lane_manual:
del _lane_manual[lane_index]
_sync_public_beat_route_from_lane_table()
def sync_beat_route_from_push_sequence(
sequence: List[Any], target_macs: Optional[List[str]] = None
sequence: List[Any],
target_macs: Optional[List[str]] = None,
*,
preserve_manual_beat_route_on_auto_select: bool = False,
) -> None:
"""
Update beat routing from a ``/presets/push`` body ``sequence`` (list of v1 dicts).
@@ -173,6 +336,10 @@ def sync_beat_route_from_push_sequence(
Without ``select`` (e.g. manual preset loaded without immediate select): if ``target_macs``
is set and the merged ``presets`` contain exactly one manual preset, enable routing using
registry names for those MACs so the first advance is on the next audio beat.
When ``preserve_manual_beat_route_on_auto_select`` is true (zone sequence playback), an
auto preset in ``select`` does not clear manual routing — other lanes may still need
``notify_beat_detected`` for manual patterns in parallel.
"""
merged_presets: Dict[str, Any] = {}
last_select: Optional[Dict[str, Any]] = None
@@ -214,6 +381,13 @@ def sync_beat_route_from_push_sequence(
if str(k).strip() == wire_preset_id:
preset_body = v
break
if preset_body is None:
update_beat_route({"enabled": False})
return
if _coerce_auto_from_body(preset_body):
if not preserve_manual_beat_route_on_auto_select:
update_beat_route({"enabled": False})
return
_apply_manual_beat_route(device_names, wire_preset_id, preset_body)
return
@@ -247,25 +421,30 @@ def _pattern_supports_manual(pattern_key: str) -> bool:
def remap_beat_route_device_name(old_name: str, new_name: str) -> None:
"""Update cached audio-beat target names after a device registry rename."""
global _beat_route
global _lane_manual
o = str(old_name or "").strip()
n = str(new_name or "").strip()
if not o or not n or o == n:
return
with _route_lock:
if not _beat_route.get("enabled"):
return
names = _beat_route.get("device_names") or []
new_list: List[str] = []
changed = False
for item in names:
if str(item).strip() == o:
new_list.append(n)
changed = True
else:
new_list.append(str(item))
if changed:
_beat_route = {**_beat_route, "device_names": new_list}
any_changed = False
for e in _lane_manual.values():
names = e.get("device_names") or []
if not isinstance(names, list):
continue
new_list: List[str] = []
row_changed = False
for item in names:
if str(item).strip() == o:
new_list.append(n)
row_changed = True
else:
new_list.append(str(item))
if row_changed:
e["device_names"] = new_list
any_changed = True
if any_changed:
_sync_public_beat_route_from_lane_table()
async def _deliver_select(device_names: List[str], wire_preset_id: str) -> None:
@@ -302,35 +481,45 @@ async def _deliver_select(device_names: List[str], wire_preset_id: str) -> None:
print(f"[beat-route] deliver failed: {e}")
async def _deliver_select_batch(pairs: List[Tuple[List[str], str]]) -> None:
for names, pid in pairs:
await _deliver_select(names, pid)
def notify_beat_detected() -> None:
"""Invoked from the audio thread when a beat is detected."""
global _beat_counter
global _preset_session_beats
work: List[Tuple[List[str], str]] = []
with _route_lock:
r = dict(_beat_route)
if not r.get("enabled"):
if not _lane_manual:
return
if not r.get("is_manual"):
return
pattern = r.get("pattern") or ""
if pattern and not _pattern_supports_manual(pattern):
return
names = r.get("device_names") or []
if not names:
return
try:
n = int(r.get("manual_beat_n") or 1)
except (TypeError, ValueError):
n = 1
n = max(1, min(64, n))
_beat_counter += 1
if ((_beat_counter - 1) % n) != 0:
return
preset_id = str(r.get("wire_preset_id") or "2")
names_copy = list(names)
work = []
for key in sorted(_lane_manual.keys()):
e = _lane_manual[key]
names = e.get("device_names") or []
if not isinstance(names, list) or not names:
continue
pattern = str(e.get("pattern") or "")
if pattern and not _pattern_supports_manual(pattern):
continue
try:
n = int(e.get("manual_beat_n") or 1)
except (TypeError, ValueError):
n = 1
n = max(1, min(64, n))
e["beat_counter"] = int(e.get("beat_counter", 0)) + 1
c = int(e["beat_counter"])
if (c - 1) % n != 0:
continue
work.append((list(names), str(e.get("wire_preset_id") or "2")))
if work:
_preset_session_beats += 1
if not work:
return
loop = _main_loop
if loop is None:
return
try:
asyncio.run_coroutine_threadsafe(_deliver_select(names_copy, preset_id), loop)
asyncio.run_coroutine_threadsafe(_deliver_select_batch(work), loop)
except Exception as e:
print(f"[beat-route] schedule failed: {e}")

View File

@@ -0,0 +1,996 @@
"""Server-side zone sequence playback (time or audio-beat advance).
The browser selects a sequence and zone; this module delivers preset pushes to drivers.
Sequence start sends one v1 message with every preset body used in the sequence; auto steps
then send select-only updates. Manual steps rely on the bulk load and only update beat routing.
"""
from __future__ import annotations
import asyncio
import json
import queue
import threading
from typing import Any, Dict, List, Optional, Tuple
_thread_beat_queue: "queue.Queue[int]" = queue.Queue(maxsize=256)
_beat_consumer_started = False
_beat_consumer_lock = threading.Lock()
_time_task: Optional[asyncio.Task] = None
_time_lock = asyncio.Lock()
_beat_run: Optional[Dict[str, Any]] = None
_beat_run_lock = threading.Lock()
def _norm_mac(raw: Any) -> Optional[str]:
from models.device import normalize_mac
return normalize_mac(raw)
def _normalize_sequence_lanes(doc: Dict[str, Any]) -> List[List[Dict[str, Any]]]:
lanes_raw = doc.get("lanes") if isinstance(doc.get("lanes"), list) else []
lanes = [x for x in lanes_raw if isinstance(x, list)]
has_any = any(len(x) > 0 for x in lanes)
steps = doc.get("steps")
if (not lanes or not has_any) and isinstance(steps, list) and steps:
lanes = [list(steps)]
if not lanes:
lanes = [[]]
out: List[List[Dict[str, Any]]] = []
for lane in lanes:
row: List[Dict[str, Any]] = []
for s in lane:
if not isinstance(s, dict):
continue
pid = s.get("preset_id", s.get("presetId"))
try:
b_raw = s.get("beats")
b_n = int(b_raw) if b_raw is not None else 1
except (TypeError, ValueError):
b_n = 1
row.append(
{
"preset_id": str(pid).strip() if pid is not None else "",
"beats": max(1, b_n),
"group_ids": [
str(x).strip()
for x in (s.get("group_ids") or [])
if x is not None and str(x).strip()
],
}
)
out.append(row)
return out
def _group_ids_for_lane_step(
sequence_doc: Dict[str, Any], step: Dict[str, Any], lane_index: int, num_lanes: int
) -> List[str]:
lgs = sequence_doc.get("lanes_group_ids")
if isinstance(lgs, list) and lane_index < len(lgs):
for_lane = lgs[lane_index]
if isinstance(for_lane, list):
return [str(x).strip() for x in for_lane if x is not None and str(x).strip()]
# Multi-lane doc with a shorter ``lanes_group_ids``: do not fall back to ``group_ids``
# (editor stores lane 0's groups there; applying it to other lanes targets the wrong groups).
if num_lanes > 1 and isinstance(lgs, list) and lane_index >= len(lgs):
return []
shared = sequence_doc.get("group_ids")
if isinstance(shared, list) and shared:
return [str(x).strip() for x in shared if x is not None and str(x).strip()]
if num_lanes == 1:
sg = step.get("group_ids")
if isinstance(sg, list) and sg:
return [str(x).strip() for x in sg if x is not None and str(x).strip()]
return []
def _compute_zone_targets(
zone_doc: Dict[str, Any], devices: Any, groups: Any
) -> Tuple[List[str], List[str]]:
gids = zone_doc.get("group_ids")
gids = [str(x).strip() for x in gids if isinstance(gids, list) and x is not None and str(x).strip()]
names: List[str] = []
macs: List[str] = []
if gids:
seen: set = set()
for gid in gids:
g = groups.read(gid) if hasattr(groups, "read") else None
if not isinstance(g, dict):
continue
devs = g.get("devices")
if not isinstance(devs, list):
continue
for raw in devs:
m = _norm_mac(raw)
if not m or m in seen:
continue
seen.add(m)
doc = devices.read(m) or {}
nm = str(doc.get("name") or "").strip() or m
names.append(nm)
macs.append(m)
return names, macs
zone_names = zone_doc.get("names")
if not isinstance(zone_names, list):
zone_names = []
name_to_mac: Dict[str, str] = {}
for did in devices.list():
m = _norm_mac(did)
if not m:
continue
doc = devices.read(did) or {}
nm = str(doc.get("name") or "").strip()
if nm:
name_to_mac[nm] = m
for zn in zone_names:
z = str(zn).strip()
if not z:
continue
m = name_to_mac.get(z)
if m and m not in macs:
names.append(z)
macs.append(m)
return names, macs
def _sequence_referenced_group_ids(sequence_doc: Dict[str, Any]) -> List[str]:
"""Group ids mentioned on the sequence (shared, per-lane, per-step, legacy steps)."""
seen: set = set()
out: List[str] = []
def add(raw: Any) -> None:
if raw is None:
return
s = str(raw).strip()
if not s or s in seen:
return
seen.add(s)
out.append(s)
g0 = sequence_doc.get("group_ids")
if isinstance(g0, list):
for x in g0:
add(x)
lgs = sequence_doc.get("lanes_group_ids")
if isinstance(lgs, list):
for row in lgs:
if isinstance(row, list):
for x in row:
add(x)
for lane_key in ("lanes", "steps"):
lanes_raw = sequence_doc.get(lane_key)
if not isinstance(lanes_raw, list):
continue
for lane in lanes_raw:
if lane_key == "steps":
step = lane if isinstance(lane, dict) else None
if step:
sg = step.get("group_ids")
if isinstance(sg, list):
for x in sg:
add(x)
continue
if not isinstance(lane, list):
continue
for step in lane:
if not isinstance(step, dict):
continue
sg = step.get("group_ids")
if isinstance(sg, list):
for x in sg:
add(x)
return out
def _extend_mac_scope_for_sequence_groups(
zone_mac_set: set,
zone_name_by_mac: Dict[str, str],
sequence_doc: Dict[str, Any],
devices: Any,
groups: Any,
) -> None:
"""Include MACs from any group the sequence references so per-lane groups can differ from the zone tab."""
for gid in _sequence_referenced_group_ids(sequence_doc):
g = groups.read(gid) if hasattr(groups, "read") else None
if not isinstance(g, dict):
continue
for raw in g.get("devices") or []:
m = _norm_mac(raw)
if not m:
continue
zone_mac_set.add(m)
if m not in zone_name_by_mac:
doc = devices.read(m) if hasattr(devices, "read") else None
if isinstance(doc, dict):
nm = str(doc.get("name") or "").strip() or m
else:
nm = m
zone_name_by_mac[m] = nm
def _resolve_step_device_names(
zone_doc: Dict[str, Any],
step_group_ids: List[str],
devices: Any,
groups: Any,
*,
sequence_doc: Optional[Dict[str, Any]] = None,
) -> List[str]:
z_names, z_macs = _compute_zone_targets(zone_doc, devices, groups)
if not step_group_ids:
return list(z_names)
zone_mac_set = {m for m in (_norm_mac(x) for x in z_macs) if m}
zone_name_by_mac: Dict[str, str] = {}
for i, m in enumerate(z_macs):
mn = _norm_mac(m)
if mn and mn not in zone_name_by_mac:
zone_name_by_mac[mn] = z_names[i] if i < len(z_names) else mn
if sequence_doc is not None:
_extend_mac_scope_for_sequence_groups(
zone_mac_set, zone_name_by_mac, sequence_doc, devices, groups
)
step_macs: set = set()
for gid in step_group_ids:
g = groups.read(gid) if hasattr(groups, "read") else None
if not isinstance(g, dict):
continue
for raw in g.get("devices") or []:
m = _norm_mac(raw)
if m and m in zone_mac_set:
step_macs.add(m)
out: List[str] = []
for m in step_macs:
n = zone_name_by_mac.get(m)
if n:
out.append(n)
return out
def _lane_has_non_empty_lanes_group_ids(sequence_doc: Dict[str, Any], lane_index: int) -> bool:
"""True when this lane's targets come from ``lanes_group_ids[lane]`` (already lane-scoped)."""
lgs = sequence_doc.get("lanes_group_ids")
if not isinstance(lgs, list) or lane_index < 0 or lane_index >= len(lgs):
return False
for_lane = lgs[lane_index]
if not isinstance(for_lane, list) or not for_lane:
return False
return any(x is not None and str(x).strip() for x in for_lane)
def _split_device_names_for_lane(
all_names: List[str],
lane_index: int,
num_lanes: int,
*,
partition_shared_zone: bool = True,
) -> List[str]:
names = [n for n in all_names if n and str(n).strip()]
if num_lanes <= 1 or not partition_shared_zone:
return names
if len(names) >= num_lanes:
n = names[lane_index]
return [n] if n else []
return names
def _resolve_colors_with_palette_refs(
colors: Any, palette_refs: Any, palette_colors: List[Any]
) -> List[Any]:
base = list(colors) if isinstance(colors, list) else []
refs = list(palette_refs) if isinstance(palette_refs, list) else []
pal = list(palette_colors) if isinstance(palette_colors, list) else []
out: List[Any] = []
for idx, color in enumerate(base):
ref_raw = refs[idx] if idx < len(refs) else None
try:
ref = int(ref_raw) if ref_raw is not None else None
except (TypeError, ValueError):
ref = None
if isinstance(ref, int) and 0 <= ref < len(pal) and pal[ref]:
out.append(pal[ref])
else:
out.append(color)
return out
def _ordered_unique_preset_ids_from_lanes(lanes: List[List[Dict[str, Any]]]) -> List[str]:
seen: set = set()
out: List[str] = []
for lane in lanes:
for step in lane:
if not isinstance(step, dict):
continue
pid = str(step.get("preset_id") or "").strip()
if not pid or pid in seen:
continue
seen.add(pid)
out.append(pid)
return out
def _display_preset_for_step(
preset_id: str,
presets_map: Dict[str, Any],
palette_colors: List[Any],
) -> Optional[Dict[str, Any]]:
preset = presets_map.get(preset_id)
if not isinstance(preset, dict):
return None
base_colors = preset.get("colors") or preset.get("c") or ["#FFFFFF"]
colors = _resolve_colors_with_palette_refs(
base_colors if isinstance(base_colors, list) else [base_colors],
preset.get("palette_refs"),
palette_colors,
)
return {**preset, "colors": colors}
def _preset_inner_from_display_preset(display_preset: Dict[str, Any]) -> Dict[str, Any]:
from util.espnow_message import build_preset_dict
body = dict(display_preset)
inner = build_preset_dict(body)
mb = body.get("manual_beat_n", body.get("manualBeatN"))
if mb is not None:
try:
n = int(mb)
if 1 <= n <= 64:
inner["manual_beat_n"] = n
except (TypeError, ValueError):
pass
return inner
def _device_names_to_macs(device_names: List[str], devices: Any) -> List[str]:
macs: List[str] = []
seen: set = set()
for nm in device_names:
key = str(nm).strip()
if not key:
continue
m = None
for did in devices.list():
doc = devices.read(did) or {}
if str(doc.get("name") or "").strip() == key:
m = _norm_mac(did)
break
if not m and key.startswith("led-"):
m = _norm_mac(key[4:])
if m and m not in seen:
seen.add(m)
macs.append(m)
return macs
def _union_macs_for_sequence(ctx: Dict[str, Any]) -> List[str]:
"""MACs that appear on any lane/step (union); falls back to full zone targets."""
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
sequence_doc: Dict[str, Any] = ctx["sequence_doc"]
zone_doc: Dict[str, Any] = ctx["zone_doc"]
devices = ctx["devices"]
groups = ctx["groups"]
num_lanes = int(ctx["num_lanes"])
seen: set = set()
out: List[str] = []
for lane_index, lane in enumerate(lanes):
for step in lane:
if not isinstance(step, dict):
continue
gids = _group_ids_for_lane_step(sequence_doc, step, lane_index, num_lanes)
device_names = _resolve_step_device_names(
zone_doc, gids, devices, groups, sequence_doc=sequence_doc
)
device_names = _split_device_names_for_lane(
device_names,
lane_index,
num_lanes,
partition_shared_zone=not _lane_has_non_empty_lanes_group_ids(
sequence_doc, lane_index
),
)
if gids and not device_names:
continue
for m in _device_names_to_macs(device_names, devices):
if m and m not in seen:
seen.add(m)
out.append(m)
if out:
return out
_, z_macs = _compute_zone_targets(zone_doc, devices, groups)
return list(z_macs)
def _build_sequence_wire_presets_map(ctx: Dict[str, Any]) -> Dict[str, Any]:
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
presets_map: Dict[str, Any] = ctx["presets_map"]
palette_colors: List[Any] = ctx["palette_colors"]
inner_by_wire: Dict[str, Any] = {}
for pid in _ordered_unique_preset_ids_from_lanes(lanes):
disp = _display_preset_for_step(pid, presets_map, palette_colors)
if not disp:
continue
inner_by_wire[str(pid)] = _preset_inner_from_display_preset(disp)
return inner_by_wire
async def _deliver_sequence_presets_bulk(ctx: Dict[str, Any]) -> None:
"""Push all preset definitions used in the sequence once; step advances use select (auto) only."""
from models.transport import get_current_sender
from util.driver_delivery import deliver_json_messages
inner_by_wire = _build_sequence_wire_presets_map(ctx)
ctx["_sequence_wire_presets"] = inner_by_wire
if not inner_by_wire:
return
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
macs = _union_macs_for_sequence(ctx)
if not macs:
return
msg = json.dumps({"v": "1", "presets": inner_by_wire}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], macs, ctx["devices"], delay_s=0.05)
def _coerce_auto(preset: Dict[str, Any]) -> bool:
raw = preset.get("auto", preset.get("a", True))
if isinstance(raw, bool):
return raw
if raw is None:
return True
if isinstance(raw, int):
return raw != 0
if isinstance(raw, str):
lo = raw.strip().lower()
if lo in ("false", "0", "no", "off"):
return False
if lo in ("true", "1", "yes", "on"):
return True
return True
def _load_palette_colors(profile_id: str) -> List[Any]:
from models.profile import Profile
from models.pallet import Palette
prof = Profile().read(profile_id)
if not isinstance(prof, dict):
return []
pid = prof.get("palette_id") or prof.get("paletteId")
if not pid:
return []
return Palette().read(str(pid)) or []
async def _deliver_preset_for_devices(
preset_id: str,
preset_doc: Dict[str, Any],
device_names: List[str],
devices: Any,
*,
lane_index: Optional[int] = None,
) -> None:
from models.transport import get_current_sender
from util.driver_delivery import deliver_json_messages
from util.beat_driver_route import sync_beat_route_from_push_sequence
from util.espnow_message import build_preset_dict
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
macs: List[str] = []
seen: set = set()
for nm in device_names:
key = str(nm).strip()
if not key:
continue
m = None
for did in devices.list():
doc = devices.read(did) or {}
if str(doc.get("name") or "").strip() == key:
m = _norm_mac(did)
break
if not m and key.startswith("led-"):
m = _norm_mac(key[4:])
if m and m not in seen:
seen.add(m)
macs.append(m)
if not macs:
return
body = dict(preset_doc)
auto = _coerce_auto(body)
inner = build_preset_dict(body)
mb = body.get("manual_beat_n", body.get("manualBeatN"))
if mb is not None:
try:
n = int(mb)
if 1 <= n <= 64:
inner["manual_beat_n"] = n
except (TypeError, ValueError):
pass
wire = str(preset_id)
seq_list: List[Dict[str, Any]] = [{"v": "1", "presets": {wire: inner}}]
if auto and device_names:
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if sel:
seq_list.append({"v": "1", "select": sel})
messages = [json.dumps(x, separators=(",", ":")) for x in seq_list]
await deliver_json_messages(sender, messages, macs, devices, delay_s=0.05)
if not auto:
if lane_index is not None:
from util.beat_driver_route import set_sequence_manual_lane_route
set_sequence_manual_lane_route(lane_index, device_names, wire, inner)
else:
sync_beat_route_from_push_sequence(
seq_list, target_macs=macs, preserve_manual_beat_route_on_auto_select=True
)
async def _send_lane(
lane_index: int,
st: Dict[str, Any],
ctx: Dict[str, Any],
) -> None:
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
sequence_doc: Dict[str, Any] = ctx["sequence_doc"]
presets_map: Dict[str, Any] = ctx["presets_map"]
zone_doc: Dict[str, Any] = ctx["zone_doc"]
devices = ctx["devices"]
groups = ctx["groups"]
palette_colors: List[Any] = ctx["palette_colors"]
num_lanes = ctx["num_lanes"]
if st.get("done"):
return
lane_steps = lanes[lane_index]
idx = int(st.get("stepIdx", 0))
if idx < 0 or idx >= len(lane_steps):
return
step = lane_steps[idx]
preset_id = str(step.get("preset_id") or "").strip()
if not preset_id:
return
display_preset = _display_preset_for_step(preset_id, presets_map, palette_colors)
if not display_preset:
return
gids = _group_ids_for_lane_step(sequence_doc, step, lane_index, num_lanes)
device_names = _resolve_step_device_names(
zone_doc, gids, devices, groups, sequence_doc=sequence_doc
)
device_names = _split_device_names_for_lane(
device_names,
lane_index,
num_lanes,
partition_shared_zone=not _lane_has_non_empty_lanes_group_ids(sequence_doc, lane_index),
)
if gids and not device_names:
return
from models.transport import get_current_sender
from util.beat_driver_route import (
clear_sequence_manual_lane_route,
set_sequence_manual_lane_route,
)
from util.driver_delivery import deliver_json_messages
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
macs = _device_names_to_macs(device_names, devices)
if not macs:
return
bulk = ctx.get("_sequence_wire_presets")
if isinstance(bulk, dict) and bulk:
auto = _coerce_auto(display_preset)
inner = _preset_inner_from_display_preset(display_preset)
wire = str(preset_id)
if auto:
clear_sequence_manual_lane_route(lane_index)
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if not sel:
return
msg = json.dumps({"v": "1", "select": sel}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], macs, devices, delay_s=0.05)
else:
set_sequence_manual_lane_route(lane_index, device_names, wire, inner)
return
await _deliver_preset_for_devices(
preset_id, display_preset, device_names, devices, lane_index=lane_index
)
async def _send_all_lanes(ctx: Dict[str, Any]) -> None:
lane_states: List[Dict[str, Any]] = ctx["lane_states"]
num_lanes = ctx["num_lanes"]
for i in range(num_lanes):
if lane_states[i].get("done"):
continue
await _send_lane(i, lane_states[i], ctx)
def _sequence_advance_beats(sequence_doc: Dict[str, Any]) -> bool:
raw = sequence_doc.get("advance_mode")
return isinstance(raw, str) and raw.strip().lower() == "beats"
def _build_ctx(
sequence_doc: Dict[str, Any],
zone_doc: Dict[str, Any],
presets_map: Dict[str, Any],
profile_id: str,
) -> Optional[Dict[str, Any]]:
from models.device import Device
from models.group import Group
lanes = [x for x in _normalize_sequence_lanes(sequence_doc) if len(x) > 0]
if not lanes:
return None
devices = Device()
groups = Group()
palette_colors = _load_palette_colors(profile_id)
num_lanes = len(lanes)
lane_states = [{"stepIdx": 0, "beatCount": 0, "done": False} for _ in range(num_lanes)]
return {
"lanes": lanes,
"lane_states": lane_states,
"num_lanes": num_lanes,
"sequence_doc": sequence_doc,
"zone_doc": zone_doc,
"presets_map": presets_map,
"devices": devices,
"groups": groups,
"palette_colors": palette_colors,
"loop": True,
"advance_mode": "beats" if _sequence_advance_beats(sequence_doc) else "time",
}
def playback_status() -> Dict[str, Any]:
"""Snapshot for UI (e.g. audio status poll): lane 0 step + beats within step, total steps sum."""
with _beat_run_lock:
ctx = _beat_run
if not ctx:
return {"active": False, "beat_readout": ""}
lanes: List[List[Dict[str, Any]]] = ctx.get("lanes") or []
lane_states: List[Dict[str, Any]] = ctx.get("lane_states") or []
num_lanes = int(ctx.get("num_lanes") or 0)
total_steps = sum(len(l) for l in lanes)
lane0_steps = len(lanes[0]) if lanes else 0
beat_count = 0
beats_per_step = 1
step_1based = 0
lane0 = lanes[0] if lanes else []
sequence_beats_per_pass = 0
for step in lane0:
sequence_beats_per_pass += max(1, int((step or {}).get("beats") or 1))
sequence_beat_at = 0
if lane_states and lane0_steps > 0:
st0 = lane_states[0]
idx = int(st0.get("stepIdx", 0))
advance_mode = str(ctx.get("advance_mode") or "").strip().lower()
if st0.get("done"):
step_1based = lane0_steps
sequence_beat_at = sequence_beats_per_pass
else:
step_1based = idx + 1
if 0 <= idx < len(lanes[0]):
step = lanes[0][idx]
beats_per_step = max(1, int(step.get("beats") or 1))
beat_count_raw = int(st0.get("beatCount", 0))
# Internal beatCount resets to 0 on step rollover; expose 1..beats_per_step in beats mode.
if advance_mode == "beats":
bt = max(1, int(beats_per_step))
beat_count = min(bt, max(1, beat_count_raw if beat_count_raw > 0 else 1))
else:
beat_count = beat_count_raw
for j in range(min(idx, len(lane0))):
sequence_beat_at += max(1, int((lane0[j] or {}).get("beats") or 1))
sequence_beat_at += beat_count
lane0_preset_id = ""
lane0_preset_name = ""
pm_raw = ctx.get("presets_map")
presets_map_status: Dict[str, Any] = pm_raw if isinstance(pm_raw, dict) else {}
if lane_states and lane0_steps > 0 and lane0:
st_preset = lane_states[0]
if not st_preset.get("done"):
ix = int(st_preset.get("stepIdx", 0))
if 0 <= ix < len(lane0):
stp = lane0[ix] or {}
pid = str(stp.get("preset_id") or "").strip()
lane0_preset_id = pid
if pid:
pdoc = presets_map_status.get(pid)
if isinstance(pdoc, dict):
nm = str(pdoc.get("name") or "").strip()
lane0_preset_name = nm or pid
else:
lane0_preset_name = pid
beat_readout = ""
adv_m = str(ctx.get("advance_mode") or "").strip().lower()
if (
adv_m == "beats"
and sequence_beats_per_pass > 0
and lane_states
and lane0_steps > 0
and lane_states[0]
and not lane_states[0].get("done")
):
tot = max(1, int(sequence_beats_per_pass))
at = int(sequence_beat_at)
# Pass position within this run: inclusive 1..tot
sp = min(tot, max(1, at if at > 0 else 1))
beat_readout = f"{sp}/{tot}"
return {
"active": True,
"advance_mode": ctx.get("advance_mode"),
"sequence_id": ctx.get("sequence_id"),
"zone_id": ctx.get("zone_id"),
"num_lanes": num_lanes,
"total_sequence_steps": total_steps,
"lane0_current_step": step_1based,
"lane0_lane_length": lane0_steps,
"lane0_beat_in_step": beat_count,
"lane0_beats_per_step": beats_per_step,
"lane0_preset_id": lane0_preset_id,
"lane0_preset_name": lane0_preset_name,
"sequence_beat_at": sequence_beat_at,
"sequence_beats_per_pass": sequence_beats_per_pass,
"sequence_loop_beat": int(ctx.get("sequence_loop_beat", 0)),
"beat_readout": beat_readout,
}
async def process_active_beat_advance() -> None:
with _beat_run_lock:
ctx = _beat_run
if not ctx or ctx.get("advance_mode") != "beats":
return
lane_states: List[Dict[str, Any]] = ctx["lane_states"]
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
loop = bool(ctx.get("loop"))
lane0_looped = False
for i in range(ctx["num_lanes"]):
st = lane_states[i]
if st.get("done"):
continue
lane_steps = lanes[i]
if not lane_steps:
continue
st["beatCount"] = int(st.get("beatCount", 0)) + 1
step = lane_steps[int(st.get("stepIdx", 0))]
need = max(1, int(step.get("beats") or 1))
if int(st["beatCount"]) >= need:
st["beatCount"] = 0
if int(st.get("stepIdx", 0)) + 1 >= len(lane_steps):
if loop:
if i == 0:
lane0_looped = True
st["stepIdx"] = 0
await _send_lane(i, st, ctx)
else:
st["done"] = True
else:
st["stepIdx"] = int(st.get("stepIdx", 0)) + 1
await _send_lane(i, st, ctx)
if lane0_looped:
# First beat of the next loop (was 0 here so single-step / first wrap never left 0).
ctx["sequence_loop_beat"] = 1
else:
ctx["sequence_loop_beat"] = int(ctx.get("sequence_loop_beat", 0)) + 1
if all(s.get("done") for s in lane_states):
stop()
def push_thread_beat() -> None:
try:
_thread_beat_queue.put_nowait(1)
except queue.Full:
pass
async def beat_consumer_loop() -> None:
while True:
n = 0
try:
while True:
_thread_beat_queue.get_nowait()
n += 1
except queue.Empty:
pass
if n:
from util.beat_driver_route import notify_beat_detected
for _ in range(n):
try:
await process_active_beat_advance()
except Exception as e:
print(f"[sequence-playback] beat advance: {e}")
try:
notify_beat_detected()
except Exception as e:
print(f"[sequence-playback] notify_beat_detected: {e}")
else:
await asyncio.sleep(0.012)
def ensure_beat_consumer_started() -> None:
global _beat_consumer_started
with _beat_consumer_lock:
if _beat_consumer_started:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
_beat_consumer_started = True
loop.create_task(beat_consumer_loop())
_time_token = 0
async def _time_loop(ctx: Dict[str, Any], token: int) -> None:
sequence_doc = ctx["sequence_doc"]
raw_dur = sequence_doc.get("step_duration_ms", 3000)
try:
duration = max(200, int(raw_dur))
except (TypeError, ValueError):
duration = 3000
raw_tr = sequence_doc.get("sequence_transition")
try:
tr_in = int(raw_tr) if raw_tr is not None else 0
except (TypeError, ValueError):
tr_in = 0
transition_ms = min(60000, max(0, tr_in))
min_step = 200
time_sleep_tr = min(transition_ms, max(0, duration - min_step))
time_tick_lead = max(min_step, duration - time_sleep_tr)
await _send_all_lanes(ctx)
my = token
while True:
await asyncio.sleep(time_tick_lead / 1000.0)
with _beat_run_lock:
cur = _time_token
if cur != my:
return
if time_sleep_tr > 0:
await asyncio.sleep(time_sleep_tr / 1000.0)
with _beat_run_lock:
cur = _time_token
if cur != my:
return
lane_states = ctx["lane_states"]
lanes = ctx["lanes"]
loop = bool(ctx.get("loop"))
lane0_looped = False
for i in range(ctx["num_lanes"]):
st = lane_states[i]
if st.get("done"):
continue
ln = len(lanes[i])
if int(st.get("stepIdx", 0)) + 1 >= ln:
if loop:
if i == 0:
lane0_looped = True
st["stepIdx"] = 0
else:
st["done"] = True
else:
st["stepIdx"] = int(st.get("stepIdx", 0)) + 1
if lane0_looped:
ctx["sequence_loop_beat"] = 1
else:
ctx["sequence_loop_beat"] = int(ctx.get("sequence_loop_beat", 0)) + 1
if all(s.get("done") for s in lane_states):
stop()
return
await _send_all_lanes(ctx)
def stop() -> None:
global _beat_run, _time_task, _time_token
with _beat_run_lock:
_beat_run = None
_time_token += 1
t = _time_task
_time_task = None
if t and not t.done():
t.cancel()
def stop_if_playing_sequence(sequence_id: str) -> bool:
"""If zone sequence playback is running this sequence id, stop it (e.g. after save/delete)."""
sid = str(sequence_id).strip()
if not sid:
return False
with _beat_run_lock:
ctx = _beat_run
if not ctx:
return False
cur = ctx.get("sequence_id")
if cur is None or str(cur).strip() != sid:
return False
stop()
return True
async def start(zone_id: str, sequence_id: str, profile_id: str) -> None:
global _beat_run, _time_task, _time_token
from models.preset import Preset
from models.profile import Profile
from models.sequence import Sequence
from models.zone import Zone
stop()
seq_m = Sequence()
zone_m = Zone()
prof_m = Profile()
sequence_doc = seq_m.read(sequence_id)
zone_doc = zone_m.read(zone_id)
if not sequence_doc or str(sequence_doc.get("profile_id")) != str(profile_id):
raise ValueError("sequence not found")
if not zone_doc:
raise ValueError("zone not found")
prof = prof_m.read(profile_id)
if not prof:
raise ValueError("profile not found")
presets_map: Dict[str, Any] = {}
pr = Preset()
for pid in pr.list():
doc = pr.read(pid)
if isinstance(doc, dict) and str(doc.get("profile_id")) == str(profile_id):
presets_map[str(pid)] = doc
ctx = _build_ctx(sequence_doc, zone_doc, presets_map, profile_id)
if not ctx:
raise ValueError("sequence has no steps")
ctx["sequence_id"] = str(sequence_id)
ctx["zone_id"] = str(zone_id)
ctx["sequence_loop_beat"] = 0
await _deliver_sequence_presets_bulk(ctx)
advance = ctx["advance_mode"]
if advance == "beats":
from util.beat_driver_route import update_beat_route
update_beat_route({"enabled": False})
with _beat_run_lock:
_beat_run = ctx
await _send_all_lanes(ctx)
else:
with _beat_run_lock:
_beat_run = ctx
_time_token += 1
my = _time_token
async def _run() -> None:
try:
await _time_loop(ctx, my)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"[sequence-playback] time loop: {e}")
loop = asyncio.get_running_loop()
_time_task = loop.create_task(_run())