"""Server-side routing of audio beats to LED drivers (no browser required).""" from __future__ import annotations import asyncio import json import os import threading from typing import Any, Dict, List, Optional, Set, Tuple _route_lock = threading.Lock() # Per-lane manual routes: key ``-1`` = legacy single-route (preset push / UI); keys ``0..n`` = # zone sequence lanes so every manual lane gets its own stride counter and wire. _lane_manual: Dict[int, Dict[str, Any]] = {} # Public mirror for ``get_beat_route`` / header UI (derived from lane table). _beat_route: Dict[str, Any] = { "enabled": False, "device_names": [], "wire_preset_id": "2", "is_manual": False, "pattern": "", "manual_beat_n": 1, } _beat_counter: int = 0 _preset_session_beats: int = 0 _main_loop: Optional[asyncio.AbstractEventLoop] = None def set_beat_route_main_loop(loop: asyncio.AbstractEventLoop) -> None: global _main_loop _main_loop = loop def _pick_display_lane_key() -> Optional[int]: """Lane key used for header stride readout (prefer sequence lane 0).""" if not _lane_manual: return None if 0 in _lane_manual: return 0 seq_keys = [k for k in _lane_manual if isinstance(k, int) and k >= 0] if seq_keys: return min(seq_keys) if -1 in _lane_manual: return -1 return min(_lane_manual.keys()) def _sync_public_beat_route_from_lane_table() -> None: """Mirror ``_lane_manual`` into legacy ``_beat_route`` shape for API consumers.""" global _beat_route, _beat_counter pick = _pick_display_lane_key() if pick is None: _beat_route = { "enabled": False, "device_names": [], "wire_preset_id": "2", "is_manual": False, "pattern": "", "manual_beat_n": 1, } _beat_counter = 0 return e = _lane_manual[pick] _beat_route = { "enabled": True, "device_names": list(e.get("device_names") or []), "wire_preset_id": str(e.get("wire_preset_id") or "2"), "is_manual": True, "pattern": str(e.get("pattern") or ""), "manual_beat_n": int(e.get("manual_beat_n") or 1), } _beat_counter = int(e.get("beat_counter", 0)) def update_beat_route(payload: Dict[str, Any]) -> None: """Internal: set or clear routing from explicit fields (tests / future APIs).""" global _lane_manual, _beat_route, _beat_counter, _preset_session_beats if not isinstance(payload, dict): return with _route_lock: if payload.get("enabled") is False: _lane_manual.clear() _beat_route = { **_beat_route, "enabled": False, "is_manual": False, "device_names": [], } _beat_counter = 0 _preset_session_beats = 0 return old = dict(_beat_route) names = payload.get("device_names") if not isinstance(names, list): names = [] try: n_raw = int(payload.get("manual_beat_n", 1)) except (TypeError, ValueError): n_raw = 1 manual_n = max(1, min(64, n_raw)) new_wire = str(payload.get("wire_preset_id") or "2") old_wire = str(old.get("wire_preset_id") or "2") if not old.get("enabled") or old_wire != new_wire: _preset_session_beats = 0 clean_names = [str(n).strip() for n in names if str(n).strip()] _lane_manual.clear() _lane_manual[-1] = { "device_names": clean_names, "wire_preset_id": new_wire, "pattern": str(payload.get("pattern") or "").strip(), "manual_beat_n": manual_n, "beat_counter": 0, } _sync_public_beat_route_from_lane_table() def get_beat_route() -> Dict[str, Any]: with _route_lock: return dict(_beat_route) def manual_beat_stride_status() -> Dict[str, Any]: """Audio-beat stride for a live manual preset (not sequence). For UI readout with BPM. ``beat_in_stride`` is always in ``1..stride_n`` when ``active`` (1-based within the stride). With multiple sequence manual lanes, reflects lane 0 (or the smallest lane index). """ with _route_lock: pick = _pick_display_lane_key() if pick is None or pick not in _lane_manual: wid = str(_beat_route.get("wire_preset_id") or "").strip() return {"active": False, "preset_session_beats": 0, "wire_preset_id": wid} e = _lane_manual[pick] c = int(e.get("beat_counter", 0)) psb = int(_preset_session_beats) wid = str(e.get("wire_preset_id") or "").strip() try: n = int(e.get("manual_beat_n") or 1) except (TypeError, ValueError): n = 1 n = max(1, min(64, n)) if c <= 0: return { "active": True, "beat_in_stride": 1, "stride_n": n, "preset_session_beats": psb, "wire_preset_id": wid, } beat_in_stride = ((c - 1) % n) + 1 return { "active": True, "beat_in_stride": beat_in_stride, "stride_n": n, "preset_session_beats": psb, "wire_preset_id": wid, } def _coerce_manual_beat_n(body: Any) -> int: """Beats between audio-triggered selects (led-controller only); default 1 = every beat.""" if not isinstance(body, dict): return 1 raw = body.get("manual_beat_n") if raw is None: return 1 try: n = int(raw) except (TypeError, ValueError): return 1 return max(1, min(64, n)) def _coerce_auto_from_body(body: Any) -> bool: """Match JS ``coercePresetAuto`` / ``build_preset_dict`` (default: auto-run).""" if not isinstance(body, dict): return True raw = body.get("auto", body.get("a", True)) if isinstance(raw, bool): return raw if raw is None: return True if isinstance(raw, int): return raw != 0 if isinstance(raw, str): lowered = raw.strip().lower() if lowered in ("false", "0", "no", "off"): return False if lowered in ("true", "1", "yes", "on"): return True return True def _registry_names_for_macs(macs: Optional[List[str]]) -> List[str]: """Resolve push ``targets`` MAC list to registry device names (order preserved, de-duplicated).""" if not macs: return [] from models.device import Device, normalize_mac devices = Device() out: List[str] = [] seen: Set[str] = set() for raw in macs: m = normalize_mac(str(raw)) if not m: continue doc = devices.read(m) or {} nm = str(doc.get("name") or "").strip() if nm and nm not in seen: seen.add(nm) out.append(nm) return out def _single_manual_wire_preset( merged_presets: Dict[str, Any], ) -> tuple[Optional[str], Optional[Dict[str, Any]]]: """If exactly one manual (non-auto) preset is present, return its wire id and body.""" manual: List[tuple[str, Dict[str, Any]]] = [] for wid, body in merged_presets.items(): if not isinstance(body, dict): continue if _coerce_auto_from_body(body): continue manual.append((str(wid).strip(), body)) if len(manual) != 1: return None, None return manual[0][0], manual[0][1] def _apply_manual_beat_route( device_names: List[str], wire_preset_id: str, preset_body: Any, ) -> None: """Enable audio→driver routing for one manual preset, or disable if invalid.""" global _lane_manual if not device_names: with _route_lock: _lane_manual.clear() _sync_public_beat_route_from_lane_table() return if not isinstance(preset_body, dict): with _route_lock: _lane_manual.clear() _sync_public_beat_route_from_lane_table() return if _coerce_auto_from_body(preset_body): with _route_lock: _lane_manual.clear() _sync_public_beat_route_from_lane_table() return pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip() if pattern and not _pattern_supports_manual(pattern): with _route_lock: _lane_manual.clear() _sync_public_beat_route_from_lane_table() return names = [str(n).strip() for n in device_names if str(n).strip()] with _route_lock: _lane_manual.clear() _lane_manual[-1] = { "device_names": names, "wire_preset_id": str(wire_preset_id).strip(), "pattern": pattern, "manual_beat_n": _coerce_manual_beat_n(preset_body), "beat_counter": 0, } _sync_public_beat_route_from_lane_table() def set_sequence_manual_lane_route( lane_index: int, device_names: List[str], wire_preset_id: str, preset_body: Any, ) -> None: """Register or update one sequence lane's manual beat route (parallel lanes, independent strides).""" global _lane_manual names = [str(n).strip() for n in (device_names or []) if str(n).strip()] if not names or not isinstance(preset_body, dict) or _coerce_auto_from_body(preset_body): with _route_lock: if lane_index in _lane_manual: del _lane_manual[lane_index] _sync_public_beat_route_from_lane_table() return pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip() if pattern and not _pattern_supports_manual(pattern): with _route_lock: if lane_index in _lane_manual: del _lane_manual[lane_index] _sync_public_beat_route_from_lane_table() return mn = _coerce_manual_beat_n(preset_body) wid = str(wire_preset_id).strip() with _route_lock: old = _lane_manual.get(lane_index) bc = 0 if ( old and str(old.get("wire_preset_id") or "") == wid and int(old.get("manual_beat_n") or 1) == mn and set(old.get("device_names") or []) == set(names) ): bc = int(old.get("beat_counter", 0)) _lane_manual[lane_index] = { "device_names": names, "wire_preset_id": wid, "pattern": pattern, "manual_beat_n": mn, "beat_counter": bc, } _sync_public_beat_route_from_lane_table() def clear_sequence_manual_lane_route(lane_index: int) -> None: """Remove beat routing for one sequence lane (e.g. step switched to auto).""" global _lane_manual with _route_lock: if lane_index in _lane_manual: del _lane_manual[lane_index] _sync_public_beat_route_from_lane_table() def sync_beat_route_from_push_sequence( sequence: List[Any], target_macs: Optional[List[str]] = None, *, preserve_manual_beat_route_on_auto_select: bool = False, ) -> None: """ Update beat routing from a ``/presets/push`` body ``sequence`` (list of v1 dicts). With a ``select`` map: use its keys as device names (existing behaviour). Without ``select`` (e.g. manual preset loaded without immediate select): if ``target_macs`` is set and the merged ``presets`` contain exactly one manual preset, enable routing using registry names for those MACs so the first advance is on the next audio beat. When ``preserve_manual_beat_route_on_auto_select`` is true (zone sequence playback), an auto preset in ``select`` does not clear manual routing — other lanes may still need ``notify_beat_detected`` for manual patterns in parallel. """ merged_presets: Dict[str, Any] = {} last_select: Optional[Dict[str, Any]] = None for item in sequence: if isinstance(item, str): try: item = json.loads(item) except (TypeError, ValueError): continue if not isinstance(item, dict) or item.get("v") != "1": continue pr = item.get("presets") if isinstance(pr, dict): merged_presets.update(pr) sel = item.get("select") if isinstance(sel, dict) and sel: last_select = sel if last_select: device_names = [str(k).strip() for k in last_select.keys() if str(k).strip()] if not device_names: update_beat_route({"enabled": False}) return wire_ids: Set[str] = set() for name in device_names: val = last_select.get(name) if isinstance(val, list) and val: wire_ids.add(str(val[0]).strip()) elif val is not None: wire_ids.add(str(val).strip()) if len(wire_ids) != 1: update_beat_route({"enabled": False}) return wire_preset_id = wire_ids.pop() preset_body = merged_presets.get(wire_preset_id) if preset_body is None: for k, v in merged_presets.items(): if str(k).strip() == wire_preset_id: preset_body = v break if preset_body is None: update_beat_route({"enabled": False}) return if _coerce_auto_from_body(preset_body): if not preserve_manual_beat_route_on_auto_select: update_beat_route({"enabled": False}) return _apply_manual_beat_route(device_names, wire_preset_id, preset_body) return wire_id, body = _single_manual_wire_preset(merged_presets) if wire_id and body is not None: names = _registry_names_for_macs(target_macs) _apply_manual_beat_route(names, wire_id, body) return update_beat_route({"enabled": False}) def _pattern_supports_manual(pattern_key: str) -> bool: if not pattern_key: return True try: here = os.path.dirname(os.path.abspath(__file__)) root = os.path.abspath(os.path.join(here, "..", "..")) path = os.path.join(root, "db", "pattern.json") with open(path, "r", encoding="utf-8") as f: data = json.load(f) meta = data.get(pattern_key) if meta is None: meta = data.get(pattern_key.lower()) if not isinstance(meta, dict): return True return meta.get("supports_manual") is not False except OSError: return True def remap_beat_route_device_name(old_name: str, new_name: str) -> None: """Update cached audio-beat target names after a device registry rename.""" global _lane_manual o = str(old_name or "").strip() n = str(new_name or "").strip() if not o or not n or o == n: return with _route_lock: any_changed = False for e in _lane_manual.values(): names = e.get("device_names") or [] if not isinstance(names, list): continue new_list: List[str] = [] row_changed = False for item in names: if str(item).strip() == o: new_list.append(n) row_changed = True else: new_list.append(str(item)) if row_changed: e["device_names"] = new_list any_changed = True if any_changed: _sync_public_beat_route_from_lane_table() async def _deliver_select(device_names: List[str], wire_preset_id: str) -> None: from models.device import Device from models.device import resolve_device_mac_for_select_routing from models.transport import get_current_sender from util.driver_delivery import deliver_json_messages sender = get_current_sender() if not sender: return devices = Device() seen_macs: List[str] = [] seen_set: Set[str] = set() for n in device_names: mac = resolve_device_mac_for_select_routing(devices, n) if mac and mac not in seen_set: seen_set.add(mac) seen_macs.append(mac) if not seen_macs: return select: Dict[str, Any] = {} for mac in seen_macs: doc = devices.read(mac) or {} nm = str(doc.get("name") or "").strip() if nm: select[nm] = [wire_preset_id] if not select: return msg = json.dumps({"v": "1", "select": select}, separators=(",", ":")) try: await deliver_json_messages(sender, [msg], seen_macs, devices, delay_s=0.05) except Exception as e: print(f"[beat-route] deliver failed: {e}") async def _deliver_select_batch(pairs: List[Tuple[List[str], str]]) -> None: for names, pid in pairs: await _deliver_select(names, pid) def notify_beat_detected() -> None: """Invoked from the audio thread when a beat is detected.""" global _preset_session_beats work: List[Tuple[List[str], str]] = [] with _route_lock: if not _lane_manual: return work = [] for key in sorted(_lane_manual.keys()): e = _lane_manual[key] names = e.get("device_names") or [] if not isinstance(names, list) or not names: continue pattern = str(e.get("pattern") or "") if pattern and not _pattern_supports_manual(pattern): continue try: n = int(e.get("manual_beat_n") or 1) except (TypeError, ValueError): n = 1 n = max(1, min(64, n)) e["beat_counter"] = int(e.get("beat_counter", 0)) + 1 c = int(e["beat_counter"]) if (c - 1) % n != 0: continue work.append((list(names), str(e.get("wire_preset_id") or "2"))) if work: _preset_session_beats += 1 if not work: return loop = _main_loop if loop is None: return try: asyncio.run_coroutine_threadsafe(_deliver_select_batch(work), loop) except Exception as e: print(f"[beat-route] schedule failed: {e}")