- Run app on Raspberry Pi: serial to ESP32 bridge at 912000 baud, /dev/ttyS0 - Remove ESP-NOW/MicroPython-only code from src (espnow, p2p, wifi, machine/Pin) - Transport: always send 6-byte MAC + payload; optional to/destination_mac in API and WebSocket - Settings and model DB use project paths (no root); fix sys.print_exception for CPython - Preset/settings controllers use get_current_sender(); template paths for cwd=src - Pipfile: run from src, PORT from env; scripts for port 80 (setcap) and test - ESP32 bridge: receive 6-byte addr + payload, LRU peer management (20 max), handle ESP_ERR_ESPNOW_EXIST - Add esp32/main.py, esp32/benchmark_peers.py, scripts/setup-port80.sh, scripts/test-port80.sh Made-with: Cursor
63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
# Serial-to-ESP-NOW bridge: receives from Pi on UART, forwards to ESP-NOW peers.
|
|
# Wire format: first 6 bytes = destination MAC, rest = payload. Address is always 6 bytes.
|
|
from machine import Pin, UART
|
|
import espnow
|
|
import network
|
|
import time
|
|
|
|
UART_BAUD = 912000
|
|
BROADCAST = b"\xff\xff\xff\xff\xff\xff"
|
|
MAX_PEERS = 20
|
|
|
|
network.WLAN(network.STA_IF).active(True)
|
|
esp = espnow.ESPNow()
|
|
esp.active(True)
|
|
esp.add_peer(BROADCAST)
|
|
|
|
uart = UART(1, UART_BAUD, tx=Pin(21), rx=Pin(6))
|
|
|
|
# Track last send time per peer for LRU eviction (remove oldest when at limit).
|
|
last_used = {BROADCAST: time.ticks_ms()}
|
|
|
|
|
|
# ESP_ERR_ESPNOW_EXIST: peer already registered (ignore when adding).
|
|
ESP_ERR_ESPNOW_EXIST = -12395
|
|
|
|
|
|
def ensure_peer(addr):
|
|
"""Ensure addr is in the peer list. When at 20 peers, remove the oldest-used (LRU)."""
|
|
peers = esp.get_peers()
|
|
peer_macs = [p[0] for p in peers]
|
|
if addr in peer_macs:
|
|
return
|
|
if len(peer_macs) >= MAX_PEERS:
|
|
# Remove the peer we used least recently (oldest).
|
|
oldest_mac = None
|
|
oldest_ts = time.ticks_ms()
|
|
for mac in peer_macs:
|
|
if mac == BROADCAST:
|
|
continue
|
|
ts = last_used.get(mac, 0)
|
|
if ts <= oldest_ts:
|
|
oldest_ts = ts
|
|
oldest_mac = mac
|
|
if oldest_mac is not None:
|
|
esp.del_peer(oldest_mac)
|
|
last_used.pop(oldest_mac, None)
|
|
try:
|
|
esp.add_peer(addr)
|
|
except OSError as e:
|
|
if e.args[0] != ESP_ERR_ESPNOW_EXIST:
|
|
raise
|
|
|
|
|
|
while True:
|
|
if uart.any():
|
|
data = uart.read()
|
|
if not data or len(data) < 6:
|
|
continue
|
|
addr = data[:6]
|
|
payload = data[6:]
|
|
ensure_peer(addr)
|
|
esp.send(addr, payload)
|
|
last_used[addr] = time.ticks_ms() |