Use init_espnow for channel alignment; route wire CMD/GROUPS and JSON v1 payloads to process_data from the poll loop. Co-authored-by: Cursor <cursoragent@cursor.com>
72 lines
1.6 KiB
Python
72 lines
1.6 KiB
Python
import print_timestamp # noqa: F401
|
|
from settings import Settings
|
|
import machine
|
|
import asyncio
|
|
import gc
|
|
import json
|
|
import network
|
|
import espnow
|
|
from presets import Presets
|
|
from controller_messages import apply_startup_pattern, process_data
|
|
from espnow_transport import _handle_packet, init_espnow
|
|
from espnow_wire import BROADCAST_MAC, WIRE_MAGIC
|
|
|
|
wdt = machine.WDT(timeout=10000)
|
|
wdt.feed()
|
|
|
|
machine.freq(160000000)
|
|
|
|
settings = Settings()
|
|
gc.collect()
|
|
|
|
presets = Presets(settings["led_pin"], settings["num_leds"])
|
|
presets.load(settings)
|
|
presets.b = settings.get("brightness", 255)
|
|
presets.debug = bool(settings.get("debug", False))
|
|
gc.collect()
|
|
|
|
apply_startup_pattern(settings, presets)
|
|
|
|
esp = init_espnow(settings)
|
|
print(network.WLAN(network.STA_IF).config("channel"))
|
|
|
|
hello = json.dumps({
|
|
"v": "1",
|
|
"name": settings.get("name", "led"),
|
|
"type": "led",
|
|
})
|
|
try:
|
|
esp.send(BROADCAST_MAC, hello)
|
|
print("espnow hello", len(hello), "B")
|
|
except Exception as e:
|
|
print("espnow hello failed:", e)
|
|
|
|
|
|
def _on_espnow_message(msg):
|
|
if not msg:
|
|
return
|
|
if msg[0] == WIRE_MAGIC:
|
|
_handle_packet(msg, settings, presets)
|
|
return
|
|
if msg[0:1] == b"{":
|
|
process_data(msg, settings, presets)
|
|
|
|
|
|
async def main():
|
|
while True:
|
|
presets.tick()
|
|
wdt.feed()
|
|
if esp.any():
|
|
host, msg = esp.recv(0)
|
|
if host and msg:
|
|
print(host, len(msg), "B")
|
|
try:
|
|
_on_espnow_message(msg)
|
|
except Exception as e:
|
|
print("espnow rx error:", e)
|
|
await asyncio.sleep(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|