feat(ui): pattern modes, bundles, and zone content kind
Add profile/preset/sequence JSON import and export; map preset mode to wire n6 with a mode dropdown for multi-mode patterns; zone edit shows presets or sequences only with content_kind on save; update catalogue and tests for merged pattern names. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -43,6 +43,8 @@ import json
|
||||
import struct
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from util.espnow_message import wire_n6
|
||||
|
||||
BINARY_ENVELOPE_VERSION_1 = 1
|
||||
BINARY_ENVELOPE_VERSION_2 = 2
|
||||
HEADER_LEN = 5
|
||||
@@ -108,7 +110,7 @@ def _pack_preset_dict(name: str, preset: Dict[str, Any]) -> bytes:
|
||||
n3 = _clamp_i16(preset.get("n3", 0))
|
||||
n4 = _clamp_i16(preset.get("n4", 0))
|
||||
n5 = _clamp_i16(preset.get("n5", 0))
|
||||
n6 = _clamp_i16(preset.get("n6", 0))
|
||||
n6 = _clamp_i16(wire_n6(preset))
|
||||
parts.append(
|
||||
struct.pack(
|
||||
"<HBBhhhhhh",
|
||||
|
||||
@@ -113,6 +113,21 @@ def resolve_preset_background_hex(preset_data, palette_colors=None):
|
||||
return _hex_from_background_raw(bg_raw)
|
||||
|
||||
|
||||
def wire_n6(preset_data, default=0):
|
||||
"""Resolve style mode for the wire (``n6``); preset may store ``mode`` or ``n6``."""
|
||||
if not isinstance(preset_data, dict):
|
||||
return default
|
||||
if preset_data.get("mode") is not None:
|
||||
try:
|
||||
return max(0, int(preset_data["mode"]))
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
return max(0, int(preset_data.get("n6", default) or 0))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def build_preset_dict(preset_data, palette_colors=None):
|
||||
"""
|
||||
Convert preset data to API-compliant format.
|
||||
@@ -188,7 +203,7 @@ def build_preset_dict(preset_data, palette_colors=None):
|
||||
"n3": preset_data.get("n3", 0),
|
||||
"n4": preset_data.get("n4", 0),
|
||||
"n5": preset_data.get("n5", 0),
|
||||
"n6": preset_data.get("n6", 0)
|
||||
"n6": wire_n6(preset_data),
|
||||
}
|
||||
|
||||
return preset
|
||||
|
||||
441
src/util/profile_bundle.py
Normal file
441
src/util/profile_bundle.py
Normal file
@@ -0,0 +1,441 @@
|
||||
"""Export/import profile bundles (profile, zones, presets, sequences, palette)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
BUNDLE_VERSION = 1
|
||||
KIND_PROFILE = "profile"
|
||||
KIND_PRESET = "preset"
|
||||
KIND_SEQUENCE = "sequence"
|
||||
|
||||
|
||||
def _allocate_id(model, cache: Dict[str, int]) -> str:
|
||||
if "next" not in cache:
|
||||
max_id = max((int(k) for k in model.keys() if str(k).isdigit()), default=0)
|
||||
cache["next"] = max_id + 1
|
||||
next_id = str(cache["next"])
|
||||
cache["next"] += 1
|
||||
return next_id
|
||||
|
||||
|
||||
def _palette_colors(palette_model, palette_id) -> List:
|
||||
if not palette_id:
|
||||
return []
|
||||
try:
|
||||
colors = palette_model.read(str(palette_id))
|
||||
except Exception:
|
||||
colors = None
|
||||
if isinstance(colors, list):
|
||||
return list(colors)
|
||||
if isinstance(colors, dict) and isinstance(colors.get("colors"), list):
|
||||
return list(colors["colors"])
|
||||
return []
|
||||
|
||||
|
||||
def _walk_preset_refs(value, out: Set[str]) -> None:
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
_walk_preset_refs(item, out)
|
||||
elif value is not None and value != "":
|
||||
out.add(str(value))
|
||||
|
||||
|
||||
def _preset_ids_in_zone(zone: Dict[str, Any]) -> Set[str]:
|
||||
ids: Set[str] = set()
|
||||
if not isinstance(zone, dict):
|
||||
return ids
|
||||
_walk_preset_refs(zone.get("presets"), ids)
|
||||
_walk_preset_refs(zone.get("presets_flat"), ids)
|
||||
if zone.get("default_preset") not in (None, ""):
|
||||
ids.add(str(zone["default_preset"]))
|
||||
return ids
|
||||
|
||||
|
||||
def _preset_ids_in_sequence(seq: Dict[str, Any]) -> Set[str]:
|
||||
ids: Set[str] = set()
|
||||
if not isinstance(seq, dict):
|
||||
return ids
|
||||
for lane in seq.get("lanes") or []:
|
||||
if not isinstance(lane, list):
|
||||
continue
|
||||
for step in lane:
|
||||
if isinstance(step, dict) and step.get("preset_id") not in (None, ""):
|
||||
ids.add(str(step["preset_id"]))
|
||||
for step in seq.get("steps") or []:
|
||||
if isinstance(step, dict) and step.get("preset_id") not in (None, ""):
|
||||
ids.add(str(step["preset_id"]))
|
||||
return ids
|
||||
|
||||
|
||||
def _map_preset_container(
|
||||
value,
|
||||
id_map: Dict[str, str],
|
||||
preset_cache: Dict[str, int],
|
||||
new_profile_id: str,
|
||||
new_presets: Dict[str, Dict[str, Any]],
|
||||
presets_model,
|
||||
) -> Any:
|
||||
if isinstance(value, list):
|
||||
return [
|
||||
_map_preset_container(v, id_map, preset_cache, new_profile_id, new_presets, presets_model)
|
||||
for v in value
|
||||
]
|
||||
if value is None:
|
||||
return None
|
||||
preset_id = str(value)
|
||||
if preset_id in id_map:
|
||||
return id_map[preset_id]
|
||||
preset_data = presets_model.read(preset_id)
|
||||
if not preset_data:
|
||||
return None
|
||||
new_preset_id = _allocate_id(presets_model, preset_cache)
|
||||
clone_data = dict(preset_data)
|
||||
clone_data["profile_id"] = str(new_profile_id)
|
||||
new_presets[new_preset_id] = clone_data
|
||||
id_map[preset_id] = new_preset_id
|
||||
return new_preset_id
|
||||
|
||||
|
||||
def _map_sequence_lanes(
|
||||
seq: Dict[str, Any],
|
||||
preset_id_map: Dict[str, str],
|
||||
) -> Dict[str, Any]:
|
||||
out = copy.deepcopy(seq)
|
||||
lanes = out.get("lanes")
|
||||
if isinstance(lanes, list):
|
||||
for lane in lanes:
|
||||
if not isinstance(lane, list):
|
||||
continue
|
||||
for step in lane:
|
||||
if not isinstance(step, dict):
|
||||
continue
|
||||
pid = step.get("preset_id")
|
||||
if pid is not None and str(pid) in preset_id_map:
|
||||
step["preset_id"] = preset_id_map[str(pid)]
|
||||
steps = out.get("steps")
|
||||
if isinstance(steps, list):
|
||||
for step in steps:
|
||||
if not isinstance(step, dict):
|
||||
continue
|
||||
pid = step.get("preset_id")
|
||||
if pid is not None and str(pid) in preset_id_map:
|
||||
step["preset_id"] = preset_id_map[str(pid)]
|
||||
return out
|
||||
|
||||
|
||||
def export_profile_bundle(
|
||||
profile_id: str,
|
||||
profiles_model,
|
||||
zones_model,
|
||||
presets_model,
|
||||
sequences_model,
|
||||
palette_model,
|
||||
) -> Dict[str, Any]:
|
||||
source = profiles_model.read(profile_id)
|
||||
if not source:
|
||||
raise ValueError("Profile not found")
|
||||
|
||||
zone_ids = source.get("zones")
|
||||
if not isinstance(zone_ids, list) or not zone_ids:
|
||||
zone_ids = source.get("zone_order") or []
|
||||
zone_ids = [str(z) for z in zone_ids if z is not None]
|
||||
|
||||
zones_out: Dict[str, Any] = {}
|
||||
preset_ids: Set[str] = set()
|
||||
sequence_ids: Set[str] = set()
|
||||
|
||||
for zid in zone_ids:
|
||||
zone = zones_model.read(zid)
|
||||
if not zone:
|
||||
continue
|
||||
zones_out[zid] = copy.deepcopy(zone)
|
||||
preset_ids |= _preset_ids_in_zone(zone)
|
||||
for sid in zone.get("sequence_ids") or []:
|
||||
if sid is not None and str(sid).strip():
|
||||
sequence_ids.add(str(sid))
|
||||
|
||||
sequences_out: Dict[str, Any] = {}
|
||||
for sid in sequence_ids:
|
||||
seq = sequences_model.read(sid)
|
||||
if not seq or str(seq.get("profile_id")) != str(profile_id):
|
||||
continue
|
||||
sequences_out[sid] = copy.deepcopy(seq)
|
||||
preset_ids |= _preset_ids_in_sequence(seq)
|
||||
|
||||
presets_out: Dict[str, Any] = {}
|
||||
for pid in preset_ids:
|
||||
pdata = presets_model.read(pid)
|
||||
if pdata and str(pdata.get("profile_id")) == str(profile_id):
|
||||
presets_out[pid] = copy.deepcopy(pdata)
|
||||
|
||||
profile_doc = copy.deepcopy(source)
|
||||
profile_doc.pop("palette", None)
|
||||
|
||||
return {
|
||||
"version": BUNDLE_VERSION,
|
||||
"kind": KIND_PROFILE,
|
||||
"profile": profile_doc,
|
||||
"palette": {"colors": _palette_colors(palette_model, source.get("palette_id"))},
|
||||
"zones": zones_out,
|
||||
"presets": presets_out,
|
||||
"sequences": sequences_out,
|
||||
}
|
||||
|
||||
|
||||
def import_profile_bundle(
|
||||
bundle: Dict[str, Any],
|
||||
profiles_model,
|
||||
zones_model,
|
||||
presets_model,
|
||||
sequences_model,
|
||||
palette_model,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
) -> Tuple[str, Dict[str, Any]]:
|
||||
if not isinstance(bundle, dict):
|
||||
raise ValueError("Invalid bundle")
|
||||
if bundle.get("version") not in (BUNDLE_VERSION, str(BUNDLE_VERSION)):
|
||||
raise ValueError("Unsupported bundle version")
|
||||
if bundle.get("kind") not in (KIND_PROFILE, None):
|
||||
raise ValueError("Not a profile bundle")
|
||||
|
||||
source_profile = bundle.get("profile")
|
||||
if not isinstance(source_profile, dict):
|
||||
raise ValueError("Bundle missing profile")
|
||||
|
||||
source_name = source_profile.get("name") or "Imported profile"
|
||||
new_name = (name or source_name).strip() or source_name
|
||||
profile_type = source_profile.get("type", "zones")
|
||||
|
||||
profile_cache: Dict[str, int] = {}
|
||||
palette_cache: Dict[str, int] = {}
|
||||
zone_cache: Dict[str, int] = {}
|
||||
preset_cache: Dict[str, int] = {}
|
||||
sequence_cache: Dict[str, int] = {}
|
||||
|
||||
new_profile_id = _allocate_id(profiles_model, profile_cache)
|
||||
new_palette_id = _allocate_id(palette_model, palette_cache)
|
||||
|
||||
palette_in = bundle.get("palette") or {}
|
||||
palette_colors = palette_in.get("colors") if isinstance(palette_in, dict) else []
|
||||
if not isinstance(palette_colors, list):
|
||||
palette_colors = []
|
||||
|
||||
preset_id_map: Dict[str, str] = {}
|
||||
new_presets: Dict[str, Dict[str, Any]] = {}
|
||||
new_zones: Dict[str, Dict[str, Any]] = {}
|
||||
new_sequences: Dict[str, Dict[str, Any]] = {}
|
||||
sequence_id_map: Dict[str, str] = {}
|
||||
|
||||
zones_in = bundle.get("zones") if isinstance(bundle.get("zones"), dict) else {}
|
||||
presets_in = bundle.get("presets") if isinstance(bundle.get("presets"), dict) else {}
|
||||
sequences_in = bundle.get("sequences") if isinstance(bundle.get("sequences"), dict) else {}
|
||||
|
||||
for old_pid, pdata in presets_in.items():
|
||||
if not isinstance(pdata, dict):
|
||||
continue
|
||||
new_pid = _allocate_id(presets_model, preset_cache)
|
||||
clone = copy.deepcopy(pdata)
|
||||
clone["profile_id"] = str(new_profile_id)
|
||||
new_presets[new_pid] = clone
|
||||
preset_id_map[str(old_pid)] = new_pid
|
||||
|
||||
for old_sid, sdata in sequences_in.items():
|
||||
if not isinstance(sdata, dict):
|
||||
continue
|
||||
new_sid = _allocate_id(sequences_model, sequence_cache)
|
||||
clone = _map_sequence_lanes(sdata, preset_id_map)
|
||||
clone["profile_id"] = str(new_profile_id)
|
||||
new_sequences[new_sid] = clone
|
||||
sequence_id_map[str(old_sid)] = new_sid
|
||||
|
||||
source_zone_order = source_profile.get("zones")
|
||||
if not isinstance(source_zone_order, list):
|
||||
source_zone_order = list(zones_in.keys())
|
||||
|
||||
cloned_zone_ids: List[str] = []
|
||||
for old_zid in source_zone_order:
|
||||
zone = zones_in.get(str(old_zid))
|
||||
if not isinstance(zone, dict):
|
||||
continue
|
||||
new_zid = _allocate_id(zones_model, zone_cache)
|
||||
clone_data: Dict[str, Any] = {
|
||||
"name": zone.get("name") or f"Zone {old_zid}",
|
||||
"names": list(zone.get("names") or []),
|
||||
}
|
||||
mapped_presets = _map_preset_container(
|
||||
zone.get("presets"),
|
||||
preset_id_map,
|
||||
preset_cache,
|
||||
new_profile_id,
|
||||
new_presets,
|
||||
presets_model,
|
||||
)
|
||||
if mapped_presets is not None:
|
||||
clone_data["presets"] = mapped_presets
|
||||
extra = {
|
||||
k: v
|
||||
for k, v in zone.items()
|
||||
if k not in ("name", "names", "presets")
|
||||
}
|
||||
if "presets_flat" in extra:
|
||||
extra["presets_flat"] = _map_preset_container(
|
||||
extra.get("presets_flat"),
|
||||
preset_id_map,
|
||||
preset_cache,
|
||||
new_profile_id,
|
||||
new_presets,
|
||||
presets_model,
|
||||
)
|
||||
if "default_preset" in extra and extra["default_preset"] is not None:
|
||||
old_dp = str(extra["default_preset"])
|
||||
if old_dp in preset_id_map:
|
||||
extra["default_preset"] = preset_id_map[old_dp]
|
||||
if "sequence_ids" in extra and isinstance(extra.get("sequence_ids"), list):
|
||||
extra["sequence_ids"] = [
|
||||
sequence_id_map.get(str(s), str(s))
|
||||
for s in extra["sequence_ids"]
|
||||
if s is not None
|
||||
]
|
||||
clone_data.update(extra)
|
||||
new_zones[new_zid] = clone_data
|
||||
cloned_zone_ids.append(new_zid)
|
||||
|
||||
new_profile_data = {
|
||||
"name": new_name,
|
||||
"type": profile_type,
|
||||
"zones": cloned_zone_ids,
|
||||
"scenes": list(source_profile.get("scenes", []))
|
||||
if isinstance(source_profile.get("scenes"), list)
|
||||
else [],
|
||||
"palette_id": str(new_palette_id),
|
||||
}
|
||||
|
||||
palette_model[str(new_palette_id)] = list(palette_colors)
|
||||
for pid, pdata in new_presets.items():
|
||||
presets_model[pid] = pdata
|
||||
for zid, zdata in new_zones.items():
|
||||
zones_model[zid] = zdata
|
||||
for sid, sdata in new_sequences.items():
|
||||
sequences_model[sid] = sdata
|
||||
profiles_model[str(new_profile_id)] = new_profile_data
|
||||
|
||||
palette_model.save()
|
||||
presets_model.save()
|
||||
zones_model.save()
|
||||
sequences_model.save()
|
||||
profiles_model.save()
|
||||
|
||||
return str(new_profile_id), new_profile_data
|
||||
|
||||
|
||||
def export_preset_bundle(preset_id: str, presets_model) -> Dict[str, Any]:
|
||||
preset = presets_model.read(preset_id)
|
||||
if not preset:
|
||||
raise ValueError("Preset not found")
|
||||
return {
|
||||
"version": BUNDLE_VERSION,
|
||||
"kind": KIND_PRESET,
|
||||
"preset": copy.deepcopy(preset),
|
||||
}
|
||||
|
||||
|
||||
def import_preset_bundle(
|
||||
bundle: Dict[str, Any],
|
||||
presets_model,
|
||||
profile_id: str,
|
||||
) -> Tuple[str, Dict[str, Any]]:
|
||||
if not isinstance(bundle, dict):
|
||||
raise ValueError("Invalid bundle")
|
||||
if bundle.get("kind") != KIND_PRESET:
|
||||
raise ValueError("Not a preset bundle")
|
||||
preset = bundle.get("preset")
|
||||
if not isinstance(preset, dict):
|
||||
raise ValueError("Bundle missing preset")
|
||||
new_id = presets_model.create(profile_id)
|
||||
data = copy.deepcopy(preset)
|
||||
data["profile_id"] = str(profile_id)
|
||||
presets_model.update(new_id, data)
|
||||
return str(new_id), presets_model.read(new_id) or data
|
||||
|
||||
|
||||
def export_sequence_bundle(
|
||||
sequence_id: str,
|
||||
sequences_model,
|
||||
presets_model,
|
||||
*,
|
||||
profile_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
seq = sequences_model.read(sequence_id)
|
||||
if not seq:
|
||||
raise ValueError("Sequence not found")
|
||||
if profile_id is not None and str(seq.get("profile_id")) != str(profile_id):
|
||||
raise ValueError("Sequence not found")
|
||||
|
||||
pid = str(profile_id or seq.get("profile_id") or "")
|
||||
preset_ids = _preset_ids_in_sequence(seq)
|
||||
presets_out: Dict[str, Any] = {}
|
||||
for old_pid in preset_ids:
|
||||
pdata = presets_model.read(old_pid)
|
||||
if pdata and (not pid or str(pdata.get("profile_id")) == pid):
|
||||
presets_out[old_pid] = copy.deepcopy(pdata)
|
||||
|
||||
return {
|
||||
"version": BUNDLE_VERSION,
|
||||
"kind": KIND_SEQUENCE,
|
||||
"sequence": copy.deepcopy(seq),
|
||||
"presets": presets_out,
|
||||
}
|
||||
|
||||
|
||||
def import_sequence_bundle(
|
||||
bundle: Dict[str, Any],
|
||||
sequences_model,
|
||||
presets_model,
|
||||
profile_id: str,
|
||||
) -> Tuple[str, Dict[str, Any]]:
|
||||
if not isinstance(bundle, dict):
|
||||
raise ValueError("Invalid bundle")
|
||||
if bundle.get("kind") != KIND_SEQUENCE:
|
||||
raise ValueError("Not a sequence bundle")
|
||||
seq = bundle.get("sequence")
|
||||
if not isinstance(seq, dict):
|
||||
raise ValueError("Bundle missing sequence")
|
||||
|
||||
preset_cache: Dict[str, int] = {}
|
||||
preset_id_map: Dict[str, str] = {}
|
||||
new_presets: Dict[str, Dict[str, Any]] = {}
|
||||
presets_in = bundle.get("presets") if isinstance(bundle.get("presets"), dict) else {}
|
||||
|
||||
for old_pid, pdata in presets_in.items():
|
||||
if not isinstance(pdata, dict):
|
||||
continue
|
||||
new_pid = _allocate_id(presets_model, preset_cache)
|
||||
clone = copy.deepcopy(pdata)
|
||||
clone["profile_id"] = str(profile_id)
|
||||
new_presets[new_pid] = clone
|
||||
preset_id_map[str(old_pid)] = new_pid
|
||||
|
||||
for old_pid in _preset_ids_in_sequence(seq):
|
||||
op = str(old_pid)
|
||||
if op not in preset_id_map:
|
||||
pdata = presets_model.read(op)
|
||||
if pdata:
|
||||
new_pid = _allocate_id(presets_model, preset_cache)
|
||||
clone = copy.deepcopy(pdata)
|
||||
clone["profile_id"] = str(profile_id)
|
||||
new_presets[new_pid] = clone
|
||||
preset_id_map[op] = new_pid
|
||||
|
||||
for pid, pdata in new_presets.items():
|
||||
presets_model[pid] = pdata
|
||||
if new_presets:
|
||||
presets_model.save()
|
||||
|
||||
new_seq_id = sequences_model.create(profile_id)
|
||||
mapped = _map_sequence_lanes(seq, preset_id_map)
|
||||
mapped["profile_id"] = str(profile_id)
|
||||
sequences_model.update(new_seq_id, mapped)
|
||||
return str(new_seq_id), sequences_model.read(new_seq_id) or mapped
|
||||
Reference in New Issue
Block a user