fix(test/endpoints): add pytest coverage for all Microdot routes
This commit is contained in:
182
tests/async_tcp_server.py
Normal file
182
tests/async_tcp_server.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
# Standalone async TCP server (stdlib only). Multiple simultaneous clients.
|
||||
# No watchdog: runs on a full host (e.g. Raspberry Pi); ESP32 clients may use WDT.
|
||||
# For RTT latency, clients may send lines like ``rtt 12345`` (ticks); they are echoed back.
|
||||
#
|
||||
# Run from anywhere (default: all IPv4 interfaces, port 9000):
|
||||
# python3 async_tcp_server.py
|
||||
# python3 async_tcp_server.py --port 9000
|
||||
# Localhost only:
|
||||
# python3 async_tcp_server.py --host 127.0.0.1
|
||||
#
|
||||
# Or from this directory:
|
||||
# chmod +x async_tcp_server.py && ./async_tcp_server.py
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
|
||||
class _ClientRegistry:
|
||||
"""Track writers and broadcast newline-terminated lines to all clients."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._writers: set[asyncio.StreamWriter] = set()
|
||||
|
||||
def add(self, writer: asyncio.StreamWriter) -> None:
|
||||
self._writers.add(writer)
|
||||
|
||||
def remove(self, writer: asyncio.StreamWriter) -> None:
|
||||
self._writers.discard(writer)
|
||||
|
||||
def count(self) -> int:
|
||||
return len(self._writers)
|
||||
|
||||
async def broadcast_line(self, line: str) -> None:
|
||||
data = (line.rstrip("\r\n") + "\n").encode("utf-8")
|
||||
for writer in list(self._writers):
|
||||
try:
|
||||
writer.write(data)
|
||||
await writer.drain()
|
||||
except Exception as e:
|
||||
print(f"[tcp] broadcast failed, dropping client: {e}")
|
||||
self._writers.discard(writer)
|
||||
try:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def _periodic_broadcast(
|
||||
registry: _ClientRegistry,
|
||||
interval_sec: float,
|
||||
message: str,
|
||||
) -> None:
|
||||
while True:
|
||||
await asyncio.sleep(interval_sec)
|
||||
if registry.count() == 0:
|
||||
continue
|
||||
line = message.format(t=time.time())
|
||||
print(f"[tcp] broadcast to {registry.count()} client(s): {line!r}")
|
||||
await registry.broadcast_line(line)
|
||||
|
||||
|
||||
async def _handle_client(
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
registry: _ClientRegistry,
|
||||
) -> None:
|
||||
peer = writer.get_extra_info("peername")
|
||||
print(f"[tcp] connected: {peer}")
|
||||
registry.add(writer)
|
||||
try:
|
||||
while not reader.at_eof():
|
||||
data = await reader.readline()
|
||||
if not data:
|
||||
break
|
||||
message = data.decode("utf-8", errors="replace").rstrip("\r\n")
|
||||
# Echo newline-delimited lines (simple test harness behaviour).
|
||||
# Clients may send ``rtt <ticks>`` for round-trip timing; echo unchanged.
|
||||
t0 = time.perf_counter()
|
||||
writer.write((message + "\n").encode("utf-8"))
|
||||
await writer.drain()
|
||||
if message.startswith("rtt "):
|
||||
server_ms = (time.perf_counter() - t0) * 1000.0
|
||||
print(
|
||||
f"[tcp] echoed rtt from {peer} "
|
||||
f"(host write+drain ~{server_ms:.2f} ms)"
|
||||
)
|
||||
finally:
|
||||
registry.remove(writer)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
print(f"[tcp] disconnected: {peer}")
|
||||
|
||||
|
||||
def _make_client_handler(registry: _ClientRegistry):
|
||||
async def _handler(
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
) -> None:
|
||||
await _handle_client(reader, writer, registry)
|
||||
|
||||
return _handler
|
||||
|
||||
|
||||
async def _run(
|
||||
host: str,
|
||||
port: int,
|
||||
broadcast_interval: float | None,
|
||||
broadcast_message: str,
|
||||
) -> None:
|
||||
registry = _ClientRegistry()
|
||||
handler = _make_client_handler(registry)
|
||||
server = await asyncio.start_server(handler, host, port)
|
||||
print(f"[tcp] listening on {host}:{port} (Ctrl+C to stop)")
|
||||
if broadcast_interval is not None and broadcast_interval > 0:
|
||||
print(
|
||||
f"[tcp] periodic broadcast every {broadcast_interval}s "
|
||||
f"(use {{t}} in --message for unix time)"
|
||||
)
|
||||
async with server:
|
||||
tasks = []
|
||||
if broadcast_interval is not None and broadcast_interval > 0:
|
||||
tasks.append(
|
||||
asyncio.create_task(
|
||||
_periodic_broadcast(registry, broadcast_interval, broadcast_message),
|
||||
name="broadcast",
|
||||
)
|
||||
)
|
||||
try:
|
||||
if tasks:
|
||||
await asyncio.gather(server.serve_forever(), *tasks)
|
||||
else:
|
||||
await server.serve_forever()
|
||||
finally:
|
||||
for t in tasks:
|
||||
t.cancel()
|
||||
for t in tasks:
|
||||
try:
|
||||
await t
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Standalone asyncio TCP server (multiple connections).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
default="0.0.0.0",
|
||||
help="bind address (default: all IPv4 interfaces)",
|
||||
)
|
||||
parser.add_argument("--port", type=int, default=9000, help="bind port")
|
||||
parser.add_argument(
|
||||
"--interval",
|
||||
type=float,
|
||||
default=5.0,
|
||||
metavar="SEC",
|
||||
help="seconds between broadcast lines to all clients (default: 5)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--message",
|
||||
default="ping {t:.0f}",
|
||||
help='broadcast line (newline added); use "{t}" for time.time() (default: %(default)s)',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-broadcast",
|
||||
action="store_true",
|
||||
help="disable periodic broadcast (echo-only)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
interval = None if args.no_broadcast else args.interval
|
||||
try:
|
||||
asyncio.run(_run(args.host, args.port, interval, args.message))
|
||||
except KeyboardInterrupt:
|
||||
print("\n[tcp] stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
12
tests/conftest.py
Normal file
12
tests/conftest.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
SRC_PATH = PROJECT_ROOT / "src"
|
||||
LIB_PATH = PROJECT_ROOT / "lib"
|
||||
|
||||
for p in (str(SRC_PATH), str(LIB_PATH), str(PROJECT_ROOT)):
|
||||
if p in sys.path:
|
||||
sys.path.remove(p)
|
||||
sys.path.insert(0, p)
|
||||
|
||||
@@ -7,6 +7,15 @@ On Pi OS Lite (no desktop) these tests are skipped unless headless Chromium
|
||||
and chromedriver are installed (e.g. chromium-browser chromium-chromedriver).
|
||||
"""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
if os.environ.get("LED_CONTROLLER_RUN_BROWSER_TESTS") != "1":
|
||||
pytest.skip(
|
||||
"Legacy device browser automation script; enable explicitly to run.",
|
||||
allow_module_level=True,
|
||||
)
|
||||
|
||||
import sys
|
||||
import time
|
||||
import requests
|
||||
|
||||
@@ -4,6 +4,15 @@ Endpoint tests that mimic web browser requests.
|
||||
Tests run against the device at 192.168.4.1
|
||||
"""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
if os.environ.get("LED_CONTROLLER_RUN_DEVICE_ENDPOINT_TESTS") != "1":
|
||||
pytest.skip(
|
||||
"Legacy device integration endpoint tests; enable explicitly to run.",
|
||||
allow_module_level=True,
|
||||
)
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
|
||||
594
tests/test_endpoints_pytest.py
Normal file
594
tests/test_endpoints_pytest.py
Normal file
@@ -0,0 +1,594 @@
|
||||
import asyncio
|
||||
import builtins
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
# Ensure imports resolve to the repo's `src/` + `lib/` code.
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
SRC_PATH = PROJECT_ROOT / "src"
|
||||
LIB_PATH = PROJECT_ROOT / "lib"
|
||||
|
||||
for p in (str(SRC_PATH), str(LIB_PATH), str(PROJECT_ROOT)):
|
||||
if p in sys.path:
|
||||
sys.path.remove(p)
|
||||
sys.path.insert(0, p)
|
||||
|
||||
from microdot import Microdot, send_file # noqa: E402
|
||||
from microdot.session import Session # noqa: E402
|
||||
from microdot.websocket import with_websocket # noqa: E402
|
||||
|
||||
|
||||
class DummySender:
|
||||
def __init__(self):
|
||||
self.sent: list[tuple[str, Optional[str]]] = []
|
||||
|
||||
async def send(self, data: Any, addr: Optional[str] = None):
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
data = bytes(data).decode(errors="ignore")
|
||||
self.sent.append((data, addr))
|
||||
return True
|
||||
|
||||
|
||||
def _json(resp: requests.Response) -> Dict[str, Any]:
|
||||
# Many endpoints already set Content-Type; but be tolerant for now.
|
||||
return resp.json() # pragma: no cover
|
||||
|
||||
|
||||
def _find_id_by_field(list_resp_json: Dict[str, Any], field: str, value: str) -> str:
|
||||
for obj_id, data in list_resp_json.items():
|
||||
if isinstance(data, dict) and data.get(field) == value:
|
||||
return str(obj_id)
|
||||
raise AssertionError(f"Could not find id for {field}={value!r}")
|
||||
|
||||
|
||||
def _start_microdot_server(app: Microdot, host: str, port: int):
|
||||
"""
|
||||
Start Microdot server on a background thread.
|
||||
Returns (thread, chosen_port).
|
||||
"""
|
||||
|
||||
def runner():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_until_complete(app.start_server(host=host, port=port))
|
||||
finally:
|
||||
try:
|
||||
loop.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
thread = threading.Thread(target=runner, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Poll until the socket is bound and app.server is available.
|
||||
chosen_port = None
|
||||
deadline = time.time() + 5.0
|
||||
while time.time() < deadline:
|
||||
server = getattr(app, "server", None)
|
||||
if server and getattr(server, "sockets", None):
|
||||
sockets = server.sockets or []
|
||||
if sockets:
|
||||
chosen_port = sockets[0].getsockname()[1]
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
if chosen_port is None:
|
||||
raise RuntimeError("Microdot server failed to start in time")
|
||||
|
||||
return thread, chosen_port
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def server(monkeypatch, tmp_path_factory):
|
||||
"""
|
||||
Start the Microdot app in-process and return a test client.
|
||||
"""
|
||||
|
||||
tmp_root = tmp_path_factory.mktemp("endpoint-tests")
|
||||
tmp_db_dir = tmp_root / "db"
|
||||
tmp_settings_file = tmp_root / "settings.json"
|
||||
|
||||
# Be defensive: pytest runners can sometimes alter sys.path ordering.
|
||||
for p in (str(SRC_PATH), str(LIB_PATH), str(PROJECT_ROOT)):
|
||||
if p in sys.path:
|
||||
sys.path.remove(p)
|
||||
sys.path.insert(0, p)
|
||||
|
||||
# Patch Settings so endpoint tests never touch real `settings.json`.
|
||||
import settings as settings_mod # noqa: E402
|
||||
|
||||
settings_mod.Settings.SETTINGS_FILE = str(tmp_settings_file)
|
||||
|
||||
# Patch the Model db directory so endpoint CRUD is isolated.
|
||||
import models.model as model_mod # noqa: E402
|
||||
|
||||
monkeypatch.setattr(model_mod, "_db_dir", lambda: str(tmp_db_dir))
|
||||
|
||||
# Reset model singletons (controllers instantiate model classes at import time).
|
||||
# Import the classes first so we can delete their `_instance` attribute if present.
|
||||
import models.preset as models_preset # noqa: E402
|
||||
import models.profile as models_profile # noqa: E402
|
||||
import models.group as models_group # noqa: E402
|
||||
import models.tab as models_tab # noqa: E402
|
||||
import models.pallet as models_pallet # noqa: E402
|
||||
import models.scene as models_scene # noqa: E402
|
||||
import models.pattern as models_pattern # noqa: E402
|
||||
import models.squence as models_sequence # noqa: E402
|
||||
|
||||
for cls in (
|
||||
models_preset.Preset,
|
||||
models_profile.Profile,
|
||||
models_group.Group,
|
||||
models_tab.Tab,
|
||||
models_pallet.Palette,
|
||||
models_scene.Scene,
|
||||
models_pattern.Pattern,
|
||||
models_sequence.Sequence,
|
||||
):
|
||||
if hasattr(cls, "_instance"):
|
||||
delattr(cls, "_instance")
|
||||
|
||||
# Patch open() so pattern definitions work after we `chdir` into src/.
|
||||
orig_open = builtins.open
|
||||
|
||||
def patched_open(file, *args, **kwargs):
|
||||
if isinstance(file, str):
|
||||
# Pattern controller loads definitions from a relative db/ path.
|
||||
if file in {"db/pattern.json", "pattern.json", "/db/pattern.json"}:
|
||||
file = str(PROJECT_ROOT / "db" / "pattern.json")
|
||||
return orig_open(file, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(builtins, "open", patched_open)
|
||||
|
||||
old_cwd = os.getcwd()
|
||||
os.chdir(str(SRC_PATH))
|
||||
|
||||
dummy_sender = DummySender()
|
||||
|
||||
try:
|
||||
# Ensure controllers are imported fresh after our patching.
|
||||
for mod_name in (
|
||||
"controllers.preset",
|
||||
"controllers.profile",
|
||||
"controllers.group",
|
||||
"controllers.sequence",
|
||||
"controllers.tab",
|
||||
"controllers.palette",
|
||||
"controllers.scene",
|
||||
"controllers.pattern",
|
||||
"controllers.settings",
|
||||
):
|
||||
sys.modules.pop(mod_name, None)
|
||||
|
||||
# Import controllers after patching db/settings/model singletons.
|
||||
import controllers.preset as preset_ctl # noqa: E402
|
||||
import controllers.profile as profile_ctl # noqa: E402
|
||||
import controllers.group as group_ctl # noqa: E402
|
||||
import controllers.sequence as sequence_ctl # noqa: E402
|
||||
import controllers.tab as tab_ctl # noqa: E402
|
||||
import controllers.palette as palette_ctl # noqa: E402
|
||||
import controllers.scene as scene_ctl # noqa: E402
|
||||
import controllers.pattern as pattern_ctl # noqa: E402
|
||||
import controllers.settings as settings_ctl # noqa: E402
|
||||
|
||||
# Configure transport sender used by /presets/send.
|
||||
from models.transport import set_sender # noqa: E402
|
||||
|
||||
set_sender(dummy_sender)
|
||||
|
||||
app = Microdot()
|
||||
|
||||
# Session secret key comes from settings (patched to tmp).
|
||||
settings = settings_mod.Settings()
|
||||
secret_key = settings.get(
|
||||
"session_secret_key",
|
||||
"led-controller-secret-key-change-in-production",
|
||||
)
|
||||
Session(app, secret_key=secret_key)
|
||||
|
||||
# Mount model controllers under their public prefixes.
|
||||
app.mount(preset_ctl.controller, "/presets")
|
||||
app.mount(profile_ctl.controller, "/profiles")
|
||||
app.mount(group_ctl.controller, "/groups")
|
||||
app.mount(sequence_ctl.controller, "/sequences")
|
||||
app.mount(tab_ctl.controller, "/tabs")
|
||||
app.mount(palette_ctl.controller, "/palettes")
|
||||
app.mount(scene_ctl.controller, "/scenes")
|
||||
app.mount(pattern_ctl.controller, "/patterns")
|
||||
app.mount(settings_ctl.controller, "/settings")
|
||||
|
||||
@app.route("/")
|
||||
def index(request):
|
||||
return send_file("templates/index.html")
|
||||
|
||||
@app.route("/settings")
|
||||
def settings_page(request):
|
||||
return send_file("templates/settings.html")
|
||||
|
||||
@app.route("/favicon.ico")
|
||||
def favicon(request):
|
||||
return "", 204
|
||||
|
||||
@app.route("/static/<path:path>")
|
||||
def static_handler(request, path):
|
||||
if ".." in path:
|
||||
return "Not found", 404
|
||||
return send_file("static/" + path)
|
||||
|
||||
@app.route("/ws")
|
||||
@with_websocket
|
||||
async def ws(request, ws):
|
||||
# Minimal websocket handler: forward raw JSON/text payloads to dummy sender.
|
||||
while True:
|
||||
data = await ws.receive()
|
||||
if not data:
|
||||
break
|
||||
try:
|
||||
parsed = json.loads(data)
|
||||
addr = parsed.pop("to", None)
|
||||
payload = json.dumps(parsed) if parsed else data
|
||||
await dummy_sender.send(payload, addr=addr)
|
||||
except Exception:
|
||||
await dummy_sender.send(data)
|
||||
|
||||
thread, chosen_port = _start_microdot_server(app, host="127.0.0.1", port=0)
|
||||
base_url = f"http://127.0.0.1:{chosen_port}"
|
||||
|
||||
client = requests.Session()
|
||||
client.headers.update(
|
||||
{
|
||||
"User-Agent": "pytest/requests",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
)
|
||||
|
||||
yield {
|
||||
"base_url": base_url,
|
||||
"client": client,
|
||||
"sender": dummy_sender,
|
||||
"thread": thread,
|
||||
"app": app,
|
||||
}
|
||||
finally:
|
||||
# Stop server cleanly.
|
||||
try:
|
||||
app = locals().get("app")
|
||||
if app is not None:
|
||||
app.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Give it a moment to close sockets.
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
thread = locals().get("thread")
|
||||
if thread is not None:
|
||||
thread.join(timeout=5)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
os.chdir(old_cwd)
|
||||
|
||||
|
||||
def test_main_routes(server):
|
||||
c: requests.Session = server["client"]
|
||||
base_url: str = server["base_url"]
|
||||
|
||||
resp = c.get(f"{base_url}/")
|
||||
assert resp.status_code == 200
|
||||
assert "LED Controller" in resp.text
|
||||
|
||||
resp = c.get(f"{base_url}/favicon.ico")
|
||||
assert resp.status_code == 204
|
||||
|
||||
resp = c.get(f"{base_url}/static/style.css")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.get(f"{base_url}/settings/page")
|
||||
assert resp.status_code == 200
|
||||
assert "LED Controller" in resp.text
|
||||
|
||||
resp = c.get(f"{base_url}/ws")
|
||||
# WebSocket endpoints should reject non-upgraded HTTP requests.
|
||||
assert resp.status_code != 200
|
||||
assert resp.status_code in {400, 401, 403, 404, 405, 426}
|
||||
|
||||
|
||||
def test_settings_controller(server):
|
||||
c: requests.Session = server["client"]
|
||||
base_url: str = server["base_url"]
|
||||
|
||||
resp = c.get(f"{base_url}/settings")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert isinstance(data, dict)
|
||||
assert "wifi_channel" in data
|
||||
|
||||
resp = c.get(f"{base_url}/settings/wifi/ap")
|
||||
assert resp.status_code == 200
|
||||
ap = resp.json()
|
||||
assert "saved_ssid" in ap
|
||||
assert "active" in ap
|
||||
|
||||
unique_ssid = f"pytest-ssid-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(
|
||||
f"{base_url}/settings/wifi/ap",
|
||||
json={"ssid": unique_ssid, "password": "secret", "channel": 1},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
msg = resp.json()
|
||||
assert msg["ssid"] == unique_ssid
|
||||
assert msg["channel"] == 1
|
||||
|
||||
resp = c.post(
|
||||
f"{base_url}/settings/wifi/ap",
|
||||
json={"ssid": "bad-ssid", "password": "secret", "channel": 12},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
resp = c.put(f"{base_url}/settings/settings", json={"wifi_channel": 11})
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.put(f"{base_url}/settings/settings", json={"wifi_channel": 12})
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
def test_profiles_presets_tabs_endpoints(server):
|
||||
c: requests.Session = server["client"]
|
||||
base_url: str = server["base_url"]
|
||||
sender: DummySender = server["sender"]
|
||||
|
||||
unique_profile_name = f"pytest-profile-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
resp = c.post(f"{base_url}/profiles", json={"name": unique_profile_name})
|
||||
assert resp.status_code == 201
|
||||
created = resp.json()
|
||||
assert isinstance(created, dict)
|
||||
profile_id = next(iter(created.keys()))
|
||||
|
||||
resp = c.post(f"{base_url}/profiles/{profile_id}/apply")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.get(f"{base_url}/profiles/current")
|
||||
assert resp.status_code == 200
|
||||
current = resp.json()
|
||||
assert str(current["id"]) == str(profile_id)
|
||||
|
||||
# Presets CRUD (scoped to current profile session).
|
||||
resp = c.get(f"{base_url}/presets")
|
||||
assert resp.status_code == 200
|
||||
presets = resp.json()
|
||||
assert isinstance(presets, dict)
|
||||
assert presets # seeded presets should exist
|
||||
|
||||
first_preset_id = next(iter(presets.keys()))
|
||||
resp = c.get(f"{base_url}/presets/{first_preset_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() # dict
|
||||
|
||||
unique_preset_name = f"pytest-preset-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(
|
||||
f"{base_url}/presets",
|
||||
json={
|
||||
"name": unique_preset_name,
|
||||
"pattern": "on",
|
||||
"colors": ["#ff0000"],
|
||||
"brightness": 123,
|
||||
"delay": 100,
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
created_preset = resp.json()
|
||||
new_preset_id = next(iter(created_preset.keys()))
|
||||
assert created_preset[new_preset_id]["profile_id"] == str(profile_id)
|
||||
|
||||
resp = c.put(
|
||||
f"{base_url}/presets/{new_preset_id}",
|
||||
json={"brightness": 77},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["brightness"] == 77
|
||||
|
||||
sender.sent.clear()
|
||||
resp = c.post(
|
||||
f"{base_url}/presets/send",
|
||||
json={"preset_ids": [new_preset_id], "save": False},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
sent_result = resp.json()
|
||||
assert sent_result["presets_sent"] >= 1
|
||||
assert len(sender.sent) >= 1
|
||||
|
||||
resp = c.delete(f"{base_url}/presets/{new_preset_id}")
|
||||
assert resp.status_code == 200
|
||||
resp = c.get(f"{base_url}/presets/{new_preset_id}")
|
||||
assert resp.status_code == 404
|
||||
|
||||
# Tabs CRUD (scoped to current profile session).
|
||||
unique_tab_name = f"pytest-tab-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(
|
||||
f"{base_url}/tabs",
|
||||
json={"name": unique_tab_name, "names": ["1", "2"]},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
created_tabs = resp.json()
|
||||
tab_id = next(iter(created_tabs.keys()))
|
||||
|
||||
resp = c.get(f"{base_url}/tabs/{tab_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["name"] == unique_tab_name
|
||||
|
||||
resp = c.post(f"{base_url}/tabs/{tab_id}/set-current")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.get(f"{base_url}/tabs/current")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["tab_id"] == str(tab_id)
|
||||
|
||||
resp = c.put(
|
||||
f"{base_url}/tabs/{tab_id}",
|
||||
json={"name": f"{unique_tab_name}-updated", "names": ["3"]},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["names"] == ["3"]
|
||||
|
||||
resp = c.post(f"{base_url}/tabs/{tab_id}/clone", json={"name": "pytest-tab-clone"})
|
||||
assert resp.status_code == 201
|
||||
clone_payload = resp.json()
|
||||
clone_id = next(iter(clone_payload.keys()))
|
||||
|
||||
resp = c.get(f"{base_url}/tabs/{clone_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.delete(f"{base_url}/tabs/{clone_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.delete(f"{base_url}/tabs/{tab_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Profile clone + update endpoints.
|
||||
clone_name = f"pytest-profile-clone-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(f"{base_url}/profiles/{profile_id}/clone", json={"name": clone_name})
|
||||
assert resp.status_code == 201
|
||||
cloned = resp.json()
|
||||
clone_profile_id = next(iter(cloned.keys()))
|
||||
|
||||
resp = c.post(f"{base_url}/profiles/{clone_profile_id}/apply")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.put(
|
||||
f"{base_url}/profiles/current",
|
||||
json={"name": f"{clone_name}-updated"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.put(
|
||||
f"{base_url}/profiles/{clone_profile_id}",
|
||||
json={"name": f"{clone_name}-updated-2"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.delete(f"{base_url}/profiles/{clone_profile_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.delete(f"{base_url}/profiles/{profile_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_groups_sequences_scenes_palettes_patterns_endpoints(server):
|
||||
c: requests.Session = server["client"]
|
||||
base_url: str = server["base_url"]
|
||||
|
||||
# Groups.
|
||||
unique_group_name = f"pytest-group-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(f"{base_url}/groups", json={"name": unique_group_name})
|
||||
assert resp.status_code == 201
|
||||
groups_list = c.get(f"{base_url}/groups").json()
|
||||
group_id = _find_id_by_field(groups_list, "name", unique_group_name)
|
||||
|
||||
resp = c.get(f"{base_url}/groups/{group_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["name"] == unique_group_name
|
||||
|
||||
resp = c.put(f"{base_url}/groups/{group_id}", json={"brightness": 10})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["brightness"] == 10
|
||||
|
||||
resp = c.delete(f"{base_url}/groups/{group_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Sequences.
|
||||
unique_seq_group_name = f"pytest-seq-group-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(
|
||||
f"{base_url}/sequences",
|
||||
json={"group_name": unique_seq_group_name, "presets": []},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
sequences_list = c.get(f"{base_url}/sequences").json()
|
||||
seq_id = _find_id_by_field(sequences_list, "group_name", unique_seq_group_name)
|
||||
|
||||
resp = c.get(f"{base_url}/sequences/{seq_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.put(f"{base_url}/sequences/{seq_id}", json={"sequence_duration": 1234})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["sequence_duration"] == 1234
|
||||
|
||||
resp = c.delete(f"{base_url}/sequences/{seq_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Scenes.
|
||||
unique_scene_name = f"pytest-scene-{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(f"{base_url}/scenes", json={"name": unique_scene_name})
|
||||
assert resp.status_code == 201
|
||||
scenes_list = c.get(f"{base_url}/scenes").json()
|
||||
scene_id = _find_id_by_field(scenes_list, "name", unique_scene_name)
|
||||
|
||||
resp = c.get(f"{base_url}/scenes/{scene_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = c.put(f"{base_url}/scenes/{scene_id}", json={"name": unique_scene_name + "-updated"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["name"].endswith("-updated")
|
||||
|
||||
resp = c.delete(f"{base_url}/scenes/{scene_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Palettes.
|
||||
colors = ["#112233", "#445566"]
|
||||
resp = c.post(f"{base_url}/palettes", json={"colors": colors})
|
||||
assert resp.status_code == 201
|
||||
palette_payload = resp.json()
|
||||
palette_id = str(palette_payload["id"])
|
||||
|
||||
resp = c.get(f"{base_url}/palettes/{palette_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["id"] == palette_id
|
||||
|
||||
resp = c.put(f"{base_url}/palettes/{palette_id}", json={"colors": ["#000000"]})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["colors"] == ["#000000"]
|
||||
|
||||
resp = c.delete(f"{base_url}/palettes/{palette_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Patterns.
|
||||
resp = c.get(f"{base_url}/patterns/definitions")
|
||||
assert resp.status_code == 200
|
||||
definitions = resp.json()
|
||||
assert isinstance(definitions, dict)
|
||||
|
||||
pattern_id = f"pytest_pattern_{uuid.uuid4().hex[:8]}"
|
||||
resp = c.post(
|
||||
f"{base_url}/patterns",
|
||||
json={"name": pattern_id, "data": {"foo": "bar"}},
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
assert resp.json()["foo"] == "bar"
|
||||
|
||||
resp = c.get(f"{base_url}/patterns")
|
||||
assert resp.status_code == 200
|
||||
patterns_list = resp.json()
|
||||
assert pattern_id in patterns_list
|
||||
|
||||
resp = c.get(f"{base_url}/patterns/{pattern_id}")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["foo"] == "bar"
|
||||
|
||||
resp = c.put(f"{base_url}/patterns/{pattern_id}", json={"baz": 1})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["baz"] == 1
|
||||
|
||||
resp = c.delete(f"{base_url}/patterns/{pattern_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import pytest
|
||||
|
||||
pytest.skip("Legacy manual server script (not a pytest suite).", allow_module_level=True)
|
||||
|
||||
from microdot import Microdot
|
||||
from src.profile import profile_app
|
||||
|
||||
|
||||
Reference in New Issue
Block a user