"""Persist whether the audio beat detector should be running (survives process restarts).""" from __future__ import annotations import json import os from typing import Any, Dict, Optional def _db_path() -> str: base = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) return os.path.join(base, "db", "audio_run.json") def coerce_audio_device(device: Any) -> Optional[Any]: """Match ``/api/audio/start`` body coercion (None = host default input).""" if device in ("", None): return None try: return int(device) except (TypeError, ValueError): return device def read_audio_run_state() -> Dict[str, Any]: path = _db_path() try: with open(path, "r", encoding="utf-8") as f: raw = json.load(f) except (OSError, json.JSONDecodeError, TypeError): return {"enabled": False, "device": None} if not isinstance(raw, dict): return {"enabled": False, "device": None} enabled = bool(raw.get("enabled")) dev = raw.get("device", None) return {"enabled": enabled, "device": dev} def write_audio_run_state(*, enabled: bool, device: Any = None) -> None: """Write run intent. When ``enabled`` is false, keep ``device`` from the previous file for next start.""" path = _db_path() prev = read_audio_run_state() if enabled: data = {"enabled": True, "device": device} else: data = {"enabled": False, "device": prev.get("device")} try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except OSError as e: print(f"[audio_run_persist] save failed: {e!r}")