chore(release): beta-1.03
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -94,12 +94,85 @@ def _coerce_auto_from_body(body: Any) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def sync_beat_route_from_push_sequence(sequence: List[Any]) -> None:
|
||||
def _registry_names_for_macs(macs: Optional[List[str]]) -> List[str]:
|
||||
"""Resolve push ``targets`` MAC list to registry device names (order preserved, de-duplicated)."""
|
||||
if not macs:
|
||||
return []
|
||||
from models.device import Device, normalize_mac
|
||||
|
||||
devices = Device()
|
||||
out: List[str] = []
|
||||
seen: Set[str] = set()
|
||||
for raw in macs:
|
||||
m = normalize_mac(str(raw))
|
||||
if not m:
|
||||
continue
|
||||
doc = devices.read(m) or {}
|
||||
nm = str(doc.get("name") or "").strip()
|
||||
if nm and nm not in seen:
|
||||
seen.add(nm)
|
||||
out.append(nm)
|
||||
return out
|
||||
|
||||
|
||||
def _single_manual_wire_preset(
|
||||
merged_presets: Dict[str, Any],
|
||||
) -> tuple[Optional[str], Optional[Dict[str, Any]]]:
|
||||
"""If exactly one manual (non-auto) preset is present, return its wire id and body."""
|
||||
manual: List[tuple[str, Dict[str, Any]]] = []
|
||||
for wid, body in merged_presets.items():
|
||||
if not isinstance(body, dict):
|
||||
continue
|
||||
if _coerce_auto_from_body(body):
|
||||
continue
|
||||
manual.append((str(wid).strip(), body))
|
||||
if len(manual) != 1:
|
||||
return None, None
|
||||
return manual[0][0], manual[0][1]
|
||||
|
||||
|
||||
def _apply_manual_beat_route(
|
||||
device_names: List[str],
|
||||
wire_preset_id: str,
|
||||
preset_body: Any,
|
||||
) -> None:
|
||||
"""Enable audio→driver routing for one manual preset, or disable if invalid."""
|
||||
if not device_names:
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
if not isinstance(preset_body, dict):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
if _coerce_auto_from_body(preset_body):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip()
|
||||
if pattern and not _pattern_supports_manual(pattern):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
update_beat_route(
|
||||
{
|
||||
"enabled": True,
|
||||
"device_names": device_names,
|
||||
"wire_preset_id": wire_preset_id,
|
||||
"is_manual": True,
|
||||
"pattern": pattern,
|
||||
"manual_beat_n": _coerce_manual_beat_n(preset_body),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def sync_beat_route_from_push_sequence(
|
||||
sequence: List[Any], target_macs: Optional[List[str]] = None
|
||||
) -> None:
|
||||
"""
|
||||
Update beat routing from a ``/presets/push`` body ``sequence`` (list of v1 dicts).
|
||||
|
||||
When the batch includes a ``select`` and preset bodies, and the selected preset is
|
||||
manual (auto off), enables the route; otherwise disables it.
|
||||
With a ``select`` map: use its keys as device names (existing behaviour).
|
||||
|
||||
Without ``select`` (e.g. manual preset loaded without immediate select): if ``target_macs``
|
||||
is set and the merged ``presets`` contain exactly one manual preset, enable routing using
|
||||
registry names for those MACs so the first advance is on the next audio beat.
|
||||
"""
|
||||
merged_presets: Dict[str, Any] = {}
|
||||
last_select: Optional[Dict[str, Any]] = None
|
||||
@@ -117,54 +190,40 @@ def sync_beat_route_from_push_sequence(sequence: List[Any]) -> None:
|
||||
sel = item.get("select")
|
||||
if isinstance(sel, dict) and sel:
|
||||
last_select = sel
|
||||
if not last_select:
|
||||
|
||||
if last_select:
|
||||
device_names = [str(k).strip() for k in last_select.keys() if str(k).strip()]
|
||||
if not device_names:
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
|
||||
wire_ids: Set[str] = set()
|
||||
for name in device_names:
|
||||
val = last_select.get(name)
|
||||
if isinstance(val, list) and val:
|
||||
wire_ids.add(str(val[0]).strip())
|
||||
elif val is not None:
|
||||
wire_ids.add(str(val).strip())
|
||||
if len(wire_ids) != 1:
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
wire_preset_id = wire_ids.pop()
|
||||
preset_body = merged_presets.get(wire_preset_id)
|
||||
if preset_body is None:
|
||||
for k, v in merged_presets.items():
|
||||
if str(k).strip() == wire_preset_id:
|
||||
preset_body = v
|
||||
break
|
||||
_apply_manual_beat_route(device_names, wire_preset_id, preset_body)
|
||||
return
|
||||
|
||||
device_names = [str(k).strip() for k in last_select.keys() if str(k).strip()]
|
||||
if not device_names:
|
||||
update_beat_route({"enabled": False})
|
||||
wire_id, body = _single_manual_wire_preset(merged_presets)
|
||||
if wire_id and body is not None:
|
||||
names = _registry_names_for_macs(target_macs)
|
||||
_apply_manual_beat_route(names, wire_id, body)
|
||||
return
|
||||
|
||||
wire_ids: Set[str] = set()
|
||||
for name in device_names:
|
||||
val = last_select.get(name)
|
||||
if isinstance(val, list) and val:
|
||||
wire_ids.add(str(val[0]).strip())
|
||||
elif val is not None:
|
||||
wire_ids.add(str(val).strip())
|
||||
if len(wire_ids) != 1:
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
wire_preset_id = wire_ids.pop()
|
||||
preset_body = merged_presets.get(wire_preset_id)
|
||||
if preset_body is None:
|
||||
for k, v in merged_presets.items():
|
||||
if str(k).strip() == wire_preset_id:
|
||||
preset_body = v
|
||||
break
|
||||
if not isinstance(preset_body, dict):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
|
||||
if _coerce_auto_from_body(preset_body):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
|
||||
pattern = str(preset_body.get("pattern") or preset_body.get("p") or "").strip()
|
||||
if pattern and not _pattern_supports_manual(pattern):
|
||||
update_beat_route({"enabled": False})
|
||||
return
|
||||
|
||||
update_beat_route(
|
||||
{
|
||||
"enabled": True,
|
||||
"device_names": device_names,
|
||||
"wire_preset_id": wire_preset_id,
|
||||
"is_manual": True,
|
||||
"pattern": pattern,
|
||||
"manual_beat_n": _coerce_manual_beat_n(preset_body),
|
||||
}
|
||||
)
|
||||
update_beat_route({"enabled": False})
|
||||
|
||||
|
||||
def _pattern_supports_manual(pattern_key: str) -> bool:
|
||||
@@ -186,45 +245,59 @@ def _pattern_supports_manual(pattern_key: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _macs_for_registry_names(device_names: List[str]) -> List[str]:
|
||||
from models.device import Device
|
||||
|
||||
want = {str(n).strip() for n in device_names if str(n).strip()}
|
||||
if not want:
|
||||
return []
|
||||
devices = Device()
|
||||
macs: List[str] = []
|
||||
seen = set()
|
||||
for did in devices.list():
|
||||
doc = devices.read(did) or {}
|
||||
nm = str(doc.get("name") or "").strip()
|
||||
if nm not in want:
|
||||
continue
|
||||
key = str(did).strip().lower().replace(":", "").replace("-", "")
|
||||
if len(key) == 12 and key not in seen:
|
||||
seen.add(key)
|
||||
macs.append(key)
|
||||
return macs
|
||||
def remap_beat_route_device_name(old_name: str, new_name: str) -> None:
|
||||
"""Update cached audio-beat target names after a device registry rename."""
|
||||
global _beat_route
|
||||
o = str(old_name or "").strip()
|
||||
n = str(new_name or "").strip()
|
||||
if not o or not n or o == n:
|
||||
return
|
||||
with _route_lock:
|
||||
if not _beat_route.get("enabled"):
|
||||
return
|
||||
names = _beat_route.get("device_names") or []
|
||||
new_list: List[str] = []
|
||||
changed = False
|
||||
for item in names:
|
||||
if str(item).strip() == o:
|
||||
new_list.append(n)
|
||||
changed = True
|
||||
else:
|
||||
new_list.append(str(item))
|
||||
if changed:
|
||||
_beat_route = {**_beat_route, "device_names": new_list}
|
||||
|
||||
|
||||
async def _deliver_select(device_names: List[str], wire_preset_id: str) -> None:
|
||||
from models.device import Device
|
||||
from models.device import resolve_device_mac_for_select_routing
|
||||
from models.transport import get_current_sender
|
||||
from util.driver_delivery import deliver_json_messages
|
||||
|
||||
sender = get_current_sender()
|
||||
if not sender:
|
||||
return
|
||||
select = {str(n).strip(): [wire_preset_id] for n in device_names if str(n).strip()}
|
||||
devices = Device()
|
||||
seen_macs: List[str] = []
|
||||
seen_set: Set[str] = set()
|
||||
for n in device_names:
|
||||
mac = resolve_device_mac_for_select_routing(devices, n)
|
||||
if mac and mac not in seen_set:
|
||||
seen_set.add(mac)
|
||||
seen_macs.append(mac)
|
||||
if not seen_macs:
|
||||
return
|
||||
select: Dict[str, Any] = {}
|
||||
for mac in seen_macs:
|
||||
doc = devices.read(mac) or {}
|
||||
nm = str(doc.get("name") or "").strip()
|
||||
if nm:
|
||||
select[nm] = [wire_preset_id]
|
||||
if not select:
|
||||
return
|
||||
msg = json.dumps({"v": "1", "select": select}, separators=(",", ":"))
|
||||
macs = _macs_for_registry_names(list(select.keys()))
|
||||
if not macs:
|
||||
return
|
||||
devices = Device()
|
||||
try:
|
||||
await deliver_json_messages(sender, [msg], macs, devices, delay_s=0.05)
|
||||
await deliver_json_messages(sender, [msg], seen_macs, devices, delay_s=0.05)
|
||||
except Exception as e:
|
||||
print(f"[beat-route] deliver failed: {e}")
|
||||
|
||||
|
||||
86
src/util/brightness_combine.py
Normal file
86
src/util/brightness_combine.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""
|
||||
Combine global, group, device (and optional zone) brightness into one 0–255 wire value.
|
||||
|
||||
Formula: ``(f1/255) * (f2/255) * ... * (fn/255) * 255`` with integer rounding — one ``b`` sent to the driver.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from models.device import normalize_mac
|
||||
|
||||
|
||||
def clamp255(value) -> int:
|
||||
try:
|
||||
v = int(value)
|
||||
except (TypeError, ValueError):
|
||||
return 255
|
||||
return max(0, min(255, v))
|
||||
|
||||
|
||||
def multiply_brightness_factors(factors: list) -> int:
|
||||
"""Product ``(f1/255)*(f2/255)*...*255``; each factor clamped to 0..255."""
|
||||
if not factors:
|
||||
return 255
|
||||
fs = [clamp255(f) for f in factors]
|
||||
if len(fs) == 1:
|
||||
return fs[0]
|
||||
num = 1
|
||||
for f in fs:
|
||||
num *= f
|
||||
den = 255 ** (len(fs) - 1)
|
||||
return max(0, min(255, (num + den // 2) // den))
|
||||
|
||||
|
||||
def _mac_in_device_list(raw_list, target_mac: str) -> bool:
|
||||
tm = normalize_mac(target_mac)
|
||||
if not tm:
|
||||
return False
|
||||
if not isinstance(raw_list, list):
|
||||
return False
|
||||
for raw in raw_list:
|
||||
if normalize_mac(str(raw)) == tm:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def effective_brightness_for_mac(
|
||||
settings_obj,
|
||||
groups_model,
|
||||
devices_model,
|
||||
mac: str,
|
||||
*,
|
||||
zone_brightness=None,
|
||||
) -> int:
|
||||
"""
|
||||
Factors (each 0..255): Pi **global_brightness**, each group's **output_brightness**
|
||||
(neutral 255 if the device is in no group), device **output_brightness** (default 255),
|
||||
optional **zone_brightness** from the zone slider when applying live.
|
||||
"""
|
||||
m = normalize_mac(mac)
|
||||
if not m:
|
||||
return 255
|
||||
|
||||
g_global = clamp255(settings_obj.get("global_brightness", 255))
|
||||
|
||||
dev_doc = devices_model.read(m)
|
||||
if dev_doc is not None and dev_doc.get("output_brightness") is not None:
|
||||
d_b = clamp255(dev_doc.get("output_brightness"))
|
||||
else:
|
||||
d_b = 255
|
||||
|
||||
group_factors = []
|
||||
for _gid, gdoc in groups_model.items():
|
||||
if not isinstance(gdoc, dict):
|
||||
continue
|
||||
if not _mac_in_device_list(gdoc.get("devices"), m):
|
||||
continue
|
||||
group_factors.append(clamp255(gdoc.get("output_brightness", 255)))
|
||||
|
||||
if not group_factors:
|
||||
group_factors = [255]
|
||||
|
||||
factors = [g_global, *group_factors, d_b]
|
||||
if zone_brightness is not None:
|
||||
factors.append(clamp255(zone_brightness))
|
||||
|
||||
return multiply_brightness_factors(factors)
|
||||
Reference in New Issue
Block a user