From fb53f900fbcd6731431c890774cdda2dc57ea736 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Sun, 22 Mar 2026 02:53:23 +1300 Subject: [PATCH] refactor(driver): simplify and harden espnow message handlers --- src/main.py | 169 ++++++++++++++++++++++++++-------------------------- 1 file changed, 86 insertions(+), 83 deletions(-) diff --git a/src/main.py b/src/main.py index 4ebe71e..bf3b4b4 100644 --- a/src/main.py +++ b/src/main.py @@ -14,18 +14,13 @@ presets = Presets(settings["led_pin"], settings["num_leds"]) presets.load(settings) presets.b = settings.get("brightness", 255) # Use the default preset name from settings (set via controller or defaults) -default_preset = settings.get("default") -if ( - isinstance(default_preset, str) - and default_preset - and default_preset in presets.presets -): +default_preset = settings.get("default", "") +if default_preset and default_preset in presets.presets: presets.select(default_preset) print(f"Selected startup preset: {default_preset}") wdt = WDT(timeout=10000) wdt.feed() -last_brightness_save = 0 sta_if = network.WLAN(network.STA_IF) sta_if.active(True) @@ -35,87 +30,95 @@ e = ESPNow() e.active(True) +def as_dict(value): + return value if isinstance(value, dict) else {} + + +def as_list(value): + return value if isinstance(value, list) else [] + + +def receive_data(receiver): + """Read one ESPNow message and decode JSON dict payload.""" + try: + host, msg = receiver.recv() + data = json.loads(msg) + print(msg) + data = as_dict(data) + if data.get("v", "") != "1": + return None + return data + except (ValueError, TypeError): + return None + + +def apply_brightness(data): + """Apply and persist global brightness from payload.""" + try: + presets.b = max(0, min(255, int(data["b"]))) + settings["brightness"] = presets.b + except (TypeError, ValueError): + pass + + +def apply_presets(data): + """Create/update preset definitions from payload.""" + presets_map = as_dict(data.get("presets")) + for id, preset_data in presets_map.items(): + preset_data = as_dict(preset_data) + if not preset_data: + continue + color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) + if color_key is not None: + try: + preset_data[color_key] = convert_and_reorder_colors( + preset_data[color_key], settings + ) + except (TypeError, ValueError, KeyError): + continue + presets.edit(id, preset_data) + print(f"Edited preset {id}: {preset_data.get('name', '')}") + + +def apply_select(data): + """Select preset for this device when addressed.""" + select_map = as_dict(data.get("select")) + device_name = settings["name"] + + # Case-sensitive: select key must match device name exactly. + select_list = as_list(select_map.get(device_name)) + if not select_list: + return + preset_name = select_list[0] + step = select_list[1] if len(select_list) > 1 else None + if isinstance(preset_name, str): + presets.select(preset_name, step=step) + + +def apply_default(data): + targets = as_list(data.get("targets")) + default_name = data.get("default", "") + if ( + settings["name"] in targets + and isinstance(default_name, str) + and default_name in presets.presets + ): + settings["default"] = default_name + + while True: wdt.feed() presets.tick() if e.any(): - try: - host, msg = e.recv() - data = json.loads(msg) - print(msg) - except (ValueError, TypeError): + if (data := receive_data(e)) is None: continue - if not isinstance(data, dict): - continue - # Only handle messages with the expected version. - if data.get("v") != "1": - continue - # print(data) - # Global brightness (0–255) for this device if "b" in data: - try: - presets.b = max(0, min(255, int(data["b"]))) - settings["brightness"] = presets.b - now = utime.ticks_ms() - if utime.ticks_diff(now, last_brightness_save) >= 500: - settings.save() - last_brightness_save = now - except (TypeError, ValueError): - pass - if isinstance(data.get("presets"), dict): - for id, preset_data in data["presets"].items(): - if not isinstance(preset_data, dict): - continue - # Convert hex color strings to RGB tuples and reorder based on device colour order. - color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None) - if color_key is not None: - try: - preset_data[color_key] = convert_and_reorder_colors( - preset_data[color_key], settings - ) - except (TypeError, ValueError): - continue - presets.edit(id, preset_data) - print(f"Edited preset {id}: {preset_data.get('name', '')}") - if isinstance(data.get("select"), dict) and settings.get("name") in data["select"]: - select_list = data["select"][settings.get("name")] - # Select value is always a list: ["preset_name"] or ["preset_name", step] - if isinstance(select_list, list) and select_list: - preset_name = select_list[0] - step = select_list[1] if len(select_list) > 1 else None - if isinstance(preset_name, str): - presets.select(preset_name, step=step) + apply_brightness(data) + if "presets" in data: + apply_presets(data) + if "select" in data: + apply_select(data) if "default" in data: - default_name = data["default"] - this_device_name = settings.get("name") - this_device_name_norm = ( - this_device_name.strip().lower() - if isinstance(this_device_name, str) - else None - ) - should_apply_default = True - if "targets" in data: - # When targets are present, default must only apply to named targets. - should_apply_default = False - targets = data.get("targets") - if isinstance(targets, list) and this_device_name_norm: - normalized_targets = [ - target.strip().lower() - for target in targets - if isinstance(target, str) and target.strip() - ] - should_apply_default = this_device_name_norm in normalized_targets - if not should_apply_default: - print("Ignored default: device not in targets") - if ( - should_apply_default - and - isinstance(default_name, str) - and default_name - and default_name in presets.presets - ): - settings["default"] = default_name - print(f"Set startup preset to: {default_name}") - settings.save() - if "save" in data: + apply_default(data) + if "save" in data and ("presets" in data or "default" in data): presets.save()