refactor(driver): simplify and harden espnow message handlers

This commit is contained in:
2026-03-22 02:53:23 +13:00
parent 044dd815dc
commit fb53f900fb

View File

@@ -14,18 +14,13 @@ presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
# Use the default preset name from settings (set via controller or defaults)
default_preset = settings.get("default")
if (
isinstance(default_preset, str)
and default_preset
and default_preset in presets.presets
):
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
presets.select(default_preset)
print(f"Selected startup preset: {default_preset}")
wdt = WDT(timeout=10000)
wdt.feed()
last_brightness_save = 0
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
@@ -35,87 +30,95 @@ e = ESPNow()
e.active(True)
def as_dict(value):
return value if isinstance(value, dict) else {}
def as_list(value):
return value if isinstance(value, list) else []
def receive_data(receiver):
"""Read one ESPNow message and decode JSON dict payload."""
try:
host, msg = receiver.recv()
data = json.loads(msg)
print(msg)
data = as_dict(data)
if data.get("v", "") != "1":
return None
return data
except (ValueError, TypeError):
return None
def apply_brightness(data):
"""Apply and persist global brightness from payload."""
try:
presets.b = max(0, min(255, int(data["b"])))
settings["brightness"] = presets.b
except (TypeError, ValueError):
pass
def apply_presets(data):
"""Create/update preset definitions from payload."""
presets_map = as_dict(data.get("presets"))
for id, preset_data in presets_map.items():
preset_data = as_dict(preset_data)
if not preset_data:
continue
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None:
try:
preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings
)
except (TypeError, ValueError, KeyError):
continue
presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
def apply_select(data):
"""Select preset for this device when addressed."""
select_map = as_dict(data.get("select"))
device_name = settings["name"]
# Case-sensitive: select key must match device name exactly.
select_list = as_list(select_map.get(device_name))
if not select_list:
return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
if isinstance(preset_name, str):
presets.select(preset_name, step=step)
def apply_default(data):
targets = as_list(data.get("targets"))
default_name = data.get("default", "")
if (
settings["name"] in targets
and isinstance(default_name, str)
and default_name in presets.presets
):
settings["default"] = default_name
while True:
wdt.feed()
presets.tick()
if e.any():
try:
host, msg = e.recv()
data = json.loads(msg)
print(msg)
except (ValueError, TypeError):
if (data := receive_data(e)) is None:
continue
if not isinstance(data, dict):
continue
# Only handle messages with the expected version.
if data.get("v") != "1":
continue
# print(data)
# Global brightness (0255) for this device
if "b" in data:
try:
presets.b = max(0, min(255, int(data["b"])))
settings["brightness"] = presets.b
now = utime.ticks_ms()
if utime.ticks_diff(now, last_brightness_save) >= 500:
settings.save()
last_brightness_save = now
except (TypeError, ValueError):
pass
if isinstance(data.get("presets"), dict):
for id, preset_data in data["presets"].items():
if not isinstance(preset_data, dict):
continue
# Convert hex color strings to RGB tuples and reorder based on device colour order.
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None:
try:
preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings
)
except (TypeError, ValueError):
continue
presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
if isinstance(data.get("select"), dict) and settings.get("name") in data["select"]:
select_list = data["select"][settings.get("name")]
# Select value is always a list: ["preset_name"] or ["preset_name", step]
if isinstance(select_list, list) and select_list:
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
if isinstance(preset_name, str):
presets.select(preset_name, step=step)
apply_brightness(data)
if "presets" in data:
apply_presets(data)
if "select" in data:
apply_select(data)
if "default" in data:
default_name = data["default"]
this_device_name = settings.get("name")
this_device_name_norm = (
this_device_name.strip().lower()
if isinstance(this_device_name, str)
else None
)
should_apply_default = True
if "targets" in data:
# When targets are present, default must only apply to named targets.
should_apply_default = False
targets = data.get("targets")
if isinstance(targets, list) and this_device_name_norm:
normalized_targets = [
target.strip().lower()
for target in targets
if isinstance(target, str) and target.strip()
]
should_apply_default = this_device_name_norm in normalized_targets
if not should_apply_default:
print("Ignored default: device not in targets")
if (
should_apply_default
and
isinstance(default_name, str)
and default_name
and default_name in presets.presets
):
settings["default"] = default_name
print(f"Set startup preset to: {default_name}")
settings.save()
if "save" in data:
apply_default(data)
if "save" in data and ("presets" in data or "default" in data):
presets.save()