Files
led-controller/src/controllers/led_tool.py
2026-04-15 00:46:31 +12:00

190 lines
4.9 KiB
Python

import json
import os
import subprocess
import sys
from microdot import Microdot
from serial.tools import list_ports
controller = Microdot()
def _repo_root() -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
def _led_cli_path() -> str:
return os.path.join(_repo_root(), "led-tool", "cli.py")
def _build_led_cli_command(port: str, payload: dict):
cmd = [sys.executable, _led_cli_path(), "--port", port]
flag_map = (
("name", "--name"),
("led_pin", "--pin"),
("num_leds", "--leds"),
("brightness", "--brightness"),
("transport", "--transport"),
("ssid", "--ssid"),
("password", "--wifi-password"),
("wifi_channel", "--wifi-channel"),
("default", "--default"),
)
for key, flag in flag_map:
value = payload.get(key)
if value is None:
continue
value_str = str(value).strip()
if value_str == "":
continue
cmd.extend([flag, value_str])
return cmd
def _run_led_cli_command(cmd, cli_path: str, timeout_s=180):
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout_s,
cwd=os.path.dirname(cli_path),
)
except subprocess.TimeoutExpired:
return (
json.dumps({"error": "led-tool command timed out after 180 seconds"}),
504,
{"Content-Type": "application/json"},
)
except Exception as exc:
return (
json.dumps({"error": str(exc)}),
500,
{"Content-Type": "application/json"},
)
return (
json.dumps(
{
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"command": cmd,
}
),
200,
{"Content-Type": "application/json"},
)
def _extract_settings_from_stdout(stdout: str):
text = (stdout or "").strip()
if not text:
return None
try:
parsed = json.loads(text)
return parsed if isinstance(parsed, dict) else None
except Exception:
return None
@controller.get("/ports")
async def list_serial_ports(request):
ports = []
for info in list_ports.comports():
ports.append(
{
"device": info.device,
"description": info.description,
"hwid": info.hwid,
}
)
return (
json.dumps(
{
"ports": ports,
"led_cli_exists": os.path.exists(_led_cli_path()),
}
),
200,
{"Content-Type": "application/json"},
)
@controller.post("/settings")
async def apply_settings(request):
data = request.json or {}
port = str(data.get("port") or "").strip()
if not port:
return (
json.dumps({"error": "port is required"}),
400,
{"Content-Type": "application/json"},
)
cli_path = _led_cli_path()
if not os.path.exists(cli_path):
return (
json.dumps({"error": "led-tool/cli.py not found"}),
500,
{"Content-Type": "application/json"},
)
cmd = _build_led_cli_command(port, data) + ["--follow"]
return _run_led_cli_command(cmd, cli_path, timeout_s=None)
@controller.post("/reset")
@controller.post("/reset/")
async def reset_device(request):
data = request.json or {}
port = str(data.get("port") or "").strip()
if not port:
return (
json.dumps({"error": "port is required"}),
400,
{"Content-Type": "application/json"},
)
cli_path = _led_cli_path()
if not os.path.exists(cli_path):
return (
json.dumps({"error": "led-tool/cli.py not found"}),
500,
{"Content-Type": "application/json"},
)
cmd = [sys.executable, cli_path, "--port", port, "--reset", "--follow"]
return _run_led_cli_command(cmd, cli_path, timeout_s=None)
@controller.get("/settings")
async def read_settings(request):
port = str(request.args.get("port") or "").strip()
if not port:
return (
json.dumps({"error": "port is required"}),
400,
{"Content-Type": "application/json"},
)
cli_path = _led_cli_path()
if not os.path.exists(cli_path):
return (
json.dumps({"error": "led-tool/cli.py not found"}),
500,
{"Content-Type": "application/json"},
)
cmd = [sys.executable, cli_path, "--port", port, "--show"]
body, status, headers = _run_led_cli_command(cmd, cli_path)
if status != 200:
return body, status, headers
data = json.loads(body)
data["settings"] = _extract_settings_from_stdout(data.get("stdout") or "")
return json.dumps(data), status, headers