feat(audio-sequences): beat phase sync and aligned playback

Add bar-phase tracking, audio reset/anchor APIs, BPM holdover, beat-phase
sequence switching, sync-phase endpoint, and sample sequence data.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-05-17 18:32:10 +12:00
parent 7ecb5c3b3e
commit 964cfc6d91
14 changed files with 1117 additions and 292 deletions

View File

@@ -4,6 +4,12 @@ import os
import queue
import threading
import time
from typing import Any
_HOLDOVER_BPM_MIN = 30.0
_HOLDOVER_BPM_MAX = 300.0
_HOLDOVER_MAX_S = 300.0
class AudioBeatDetector:
@@ -13,6 +19,11 @@ class AudioBeatDetector:
self._stream = None
self._running = False
self._stop_event = threading.Event()
self._runtime = None
self._pending_reset = False
self._holdover_thread: threading.Thread | None = None
self._holdover_stop = threading.Event()
self._holdover_active = False
self._status = {
"running": False,
"bpm": None,
@@ -20,6 +31,11 @@ class AudioBeatDetector:
"beat_seq": 0,
"beat_type": "unknown",
"beat_type_confidence": 0.0,
"bar_beat": 1,
"beats_per_bar": 4,
"is_downbeat": False,
"phase_confidence": 0.0,
"bar_phase_readout": "1/4",
"error": None,
"device": None,
}
@@ -100,6 +116,11 @@ class AudioBeatDetector:
"beat_seq": 0,
"beat_type": "unknown",
"beat_type_confidence": 0.0,
"bar_beat": 1,
"beats_per_bar": 4,
"is_downbeat": False,
"phase_confidence": 0.0,
"bar_phase_readout": "1/4",
"error": None,
"device": device,
}
@@ -111,6 +132,7 @@ class AudioBeatDetector:
self._thread.start()
def stop(self):
self._stop_bpm_holdover()
with self._lock:
self._stop_event.set()
t = self._thread
@@ -139,11 +161,159 @@ class AudioBeatDetector:
self._running = False
self._thread = None
self._stream = None
self._pending_reset = False
self._status["running"] = False
def status(self):
with self._lock:
return dict(self._status)
st = dict(self._status)
holdover = self._holdover_active
last = st.get("last_beat_ts")
if st.get("running") and last is not None and not holdover:
try:
if (time.time() - float(last)) > 4.0:
st["bpm"] = None
except (TypeError, ValueError):
pass
return st
def _apply_tracking_reset_status(self) -> None:
"""Refresh published status after a tracking reset (lock must be held)."""
bpb = max(1, int(self._status.get("beats_per_bar") or 4))
self._status.update(
{
"running": True,
"beat_type": "unknown",
"beat_type_confidence": 0.0,
"bar_beat": 1,
"is_downbeat": True,
"phase_confidence": 0.0,
"bar_phase_readout": f"1/{bpb}",
}
)
def _clamp_holdover_bpm(self, bpm: Any) -> float | None:
try:
v = float(bpm)
except (TypeError, ValueError):
return None
if not (_HOLDOVER_BPM_MIN <= v <= _HOLDOVER_BPM_MAX):
return None
return v
def _holdover_interval_s(self, bpm: float) -> float:
return 60.0 / max(_HOLDOVER_BPM_MIN, min(_HOLDOVER_BPM_MAX, float(bpm)))
def _stop_bpm_holdover(self) -> None:
with self._lock:
self._holdover_active = False
self._holdover_stop.set()
t = self._holdover_thread
if t and t.is_alive() and t is not threading.current_thread():
t.join(timeout=2.0)
with self._lock:
if self._holdover_thread is t:
self._holdover_thread = None
def _advance_holdover_bar_phase_locked(self) -> dict:
"""Advance bar phase for one synthetic beat (lock must be held)."""
bpb = max(1, int(self._status.get("beats_per_bar") or 4))
prev = int(self._status.get("bar_beat") or 1)
bar_beat = (prev % bpb) + 1
is_downbeat = bar_beat == 1
bar_readout = f"{bar_beat}/{bpb}"
self._status["bar_beat"] = bar_beat
self._status["is_downbeat"] = is_downbeat
self._status["bar_phase_readout"] = bar_readout
return {
"bar_beat": bar_beat,
"beats_per_bar": bpb,
"is_downbeat": is_downbeat,
"bar_phase_readout": bar_readout,
}
def _emit_holdover_beat(self, bpm: float) -> None:
now = time.time()
with self._lock:
if not self._running or not self._holdover_active:
return
self._advance_holdover_bar_phase_locked()
self._status["last_beat_ts"] = now
self._status["bpm"] = float(bpm)
self._status["beat_type"] = "holdover"
self._status["beat_type_confidence"] = 0.0
self._status["beat_seq"] = int(self._status.get("beat_seq", 0)) + 1
try:
from util import sequence_playback as seq_pb
seq_pb.push_thread_beat()
except Exception as e:
print(f"[audio] holdover beat queue: {e}")
def _holdover_loop(self, bpm: float, started_at: float) -> None:
interval = self._holdover_interval_s(bpm)
while not self._holdover_stop.is_set():
with self._lock:
if not self._running or not self._holdover_active:
return
if (time.time() - started_at) > _HOLDOVER_MAX_S:
self._holdover_active = False
return
last = self._status.get("last_beat_ts")
if last is not None:
try:
delay = max(0.02, float(last) + interval - time.time())
except (TypeError, ValueError):
delay = interval
else:
delay = interval
if self._holdover_stop.wait(delay):
return
self._emit_holdover_beat(bpm)
def _start_bpm_holdover(self, bpm: float) -> None:
bpm_v = self._clamp_holdover_bpm(bpm)
if bpm_v is None:
return
self._stop_bpm_holdover()
self._holdover_stop.clear()
started_at = time.time()
with self._lock:
self._holdover_active = True
self._holdover_thread = threading.Thread(
target=self._holdover_loop,
args=(bpm_v, started_at),
name="audio-bpm-holdover",
daemon=True,
)
t = self._holdover_thread
t.start()
def _process_pending_reset(self, runtime) -> None:
"""Run ``reset_state`` on the audio thread (safe for aubio tempo)."""
with self._lock:
if not self._pending_reset:
return
self._pending_reset = False
try:
runtime.reset_state()
with self._lock:
self._apply_tracking_reset_status()
except Exception as e:
print(f"[audio] pending reset: {e}")
def reset_tracking(self) -> bool:
"""Clear detector tempo history without stopping the input stream."""
holdover_bpm = None
with self._lock:
if not self._running or self._runtime is None:
return False
holdover_bpm = self._clamp_holdover_bpm(self._status.get("bpm"))
self._pending_reset = True
self._apply_tracking_reset_status()
if holdover_bpm is not None:
self._start_bpm_holdover(holdover_bpm)
return True
def _set_error(self, msg):
print(f"[audio] {msg}")
@@ -152,7 +322,28 @@ class AudioBeatDetector:
self._status["running"] = False
self._running = False
def _record_beat(self, bpm, beat_type="unknown", beat_type_confidence=0.0):
def anchor_bar_phase(self) -> bool:
"""Mark the current moment as bar beat 1 (downbeat), e.g. after manual sync."""
with self._lock:
rt = self._runtime
if rt is None:
return False
try:
rt.anchor_bar_phase(time.time())
with self._lock:
self._status["bar_beat"] = 1
self._status["is_downbeat"] = True
self._status["bar_phase_readout"] = f"1/{int(self._status.get('beats_per_bar') or 4)}"
self._status["phase_confidence"] = max(
float(self._status.get("phase_confidence") or 0.0), 0.85
)
return True
except Exception as e:
print(f"[audio] anchor_bar_phase: {e}")
return False
def _record_beat(self, bpm, beat_type="unknown", beat_type_confidence=0.0, **phase_fields):
self._stop_bpm_holdover()
now = time.time()
with self._lock:
self._status["last_beat_ts"] = now
@@ -160,6 +351,16 @@ class AudioBeatDetector:
self._status["beat_type"] = beat_type
self._status["beat_type_confidence"] = float(beat_type_confidence or 0.0)
self._status["beat_seq"] = int(self._status.get("beat_seq", 0)) + 1
if phase_fields.get("bar_beat") is not None:
self._status["bar_beat"] = int(phase_fields["bar_beat"])
if phase_fields.get("beats_per_bar") is not None:
self._status["beats_per_bar"] = int(phase_fields["beats_per_bar"])
if phase_fields.get("is_downbeat") is not None:
self._status["is_downbeat"] = bool(phase_fields["is_downbeat"])
if phase_fields.get("phase_confidence") is not None:
self._status["phase_confidence"] = float(phase_fields["phase_confidence"])
if phase_fields.get("bar_phase_readout"):
self._status["bar_phase_readout"] = str(phase_fields["bar_phase_readout"])
try:
from util import sequence_playback as seq_pb
@@ -210,15 +411,17 @@ class AudioBeatDetector:
flux_weight=0.3,
threshold_multiplier=1.35,
ema_alpha=0.08,
min_ioi_ms=85.0,
min_ioi_ms=100.0,
bpm_window=8,
post_url="",
aubio_method="default",
aubio_threshold=0.12,
silence_gate_db=-58.0,
aubio_threshold=0.14,
beats_per_bar=4,
)
runtime = beat_mod.BeatDetectRuntime(args)
runtime.setup(sample_rate=sample_rate)
with self._lock:
self._runtime = runtime
hop_size = runtime.frame_size
audio_q = queue.Queue(maxsize=64)
@@ -243,10 +446,12 @@ class AudioBeatDetector:
stream.start()
try:
while not self._stop_event.is_set():
self._process_pending_reset(runtime)
try:
frame = audio_q.get(timeout=0.1)
except queue.Empty:
continue
self._process_pending_reset(runtime)
if frame.shape[0] != hop_size:
if frame.shape[0] > hop_size:
frame = frame[:hop_size]
@@ -260,6 +465,11 @@ class AudioBeatDetector:
bpm,
beat_type=event.get("beat_type", "unknown"),
beat_type_confidence=event.get("beat_type_confidence", 0.0),
bar_beat=event.get("bar_beat"),
beats_per_bar=event.get("beats_per_bar"),
is_downbeat=event.get("is_downbeat"),
phase_confidence=event.get("phase_confidence"),
bar_phase_readout=event.get("bar_phase_readout"),
)
finally:
try:
@@ -280,6 +490,7 @@ class AudioBeatDetector:
with self._lock:
self._running = False
self._status["running"] = False
self._runtime = None
# Set from ``main`` so sequence playback can tell real audio from simulated beats.
@@ -299,3 +510,25 @@ def shared_beat_detector_running():
return bool(d.status().get("running"))
except Exception:
return False
def shared_beat_status_snapshot() -> dict:
"""Thread-safe copy of live detector status, or {} if audio is off."""
d = _shared_beat_detector
if d is None:
return {}
try:
return dict(d.status())
except Exception:
return {}
def anchor_shared_bar_phase() -> bool:
"""Anchor bar phase on the shared detector (no-op if audio is off)."""
d = _shared_beat_detector
if d is None:
return False
try:
return bool(d.anchor_bar_phase())
except Exception:
return False

View File

@@ -30,20 +30,64 @@ def read_audio_run_state() -> Dict[str, Any]:
except (OSError, json.JSONDecodeError, TypeError):
return {"enabled": False, "device": None}
if not isinstance(raw, dict):
return {"enabled": False, "device": None}
return {
"enabled": False,
"device": None,
"device_override": "",
"device_select": "",
}
enabled = bool(raw.get("enabled"))
dev = raw.get("device", None)
return {"enabled": enabled, "device": dev}
return {
"enabled": enabled,
"device": dev,
"device_override": str(raw.get("device_override") or ""),
"device_select": str(raw.get("device_select") or ""),
}
def write_audio_run_state(*, enabled: bool, device: Any = None) -> None:
"""Write run intent. When ``enabled`` is false, keep ``device`` from the previous file for next start."""
def write_audio_run_state(
*,
enabled: bool,
device: Any = None,
device_override: str | None = None,
device_select: str | None = None,
) -> None:
"""Write run intent. When ``enabled`` is false, keep device fields from the previous file."""
path = _db_path()
prev = read_audio_run_state()
if enabled:
data = {"enabled": True, "device": device}
data = {
"enabled": True,
"device": device,
"device_override": (
str(device_override)
if device_override is not None
else str(prev.get("device_override") or "")
),
"device_select": (
str(device_select)
if device_select is not None
else str(prev.get("device_select") or "")
),
}
if device_select is None and device is not None:
data["device_select"] = str(device)
else:
data = {"enabled": False, "device": prev.get("device")}
data = {
"enabled": False,
"device": prev.get("device"),
"device_override": (
str(device_override)
if device_override is not None
else str(prev.get("device_override") or "")
),
"device_select": (
str(device_select)
if device_select is not None
else str(prev.get("device_select") or "")
),
}
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:

View File

@@ -299,6 +299,10 @@ def _apply_manual_beat_route_standalone_overlay(
return
names = [str(n).strip() for n in device_names if str(n).strip()]
with _route_lock:
if _sequence_lane_covers_standalone_overlay(names, str(wire_preset_id).strip()):
_lane_manual.pop(-1, None)
_sync_public_beat_route_from_lane_table()
return
_lane_manual[-1] = {
"device_names": names,
"wire_preset_id": str(wire_preset_id).strip(),
@@ -350,6 +354,11 @@ def set_sequence_manual_lane_route(
"manual_beat_n": mn,
"beat_counter": bc,
}
overlay = _lane_manual.get(-1)
if overlay and _lane_route_targets_key(names, wid) == _lane_route_targets_key(
overlay.get("device_names") or [], str(overlay.get("wire_preset_id") or "")
):
_lane_manual.pop(-1, None)
_sync_public_beat_route_from_lane_table()
@@ -362,6 +371,49 @@ def clear_sequence_manual_lane_route(lane_index: int) -> None:
_sync_public_beat_route_from_lane_table()
def _lane_route_targets_key(device_names: List[str], wire_preset_id: str) -> Tuple[Tuple[str, ...], str]:
names = tuple(sorted({str(n).strip() for n in (device_names or []) if str(n).strip()}))
return names, str(wire_preset_id or "").strip()
def _sequence_lane_covers_standalone_overlay(device_names: List[str], wire_preset_id: str) -> bool:
"""True when a sequence lane (0..n) already routes the same device(s) and wire preset."""
key = _lane_route_targets_key(device_names, wire_preset_id)
for lane_key, entry in _lane_manual.items():
if not isinstance(lane_key, int) or lane_key < 0:
continue
other = _lane_route_targets_key(
entry.get("device_names") or [], str(entry.get("wire_preset_id") or "")
)
if other == key:
return True
return False
def mark_manual_select_sent_for_targets(
device_names: List[str], wire_preset_id: str
) -> None:
"""A ``select`` was just sent for these targets; skip one duplicate on the next beat."""
key = _lane_route_targets_key(device_names, wire_preset_id)
with _route_lock:
for entry in _lane_manual.values():
if not isinstance(entry, dict):
continue
other = _lane_route_targets_key(
entry.get("device_names") or [], str(entry.get("wire_preset_id") or "")
)
if other == key:
entry["suppress_next_notify"] = True
def mark_sequence_manual_lane_select_sent(lane_index: int) -> None:
"""A ``select`` was just sent for this lane; skip one duplicate on the next beat."""
with _route_lock:
e = _lane_manual.get(lane_index)
if e is not None:
e["suppress_next_notify"] = True
def sync_beat_route_from_push_sequence(
sequence: List[Any],
target_macs: Optional[List[str]] = None,
@@ -438,6 +490,7 @@ def sync_beat_route_from_push_sequence(
)
else:
_apply_manual_beat_route(device_names, wire_preset_id, preset_body)
mark_manual_select_sent_for_targets(device_names, wire_preset_id)
return
wire_id, body = _single_manual_wire_preset(merged_presets)
@@ -547,6 +600,7 @@ def notify_beat_detected() -> None:
if not _lane_manual:
return
work = []
seen_targets: Set[Tuple[Tuple[str, ...], str]] = set()
for key in sorted(_lane_manual.keys()):
e = _lane_manual[key]
names = e.get("device_names") or []
@@ -555,6 +609,8 @@ def notify_beat_detected() -> None:
pattern = str(e.get("pattern") or "")
if pattern and not _pattern_supports_manual(pattern):
continue
if e.pop("suppress_next_notify", False):
continue
try:
n = int(e.get("manual_beat_n") or 1)
except (TypeError, ValueError):
@@ -564,7 +620,12 @@ def notify_beat_detected() -> None:
c = int(e["beat_counter"])
if (c - 1) % n != 0:
continue
work.append((list(names), str(e.get("wire_preset_id") or "2")))
wire = str(e.get("wire_preset_id") or "2")
target_key = _lane_route_targets_key(names, wire)
if target_key in seen_targets:
continue
seen_targets.add(target_key)
work.append((list(names), wire))
if work:
_preset_session_beats += 1
if not work:

View File

@@ -24,6 +24,16 @@ _sim_beat_token = 0
_beat_run: Optional[Dict[str, Any]] = None
_beat_run_lock = threading.Lock()
_pending_play: Optional[Dict[str, Any]] = None
_pending_play_lock = threading.Lock()
_pending_beat_task: Optional[asyncio.Task] = None
_pending_beat_token = 0
_last_thread_beat_phase: Dict[str, Any] = {
"is_downbeat": True,
"bar_beat": 1,
}
_sim_beat_counter = 0
def _norm_mac(raw: Any) -> Optional[str]:
from models.device import normalize_mac
@@ -299,21 +309,6 @@ def _resolve_colors_with_palette_refs(
return out
def _ordered_unique_preset_ids_from_lanes(lanes: List[List[Dict[str, Any]]]) -> List[str]:
seen: set = set()
out: List[str] = []
for lane in lanes:
for step in lane:
if not isinstance(step, dict):
continue
pid = str(step.get("preset_id") or "").strip()
if not pid or pid in seen:
continue
seen.add(pid)
out.append(pid)
return out
def _display_preset_for_step(
preset_id: str,
presets_map: Dict[str, Any],
@@ -348,6 +343,129 @@ def _preset_inner_from_display_preset(display_preset: Dict[str, Any]) -> Dict[st
return inner
def _ordered_unique_preset_ids_in_lane(lane: List[Dict[str, Any]]) -> List[str]:
seen: set = set()
out: List[str] = []
for step in lane:
if not isinstance(step, dict):
continue
pid = str(step.get("preset_id") or "").strip()
if not pid or pid in seen:
continue
seen.add(pid)
out.append(pid)
return out
def _resolve_lane_device_names(lane_index: int, ctx: Dict[str, Any]) -> List[str]:
"""Device names for one lane (lane groups / whole zone), after lane partition."""
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
sequence_doc: Dict[str, Any] = ctx["sequence_doc"]
zone_doc: Dict[str, Any] = ctx["zone_doc"]
devices = ctx["devices"]
groups = ctx["groups"]
num_lanes = int(ctx["num_lanes"])
lane = lanes[lane_index] if 0 <= lane_index < len(lanes) else []
if not lane:
return []
gids = _group_ids_for_lane_step(sequence_doc, lane[0], lane_index, num_lanes)
device_names = _resolve_step_device_names(
zone_doc, gids, devices, groups, sequence_doc=sequence_doc
)
return _split_device_names_for_lane(
device_names,
lane_index,
num_lanes,
partition_shared_zone=not _lane_has_non_empty_lanes_group_ids(sequence_doc, lane_index),
)
def _build_lane_wire_presets_map(lane_index: int, ctx: Dict[str, Any]) -> Dict[str, Any]:
"""All preset wire bodies for one lane, keyed by preset id."""
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
presets_map: Dict[str, Any] = ctx["presets_map"]
palette_colors: List[Any] = ctx["palette_colors"]
lane = lanes[lane_index] if 0 <= lane_index < len(lanes) else []
inner_by_wire: Dict[str, Any] = {}
for pid in _ordered_unique_preset_ids_in_lane(lane):
disp = _display_preset_for_step(pid, presets_map, palette_colors)
if not disp:
continue
inner_by_wire[str(pid)] = _preset_inner_from_display_preset(disp)
return inner_by_wire
async def _prime_lane(lane_index: int, ctx: Dict[str, Any]) -> None:
"""Upload all lane presets and select step 0 in one message (driver applies presets before select)."""
from models.transport import get_current_sender
from util.beat_driver_route import (
clear_sequence_manual_lane_route,
mark_sequence_manual_lane_select_sent,
set_sequence_manual_lane_route,
)
from util.driver_delivery import deliver_json_messages
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
presets_map: Dict[str, Any] = ctx["presets_map"]
palette_colors: List[Any] = ctx["palette_colors"]
lane_steps = lanes[lane_index] if 0 <= lane_index < len(lanes) else []
if not lane_steps:
return
inner_by_wire = _build_lane_wire_presets_map(lane_index, ctx)
if not inner_by_wire:
return
step0 = lane_steps[0]
preset_id = str(step0.get("preset_id") or "").strip()
if not preset_id:
return
display_preset = _display_preset_for_step(preset_id, presets_map, palette_colors)
if not display_preset:
return
device_names = _resolve_lane_device_names(lane_index, ctx)
macs = _device_names_to_macs(device_names, ctx["devices"])
if not macs:
return
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
zone_doc = ctx.get("zone_doc") if isinstance(ctx.get("zone_doc"), dict) else {}
devices_model = ctx["devices"]
wire = str(preset_id)
auto = _coerce_auto(display_preset)
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
delay_s = 0.05
for mac in macs:
body: Dict[str, Any] = {"v": "1", "presets": dict(inner_by_wire)}
if sel:
body["select"] = sel
msg = json.dumps(body, separators=(",", ":"))
await deliver_json_messages(sender, [msg], [mac], devices_model, delay_s=delay_s)
if auto:
clear_sequence_manual_lane_route(lane_index)
else:
inner = _preset_inner_from_display_preset(display_preset)
set_sequence_manual_lane_route(lane_index, device_names, wire, inner)
mark_sequence_manual_lane_select_sent(lane_index)
async def _prime_all_lanes(ctx: Dict[str, Any]) -> None:
"""One-shot preset upload + first-step select per lane (to each lane's groups)."""
for i in range(int(ctx["num_lanes"])):
await _prime_lane(i, ctx)
ctx["_presets_delivered"] = True
ctx["_sequence_primed"] = True
def _parse_zone_brightness_value(zone_doc: Any) -> int:
"""Zone slider value stored on the zone row (0255); default 255 if unset."""
from util.brightness_combine import clamp255
@@ -363,37 +481,33 @@ def _parse_zone_brightness_value(zone_doc: Any) -> int:
return 255
def _inner_wire_b_with_sequence_zone_brightness(
inner: Dict[str, Any],
zone_doc: Dict[str, Any],
*,
target_mac: Optional[str],
settings_obj: Any,
groups_model: Any,
devices_model: Any,
) -> Dict[str, Any]:
"""Combine preset wire ``b`` with zone brightness (and global/group/device when ``target_mac`` is set)."""
from util.brightness_combine import (
clamp255,
multiply_brightness_factors,
effective_brightness_for_mac,
)
async def _deliver_zone_brightness_for_sequence(ctx: Dict[str, Any]) -> None:
"""Apply zone/global/group/device brightness like the zone slider (not inside preset ``b``)."""
from models.transport import get_current_sender
from util.brightness_combine import effective_brightness_for_mac
from util.driver_delivery import deliver_json_messages
out = dict(inner)
base = clamp255(out.get("b", 127))
sender = get_current_sender()
if not sender:
return
macs = _union_macs_for_sequence(ctx)
if not macs:
return
zone_doc = ctx.get("zone_doc") if isinstance(ctx.get("zone_doc"), dict) else {}
zb = _parse_zone_brightness_value(zone_doc)
if target_mac and settings_obj is not None and groups_model is not None and devices_model is not None:
settings_obj = ctx.get("settings")
groups_model = ctx.get("groups")
devices_model = ctx.get("devices")
for mac in macs:
eff = effective_brightness_for_mac(
settings_obj,
groups_model,
devices_model,
target_mac,
mac,
zone_brightness=zb,
)
out["b"] = multiply_brightness_factors([base, eff])
else:
out["b"] = multiply_brightness_factors([base, zb])
return out
msg = json.dumps({"v": "1", "b": eff, "save": True}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], [mac], devices_model, delay_s=0.05)
def _device_names_to_macs(device_names: List[str], devices: Any) -> List[str]:
@@ -455,52 +569,21 @@ def _union_macs_for_sequence(ctx: Dict[str, Any]) -> List[str]:
return list(z_macs)
def _build_sequence_wire_presets_map(ctx: Dict[str, Any]) -> Dict[str, Any]:
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
presets_map: Dict[str, Any] = ctx["presets_map"]
palette_colors: List[Any] = ctx["palette_colors"]
inner_by_wire: Dict[str, Any] = {}
for pid in _ordered_unique_preset_ids_from_lanes(lanes):
disp = _display_preset_for_step(pid, presets_map, palette_colors)
if not disp:
continue
inner_by_wire[str(pid)] = _preset_inner_from_display_preset(disp)
return inner_by_wire
async def _deliver_sequence_presets_bulk(ctx: Dict[str, Any]) -> None:
"""Push all preset definitions used in the sequence once; step advances use select (auto) only."""
from models.transport import get_current_sender
from util.driver_delivery import deliver_json_messages
inner_by_wire = _build_sequence_wire_presets_map(ctx)
ctx["_sequence_wire_presets"] = inner_by_wire
if not inner_by_wire:
return
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
macs = _union_macs_for_sequence(ctx)
if not macs:
return
zone_doc = ctx.get("zone_doc") if isinstance(ctx.get("zone_doc"), dict) else {}
settings_obj = ctx.get("settings")
groups_model = ctx.get("groups")
devices_model = ctx.get("devices")
delay_s = 0.05
for mac in macs:
adjusted: Dict[str, Any] = {}
for wire_pid, inner in inner_by_wire.items():
adjusted[wire_pid] = _inner_wire_b_with_sequence_zone_brightness(
inner,
zone_doc,
target_mac=mac,
settings_obj=settings_obj,
groups_model=groups_model,
devices_model=devices_model,
)
msg = json.dumps({"v": "1", "presets": adjusted}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], [mac], devices_model, delay_s=delay_s)
def _coerce_loop(sequence_doc: Dict[str, Any]) -> bool:
raw = sequence_doc.get("loop", sequence_doc.get("sequence_loop", True))
if isinstance(raw, bool):
return raw
if raw is None:
return True
if isinstance(raw, int):
return raw != 0
if isinstance(raw, str):
lo = raw.strip().lower()
if lo in ("false", "0", "no", "off"):
return False
if lo in ("true", "1", "yes", "on"):
return True
return True
def _coerce_auto(preset: Dict[str, Any]) -> bool:
@@ -533,119 +616,17 @@ def _load_palette_colors(profile_id: str) -> List[Any]:
return Palette().read(str(pid)) or []
async def _deliver_preset_for_devices(
preset_id: str,
preset_doc: Dict[str, Any],
device_names: List[str],
devices: Any,
*,
lane_index: Optional[int] = None,
zone_doc: Optional[Dict[str, Any]] = None,
settings_obj: Any = None,
groups_model: Any = None,
) -> None:
from models.transport import get_current_sender
from util.driver_delivery import deliver_json_messages
from util.beat_driver_route import sync_beat_route_from_push_sequence
from util.espnow_message import build_preset_dict
sender = get_current_sender()
if not sender:
raise RuntimeError("Transport not configured")
macs: List[str] = []
seen: set = set()
for nm in device_names:
key = str(nm).strip()
if not key:
continue
m = None
for did in devices.list():
doc = devices.read(did) or {}
if str(doc.get("name") or "").strip() == key:
m = _norm_mac(did)
break
if not m and key.startswith("led-"):
m = _norm_mac(key[4:])
if m and m not in seen:
seen.add(m)
macs.append(m)
if not macs:
return
body = dict(preset_doc)
auto = _coerce_auto(body)
inner_base = build_preset_dict(body)
mb = body.get("manual_beat_n", body.get("manualBeatN"))
if mb is not None:
try:
n = int(mb)
if 1 <= n <= 64:
inner_base["manual_beat_n"] = n
except (TypeError, ValueError):
pass
wire = str(preset_id)
zone_use = zone_doc if isinstance(zone_doc, dict) else {}
sel_append: Optional[Dict[str, Any]] = None
if auto and device_names:
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if sel:
sel_append = {"v": "1", "select": sel}
for mac in macs:
inner = _inner_wire_b_with_sequence_zone_brightness(
inner_base,
zone_use,
target_mac=mac,
settings_obj=settings_obj,
groups_model=groups_model,
devices_model=devices,
)
seq_list: List[Dict[str, Any]] = [{"v": "1", "presets": {wire: inner}}]
if sel_append:
seq_list.append(dict(sel_append))
messages = [json.dumps(x, separators=(",", ":")) for x in seq_list]
await deliver_json_messages(sender, messages, [mac], devices, delay_s=0.05)
if not auto:
manual_inner = _inner_wire_b_with_sequence_zone_brightness(
inner_base,
zone_use,
target_mac=macs[0] if len(macs) == 1 else None,
settings_obj=settings_obj,
groups_model=groups_model,
devices_model=devices,
)
if lane_index is not None:
from util.beat_driver_route import set_sequence_manual_lane_route
set_sequence_manual_lane_route(lane_index, device_names, wire, manual_inner)
else:
seq_one = [{"v": "1", "presets": {wire: manual_inner}}]
if sel_append:
seq_one.append(dict(sel_append))
sync_beat_route_from_push_sequence(
seq_one, target_macs=macs, preserve_parallel_lane_routes=True
)
async def _send_lane(
lane_index: int,
st: Dict[str, Any],
ctx: Dict[str, Any],
) -> None:
"""Apply the current step (select or manual route). Presets must already be on devices."""
lanes: List[List[Dict[str, Any]]] = ctx["lanes"]
sequence_doc: Dict[str, Any] = ctx["sequence_doc"]
presets_map: Dict[str, Any] = ctx["presets_map"]
zone_doc: Dict[str, Any] = ctx["zone_doc"]
devices = ctx["devices"]
groups = ctx["groups"]
palette_colors: List[Any] = ctx["palette_colors"]
num_lanes = ctx["num_lanes"]
devices = ctx["devices"]
if st.get("done"):
return
@@ -660,14 +641,14 @@ async def _send_lane(
display_preset = _display_preset_for_step(preset_id, presets_map, palette_colors)
if not display_preset:
return
gids = _group_ids_for_lane_step(sequence_doc, step, lane_index, num_lanes)
gids = _group_ids_for_lane_step(sequence_doc, step, lane_index, int(ctx["num_lanes"]))
device_names = _resolve_step_device_names(
zone_doc, gids, devices, groups, sequence_doc=sequence_doc
ctx["zone_doc"], gids, devices, ctx["groups"], sequence_doc=sequence_doc
)
device_names = _split_device_names_for_lane(
device_names,
lane_index,
num_lanes,
int(ctx["num_lanes"]),
partition_shared_zone=not _lane_has_non_empty_lanes_group_ids(sequence_doc, lane_index),
)
if gids and not device_names:
@@ -676,6 +657,7 @@ async def _send_lane(
from models.transport import get_current_sender
from util.beat_driver_route import (
clear_sequence_manual_lane_route,
mark_sequence_manual_lane_select_sent,
set_sequence_manual_lane_route,
)
from util.driver_delivery import deliver_json_messages
@@ -688,44 +670,29 @@ async def _send_lane(
if not macs:
return
bulk = ctx.get("_sequence_wire_presets")
if isinstance(bulk, dict) and bulk:
auto = _coerce_auto(display_preset)
wire = str(preset_id)
auto = _coerce_auto(display_preset)
if auto:
clear_sequence_manual_lane_route(lane_index)
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if not sel:
return
msg = json.dumps({"v": "1", "select": sel}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], macs, devices, delay_s=0.05)
else:
inner = _preset_inner_from_display_preset(display_preset)
zone_use = ctx.get("zone_doc") if isinstance(ctx.get("zone_doc"), dict) else {}
inner = _inner_wire_b_with_sequence_zone_brightness(
inner,
zone_use,
target_mac=macs[0] if len(macs) == 1 else None,
settings_obj=ctx.get("settings"),
groups_model=ctx.get("groups"),
devices_model=devices,
)
wire = str(preset_id)
if auto:
clear_sequence_manual_lane_route(lane_index)
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if not sel:
return
set_sequence_manual_lane_route(lane_index, device_names, wire, inner)
sel: Dict[str, Any] = {}
for n in device_names:
if n:
sel[str(n)] = [wire]
if sel:
msg = json.dumps({"v": "1", "select": sel}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], macs, devices, delay_s=0.05)
else:
set_sequence_manual_lane_route(lane_index, device_names, wire, inner)
return
await _deliver_preset_for_devices(
preset_id,
display_preset,
device_names,
devices,
lane_index=lane_index,
zone_doc=zone_doc,
settings_obj=ctx.get("settings"),
groups_model=groups,
)
mark_sequence_manual_lane_select_sent(lane_index)
async def _send_all_lanes(ctx: Dict[str, Any]) -> None:
@@ -745,7 +712,7 @@ def _build_ctx(
) -> Optional[Dict[str, Any]]:
from models.device import Device
from models.group import Group
from settings import Settings
from settings import get_settings
lanes = [x for x in _normalize_sequence_lanes(sequence_doc) if len(x) > 0]
if not lanes:
@@ -764,9 +731,9 @@ def _build_ctx(
"presets_map": presets_map,
"devices": devices,
"groups": groups,
"settings": Settings(),
"settings": get_settings(),
"palette_colors": palette_colors,
"loop": True,
"loop": _coerce_loop(sequence_doc),
"advance_mode": "beats",
}
@@ -897,7 +864,294 @@ async def process_active_beat_advance() -> None:
else:
ctx["sequence_loop_beat"] = int(ctx.get("sequence_loop_beat", 0)) + 1
if all(s.get("done") for s in lane_states):
stop()
await stop_playback(clear_devices=True)
return
async def _clear_devices_after_sequence(ctx: Dict[str, Any]) -> None:
"""Stop beat routing and clear driver presets for devices used by this sequence run."""
from models.transport import get_current_sender
from util.beat_driver_route import clear_sequence_manual_lane_route, update_beat_route
from util.driver_delivery import deliver_json_messages
num_lanes = int(ctx.get("num_lanes") or 0)
for i in range(num_lanes):
clear_sequence_manual_lane_route(i)
update_beat_route({"enabled": False})
sender = get_current_sender()
if not sender:
return
devices = ctx.get("devices")
macs = _union_macs_for_sequence(ctx)
if not macs:
return
msg = json.dumps({"v": "1", "clear_presets": True, "save": True}, separators=(",", ":"))
await deliver_json_messages(sender, [msg], macs, devices, delay_s=0.05)
def _halt_playback_state() -> Optional[Dict[str, Any]]:
"""Drop active run state and cancel simulated beats; return the previous ctx."""
global _beat_run, _sim_beat_task, _sim_beat_token
ctx: Optional[Dict[str, Any]] = None
with _beat_run_lock:
ctx = _beat_run
_beat_run = None
_sim_beat_token += 1
st = _sim_beat_task
_sim_beat_task = None
if st and not st.done():
st.cancel()
return ctx
async def stop_playback(*, clear_devices: bool = True) -> None:
"""Stop sequence playback; optionally clear presets on targeted devices."""
clear_pending_play()
ctx = _halt_playback_state()
if clear_devices and ctx:
await _clear_devices_after_sequence(ctx)
def apply_beat_phase_sync(ctx: Dict[str, Any], mode: str) -> Tuple[bool, bool]:
"""Align beat counters to music.
``step`` (default): beat 1 of the current step on the next counted beat.
``pass``: restart from step 1 of the sequence pass and re-apply presets.
Returns ``(ok, resend_lanes)`` — caller should ``await _send_all_lanes(ctx)`` when resend is true.
"""
if not ctx:
return False, False
mode_norm = str(mode or "step").strip().lower()
lane_states: List[Dict[str, Any]] = ctx.get("lane_states") or []
if mode_norm in ("pass", "sequence", "restart"):
for st in lane_states:
st["stepIdx"] = 0
st["beatCount"] = 0
st["done"] = False
ctx["sequence_loop_beat"] = 0
return True, True
for st in lane_states:
if not st.get("done"):
st["beatCount"] = 0
return True, False
async def sync_beat_phase(mode: str = "step") -> bool:
"""Public entry: align active sequence playback to a musical phase."""
with _beat_run_lock:
ctx = _beat_run
if not ctx:
return False
ok, resend = apply_beat_phase_sync(ctx, mode)
if not ok:
return False
if resend:
await _send_all_lanes(ctx)
return True
def _drain_beat_queue() -> None:
try:
while True:
_thread_beat_queue.get_nowait()
except queue.Empty:
pass
def _reset_beat_side_effects() -> None:
"""Clear manual routes and queued beats so startup cannot select before presets land."""
from util.beat_driver_route import update_beat_route
update_beat_route({"enabled": False})
_drain_beat_queue()
def _sequence_switch_wait_from_settings() -> str:
try:
from settings import get_settings
raw = get_settings().get("sequence_switch_wait", "beat")
mode = _normalize_wait_for({"wait_for": raw}) or "beat"
if mode == "phrase":
return "beat"
return mode
except Exception:
return "beat"
def _normalize_wait_for(play_options: Optional[Dict[str, Any]]) -> Optional[str]:
"""``beat`` | ``downbeat`` | None (immediate)."""
if not isinstance(play_options, dict):
return None
raw = play_options.get("wait_for")
if raw is None:
raw = play_options.get("start_on")
if raw is None:
return None
s = str(raw).strip().lower()
if s in ("beat", "next_beat"):
return "beat"
if s in ("downbeat", "next_downbeat"):
return "downbeat"
return None
def _play_options_without_wait(play_options: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not isinstance(play_options, dict):
return play_options
out = dict(play_options)
out.pop("wait_for", None)
out.pop("start_on", None)
return out
def _cancel_pending_beat_waiter() -> None:
global _pending_beat_task, _pending_beat_token
_pending_beat_token += 1
t = _pending_beat_task
_pending_beat_task = None
if t and not t.done():
t.cancel()
def clear_pending_play() -> None:
"""Drop a queued sequence start (e.g. user stop)."""
global _pending_play
with _pending_play_lock:
_pending_play = None
_cancel_pending_beat_waiter()
def pending_play_status() -> Dict[str, Any]:
with _pending_play_lock:
p = _pending_play
if not p:
return {"pending": False}
return {
"pending": True,
"wait_for": p.get("wait_for"),
"sequence_id": p.get("sequence_id"),
"zone_id": p.get("zone_id"),
}
def _beat_phase_from_sources() -> Dict[str, Any]:
from util import audio_detector as ad_mod
if ad_mod.shared_beat_detector_running():
st = ad_mod.shared_beat_status_snapshot()
if st:
return dict(st)
return dict(_last_thread_beat_phase)
def _beat_is_downbeat_from_sources() -> bool:
return bool(_beat_phase_from_sources().get("is_downbeat"))
def _mark_simulated_beat_phase(*, beats_per_bar: int = 4) -> None:
global _sim_beat_counter, _last_thread_beat_phase
bpb = max(1, int(beats_per_bar))
_sim_beat_counter += 1
bar_beat = ((_sim_beat_counter - 1) % bpb) + 1
is_downbeat = bar_beat == 1
_last_thread_beat_phase = {
"bar_beat": bar_beat,
"is_downbeat": is_downbeat,
}
def _queue_pending_start(
zone_id: str,
sequence_id: str,
profile_id: str,
play_options: Optional[Dict[str, Any]],
wait_for: str,
*,
bpm: float,
) -> None:
global _pending_play
clear_pending_play()
with _pending_play_lock:
_pending_play = {
"zone_id": str(zone_id),
"sequence_id": str(sequence_id),
"profile_id": str(profile_id),
"play_options": _play_options_without_wait(play_options),
"wait_for": wait_for,
}
_ensure_pending_beat_waiter(bpm)
def _ensure_pending_beat_waiter(bpm: float) -> None:
"""When nothing is playing and audio is off, emit synthetic beats until pending starts."""
from util import audio_detector as ad_mod
if ad_mod.shared_beat_detector_running():
return
with _beat_run_lock:
if _beat_run:
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
global _pending_beat_task, _pending_beat_token
t = _pending_beat_task
if t and not t.done():
t.cancel()
_pending_beat_token += 1
my_tok = _pending_beat_token
_pending_beat_task = loop.create_task(_pending_beat_wait_loop(bpm, my_tok))
async def _pending_beat_wait_loop(bpm: float, my_token: int) -> None:
from util import audio_detector as ad_mod
interval = 60.0 / max(30.0, min(300.0, float(bpm)))
while True:
with _pending_play_lock:
if _pending_beat_token != my_token or _pending_play is None:
return
if ad_mod.shared_beat_detector_running():
return
await asyncio.sleep(interval)
with _pending_play_lock:
if _pending_beat_token != my_token or _pending_play is None:
return
if ad_mod.shared_beat_detector_running():
return
_mark_simulated_beat_phase()
push_thread_beat()
async def _try_consume_pending_play(*, is_downbeat: bool) -> bool:
global _pending_play
with _pending_play_lock:
pending = _pending_play
if not pending:
return False
wait_for = str(pending.get("wait_for") or "beat").strip().lower()
if wait_for == "downbeat" and not is_downbeat:
return False
_pending_play = None
_cancel_pending_beat_waiter()
await _start_immediate(
pending["zone_id"],
pending["sequence_id"],
pending["profile_id"],
pending.get("play_options"),
)
return True
def stop() -> None:
"""Stop server playback state without sending device clear (e.g. before starting another run)."""
clear_pending_play()
_halt_playback_state()
_reset_beat_side_effects()
def push_thread_beat() -> None:
@@ -920,6 +1174,12 @@ async def beat_consumer_loop() -> None:
from util.beat_driver_route import notify_beat_detected
for _ in range(n):
phase = _beat_phase_from_sources()
is_down = bool(phase.get("is_downbeat"))
try:
await _try_consume_pending_play(is_downbeat=is_down)
except Exception as e:
print(f"[sequence-playback] pending start: {e}")
try:
await process_active_beat_advance()
except Exception as e:
@@ -981,20 +1241,10 @@ async def _simulated_beat_loop(ctx: Dict[str, Any], my_token: int, bpm: float) -
return
if ad_mod.shared_beat_detector_running():
continue
_mark_simulated_beat_phase()
push_thread_beat()
def stop() -> None:
global _beat_run, _sim_beat_task, _sim_beat_token
with _beat_run_lock:
_beat_run = None
_sim_beat_token += 1
st = _sim_beat_task
_sim_beat_task = None
if st and not st.done():
st.cancel()
def stop_if_playing_sequence(sequence_id: str) -> bool:
"""If zone sequence playback is running this sequence id, stop it (e.g. after save/delete)."""
sid = str(sequence_id).strip()
@@ -1007,7 +1257,11 @@ def stop_if_playing_sequence(sequence_id: str) -> bool:
cur = ctx.get("sequence_id")
if cur is None or str(cur).strip() != sid:
return False
stop()
try:
loop = asyncio.get_running_loop()
loop.create_task(stop_playback(clear_devices=True))
except RuntimeError:
stop()
return True
@@ -1016,6 +1270,29 @@ async def start(
sequence_id: str,
profile_id: str,
play_options: Optional[Dict[str, Any]] = None,
) -> None:
"""Start immediately, or queue until the next beat / downbeat (``wait_for`` in *play_options*)."""
from models.sequence import Sequence
seq_m = Sequence()
sequence_doc = seq_m.read(sequence_id)
if not sequence_doc or str(sequence_doc.get("profile_id")) != str(profile_id):
raise ValueError("sequence not found")
wait_for = _sequence_switch_wait_from_settings()
if wait_for:
bpm = _coerce_simulated_bpm(sequence_doc, play_options)
_queue_pending_start(
zone_id, sequence_id, profile_id, play_options, wait_for, bpm=bpm
)
return
await _start_immediate(zone_id, sequence_id, profile_id, play_options)
async def _start_immediate(
zone_id: str,
sequence_id: str,
profile_id: str,
play_options: Optional[Dict[str, Any]] = None,
) -> None:
global _beat_run, _sim_beat_task, _sim_beat_token
from models.preset import Preset
@@ -1052,14 +1329,11 @@ async def start(
ctx["zone_id"] = str(zone_id)
ctx["sequence_loop_beat"] = 0
await _deliver_sequence_presets_bulk(ctx)
from util.beat_driver_route import update_beat_route
update_beat_route({"enabled": False})
_reset_beat_side_effects()
await _prime_all_lanes(ctx)
await _deliver_zone_brightness_for_sequence(ctx)
with _beat_run_lock:
_beat_run = ctx
await _send_all_lanes(ctx)
bpm = _coerce_simulated_bpm(sequence_doc, play_options)
loop = asyncio.get_running_loop()