feat(devices): wifi tcp registry, device API/UI, tests; bump led-tool
Made-with: Cursor
This commit is contained in:
@@ -1,49 +1,229 @@
|
||||
"""
|
||||
LED driver registry persisted in ``db/device.json``.
|
||||
|
||||
Storage key and **id** field are the device **MAC**: 12 lowercase hex characters
|
||||
(no colons). **name** is for ``select`` / tabs (not unique). **address** is the
|
||||
reachability hint: same as MAC for ESP-NOW, or IP/hostname for Wi-Fi.
|
||||
"""
|
||||
|
||||
from models.model import Model
|
||||
|
||||
DEVICE_TYPES = frozenset({"led"})
|
||||
DEVICE_TRANSPORTS = frozenset({"wifi", "espnow"})
|
||||
|
||||
def _normalize_address(addr):
|
||||
"""Normalize 6-byte ESP32 address to 12-char lowercase hex (no colons)."""
|
||||
if addr is None:
|
||||
|
||||
def validate_device_type(value):
|
||||
t = (value or "led").strip().lower()
|
||||
if t not in DEVICE_TYPES:
|
||||
raise ValueError(f"type must be one of: {', '.join(sorted(DEVICE_TYPES))}")
|
||||
return t
|
||||
|
||||
|
||||
def validate_device_transport(value):
|
||||
tr = (value or "espnow").strip().lower()
|
||||
if tr not in DEVICE_TRANSPORTS:
|
||||
raise ValueError(
|
||||
f"transport must be one of: {', '.join(sorted(DEVICE_TRANSPORTS))}"
|
||||
)
|
||||
return tr
|
||||
|
||||
|
||||
def normalize_mac(mac):
|
||||
"""Normalise to 12-char lowercase hex or None."""
|
||||
if mac is None:
|
||||
return None
|
||||
s = str(addr).strip().lower().replace(":", "").replace("-", "")
|
||||
s = str(mac).strip().lower().replace(":", "").replace("-", "")
|
||||
if len(s) == 12 and all(c in "0123456789abcdef" for c in s):
|
||||
return s
|
||||
return None
|
||||
|
||||
|
||||
def derive_device_mac(mac=None, address=None, transport="espnow"):
|
||||
"""
|
||||
Resolve the device MAC used as storage id.
|
||||
|
||||
Explicit ``mac`` wins. For ESP-NOW, ``address`` is the peer MAC. For Wi-Fi,
|
||||
``mac`` must be supplied (``address`` is typically an IP).
|
||||
"""
|
||||
m = normalize_mac(mac)
|
||||
if m:
|
||||
return m
|
||||
tr = validate_device_transport(transport)
|
||||
if tr == "espnow":
|
||||
return normalize_mac(address)
|
||||
return None
|
||||
|
||||
|
||||
def normalize_address_for_transport(addr, transport):
|
||||
"""ESP-NOW → 12 hex or None; Wi-Fi → trimmed string or None."""
|
||||
tr = validate_device_transport(transport)
|
||||
if tr == "espnow":
|
||||
return normalize_mac(addr)
|
||||
if addr is None:
|
||||
return None
|
||||
s = str(addr).strip()
|
||||
return s if s else None
|
||||
|
||||
|
||||
class Device(Model):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, name="", address=None, default_pattern=None, tabs=None):
|
||||
next_id = self.get_next_id()
|
||||
addr = _normalize_address(address)
|
||||
self[next_id] = {
|
||||
def load(self):
|
||||
super().load()
|
||||
changed = False
|
||||
for sid, doc in list(self.items()):
|
||||
if not isinstance(doc, dict):
|
||||
continue
|
||||
if self._migrate_record(str(sid), doc):
|
||||
changed = True
|
||||
if self._rekey_legacy_ids():
|
||||
changed = True
|
||||
if changed:
|
||||
self.save()
|
||||
|
||||
def _migrate_record(self, storage_id, doc):
|
||||
changed = False
|
||||
if doc.get("type") not in DEVICE_TYPES:
|
||||
doc["type"] = "led"
|
||||
changed = True
|
||||
if doc.get("transport") not in DEVICE_TRANSPORTS:
|
||||
doc["transport"] = "espnow"
|
||||
changed = True
|
||||
raw_list = doc.get("addresses")
|
||||
if isinstance(raw_list, list) and raw_list:
|
||||
picked = None
|
||||
for item in raw_list:
|
||||
n = normalize_mac(item)
|
||||
if n:
|
||||
picked = n
|
||||
break
|
||||
if picked:
|
||||
doc["address"] = picked
|
||||
del doc["addresses"]
|
||||
changed = True
|
||||
elif "addresses" in doc:
|
||||
del doc["addresses"]
|
||||
changed = True
|
||||
tr = doc["transport"]
|
||||
norm = normalize_address_for_transport(doc.get("address"), tr)
|
||||
if doc.get("address") != norm:
|
||||
doc["address"] = norm
|
||||
changed = True
|
||||
mac_key = normalize_mac(storage_id)
|
||||
if mac_key and mac_key == storage_id and str(doc.get("id") or "") != mac_key:
|
||||
doc["id"] = mac_key
|
||||
changed = True
|
||||
elif str(doc.get("id") or "").strip() != storage_id:
|
||||
doc["id"] = storage_id
|
||||
changed = True
|
||||
doc.pop("mac", None)
|
||||
return changed
|
||||
|
||||
def _rekey_legacy_ids(self):
|
||||
"""Move numeric-keyed rows to MAC keys when ESP-NOW MAC is known."""
|
||||
changed = False
|
||||
moves = []
|
||||
for sid in list(self.keys()):
|
||||
doc = self.get(sid)
|
||||
if not isinstance(doc, dict):
|
||||
continue
|
||||
if normalize_mac(sid) == sid:
|
||||
continue
|
||||
if not str(sid).isdigit():
|
||||
continue
|
||||
tr = doc.get("transport", "espnow")
|
||||
cand = None
|
||||
if tr == "espnow":
|
||||
cand = normalize_mac(doc.get("address"))
|
||||
if not cand:
|
||||
continue
|
||||
moves.append((sid, cand))
|
||||
for old, mac in moves:
|
||||
if old not in self:
|
||||
continue
|
||||
doc = self.pop(old)
|
||||
if mac in self:
|
||||
existing = dict(self[mac])
|
||||
for k, v in doc.items():
|
||||
if k not in existing or existing[k] in (None, "", []):
|
||||
existing[k] = v
|
||||
doc = existing
|
||||
doc["id"] = mac
|
||||
self[mac] = doc
|
||||
changed = True
|
||||
return changed
|
||||
|
||||
def create(
|
||||
self,
|
||||
name="",
|
||||
address=None,
|
||||
mac=None,
|
||||
default_pattern=None,
|
||||
tabs=None,
|
||||
device_type="led",
|
||||
transport="espnow",
|
||||
):
|
||||
dt = validate_device_type(device_type)
|
||||
tr = validate_device_transport(transport)
|
||||
mac_hex = derive_device_mac(mac=mac, address=address, transport=tr)
|
||||
if not mac_hex:
|
||||
raise ValueError(
|
||||
"mac is required (12 hex characters); for Wi-Fi pass mac separately from IP address"
|
||||
)
|
||||
if mac_hex in self:
|
||||
raise ValueError("device with this mac already exists")
|
||||
addr = normalize_address_for_transport(address, tr)
|
||||
if tr == "espnow":
|
||||
addr = mac_hex
|
||||
self[mac_hex] = {
|
||||
"id": mac_hex,
|
||||
"name": name,
|
||||
"type": dt,
|
||||
"transport": tr,
|
||||
"address": addr,
|
||||
"default_pattern": default_pattern if default_pattern else None,
|
||||
"tabs": list(tabs) if tabs else [],
|
||||
}
|
||||
self.save()
|
||||
return next_id
|
||||
return mac_hex
|
||||
|
||||
def read(self, id):
|
||||
id_str = str(id)
|
||||
return self.get(id_str, None)
|
||||
m = normalize_mac(id)
|
||||
if m is not None and m in self:
|
||||
return self.get(m)
|
||||
return self.get(str(id), None)
|
||||
|
||||
def update(self, id, data):
|
||||
id_str = str(id)
|
||||
id_str = normalize_mac(id)
|
||||
if id_str is None:
|
||||
id_str = str(id)
|
||||
if id_str not in self:
|
||||
return False
|
||||
if "address" in data and data["address"] is not None:
|
||||
data = dict(data)
|
||||
data["address"] = _normalize_address(data["address"])
|
||||
self[id_str].update(data)
|
||||
incoming = dict(data)
|
||||
incoming.pop("id", None)
|
||||
incoming.pop("addresses", None)
|
||||
in_mac = normalize_mac(incoming.get("mac"))
|
||||
if in_mac is not None and in_mac != id_str:
|
||||
raise ValueError("cannot change device mac; delete and re-add")
|
||||
incoming.pop("mac", None)
|
||||
merged = dict(self[id_str])
|
||||
merged.update(incoming)
|
||||
merged["type"] = validate_device_type(merged.get("type"))
|
||||
merged["transport"] = validate_device_transport(merged.get("transport"))
|
||||
tr = merged["transport"]
|
||||
merged["address"] = normalize_address_for_transport(merged.get("address"), tr)
|
||||
if tr == "espnow":
|
||||
merged["address"] = id_str
|
||||
merged["id"] = id_str
|
||||
self[id_str] = merged
|
||||
self.save()
|
||||
return True
|
||||
|
||||
def delete(self, id):
|
||||
id_str = str(id)
|
||||
id_str = normalize_mac(id)
|
||||
if id_str is None:
|
||||
id_str = str(id)
|
||||
if id_str not in self:
|
||||
return False
|
||||
self.pop(id_str)
|
||||
@@ -52,3 +232,39 @@ class Device(Model):
|
||||
|
||||
def list(self):
|
||||
return list(self.keys())
|
||||
|
||||
def upsert_wifi_tcp_client(self, device_name, peer_ip, mac):
|
||||
"""
|
||||
Register or update a Wi-Fi client by **MAC** (storage id). Updates **name**
|
||||
and **address** (peer IP) on each connect.
|
||||
"""
|
||||
mac_hex = normalize_mac(mac)
|
||||
if not mac_hex:
|
||||
return None
|
||||
name = (device_name or "").strip()
|
||||
if not name:
|
||||
return None
|
||||
ip = normalize_address_for_transport(peer_ip, "wifi")
|
||||
if not ip:
|
||||
return None
|
||||
if mac_hex in self:
|
||||
merged = dict(self[mac_hex])
|
||||
merged["name"] = name
|
||||
merged["type"] = validate_device_type(merged.get("type"))
|
||||
merged["transport"] = "wifi"
|
||||
merged["address"] = ip
|
||||
merged["id"] = mac_hex
|
||||
self[mac_hex] = merged
|
||||
self.save()
|
||||
return mac_hex
|
||||
self[mac_hex] = {
|
||||
"id": mac_hex,
|
||||
"name": name,
|
||||
"type": "led",
|
||||
"transport": "wifi",
|
||||
"address": ip,
|
||||
"default_pattern": None,
|
||||
"tabs": [],
|
||||
}
|
||||
self.save()
|
||||
return mac_hex
|
||||
|
||||
Reference in New Issue
Block a user