Files
led-controller/esp32/main.py

72 lines
2.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Serial-to-ESP-NOW bridge: receives from Pi on UART, forwards to ESP-NOW peers.
# Wire format: first 6 bytes = destination MAC, rest = payload. Address is always 6 bytes.
from machine import Pin, UART
import espnow
import network
import time
UART_BAUD = 912000
BROADCAST = b"\xff\xff\xff\xff\xff\xff"
MAX_PEERS = 20
# Match led-driver / controller default settings wifi_channel (111)
WIFI_CHANNEL = 6
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(pm=network.WLAN.PM_NONE, channel=WIFI_CHANNEL)
print("WiFi STA channel:", sta.config("channel"), "(WIFI_CHANNEL=%s)" % WIFI_CHANNEL)
esp = espnow.ESPNow()
esp.active(True)
esp.add_peer(BROADCAST)
uart = UART(1, UART_BAUD, tx=Pin(21), rx=Pin(6))
# Track last send time per peer for LRU eviction (remove oldest when at limit).
last_used = {BROADCAST: time.ticks_ms()}
# ESP_ERR_ESPNOW_EXIST: peer already registered (ignore when adding).
ESP_ERR_ESPNOW_EXIST = -12395
def ensure_peer(addr):
"""Ensure addr is in the peer list. When at 20 peers, remove the oldest-used (LRU)."""
peers = esp.get_peers()
peer_macs = [p[0] for p in peers]
if addr in peer_macs:
return
if len(peer_macs) >= MAX_PEERS:
# Remove the peer we used least recently (oldest).
oldest_mac = None
oldest_ts = time.ticks_ms()
for mac in peer_macs:
if mac == BROADCAST:
continue
ts = last_used.get(mac, 0)
if ts <= oldest_ts:
oldest_ts = ts
oldest_mac = mac
if oldest_mac is not None:
esp.del_peer(oldest_mac)
last_used.pop(oldest_mac, None)
try:
esp.add_peer(addr)
except OSError as e:
if e.args[0] != ESP_ERR_ESPNOW_EXIST:
raise
print("Starting ESP32 main.py")
while True:
if uart.any():
data = uart.read()
if not data or len(data) < 6:
continue
print(f"Received data: {data}")
addr = data[:6]
payload = data[6:]
ensure_peer(addr)
esp.send(addr, payload)
last_used[addr] = time.ticks_ms()