"""Export/import profile bundles (profile, zones, presets, sequences, palette).""" from __future__ import annotations import copy from typing import Any, Callable, Dict, List, Optional, Set, Tuple BUNDLE_VERSION = 1 KIND_PROFILE = "profile" KIND_PRESET = "preset" KIND_SEQUENCE = "sequence" def _allocate_id(model, cache: Dict[str, int]) -> str: if "next" not in cache: max_id = max((int(k) for k in model.keys() if str(k).isdigit()), default=0) cache["next"] = max_id + 1 next_id = str(cache["next"]) cache["next"] += 1 return next_id def _palette_colors(palette_model, palette_id) -> List: if not palette_id: return [] try: colors = palette_model.read(str(palette_id)) except Exception: colors = None if isinstance(colors, list): return list(colors) if isinstance(colors, dict) and isinstance(colors.get("colors"), list): return list(colors["colors"]) return [] def _walk_preset_refs(value, out: Set[str]) -> None: if isinstance(value, list): for item in value: _walk_preset_refs(item, out) elif value is not None and value != "": out.add(str(value)) def _preset_ids_in_zone(zone: Dict[str, Any]) -> Set[str]: ids: Set[str] = set() if not isinstance(zone, dict): return ids _walk_preset_refs(zone.get("presets"), ids) _walk_preset_refs(zone.get("presets_flat"), ids) if zone.get("default_preset") not in (None, ""): ids.add(str(zone["default_preset"])) return ids def _preset_ids_in_sequence(seq: Dict[str, Any]) -> Set[str]: ids: Set[str] = set() if not isinstance(seq, dict): return ids for lane in seq.get("lanes") or []: if not isinstance(lane, list): continue for step in lane: if isinstance(step, dict) and step.get("preset_id") not in (None, ""): ids.add(str(step["preset_id"])) for step in seq.get("steps") or []: if isinstance(step, dict) and step.get("preset_id") not in (None, ""): ids.add(str(step["preset_id"])) return ids def _map_preset_container( value, id_map: Dict[str, str], preset_cache: Dict[str, int], new_profile_id: str, new_presets: Dict[str, Dict[str, Any]], presets_model, ) -> Any: if isinstance(value, list): return [ _map_preset_container(v, id_map, preset_cache, new_profile_id, new_presets, presets_model) for v in value ] if value is None: return None preset_id = str(value) if preset_id in id_map: return id_map[preset_id] preset_data = presets_model.read(preset_id) if not preset_data: return None new_preset_id = _allocate_id(presets_model, preset_cache) clone_data = dict(preset_data) clone_data["profile_id"] = str(new_profile_id) new_presets[new_preset_id] = clone_data id_map[preset_id] = new_preset_id return new_preset_id def _map_sequence_lanes( seq: Dict[str, Any], preset_id_map: Dict[str, str], ) -> Dict[str, Any]: out = copy.deepcopy(seq) lanes = out.get("lanes") if isinstance(lanes, list): for lane in lanes: if not isinstance(lane, list): continue for step in lane: if not isinstance(step, dict): continue pid = step.get("preset_id") if pid is not None and str(pid) in preset_id_map: step["preset_id"] = preset_id_map[str(pid)] steps = out.get("steps") if isinstance(steps, list): for step in steps: if not isinstance(step, dict): continue pid = step.get("preset_id") if pid is not None and str(pid) in preset_id_map: step["preset_id"] = preset_id_map[str(pid)] return out def export_profile_bundle( profile_id: str, profiles_model, zones_model, presets_model, sequences_model, palette_model, ) -> Dict[str, Any]: source = profiles_model.read(profile_id) if not source: raise ValueError("Profile not found") zone_ids = source.get("zones") if not isinstance(zone_ids, list) or not zone_ids: zone_ids = source.get("zone_order") or [] zone_ids = [str(z) for z in zone_ids if z is not None] zones_out: Dict[str, Any] = {} preset_ids: Set[str] = set() sequence_ids: Set[str] = set() for zid in zone_ids: zone = zones_model.read(zid) if not zone: continue zones_out[zid] = copy.deepcopy(zone) preset_ids |= _preset_ids_in_zone(zone) for sid in zone.get("sequence_ids") or []: if sid is not None and str(sid).strip(): sequence_ids.add(str(sid)) sequences_out: Dict[str, Any] = {} for sid in sequence_ids: seq = sequences_model.read(sid) if not seq or str(seq.get("profile_id")) != str(profile_id): continue sequences_out[sid] = copy.deepcopy(seq) preset_ids |= _preset_ids_in_sequence(seq) presets_out: Dict[str, Any] = {} for pid in preset_ids: pdata = presets_model.read(pid) if pdata and str(pdata.get("profile_id")) == str(profile_id): presets_out[pid] = copy.deepcopy(pdata) profile_doc = copy.deepcopy(source) profile_doc.pop("palette", None) return { "version": BUNDLE_VERSION, "kind": KIND_PROFILE, "profile": profile_doc, "palette": {"colors": _palette_colors(palette_model, source.get("palette_id"))}, "zones": zones_out, "presets": presets_out, "sequences": sequences_out, } def import_profile_bundle( bundle: Dict[str, Any], profiles_model, zones_model, presets_model, sequences_model, palette_model, *, name: Optional[str] = None, ) -> Tuple[str, Dict[str, Any]]: if not isinstance(bundle, dict): raise ValueError("Invalid bundle") if bundle.get("version") not in (BUNDLE_VERSION, str(BUNDLE_VERSION)): raise ValueError("Unsupported bundle version") if bundle.get("kind") not in (KIND_PROFILE, None): raise ValueError("Not a profile bundle") source_profile = bundle.get("profile") if not isinstance(source_profile, dict): raise ValueError("Bundle missing profile") source_name = source_profile.get("name") or "Imported profile" new_name = (name or source_name).strip() or source_name profile_type = source_profile.get("type", "zones") profile_cache: Dict[str, int] = {} palette_cache: Dict[str, int] = {} zone_cache: Dict[str, int] = {} preset_cache: Dict[str, int] = {} sequence_cache: Dict[str, int] = {} new_profile_id = _allocate_id(profiles_model, profile_cache) new_palette_id = _allocate_id(palette_model, palette_cache) palette_in = bundle.get("palette") or {} palette_colors = palette_in.get("colors") if isinstance(palette_in, dict) else [] if not isinstance(palette_colors, list): palette_colors = [] preset_id_map: Dict[str, str] = {} new_presets: Dict[str, Dict[str, Any]] = {} new_zones: Dict[str, Dict[str, Any]] = {} new_sequences: Dict[str, Dict[str, Any]] = {} sequence_id_map: Dict[str, str] = {} zones_in = bundle.get("zones") if isinstance(bundle.get("zones"), dict) else {} presets_in = bundle.get("presets") if isinstance(bundle.get("presets"), dict) else {} sequences_in = bundle.get("sequences") if isinstance(bundle.get("sequences"), dict) else {} for old_pid, pdata in presets_in.items(): if not isinstance(pdata, dict): continue new_pid = _allocate_id(presets_model, preset_cache) clone = copy.deepcopy(pdata) clone["profile_id"] = str(new_profile_id) new_presets[new_pid] = clone preset_id_map[str(old_pid)] = new_pid for old_sid, sdata in sequences_in.items(): if not isinstance(sdata, dict): continue new_sid = _allocate_id(sequences_model, sequence_cache) clone = _map_sequence_lanes(sdata, preset_id_map) clone["profile_id"] = str(new_profile_id) new_sequences[new_sid] = clone sequence_id_map[str(old_sid)] = new_sid source_zone_order = source_profile.get("zones") if not isinstance(source_zone_order, list): source_zone_order = list(zones_in.keys()) cloned_zone_ids: List[str] = [] for old_zid in source_zone_order: zone = zones_in.get(str(old_zid)) if not isinstance(zone, dict): continue new_zid = _allocate_id(zones_model, zone_cache) clone_data: Dict[str, Any] = { "name": zone.get("name") or f"Zone {old_zid}", "names": list(zone.get("names") or []), } mapped_presets = _map_preset_container( zone.get("presets"), preset_id_map, preset_cache, new_profile_id, new_presets, presets_model, ) if mapped_presets is not None: clone_data["presets"] = mapped_presets extra = { k: v for k, v in zone.items() if k not in ("name", "names", "presets") } if "presets_flat" in extra: extra["presets_flat"] = _map_preset_container( extra.get("presets_flat"), preset_id_map, preset_cache, new_profile_id, new_presets, presets_model, ) if "default_preset" in extra and extra["default_preset"] is not None: old_dp = str(extra["default_preset"]) if old_dp in preset_id_map: extra["default_preset"] = preset_id_map[old_dp] if "sequence_ids" in extra and isinstance(extra.get("sequence_ids"), list): extra["sequence_ids"] = [ sequence_id_map.get(str(s), str(s)) for s in extra["sequence_ids"] if s is not None ] clone_data.update(extra) new_zones[new_zid] = clone_data cloned_zone_ids.append(new_zid) new_profile_data = { "name": new_name, "type": profile_type, "zones": cloned_zone_ids, "scenes": list(source_profile.get("scenes", [])) if isinstance(source_profile.get("scenes"), list) else [], "palette_id": str(new_palette_id), } palette_model[str(new_palette_id)] = list(palette_colors) for pid, pdata in new_presets.items(): presets_model[pid] = pdata for zid, zdata in new_zones.items(): zones_model[zid] = zdata for sid, sdata in new_sequences.items(): sequences_model[sid] = sdata profiles_model[str(new_profile_id)] = new_profile_data palette_model.save() presets_model.save() zones_model.save() sequences_model.save() profiles_model.save() return str(new_profile_id), new_profile_data def export_preset_bundle(preset_id: str, presets_model) -> Dict[str, Any]: preset = presets_model.read(preset_id) if not preset: raise ValueError("Preset not found") return { "version": BUNDLE_VERSION, "kind": KIND_PRESET, "preset": copy.deepcopy(preset), } def import_preset_bundle( bundle: Dict[str, Any], presets_model, profile_id: str, ) -> Tuple[str, Dict[str, Any]]: if not isinstance(bundle, dict): raise ValueError("Invalid bundle") if bundle.get("kind") != KIND_PRESET: raise ValueError("Not a preset bundle") preset = bundle.get("preset") if not isinstance(preset, dict): raise ValueError("Bundle missing preset") new_id = presets_model.create(profile_id) data = copy.deepcopy(preset) data["profile_id"] = str(profile_id) presets_model.update(new_id, data) return str(new_id), presets_model.read(new_id) or data def export_sequence_bundle( sequence_id: str, sequences_model, presets_model, *, profile_id: Optional[str] = None, ) -> Dict[str, Any]: seq = sequences_model.read(sequence_id) if not seq: raise ValueError("Sequence not found") if profile_id is not None and str(seq.get("profile_id")) != str(profile_id): raise ValueError("Sequence not found") pid = str(profile_id or seq.get("profile_id") or "") preset_ids = _preset_ids_in_sequence(seq) presets_out: Dict[str, Any] = {} for old_pid in preset_ids: pdata = presets_model.read(old_pid) if pdata and (not pid or str(pdata.get("profile_id")) == pid): presets_out[old_pid] = copy.deepcopy(pdata) return { "version": BUNDLE_VERSION, "kind": KIND_SEQUENCE, "sequence": copy.deepcopy(seq), "presets": presets_out, } def import_sequence_bundle( bundle: Dict[str, Any], sequences_model, presets_model, profile_id: str, ) -> Tuple[str, Dict[str, Any]]: if not isinstance(bundle, dict): raise ValueError("Invalid bundle") if bundle.get("kind") != KIND_SEQUENCE: raise ValueError("Not a sequence bundle") seq = bundle.get("sequence") if not isinstance(seq, dict): raise ValueError("Bundle missing sequence") preset_cache: Dict[str, int] = {} preset_id_map: Dict[str, str] = {} new_presets: Dict[str, Dict[str, Any]] = {} presets_in = bundle.get("presets") if isinstance(bundle.get("presets"), dict) else {} for old_pid, pdata in presets_in.items(): if not isinstance(pdata, dict): continue new_pid = _allocate_id(presets_model, preset_cache) clone = copy.deepcopy(pdata) clone["profile_id"] = str(profile_id) new_presets[new_pid] = clone preset_id_map[str(old_pid)] = new_pid for old_pid in _preset_ids_in_sequence(seq): op = str(old_pid) if op not in preset_id_map: pdata = presets_model.read(op) if pdata: new_pid = _allocate_id(presets_model, preset_cache) clone = copy.deepcopy(pdata) clone["profile_id"] = str(profile_id) new_presets[new_pid] = clone preset_id_map[op] = new_pid for pid, pdata in new_presets.items(): presets_model[pid] = pdata if new_presets: presets_model.save() new_seq_id = sequences_model.create(profile_id) mapped = _map_sequence_lanes(seq, preset_id_map) mapped["profile_id"] = str(profile_id) sequences_model.update(new_seq_id, mapped) return str(new_seq_id), sequences_model.read(new_seq_id) or mapped