# Serial-to-ESP-NOW bridge: JSON in both directions on UART + ESP-NOW. # # Pi → UART (two supported forms): # A) Legacy: 6 bytes destination MAC + UTF-8 JSON payload (one write = one frame). # B) Newline JSON: one object per line, UTF-8, ending with \n # - Multicast via ESP32: {"m":"split","peers":["12hex",...],"body":{...}} # - Unicast / broadcast: {"to":"12hex","v":"1",...} (all keys except to/dest go to peers) # # ESP-NOW → Pi: newline-delimited JSON, one object per packet: # {"dir":"espnow_rx","from":"<12hex>","payload":{...}} if body was JSON # {"dir":"espnow_rx","from":"<12hex>","payload_text":"..."} if UTF-8 not JSON # {"dir":"espnow_rx","from":"<12hex>","payload_b64":"..."} if binary from machine import Pin, UART import espnow import json import network import time import ubinascii UART_BAUD = 912000 BROADCAST = b"\xff\xff\xff\xff\xff\xff" MAX_PEERS = 20 WIFI_CHANNEL = 6 sta = network.WLAN(network.STA_IF) sta.active(True) sta.config(pm=network.WLAN.PM_NONE, channel=WIFI_CHANNEL) print("WiFi STA channel:", sta.config("channel"), "(WIFI_CHANNEL=%s)" % WIFI_CHANNEL) esp = espnow.ESPNow() esp.active(True) esp.add_peer(BROADCAST) uart = UART(1, UART_BAUD, tx=Pin(21), rx=Pin(6)) last_used = {BROADCAST: time.ticks_ms()} uart_rx_buf = b"" ESP_ERR_ESPNOW_EXIST = -12395 def ensure_peer(addr): peers = esp.get_peers() peer_macs = [p[0] for p in peers] if addr in peer_macs: return if len(peer_macs) >= MAX_PEERS: oldest_mac = None oldest_ts = time.ticks_ms() for mac in peer_macs: if mac == BROADCAST: continue ts = last_used.get(mac, 0) if ts <= oldest_ts: oldest_ts = ts oldest_mac = mac if oldest_mac is not None: esp.del_peer(oldest_mac) last_used.pop(oldest_mac, None) try: esp.add_peer(addr) except OSError as e: if e.args[0] != ESP_ERR_ESPNOW_EXIST: raise def try_apply_bridge_config(obj): """Pi sends {"m":"bridge","ch":1..11} — set STA channel only; do not ESP-NOW forward.""" if not isinstance(obj, dict) or obj.get("m") != "bridge": return False ch = obj.get("ch") if ch is None: ch = obj.get("wifi_channel") if ch is None: return True try: n = int(ch) if 1 <= n <= 11: sta.config(pm=network.WLAN.PM_NONE, channel=n) print("Bridge STA channel ->", n) except Exception as e: print("bridge config:", e) return True def send_split_from_obj(obj): """obj has m=split, peers=[12hex,...], body=dict.""" body = obj.get("body") if body is None: return try: out = json.dumps(body).encode("utf-8") except (TypeError, ValueError): return for peer in obj.get("peers") or []: if not isinstance(peer, str) or len(peer) != 12: continue try: mac = bytes.fromhex(peer) except ValueError: continue if len(mac) != 6: continue ensure_peer(mac) esp.send(mac, out) last_used[mac] = time.ticks_ms() def process_broadcast_payload_split_or_flood(payload): try: text = payload.decode("utf-8") obj = json.loads(text) except Exception: obj = None if isinstance(obj, dict) and try_apply_bridge_config(obj): return if ( isinstance(obj, dict) and obj.get("m") == "split" and isinstance(obj.get("peers"), list) ): send_split_from_obj(obj) return ensure_peer(BROADCAST) esp.send(BROADCAST, payload) last_used[BROADCAST] = time.ticks_ms() def process_legacy_uart_frame(data): if not data or len(data) < 6: return addr = data[:6] payload = data[6:] if addr == BROADCAST: process_broadcast_payload_split_or_flood(payload) return ensure_peer(addr) esp.send(addr, payload) last_used[addr] = time.ticks_ms() def handle_json_command_line(obj): if not isinstance(obj, dict): return if try_apply_bridge_config(obj): return if obj.get("m") == "split" and isinstance(obj.get("peers"), list): send_split_from_obj(obj) return to = obj.get("to") or obj.get("dest") if isinstance(to, str) and len(to) == 12: try: mac = bytes.fromhex(to) except ValueError: return if len(mac) != 6: return body = {k: v for k, v in obj.items() if k not in ("to", "dest")} if not body: return try: out = json.dumps(body).encode("utf-8") except (TypeError, ValueError): return ensure_peer(mac) esp.send(mac, out) last_used[mac] = time.ticks_ms() def drain_uart_json_lines(): """Parse leading newline-delimited JSON objects from uart_rx_buf; leave rest.""" global uart_rx_buf while True: s = uart_rx_buf.lstrip() if not s: uart_rx_buf = b"" return if s[0] != ord("{"): uart_rx_buf = s return nl = s.find(b"\n") if nl < 0: uart_rx_buf = s return line = s[:nl].strip() uart_rx_buf = s[nl + 1 :] if line: try: text = line.decode("utf-8") obj = json.loads(text) handle_json_command_line(obj) except Exception as e: print("UART JSON line error:", e) # continue; there may be another JSON line in buffer def drain_uart_legacy_frame(): """If buffer does not start with '{', treat whole buffer as one 6-byte MAC + JSON frame.""" global uart_rx_buf s = uart_rx_buf if not s or s[0] == ord("{"): return if len(s) < 6: return data = s uart_rx_buf = b"" process_legacy_uart_frame(data) def forward_espnow_to_uart(mac, msg): peer_hex = ubinascii.hexlify(mac).decode() try: text = msg.decode("utf-8") try: payload = json.loads(text) line_obj = {"dir": "espnow_rx", "from": peer_hex, "payload": payload} except ValueError: line_obj = {"dir": "espnow_rx", "from": peer_hex, "payload_text": text} except UnicodeDecodeError: line_obj = { "dir": "espnow_rx", "from": peer_hex, "payload_b64": ubinascii.b64encode(msg).decode(), } try: line = json.dumps(line_obj) + "\n" uart.write(line.encode("utf-8")) except Exception as e: print("UART TX error:", e) print("Starting ESP32 bridge (UART JSON + legacy MAC+JSON, ESP-NOW RX → UART JSON lines)") while True: idle = True if uart.any(): idle = False uart_rx_buf += uart.read() drain_uart_json_lines() drain_uart_legacy_frame() try: peer, msg = esp.recv(0) except OSError: peer, msg = None, None if peer is not None and msg is not None: idle = False if len(peer) == 6: forward_espnow_to_uart(peer, msg) if idle: time.sleep_ms(1)