diff --git a/docs/API.md b/docs/API.md index 54576a4..9e7c803 100644 --- a/docs/API.md +++ b/docs/API.md @@ -15,8 +15,8 @@ All JSON APIs use `Content-Type: application/json` for bodies and responses unle The main UI has two modes controlled by the mode toggle: -- **Run mode**: optimized for operation (tab/preset selection, profile apply, and **Devices** registry for LED driver names/MACs). -- **Edit mode**: shows editing/management controls (tabs, presets, patterns, colour palette, send presets, profile management actions, and related tools). +- **Run mode**: optimized for operation (tab/preset selection and profile apply). +- **Edit mode**: shows editing/management controls (tabs, presets, patterns, colour palette, send presets, profile management actions, **Devices** registry for LED driver names/MACs, and related tools). Profiles are available in both modes, but behavior differs: diff --git a/esp32/main.py b/esp32/main.py index 8126444..da091db 100644 --- a/esp32/main.py +++ b/esp32/main.py @@ -1,14 +1,25 @@ -# Serial-to-ESP-NOW bridge: receives from Pi on UART, forwards to ESP-NOW peers. -# Wire format: first 6 bytes = destination MAC, rest = payload. Address is always 6 bytes. +# Serial-to-ESP-NOW bridge: JSON in both directions on UART + ESP-NOW. +# +# Pi → UART (two supported forms): +# A) Legacy: 6 bytes destination MAC + UTF-8 JSON payload (one write = one frame). +# B) Newline JSON: one object per line, UTF-8, ending with \n +# - Multicast via ESP32: {"m":"split","peers":["12hex",...],"body":{...}} +# - Unicast / broadcast: {"to":"12hex","v":"1",...} (all keys except to/dest go to peers) +# +# ESP-NOW → Pi: newline-delimited JSON, one object per packet: +# {"dir":"espnow_rx","from":"<12hex>","payload":{...}} if body was JSON +# {"dir":"espnow_rx","from":"<12hex>","payload_text":"..."} if UTF-8 not JSON +# {"dir":"espnow_rx","from":"<12hex>","payload_b64":"..."} if binary from machine import Pin, UART import espnow +import json import network import time +import ubinascii UART_BAUD = 912000 BROADCAST = b"\xff\xff\xff\xff\xff\xff" MAX_PEERS = 20 -# Match led-driver / controller default settings wifi_channel (1–11) WIFI_CHANNEL = 6 sta = network.WLAN(network.STA_IF) @@ -22,22 +33,18 @@ esp.add_peer(BROADCAST) uart = UART(1, UART_BAUD, tx=Pin(21), rx=Pin(6)) -# Track last send time per peer for LRU eviction (remove oldest when at limit). last_used = {BROADCAST: time.ticks_ms()} +uart_rx_buf = b"" - -# ESP_ERR_ESPNOW_EXIST: peer already registered (ignore when adding). ESP_ERR_ESPNOW_EXIST = -12395 def ensure_peer(addr): - """Ensure addr is in the peer list. When at 20 peers, remove the oldest-used (LRU).""" peers = esp.get_peers() peer_macs = [p[0] for p in peers] if addr in peer_macs: return if len(peer_macs) >= MAX_PEERS: - # Remove the peer we used least recently (oldest). oldest_mac = None oldest_ts = time.ticks_ms() for mac in peer_macs: @@ -57,16 +64,190 @@ def ensure_peer(addr): raise -print("Starting ESP32 main.py") +def try_apply_bridge_config(obj): + """Pi sends {"m":"bridge","ch":1..11} — set STA channel only; do not ESP-NOW forward.""" + if not isinstance(obj, dict) or obj.get("m") != "bridge": + return False + ch = obj.get("ch") + if ch is None: + ch = obj.get("wifi_channel") + if ch is None: + return True + try: + n = int(ch) + if 1 <= n <= 11: + sta.config(pm=network.WLAN.PM_NONE, channel=n) + print("Bridge STA channel ->", n) + except Exception as e: + print("bridge config:", e) + return True + + +def send_split_from_obj(obj): + """obj has m=split, peers=[12hex,...], body=dict.""" + body = obj.get("body") + if body is None: + return + try: + out = json.dumps(body).encode("utf-8") + except (TypeError, ValueError): + return + for peer in obj.get("peers") or []: + if not isinstance(peer, str) or len(peer) != 12: + continue + try: + mac = bytes.fromhex(peer) + except ValueError: + continue + if len(mac) != 6: + continue + ensure_peer(mac) + esp.send(mac, out) + last_used[mac] = time.ticks_ms() + + +def process_broadcast_payload_split_or_flood(payload): + try: + text = payload.decode("utf-8") + obj = json.loads(text) + except Exception: + obj = None + if isinstance(obj, dict) and try_apply_bridge_config(obj): + return + if ( + isinstance(obj, dict) + and obj.get("m") == "split" + and isinstance(obj.get("peers"), list) + ): + send_split_from_obj(obj) + return + ensure_peer(BROADCAST) + esp.send(BROADCAST, payload) + last_used[BROADCAST] = time.ticks_ms() + + +def process_legacy_uart_frame(data): + if not data or len(data) < 6: + return + addr = data[:6] + payload = data[6:] + if addr == BROADCAST: + process_broadcast_payload_split_or_flood(payload) + return + ensure_peer(addr) + esp.send(addr, payload) + last_used[addr] = time.ticks_ms() + + +def handle_json_command_line(obj): + if not isinstance(obj, dict): + return + if try_apply_bridge_config(obj): + return + if obj.get("m") == "split" and isinstance(obj.get("peers"), list): + send_split_from_obj(obj) + return + to = obj.get("to") or obj.get("dest") + if isinstance(to, str) and len(to) == 12: + try: + mac = bytes.fromhex(to) + except ValueError: + return + if len(mac) != 6: + return + body = {k: v for k, v in obj.items() if k not in ("to", "dest")} + if not body: + return + try: + out = json.dumps(body).encode("utf-8") + except (TypeError, ValueError): + return + ensure_peer(mac) + esp.send(mac, out) + last_used[mac] = time.ticks_ms() + + +def drain_uart_json_lines(): + """Parse leading newline-delimited JSON objects from uart_rx_buf; leave rest.""" + global uart_rx_buf + while True: + s = uart_rx_buf.lstrip() + if not s: + uart_rx_buf = b"" + return + if s[0] != ord("{"): + uart_rx_buf = s + return + nl = s.find(b"\n") + if nl < 0: + uart_rx_buf = s + return + line = s[:nl].strip() + uart_rx_buf = s[nl + 1 :] + if line: + try: + text = line.decode("utf-8") + obj = json.loads(text) + handle_json_command_line(obj) + except Exception as e: + print("UART JSON line error:", e) + # continue; there may be another JSON line in buffer + + +def drain_uart_legacy_frame(): + """If buffer does not start with '{', treat whole buffer as one 6-byte MAC + JSON frame.""" + global uart_rx_buf + s = uart_rx_buf + if not s or s[0] == ord("{"): + return + if len(s) < 6: + return + data = s + uart_rx_buf = b"" + process_legacy_uart_frame(data) + + +def forward_espnow_to_uart(mac, msg): + peer_hex = ubinascii.hexlify(mac).decode() + try: + text = msg.decode("utf-8") + try: + payload = json.loads(text) + line_obj = {"dir": "espnow_rx", "from": peer_hex, "payload": payload} + except ValueError: + line_obj = {"dir": "espnow_rx", "from": peer_hex, "payload_text": text} + except UnicodeDecodeError: + line_obj = { + "dir": "espnow_rx", + "from": peer_hex, + "payload_b64": ubinascii.b64encode(msg).decode(), + } + try: + line = json.dumps(line_obj) + "\n" + uart.write(line.encode("utf-8")) + except Exception as e: + print("UART TX error:", e) + + +print("Starting ESP32 bridge (UART JSON + legacy MAC+JSON, ESP-NOW RX → UART JSON lines)") while True: + idle = True if uart.any(): - data = uart.read() - if not data or len(data) < 6: - continue - print(f"Received data: {data}") - addr = data[:6] - payload = data[6:] - ensure_peer(addr) - esp.send(addr, payload) - last_used[addr] = time.ticks_ms() \ No newline at end of file + idle = False + uart_rx_buf += uart.read() + drain_uart_json_lines() + drain_uart_legacy_frame() + + try: + peer, msg = esp.recv(0) + except OSError: + peer, msg = None, None + + if peer is not None and msg is not None: + idle = False + if len(peer) == 6: + forward_espnow_to_uart(peer, msg) + + if idle: + time.sleep_ms(1) diff --git a/led-driver b/led-driver index 7e3aca4..cef9e00 160000 --- a/led-driver +++ b/led-driver @@ -1 +1 @@ -Subproject commit 7e3aca491cc0c0995628712709dd6b2d7d2deccb +Subproject commit cef9e00819dc1a95f088a6d2ee8538a6f83552b0 diff --git a/src/controllers/device.py b/src/controllers/device.py index 3aa9d39..48834dd 100644 --- a/src/controllers/device.py +++ b/src/controllers/device.py @@ -5,29 +5,104 @@ from models.device import ( validate_device_transport, validate_device_type, ) +from models.transport import get_current_sender +from models.tcp_clients import ( + normalize_tcp_peer_ip, + send_json_line_to_ip, + tcp_client_connected, +) +from util.espnow_message import build_message +import asyncio import json +# Ephemeral driver preset name (never written to Pi preset store; ``save`` not set on wire). +_IDENTIFY_PRESET_KEY = "__identify" + +# Short-key payload: 10 Hz full cycle = 50 ms on + 50 ms off (driver ``blink`` toggles each ``d`` ms). +_IDENTIFY_DRIVER_PRESET = { + "p": "blink", + "c": ["#ff0000"], + "d": 50, + "b": 128, + "a": True, + "n1": 0, + "n2": 0, + "n3": 0, + "n4": 0, + "n5": 0, + "n6": 0, +} + + +def _compact_v1_json(*, presets=None, select=None, save=False): + """Single-line v1 object; compact so serial/ESP-NOW stays small.""" + body = {"v": "1"} + if presets is not None: + body["presets"] = presets + if save: + body["save"] = True + if select is not None: + body["select"] = select + return json.dumps(body, separators=(",", ":")) + +# Seconds after identify blink before selecting built-in ``off`` (tests may monkeypatch). +IDENTIFY_OFF_DELAY_S = 2.0 + controller = Microdot() devices = Device() +def _device_live_connected(dev_dict): + """ + Wi-Fi: whether a TCP client is registered for this device's address (IP). + ESP-NOW: None (no TCP session on the Pi for that transport). + """ + tr = (dev_dict.get("transport") or "espnow").strip().lower() + if tr != "wifi": + return None + ip = normalize_tcp_peer_ip(dev_dict.get("address") or "") + if not ip: + return False + return tcp_client_connected(ip) + + +def _device_json_with_live_status(dev_dict): + row = dict(dev_dict) + row["connected"] = _device_live_connected(dev_dict) + return row + + +async def _identify_send_off_after_delay(sender, transport, wifi_ip, dev_id, name): + try: + await asyncio.sleep(IDENTIFY_OFF_DELAY_S) + off_msg = build_message(select={name: ["off"]}) + if transport == "wifi": + await send_json_line_to_ip(wifi_ip, off_msg) + else: + await sender.send(off_msg, addr=dev_id) + except Exception: + pass + + @controller.get("") async def list_devices(request): - """List all devices.""" + """List all devices (includes ``connected`` for live Wi-Fi TCP presence).""" devices_data = {} for dev_id in devices.list(): d = devices.read(dev_id) if d: - devices_data[dev_id] = d + devices_data[dev_id] = _device_json_with_live_status(d) return json.dumps(devices_data), 200, {"Content-Type": "application/json"} @controller.get("/") async def get_device(request, id): - """Get a device by ID.""" + """Get a device by ID (includes ``connected`` for live Wi-Fi TCP presence).""" dev = devices.read(id) if dev: - return json.dumps(dev), 200, {"Content-Type": "application/json"} + return json.dumps(_device_json_with_live_status(dev)), 200, { + "Content-Type": "application/json", + } return json.dumps({"error": "Device not found"}), 404, { "Content-Type": "application/json", } @@ -91,6 +166,7 @@ async def update_device(request, id): data = dict(raw) data.pop("id", None) data.pop("addresses", None) + data.pop("connected", None) if "name" in data: n = (data.get("name") or "").strip() if not n: @@ -127,3 +203,59 @@ async def delete_device(request, id): return json.dumps({"error": "Device not found"}), 404, { "Content-Type": "application/json", } + + +@controller.post("//identify") +async def identify_device(request, id): + """ + One v1 JSON object: ``presets.__identify`` (``d``=50 ms → 10 Hz blink) plus ``select`` for + this device name — same combined shape as profile sends the driver already accepts over TCP + / ESP-NOW. No ``save``. After ``IDENTIFY_OFF_DELAY_S``, a background task selects ``off``. + """ + dev = devices.read(id) + if not dev: + return json.dumps({"error": "Device not found"}), 404, { + "Content-Type": "application/json", + } + sender = get_current_sender() + if not sender: + return json.dumps({"error": "Transport not configured"}), 503, { + "Content-Type": "application/json", + } + name = str(dev.get("name") or "").strip() + if not name: + return json.dumps({"error": "Device must have a name to identify"}), 400, { + "Content-Type": "application/json", + } + + transport = dev.get("transport") or "espnow" + wifi_ip = None + if transport == "wifi": + wifi_ip = dev.get("address") + if not wifi_ip: + return json.dumps({"error": "Device has no IP address"}), 400, { + "Content-Type": "application/json", + } + + try: + msg = _compact_v1_json( + presets={_IDENTIFY_PRESET_KEY: dict(_IDENTIFY_DRIVER_PRESET)}, + select={name: [_IDENTIFY_PRESET_KEY]}, + ) + if transport == "wifi": + ok = await send_json_line_to_ip(wifi_ip, msg) + if not ok: + return json.dumps({"error": "Wi-Fi driver not connected"}), 503, { + "Content-Type": "application/json", + } + else: + await sender.send(msg, addr=id) + + asyncio.create_task( + _identify_send_off_after_delay(sender, transport, wifi_ip, id, name) + ) + except Exception as e: + return json.dumps({"error": str(e)}), 503, {"Content-Type": "application/json"} + return json.dumps({"message": "Identify sent"}), 200, { + "Content-Type": "application/json", + } diff --git a/src/controllers/preset.py b/src/controllers/preset.py index 8fd142b..599f976 100644 --- a/src/controllers/preset.py +++ b/src/controllers/preset.py @@ -2,9 +2,10 @@ from microdot import Microdot from microdot.session import with_session from models.preset import Preset from models.profile import Profile +from models.device import Device, normalize_mac from models.transport import get_current_sender +from util.driver_delivery import deliver_json_messages, deliver_preset_broadcast_then_per_device from util.espnow_message import build_message, build_preset_dict -import asyncio import json controller = Microdot() @@ -125,13 +126,17 @@ async def delete_preset(request, *args, **kwargs): @with_session async def send_presets(request, session): """ - Send one or more presets to the LED driver (via serial transport). + Send one or more presets to LED drivers (serial/ESP-NOW and/or TCP Wi-Fi clients). Body JSON: {"preset_ids": ["1", "2", ...]} or {"ids": ["1", "2", ...]} + Optional "targets": ["aabbccddeeff", ...] — registry MACs. When set: preset + chunks are ESP-NOW broadcast once each; Wi-Fi drivers get the same chunks + over TCP; if "default" is set, each target then gets a unicast default + message (serial or TCP) with that device name in "targets". + Omit targets for broadcast-only serial (legacy). - The controller looks up each preset, converts to API format, chunks into - <= 240-byte messages, and sends them over the configured transport. + Optional "destination_mac" / "to": single MAC when targets is omitted. """ try: data = request.json or {} @@ -144,7 +149,6 @@ async def send_presets(request, session): save_flag = data.get('save', True) save_flag = bool(save_flag) default_id = data.get('default') - # Optional 12-char hex MAC to send to one device; omit for default (e.g. broadcast). destination_mac = data.get('destination_mac') or data.get('to') # Build API-compliant preset map keyed by preset ID, include name @@ -171,23 +175,13 @@ async def send_presets(request, session): if not sender: return json.dumps({"error": "Transport not configured"}), 503, {'Content-Type': 'application/json'} - async def send_chunk(chunk_presets, is_last): - # Save/default should only be sent with the final presets chunk. - msg = build_message( - presets=chunk_presets, - save=save_flag and is_last, - default=default_id if is_last else None, - ) - await sender.send(msg, addr=destination_mac) - MAX_BYTES = 240 send_delay_s = 0.1 entries = list(presets_by_name.items()) total_presets = len(entries) - messages_sent = 0 batch = {} - last_msg = None + chunk_messages = [] for name, preset_obj in entries: test_batch = dict(batch) test_batch[name] = preset_obj @@ -196,28 +190,133 @@ async def send_presets(request, session): if size <= MAX_BYTES or not batch: batch = test_batch - last_msg = test_msg else: - try: - await send_chunk(batch, False) - except Exception: - return json.dumps({"error": "Send failed"}), 503, {'Content-Type': 'application/json'} - await asyncio.sleep(send_delay_s) - messages_sent += 1 + chunk_messages.append( + build_message( + presets=dict(batch), + save=False, + default=None, + ) + ) batch = {name: preset_obj} - last_msg = build_message(presets=batch, save=save_flag, default=default_id) if batch: - try: - await send_chunk(batch, True) - except Exception: - return json.dumps({"error": "Send failed"}), 503, {'Content-Type': 'application/json'} - await asyncio.sleep(send_delay_s) - messages_sent += 1 + chunk_messages.append( + build_message( + presets=dict(batch), + save=save_flag, + default=default_id, + ) + ) + + target_list = None + raw_targets = data.get("targets") + if isinstance(raw_targets, list) and raw_targets: + target_list = [] + for t in raw_targets: + m = normalize_mac(str(t)) + if m: + target_list.append(m) + target_list = list(dict.fromkeys(target_list)) + if not target_list: + target_list = None + elif destination_mac: + dm = normalize_mac(str(destination_mac)) + target_list = [dm] if dm else None + + try: + if target_list: + deliveries = await deliver_preset_broadcast_then_per_device( + sender, + chunk_messages, + target_list, + Device(), + str(default_id) if default_id is not None else None, + delay_s=send_delay_s, + ) + else: + deliveries, _chunks = await deliver_json_messages( + sender, + chunk_messages, + None, + Device(), + delay_s=send_delay_s, + ) + except Exception: + return json.dumps({"error": "Send failed"}), 503, {'Content-Type': 'application/json'} return json.dumps({ "message": "Presets sent", "presets_sent": total_presets, - "messages_sent": messages_sent + "messages_sent": deliveries, + }), 200, {'Content-Type': 'application/json'} + + +@controller.post('/push') +@with_session +async def push_driver_messages(request, session): + """ + Deliver one or more raw v1 JSON objects to devices (ESP-NOW and/or TCP). + + Body: + {"sequence": [{ "v": "1", ... }, ...], "targets": ["mac", ...]} + or a single {"payload": {...}, "targets": [...]}. + """ + try: + data = request.json or {} + except Exception: + return json.dumps({"error": "Invalid JSON"}), 400, {'Content-Type': 'application/json'} + + seq = data.get("sequence") + if not seq and data.get("payload") is not None: + seq = [data["payload"]] + if not isinstance(seq, list) or not seq: + return json.dumps({"error": "sequence or payload required"}), 400, {'Content-Type': 'application/json'} + + raw_targets = data.get("targets") + target_list = None + if isinstance(raw_targets, list) and raw_targets: + target_list = [] + for t in raw_targets: + m = normalize_mac(str(t)) + if m: + target_list.append(m) + target_list = list(dict.fromkeys(target_list)) + if not target_list: + target_list = None + + sender = get_current_sender() + if not sender: + return json.dumps({"error": "Transport not configured"}), 503, {'Content-Type': 'application/json'} + + messages = [] + for item in seq: + if isinstance(item, dict): + messages.append(json.dumps(item)) + elif isinstance(item, str): + messages.append(item) + else: + return json.dumps({"error": "sequence items must be objects or strings"}), 400, {'Content-Type': 'application/json'} + + delay_s = data.get("delay_s", 0.05) + try: + delay_s = float(delay_s) + except (TypeError, ValueError): + delay_s = 0.05 + + try: + deliveries, _chunks = await deliver_json_messages( + sender, + messages, + target_list, + Device(), + delay_s=delay_s, + ) + except Exception: + return json.dumps({"error": "Send failed"}), 503, {'Content-Type': 'application/json'} + + return json.dumps({ + "message": "Delivered", + "deliveries": deliveries, }), 200, {'Content-Type': 'application/json'} diff --git a/src/main.py b/src/main.py index c2da8bc..8c9fb05 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,8 @@ import asyncio +import errno import json import os +import socket import threading import traceback from microdot import Microdot, send_file @@ -20,15 +22,101 @@ import controllers.settings as settings_controller import controllers.device as device_controller from models.transport import get_sender, set_sender, get_current_sender from models.device import Device, normalize_mac +from models import tcp_clients as tcp_client_registry +from util.device_status_broadcaster import ( + broadcast_device_tcp_snapshot_to, + broadcast_device_tcp_status, + register_device_status_ws, + unregister_device_status_ws, +) _tcp_device_lock = threading.Lock() +# Wi-Fi drivers send one hello line then stay quiet; periodic outbound data makes dead peers +# fail drain() within this interval (keepalive alone is often slow or ineffective). +TCP_LIVENESS_PING_INTERVAL_S = 12.0 -def _register_tcp_device_sync(device_name: str, peer_ip: str, mac) -> None: +# Keepalive or lossy Wi-Fi can still surface OSError(110) / TimeoutError on recv or wait_closed. +_TCP_PEER_GONE = ( + BrokenPipeError, + ConnectionResetError, + ConnectionAbortedError, + ConnectionRefusedError, + TimeoutError, + OSError, +) + + +def _tcp_socket_from_writer(writer): + sock = writer.get_extra_info("socket") + if sock is not None: + return sock + transport = getattr(writer, "transport", None) + if transport is not None: + return transport.get_extra_info("socket") + return None + + +def _enable_tcp_keepalive(writer) -> None: + """ + Detect vanished peers (power off, Wi-Fi drop) without waiting for a send() failure. + Linux: shorten time before the first keepalive probe; other platforms: SO_KEEPALIVE only. + """ + sock = _tcp_socket_from_writer(writer) + if sock is None: + return + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + except OSError: + return + if hasattr(socket, "TCP_KEEPIDLE"): + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 120) + except OSError: + pass + if hasattr(socket, "TCP_KEEPINTVL"): + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) + except OSError: + pass + if hasattr(socket, "TCP_KEEPCNT"): + try: + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) + except OSError: + pass + # Do not set TCP_USER_TIMEOUT: a short value causes Errno 110 on recv for Wi-Fi peers + # when ACKs are delayed (ESP power save, lossy links). Liveness pings already clear dead + # sessions via drain(). + + +async def _tcp_liveness_ping_loop(writer, peer_ip: str) -> None: + """Send a bare newline so ``drain()`` fails soon after the peer disappears.""" + while True: + await asyncio.sleep(TCP_LIVENESS_PING_INTERVAL_S) + if writer.is_closing(): + return + try: + writer.write(b"\n") + await writer.drain() + except Exception as exc: + print(f"[TCP] liveness ping failed {peer_ip!r}: {exc!r}") + tcp_client_registry.unregister_tcp_writer(peer_ip, writer) + try: + writer.close() + except Exception: + pass + return + + +def _register_tcp_device_sync( + device_name: str, peer_ip: str, mac, device_type=None +) -> None: with _tcp_device_lock: try: d = Device() - did = d.upsert_wifi_tcp_client(device_name, peer_ip, mac) + did = d.upsert_wifi_tcp_client( + device_name, peer_ip, mac, device_type=device_type + ) if did: print( f"TCP device registered: mac={did} name={device_name!r} ip={peer_ip!r}" @@ -44,11 +132,21 @@ async def _handle_tcp_client(reader, writer): peer_ip = peer[0] if peer else "" peer_label = f"{peer_ip}:{peer[1]}" if peer and len(peer) > 1 else peer_ip or "?" print(f"[TCP] client connected {peer_label}") + _enable_tcp_keepalive(writer) + tcp_client_registry.register_tcp_writer(peer_ip, writer) + ping_task = asyncio.create_task(_tcp_liveness_ping_loop(writer, peer_ip)) sender = get_current_sender() buf = b"" try: while True: - chunk = await reader.read(4096) + try: + chunk = await reader.read(4096) + except asyncio.CancelledError: + raise + except _TCP_PEER_GONE as e: + print(f"[TCP] read ended ({peer_label}): {e!r}") + tcp_client_registry.unregister_tcp_writer(peer_ip, writer) + break if not chunk: break buf += chunk @@ -77,8 +175,11 @@ async def _handle_tcp_client(reader, writer): if isinstance(parsed, dict): dns = str(parsed.get("device_name") or "").strip() mac = parsed.get("mac") or parsed.get("device_mac") or parsed.get("sta_mac") + device_type = parsed.get("type") or parsed.get("device_type") if dns and normalize_mac(mac): - _register_tcp_device_sync(dns, peer_ip, mac) + _register_tcp_device_sync( + dns, peer_ip, mac, device_type=device_type + ) addr = parsed.pop("to", None) payload = json.dumps(parsed) if parsed else "{}" if sender: @@ -92,12 +193,40 @@ async def _handle_tcp_client(reader, writer): except Exception: pass finally: - print(f"[TCP] client disconnected {peer_label}") + # Drop registry + broadcast connected:false before awaiting ping/close so the UI + # does not stay green if ping or wait_closed blocks on a timed-out peer. + outcome = tcp_client_registry.unregister_tcp_writer(peer_ip, writer) + if outcome == "superseded": + print( + f"[TCP] TCP session ended (same IP already has a newer connection): {peer_label}" + ) + ping_task.cancel() + try: + await ping_task + except asyncio.CancelledError: + pass try: writer.close() await writer.wait_closed() - except Exception: - pass + except asyncio.CancelledError: + raise + except _TCP_PEER_GONE: + tcp_client_registry.unregister_tcp_writer(peer_ip, writer) + + +async def _send_bridge_wifi_channel(settings, sender): + """Tell the serial ESP32 bridge to set STA channel (settings wifi_channel); not forwarded as ESP-NOW.""" + try: + ch = int(settings.get("wifi_channel", 6)) + except (TypeError, ValueError): + ch = 6 + ch = max(1, min(11, ch)) + payload = json.dumps({"m": "bridge", "ch": ch}, separators=(",", ":")) + try: + await sender.send(payload, addr="ffffffffffff") + print(f"[startup] bridge Wi-Fi channel -> {ch}") + except Exception as e: + print(f"[startup] bridge channel message failed: {e}") async def _run_tcp_server(settings): @@ -149,7 +278,9 @@ async def main(port=80): app.mount(pattern.controller, '/patterns') app.mount(settings_controller.controller, '/settings') app.mount(device_controller.controller, '/devices') - + + tcp_client_registry.set_tcp_status_broadcaster(broadcast_device_tcp_status) + # Serve index.html at root (cwd is src/ when run via pipenv run run) @app.route('/') def index(request): @@ -179,44 +310,63 @@ async def main(port=80): @app.route('/ws') @with_websocket async def ws(request, ws): - while True: - data = await ws.receive() - print(data) - if data: - try: - parsed = json.loads(data) - print("WS received JSON:", parsed) - # Optional "to": 12-char hex MAC; rest is payload (sent with that address). - addr = parsed.pop("to", None) - payload = json.dumps(parsed) if parsed else data - await sender.send(payload, addr=addr) - except json.JSONDecodeError: - # Not JSON: send raw with default address + await register_device_status_ws(ws) + await broadcast_device_tcp_snapshot_to(ws) + try: + while True: + data = await ws.receive() + print(data) + if data: try: - await sender.send(data) + parsed = json.loads(data) + print("WS received JSON:", parsed) + # Optional "to": 12-char hex MAC; rest is payload (sent with that address). + addr = parsed.pop("to", None) + payload = json.dumps(parsed) if parsed else data + await sender.send(payload, addr=addr) + except json.JSONDecodeError: + # Not JSON: send raw with default address + try: + await sender.send(data) + except Exception: + try: + await ws.send(json.dumps({"error": "Send failed"})) + except Exception: + pass except Exception: try: await ws.send(json.dumps({"error": "Send failed"})) except Exception: pass - except Exception: - try: - await ws.send(json.dumps({"error": "Send failed"})) - except Exception: - pass - else: - break + else: + break + finally: + await unregister_device_status_ws(ws) - server = asyncio.create_task(app.start_server(host="0.0.0.0", port=port)) # Touch Device singleton early so db/device.json exists before first TCP hello. Device() - tcp_task = asyncio.create_task(_run_tcp_server(settings)) + await _send_bridge_wifi_channel(settings, sender) - while True: - await asyncio.sleep(30) - # cleanup before ending the application + # Await HTTP + driver TCP together so bind failures (e.g. port 80 in use) surface + # here instead of as an unretrieved Task exception; the UI WebSocket drops if HTTP + # never starts, which clears Wi-Fi presence dots. + try: + await asyncio.gather( + app.start_server(host="0.0.0.0", port=port), + _run_tcp_server(settings), + ) + except OSError as e: + if e.errno == errno.EADDRINUSE: + tcp_p = int(settings.get("tcp_port", 8765)) + print( + f"[server] bind failed (address already in use): {e!s}\n" + f"[server] HTTP is configured for port {port} (env PORT); " + f"Wi-Fi LED drivers use tcp_port {tcp_p}. " + f"Stop the other process or use a free port, e.g. PORT=8080 pipenv run run" + ) + raise if __name__ == "__main__": import os diff --git a/src/models/device.py b/src/models/device.py index c651e8d..258ce9b 100644 --- a/src/models/device.py +++ b/src/models/device.py @@ -233,10 +233,10 @@ class Device(Model): def list(self): return list(self.keys()) - def upsert_wifi_tcp_client(self, device_name, peer_ip, mac): + def upsert_wifi_tcp_client(self, device_name, peer_ip, mac, device_type=None): """ - Register or update a Wi-Fi client by **MAC** (storage id). Updates **name** - and **address** (peer IP) on each connect. + Register or update a Wi-Fi client by **MAC** (storage id). Updates **name**, + **address** (peer IP), and optionally **type** from the client hello when valid. """ mac_hex = normalize_mac(mac) if not mac_hex: @@ -247,10 +247,19 @@ class Device(Model): ip = normalize_address_for_transport(peer_ip, "wifi") if not ip: return None + resolved_type = None + if device_type is not None: + try: + resolved_type = validate_device_type(device_type) + except ValueError: + resolved_type = None if mac_hex in self: merged = dict(self[mac_hex]) merged["name"] = name - merged["type"] = validate_device_type(merged.get("type")) + if resolved_type is not None: + merged["type"] = resolved_type + else: + merged["type"] = validate_device_type(merged.get("type")) merged["transport"] = "wifi" merged["address"] = ip merged["id"] = mac_hex @@ -260,7 +269,7 @@ class Device(Model): self[mac_hex] = { "id": mac_hex, "name": name, - "type": "led", + "type": resolved_type or "led", "transport": "wifi", "address": ip, "default_pattern": None, diff --git a/src/models/tcp_clients.py b/src/models/tcp_clients.py new file mode 100644 index 0000000..cb85288 --- /dev/null +++ b/src/models/tcp_clients.py @@ -0,0 +1,115 @@ +"""Track connected Wi-Fi LED drivers (TCP clients) for outbound JSON lines.""" + +import asyncio + +_writers = {} + + +def prune_stale_tcp_writers() -> None: + """Remove writers that are already closing so the UI does not stay online.""" + stale = [(ip, w) for ip, w in list(_writers.items()) if w.is_closing()] + for ip, w in stale: + unregister_tcp_writer(ip, w) + + +def normalize_tcp_peer_ip(ip: str) -> str: + """Match asyncio peer addresses to registry IPs (strip IPv4-mapped IPv6 prefix).""" + s = str(ip).strip() + if s.lower().startswith("::ffff:"): + s = s[7:] + return s +# Optional ``async def (ip: str, connected: bool) -> None`` set from ``main``. +_tcp_status_broadcast = None + + +def set_tcp_status_broadcaster(coro) -> None: + global _tcp_status_broadcast + _tcp_status_broadcast = coro + + +def _schedule_tcp_status_broadcast(ip: str, connected: bool) -> None: + fn = _tcp_status_broadcast + if not fn: + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + try: + loop.create_task(fn(ip, connected)) + except Exception: + pass + + +def register_tcp_writer(peer_ip: str, writer) -> None: + if not peer_ip: + return + key = normalize_tcp_peer_ip(peer_ip) + if not key: + return + old = _writers.get(key) + _writers[key] = writer + _schedule_tcp_status_broadcast(key, True) + if old is not None and old is not writer: + try: + old.close() + except Exception: + pass + + +def unregister_tcp_writer(peer_ip: str, writer=None) -> str: + """ + Remove the writer for peer_ip. If ``writer`` is given, only pop when it is still + the registered instance (avoids a replaced TCP session removing the new one). + + Returns ``removed`` (cleared live session + UI offline), ``noop`` (already gone), + or ``superseded`` (this writer is not the registered one for that IP). + """ + if not peer_ip: + return "noop" + key = normalize_tcp_peer_ip(peer_ip) + if not key: + return "noop" + current = _writers.get(key) + if writer is not None: + if current is None: + return "noop" + if current is not writer: + return "superseded" + had = key in _writers + if had: + _writers.pop(key, None) + _schedule_tcp_status_broadcast(key, False) + print(f"[TCP] device disconnected: {key}") + return "removed" + return "noop" + + +def list_connected_ips(): + """IPs with an active TCP writer (for UI snapshot).""" + prune_stale_tcp_writers() + return list(_writers.keys()) + + +def tcp_client_connected(ip: str) -> bool: + """True if a Wi-Fi driver is connected on this IP (TCP writer registered).""" + prune_stale_tcp_writers() + key = normalize_tcp_peer_ip(ip) + return bool(key and key in _writers) + + +async def send_json_line_to_ip(ip: str, json_str: str) -> bool: + """Send one newline-terminated JSON message to a connected TCP client.""" + ip = normalize_tcp_peer_ip(ip) + writer = _writers.get(ip) + if not writer: + return False + try: + line = json_str if json_str.endswith("\n") else json_str + "\n" + writer.write(line.encode("utf-8")) + await writer.drain() + return True + except Exception as exc: + print(f"[TCP] send to {ip} failed: {exc}") + unregister_tcp_writer(ip, writer) + return False diff --git a/src/models/transport.py b/src/models/transport.py index 30abca4..44a29e6 100644 --- a/src/models/transport.py +++ b/src/models/transport.py @@ -39,11 +39,13 @@ class SerialSender: self._serial = serial.Serial(port, baudrate=baudrate, timeout=1) self._default_addr = _parse_mac(default_addr) + self._write_lock = asyncio.Lock() async def send(self, data, addr=None): mac = _parse_mac(addr) if addr is not None else self._default_addr payload = _encode_payload(data) - await _to_thread(self._serial.write, mac + payload) + async with self._write_lock: + await _to_thread(self._serial.write, mac + payload) return True diff --git a/src/util/device_status_broadcaster.py b/src/util/device_status_broadcaster.py new file mode 100644 index 0000000..5d52a7c --- /dev/null +++ b/src/util/device_status_broadcaster.py @@ -0,0 +1,52 @@ +"""Push Wi-Fi TCP connect/disconnect updates to browser WebSocket clients.""" + +import json +import threading +from typing import Any, Set + +# Threading lock: safe across asyncio tasks and avoids binding asyncio.Lock to the wrong loop. +_clients_lock = threading.Lock() +_clients: Set[Any] = set() + + +async def register_device_status_ws(ws: Any) -> None: + with _clients_lock: + _clients.add(ws) + + +async def unregister_device_status_ws(ws: Any) -> None: + with _clients_lock: + _clients.discard(ws) + + +async def broadcast_device_tcp_status(ip: str, connected: bool) -> None: + from models.tcp_clients import normalize_tcp_peer_ip + + ip = normalize_tcp_peer_ip(ip) + if not ip: + return + msg = json.dumps({"type": "device_tcp", "ip": ip, "connected": bool(connected)}) + with _clients_lock: + targets = list(_clients) + dead = [] + for ws in targets: + try: + await ws.send(msg) + except Exception as exc: + dead.append(ws) + print(f"[device_status_broadcaster] ws.send failed: {exc!r}") + if dead: + with _clients_lock: + for ws in dead: + _clients.discard(ws) + + +async def broadcast_device_tcp_snapshot_to(ws: Any) -> None: + from models import tcp_clients as tcp + + ips = tcp.list_connected_ips() + msg = json.dumps({"type": "device_tcp_snapshot", "connected_ips": ips}) + try: + await ws.send(msg) + except Exception as exc: + print(f"[device_status_broadcaster] snapshot send failed: {exc!r}") diff --git a/src/util/driver_delivery.py b/src/util/driver_delivery.py new file mode 100644 index 0000000..582a1c4 --- /dev/null +++ b/src/util/driver_delivery.py @@ -0,0 +1,168 @@ +"""Deliver driver JSON messages over serial (ESP-NOW) and/or TCP (Wi-Fi clients).""" + +import asyncio +import json + +from models.device import normalize_mac +from models.tcp_clients import send_json_line_to_ip + +# Serial bridge (ESP32): broadcast MAC + this envelope → firmware unicasts ``body`` to each peer. +_SPLIT_MODE = "split" +_BROADCAST_MAC_HEX = "ffffffffffff" + + +def _split_serial_envelope(inner_json_str, peer_hex_list): + """One UART frame: broadcast dest + JSON {m:split, peers:[hex,...], body:}.""" + body = json.loads(inner_json_str) + env = {"m": _SPLIT_MODE, "peers": list(peer_hex_list), "body": body} + return json.dumps(env, separators=(",", ":")) + + +async def deliver_preset_broadcast_then_per_device( + sender, + chunk_messages, + target_macs, + devices_model, + default_id, + delay_s=0.1, +): + """ + Send preset definition chunks: ESP-NOW broadcast once per chunk; same chunk to each + Wi-Fi driver over TCP. If default_id is set, send a per-target default message + (unicast serial or TCP) with targets=[device name] for each registry entry. + """ + if not chunk_messages: + return 0 + + seen = set() + ordered = [] + for raw in target_macs: + m = normalize_mac(str(raw)) if raw else None + if not m or m in seen: + continue + seen.add(m) + ordered.append(m) + + wifi_ips = [] + for mac in ordered: + doc = devices_model.read(mac) + if doc and doc.get("transport") == "wifi" and doc.get("address"): + wifi_ips.append(str(doc["address"]).strip()) + + deliveries = 0 + for msg in chunk_messages: + tasks = [sender.send(msg, addr=_BROADCAST_MAC_HEX)] + for ip in wifi_ips: + if ip: + tasks.append(send_json_line_to_ip(ip, msg)) + results = await asyncio.gather(*tasks, return_exceptions=True) + if results and results[0] is True: + deliveries += 1 + for r in results[1:]: + if r is True: + deliveries += 1 + await asyncio.sleep(delay_s) + + if default_id: + did = str(default_id) + for mac in ordered: + doc = devices_model.read(mac) or {} + name = str(doc.get("name") or "").strip() or mac + body = {"v": "1", "default": did, "save": True, "targets": [name]} + out = json.dumps(body, separators=(",", ":")) + if doc.get("transport") == "wifi" and doc.get("address"): + ip = str(doc["address"]).strip() + try: + if await send_json_line_to_ip(ip, out): + deliveries += 1 + except Exception as e: + print(f"[driver_delivery] default TCP failed: {e!r}") + else: + try: + await sender.send(out, addr=mac) + deliveries += 1 + except Exception as e: + print(f"[driver_delivery] default serial failed: {e!r}") + await asyncio.sleep(delay_s) + + return deliveries + + +async def deliver_json_messages(sender, messages, target_macs, devices_model, delay_s=0.1): + """ + Send each message string to the bridge and/or TCP clients. + + If target_macs is None or empty: one serial send per message (default/broadcast address). + Otherwise: Wi-Fi uses TCP in parallel. Multiple ESP-NOW peers are sent in **one** serial + write to the ESP32 (broadcast + split envelope); the bridge unicasts ``body`` to each + peer. A single ESP-NOW peer still uses one unicast serial frame. Wi-Fi and serial + tasks run together in one asyncio.gather. + + Returns (delivery_count, chunk_count) where chunk_count is len(messages). + """ + if not messages: + return 0, 0 + + if not target_macs: + deliveries = 0 + for msg in messages: + await sender.send(msg) + deliveries += 1 + await asyncio.sleep(delay_s) + return deliveries, len(messages) + + seen = set() + ordered_macs = [] + for raw in target_macs: + m = normalize_mac(str(raw)) if raw else None + if not m or m in seen: + continue + seen.add(m) + ordered_macs.append(m) + + deliveries = 0 + for msg in messages: + wifi_tasks = [] + espnow_hex = [] + for mac in ordered_macs: + doc = devices_model.read(mac) + if doc and doc.get("transport") == "wifi": + ip = doc.get("address") + if ip: + wifi_tasks.append(send_json_line_to_ip(ip, msg)) + else: + espnow_hex.append(mac) + + tasks = [] + espnow_peer_count = 0 + if len(espnow_hex) > 1: + tasks.append( + sender.send( + _split_serial_envelope(msg, espnow_hex), + addr=_BROADCAST_MAC_HEX, + ) + ) + espnow_peer_count = len(espnow_hex) + elif len(espnow_hex) == 1: + tasks.append(sender.send(msg, addr=espnow_hex[0])) + espnow_peer_count = 1 + + tasks.extend(wifi_tasks) + + if tasks: + results = await asyncio.gather(*tasks, return_exceptions=True) + n_serial = len(tasks) - len(wifi_tasks) + for i, r in enumerate(results): + if i < n_serial: + if r is True: + deliveries += espnow_peer_count + elif isinstance(r, Exception): + print(f"[driver_delivery] serial delivery failed: {r!r}") + else: + if r is True: + deliveries += 1 + elif isinstance(r, Exception): + print(f"[driver_delivery] Wi-Fi delivery failed: {r!r}") + + await asyncio.sleep(delay_s) + return deliveries, len(messages) diff --git a/tests/models/test_device.py b/tests/models/test_device.py index a0aa1e9..6840a3c 100644 --- a/tests/models/test_device.py +++ b/tests/models/test_device.py @@ -102,6 +102,7 @@ def test_upsert_wifi_tcp_client(): assert i1 == m1 d = devices.read(i1) assert d["name"] == "kitchen" + assert d["type"] == "led" assert d["transport"] == "wifi" assert d["address"] == "192.168.1.20" @@ -115,6 +116,14 @@ def test_upsert_wifi_tcp_client(): assert again == m1 assert devices.read(m1)["address"] == "192.168.1.99" + assert ( + devices.upsert_wifi_tcp_client( + "kitchen", "192.168.1.100", m1, device_type="bogus" + ) + == m1 + ) + assert devices.read(m1)["type"] == "led" + i3 = devices.upsert_wifi_tcp_client("hall", "10.0.0.5", "deadbeefcafe") assert i3 == "deadbeefcafe" assert len(devices.list()) == 3 diff --git a/tests/tcp_test_server.py b/tests/tcp_test_server.py index d30a6c3..d321b06 100644 --- a/tests/tcp_test_server.py +++ b/tests/tcp_test_server.py @@ -6,8 +6,8 @@ Listens on the same TCP port used by led-driver WiFi transport and every 5 seconds sends a newline-delimited JSON message with v="1". Clients talking to the real Pi registry should send a first line JSON object -that includes device_name and mac (12 hex) so the controller can register -the device by MAC. +that includes device_name, mac (12 hex), and type (e.g. led) so the controller +can register the device by MAC. """ import asyncio diff --git a/tests/test_browser.py b/tests/test_browser.py index a5f145c..6ffc0ec 100644 --- a/tests/test_browser.py +++ b/tests/test_browser.py @@ -349,9 +349,7 @@ def test_tabs_ui(browser: BrowserTest) -> bool: # Fill in tab name if browser.fill_input(By.ID, 'new-tab-name', 'Browser Test Tab'): print(" ✓ Filled tab name") - # Fill in device IDs - if browser.fill_input(By.ID, 'new-tab-ids', '1,2,3'): - print(" ✓ Filled device IDs") + # Devices default from registry or placeholder name "1" # Click create button if browser.click_element(By.ID, 'create-tab-btn'): print(" ✓ Clicked create button") @@ -790,7 +788,6 @@ def test_preset_drag_and_drop(browser: BrowserTest) -> bool: if tabs_list and 'No tabs found' in tabs_list.text: # Create a tab browser.fill_input(By.ID, 'new-tab-name', 'Drag Test Tab') - browser.fill_input(By.ID, 'new-tab-ids', '1') browser.click_element(By.ID, 'create-tab-btn') time.sleep(1) @@ -848,7 +845,7 @@ def test_preset_drag_and_drop(browser: BrowserTest) -> bool: print("✓ Created 3 presets for drag test") passed += 1 - # Test 4: Add presets to the tab (via Edit Tab modal – Select buttons in list) + # Test 4: Add presets to the tab (via Edit Tab modal – Add buttons in list) total += 1 try: tab_id = browser.driver.execute_script( @@ -864,12 +861,12 @@ def test_preset_drag_and_drop(browser: BrowserTest) -> bool: time.sleep(1) list_el = browser.wait_for_element(By.ID, 'edit-tab-presets-list', timeout=5) if list_el: - select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Select']") + select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Add']") if len(select_buttons) >= 2: browser.driver.execute_script("arguments[0].click();", select_buttons[0]) time.sleep(1.5) browser.handle_alert(accept=True, timeout=1) - select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Select']") + select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Add']") if len(select_buttons) >= 1: browser.driver.execute_script("arguments[0].click();", select_buttons[0]) time.sleep(1.5) @@ -949,7 +946,7 @@ def test_preset_drag_and_drop(browser: BrowserTest) -> bool: tab_id ) time.sleep(1) - select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Select']") + select_buttons = browser.driver.find_elements(By.XPATH, "//div[@id='edit-tab-presets-list']//button[text()='Add']") if select_buttons: print(" Attempting to add another preset...") browser.driver.execute_script("arguments[0].click();", select_buttons[0]) @@ -973,7 +970,7 @@ def test_preset_drag_and_drop(browser: BrowserTest) -> bool: else: print(f" ✗ Still only {len(draggable_presets)} preset(s) after adding") else: - print(" ✗ No Select buttons found in Edit Tab modal") + print(" ✗ No Add buttons found in Edit Tab modal") else: print(f"✗ No presets found in tab (found {len(draggable_presets)})") else: diff --git a/tests/test_endpoints_pytest.py b/tests/test_endpoints_pytest.py index ffb10d9..c156b7f 100644 --- a/tests/test_endpoints_pytest.py +++ b/tests/test_endpoints_pytest.py @@ -348,11 +348,15 @@ def test_settings_controller(server): assert resp.status_code == 400 -def test_profiles_presets_tabs_endpoints(server): +def test_profiles_presets_tabs_endpoints(server, monkeypatch): c: requests.Session = server["client"] base_url: str = server["base_url"] sender: DummySender = server["sender"] + import controllers.device as device_ctl + + monkeypatch.setattr(device_ctl, "IDENTIFY_OFF_DELAY_S", 0.05) + unique_profile_name = f"pytest-profile-{uuid.uuid4().hex[:8]}" resp = c.post(f"{base_url}/profiles", json={"name": unique_profile_name}) @@ -593,6 +597,28 @@ def test_groups_sequences_scenes_palettes_patterns_endpoints(server): assert resp.status_code == 200 assert resp.json()["name"] == "pytest-dev" assert resp.json()["type"] == "led" + assert resp.json().get("connected") is None + + resp = c.get(f"{base_url}/devices") + assert resp.status_code == 200 + assert resp.json()[dev_id].get("connected") is None + + sender.sent.clear() + resp = c.post(f"{base_url}/devices/{dev_id}/identify") + assert resp.status_code == 200 + assert resp.json().get("message") + assert len(sender.sent) >= 1 + first = json.loads(sender.sent[0][0]) + assert "presets" in first and "select" in first + assert first["presets"]["__identify"]["p"] == "blink" + assert first["presets"]["__identify"]["d"] == 50 + assert first["select"]["pytest-dev"] == ["__identify"] + deadline = time.monotonic() + 2.0 + while len(sender.sent) < 2 and time.monotonic() < deadline: + time.sleep(0.02) + assert len(sender.sent) >= 2 + second = json.loads(sender.sent[1][0]) + assert second.get("select") == {"pytest-dev": ["off"]} resp = c.post( f"{base_url}/devices", @@ -610,6 +636,10 @@ def test_groups_sequences_scenes_palettes_patterns_endpoints(server): assert resp.json()[wid]["transport"] == "wifi" assert resp.json()[wid]["address"] == "192.168.50.10" + resp = c.get(f"{base_url}/devices/{wid}") + assert resp.status_code == 200 + assert resp.json().get("connected") is False + resp = c.post( f"{base_url}/devices", json={