From 68eb547ec4c186355ddcb3fb298812e25fa18939 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Fri, 29 May 2026 16:00:54 +1200 Subject: [PATCH] feat(espnow): add debug logging and channel diagnostics Co-authored-by: Cursor --- src/espnow_transport.py | 32 +++++++-- tests/device_test_radio_diag.py | 116 ++++++++++++++++++++++++++++++++ tests/device_test_recv_ch5.py | 40 +++++++++++ tests/device_test_send_ch5.py | 47 +++++++++++++ 4 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 tests/device_test_radio_diag.py create mode 100644 tests/device_test_recv_ch5.py create mode 100644 tests/device_test_send_ch5.py diff --git a/src/espnow_transport.py b/src/espnow_transport.py index 78d7944..69e24c3 100644 --- a/src/espnow_transport.py +++ b/src/espnow_transport.py @@ -2,6 +2,7 @@ import asyncio import urandom +import ubinascii import espnow import network @@ -31,10 +32,17 @@ _PING_DELAY_MS_MAX = 500 _esp = None _groups_received = False +_debug = False + + +def _dlog(*parts): + if _debug: + print(*parts) def init_espnow(settings): - global _esp + global _esp, _debug + _debug = bool(settings.get("debug", False)) try: ch = int(settings.get("wifi_channel", WIFI_CHANNEL_DEFAULT)) except (TypeError, ValueError): @@ -43,13 +51,22 @@ def init_espnow(settings): sta = network.WLAN(network.STA_IF) sta.active(True) sta.config(pm=network.WLAN.PM_NONE) - sta.config(channel=ch) + try: + sta.config(channel=ch) + except Exception as e: + print("espnow sta channel set failed:", e) _esp = espnow.ESPNow() _esp.active(True) try: _esp.add_peer(BROADCAST_MAC) + _dlog("espnow add bcast ok") + except Exception as e: + print("espnow add bcast failed:", e) + try: + actual_ch = sta.config("channel") except Exception: - pass + actual_ch = "?" + print("espnow init ch", ch, "sta_ch", actual_ch, "debug", _debug) return _esp @@ -63,12 +80,13 @@ def _send_ping_rsp(host, settings, ping_id, delay_ms): try: try: _esp.add_peer(host) - except Exception: - pass + _dlog("espnow ping add_peer ok", ubinascii.hexlify(host).decode()) + except Exception as e: + _dlog("espnow ping add_peer skip", repr(e)) _esp.send(host, pkt) - print("espnow ping rsp", ping_id, delay_ms, "ms") + print("espnow ping rsp", ping_id, delay_ms, "ms", ubinascii.hexlify(host).decode()) except Exception as e: - print("espnow ping rsp failed:", e) + print("espnow ping rsp failed:", e, "host", ubinascii.hexlify(host).decode(), "len", len(pkt)) async def _send_ping_rsp_delayed(host, settings, ping_id): diff --git a/tests/device_test_radio_diag.py b/tests/device_test_radio_diag.py new file mode 100644 index 0000000..2a8f61f --- /dev/null +++ b/tests/device_test_radio_diag.py @@ -0,0 +1,116 @@ +"""Device-side radio diagnostic test (MicroPython). + +Checks: +1) STA/AP bring-up on channel 5 +2) ESP-NOW init and broadcast peer add +3) Broadcast TX test packet send +4) RX wait window to see any incoming ESP-NOW frames +""" + +import espnow +import machine +import network +import time +import ubinascii + + +CHANNEL = 5 +RX_WINDOW_MS = 3000 +WDT_TIMEOUT_MS = 10000 +BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff" +TEST_PAYLOAD = b"\x4c\x05\x01\x00\x00\x00" + + +def _mac_hex(mac_bytes): + try: + return ubinascii.hexlify(mac_bytes).decode() + except Exception: + return "?" + + +def run_diag(channel=CHANNEL, rx_window_ms=RX_WINDOW_MS): + wdt = machine.WDT(timeout=WDT_TIMEOUT_MS) + wdt.feed() + + print("diag start") + print("cpu freq", machine.freq()) + + sta = network.WLAN(network.STA_IF) + ap = network.WLAN(network.AP_IF) + + # Clean start + try: + sta.active(False) + ap.active(False) + time.sleep_ms(100) + except Exception as e: + print("wifi reset failed", repr(e)) + + # STA setup + try: + sta.active(True) + sta.config(pm=network.WLAN.PM_NONE) + sta.config(channel=channel) + print("sta ok ch", sta.config("channel"), "mac", _mac_hex(sta.config("mac"))) + except Exception as e: + print("sta setup failed", repr(e)) + + # AP setup + try: + ap.active(True) + try: + ap.config(essid="diag-ap", channel=channel, hidden=True) + except TypeError: + ap.config(essid="diag-ap", channel=channel) + print("ap ok ch", ap.config("channel"), "mac", _mac_hex(ap.config("mac"))) + except Exception as e: + print("ap setup failed", repr(e)) + + wdt.feed() + + # ESP-NOW setup + try: + e = espnow.ESPNow() + e.active(True) + print("espnow active ok") + except Exception as e_err: + print("espnow init failed", repr(e_err)) + return + + # Add broadcast peer + try: + e.add_peer(BROADCAST_MAC, channel=channel) + print("add bcast peer ok") + except TypeError: + try: + e.add_peer(BROADCAST_MAC) + print("add bcast peer ok (no channel arg)") + except Exception as e_err: + print("add bcast peer failed", repr(e_err)) + except Exception as e_err: + print("add bcast peer failed", repr(e_err)) + + # TX test + try: + ok = e.send(BROADCAST_MAC, TEST_PAYLOAD, True) + print("tx bcast", ok, "len", len(TEST_PAYLOAD)) + except Exception as e_err: + print("tx bcast failed", repr(e_err)) + + # RX window + print("rx window ms", rx_window_ms) + t_end = time.ticks_add(time.ticks_ms(), rx_window_ms) + rx_count = 0 + while time.ticks_diff(t_end, time.ticks_ms()) > 0: + wdt.feed() + host, msg = e.recv(100) + if host: + rx_count += 1 + print("rx", rx_count, _mac_hex(host), "len", len(msg)) + time.sleep_ms(5) + + print("diag done rx_count", rx_count) + + +if __name__ == "__main__": + run_diag() diff --git a/tests/device_test_recv_ch5.py b/tests/device_test_recv_ch5.py new file mode 100644 index 0000000..ebcef5e --- /dev/null +++ b/tests/device_test_recv_ch5.py @@ -0,0 +1,40 @@ +"""Device test: receive ESP-NOW packets on channel 5 (MicroPython).""" + +import espnow +import machine +import network +import ubinascii +import time + + +CHANNEL = 5 +TIMEOUT_MS = 1000 +WDT_TIMEOUT_MS = 10000 + + +def _set_channel(channel): + sta = network.WLAN(network.STA_IF) + sta.active(True) + sta.config(pm=network.WLAN.PM_NONE) + sta.config(channel=channel) + + +def recv_loop(channel=CHANNEL, timeout_ms=TIMEOUT_MS): + wdt = machine.WDT(timeout=WDT_TIMEOUT_MS) + _set_channel(channel) + e = espnow.ESPNow() + e.active(True) + print("recv ready ch", channel) + while True: + wdt.feed() + host, msg = e.recv(timeout_ms) + if host: + mac_hex = ubinascii.hexlify(host).decode() + print("rx", mac_hex, "len", len(msg), "hex", ubinascii.hexlify(msg).decode()) + else: + print("rx timeout") + time.sleep_ms(10) + + +if __name__ == "__main__": + recv_loop() diff --git a/tests/device_test_send_ch5.py b/tests/device_test_send_ch5.py new file mode 100644 index 0000000..4bc6c52 --- /dev/null +++ b/tests/device_test_send_ch5.py @@ -0,0 +1,47 @@ +"""Device test: send one ESP-NOW packet on channel 5 (MicroPython).""" + +import espnow +import machine +import network +import ubinascii + + +CHANNEL = 5 +DEST_HEX = "ffffffffffff" +PAYLOAD_HEX = "4c0501000000" +WDT_TIMEOUT_MS = 10000 + + +def _set_channel(channel): + sta = network.WLAN(network.STA_IF) + sta.active(True) + sta.config(pm=network.WLAN.PM_NONE) + sta.config(channel=channel) + + +def _add_peer(esp, dest, channel): + try: + esp.add_peer(dest, channel=channel) + except TypeError: + esp.add_peer(dest) + except OSError: + pass + + +def send_once(dest_hex=DEST_HEX, payload_hex=PAYLOAD_HEX, channel=CHANNEL): + wdt = machine.WDT(timeout=WDT_TIMEOUT_MS) + wdt.feed() + dest = ubinascii.unhexlify(dest_hex) + pkt = ubinascii.unhexlify(payload_hex) + _set_channel(channel) + e = espnow.ESPNow() + e.active(True) + _add_peer(e, dest, channel) + wdt.feed() + ok = e.send(dest, pkt, True) + print("sent", ok, "ch", channel, "dest", dest_hex, "len", len(pkt)) + return ok + + +if __name__ == "__main__": + send_once()