""" LED driver registry persisted in ``db/device.json``. Storage key and **id** field are the device **MAC**: 12 lowercase hex characters (no colons). **name** is for ``select`` / tabs (not unique). **address** is the reachability hint: same as MAC for ESP-NOW, or IP/hostname for Wi-Fi. """ from models.model import Model DEVICE_TYPES = frozenset({"led"}) DEVICE_TRANSPORTS = frozenset({"wifi", "espnow"}) def validate_device_type(value): t = (value or "led").strip().lower() if t not in DEVICE_TYPES: raise ValueError(f"type must be one of: {', '.join(sorted(DEVICE_TYPES))}") return t def validate_device_transport(value): tr = (value or "espnow").strip().lower() if tr not in DEVICE_TRANSPORTS: raise ValueError( f"transport must be one of: {', '.join(sorted(DEVICE_TRANSPORTS))}" ) return tr def normalize_mac(mac): """Normalise to 12-char lowercase hex or None.""" if mac is None: return None s = str(mac).strip().lower().replace(":", "").replace("-", "") if len(s) == 12 and all(c in "0123456789abcdef" for c in s): return s return None def derive_device_mac(mac=None, address=None, transport="espnow"): """ Resolve the device MAC used as storage id. Explicit ``mac`` wins. For ESP-NOW, ``address`` is the peer MAC. For Wi-Fi, ``mac`` must be supplied (``address`` is typically an IP). """ m = normalize_mac(mac) if m: return m tr = validate_device_transport(transport) if tr == "espnow": return normalize_mac(address) return None def normalize_address_for_transport(addr, transport): """ESP-NOW → 12 hex or None; Wi-Fi → trimmed string or None.""" tr = validate_device_transport(transport) if tr == "espnow": return normalize_mac(addr) if addr is None: return None s = str(addr).strip() return s if s else None class Device(Model): def __init__(self): super().__init__() def load(self): super().load() changed = False for sid, doc in list(self.items()): if not isinstance(doc, dict): continue if self._migrate_record(str(sid), doc): changed = True if self._rekey_legacy_ids(): changed = True if changed: self.save() def _migrate_record(self, storage_id, doc): changed = False if doc.get("type") not in DEVICE_TYPES: doc["type"] = "led" changed = True if doc.get("transport") not in DEVICE_TRANSPORTS: doc["transport"] = "espnow" changed = True raw_list = doc.get("addresses") if isinstance(raw_list, list) and raw_list: picked = None for item in raw_list: n = normalize_mac(item) if n: picked = n break if picked: doc["address"] = picked del doc["addresses"] changed = True elif "addresses" in doc: del doc["addresses"] changed = True tr = doc["transport"] norm = normalize_address_for_transport(doc.get("address"), tr) if doc.get("address") != norm: doc["address"] = norm changed = True mac_key = normalize_mac(storage_id) if mac_key and mac_key == storage_id and str(doc.get("id") or "") != mac_key: doc["id"] = mac_key changed = True elif str(doc.get("id") or "").strip() != storage_id: doc["id"] = storage_id changed = True doc.pop("mac", None) return changed def _rekey_legacy_ids(self): """Move numeric-keyed rows to MAC keys when ESP-NOW MAC is known.""" changed = False moves = [] for sid in list(self.keys()): doc = self.get(sid) if not isinstance(doc, dict): continue if normalize_mac(sid) == sid: continue if not str(sid).isdigit(): continue tr = doc.get("transport", "espnow") cand = None if tr == "espnow": cand = normalize_mac(doc.get("address")) if not cand: continue moves.append((sid, cand)) for old, mac in moves: if old not in self: continue doc = self.pop(old) if mac in self: existing = dict(self[mac]) for k, v in doc.items(): if k not in existing or existing[k] in (None, "", []): existing[k] = v doc = existing doc["id"] = mac self[mac] = doc changed = True return changed def create( self, name="", address=None, mac=None, default_pattern=None, tabs=None, device_type="led", transport="espnow", ): dt = validate_device_type(device_type) tr = validate_device_transport(transport) mac_hex = derive_device_mac(mac=mac, address=address, transport=tr) if not mac_hex: raise ValueError( "mac is required (12 hex characters); for Wi-Fi pass mac separately from IP address" ) if mac_hex in self: raise ValueError("device with this mac already exists") addr = normalize_address_for_transport(address, tr) if tr == "espnow": addr = mac_hex self[mac_hex] = { "id": mac_hex, "name": name, "type": dt, "transport": tr, "address": addr, "default_pattern": default_pattern if default_pattern else None, "tabs": list(tabs) if tabs else [], } self.save() return mac_hex def read(self, id): m = normalize_mac(id) if m is not None and m in self: return self.get(m) return self.get(str(id), None) def update(self, id, data): id_str = normalize_mac(id) if id_str is None: id_str = str(id) if id_str not in self: return False incoming = dict(data) incoming.pop("id", None) incoming.pop("addresses", None) in_mac = normalize_mac(incoming.get("mac")) if in_mac is not None and in_mac != id_str: raise ValueError("cannot change device mac; delete and re-add") incoming.pop("mac", None) merged = dict(self[id_str]) merged.update(incoming) merged["type"] = validate_device_type(merged.get("type")) merged["transport"] = validate_device_transport(merged.get("transport")) tr = merged["transport"] merged["address"] = normalize_address_for_transport(merged.get("address"), tr) if tr == "espnow": merged["address"] = id_str merged["id"] = id_str self[id_str] = merged self.save() return True def delete(self, id): id_str = normalize_mac(id) if id_str is None: id_str = str(id) if id_str not in self: return False self.pop(id_str) self.save() return True def list(self): return list(self.keys()) def upsert_wifi_tcp_client(self, device_name, peer_ip, mac, device_type=None): """ Register or update a Wi-Fi client by **MAC** (storage id). Updates **name**, **address** (peer IP), and optionally **type** from the client hello when valid. """ mac_hex = normalize_mac(mac) if not mac_hex: return None name = (device_name or "").strip() if not name: return None ip = normalize_address_for_transport(peer_ip, "wifi") if not ip: return None resolved_type = None if device_type is not None: try: resolved_type = validate_device_type(device_type) except ValueError: resolved_type = None if mac_hex in self: merged = dict(self[mac_hex]) merged["name"] = name if resolved_type is not None: merged["type"] = resolved_type else: merged["type"] = validate_device_type(merged.get("type")) merged["transport"] = "wifi" merged["address"] = ip merged["id"] = mac_hex self[mac_hex] = merged self.save() return mac_hex self[mac_hex] = { "id": mac_hex, "name": name, "type": resolved_type or "led", "transport": "wifi", "address": ip, "default_pattern": None, "tabs": [], } self.save() return mac_hex