Files
led-controller/src/main.py
Jimmy ac9fca8d4b Pi port: serial transport, addressed ESP-NOW bridge, port 80
- Run app on Raspberry Pi: serial to ESP32 bridge at 912000 baud, /dev/ttyS0
- Remove ESP-NOW/MicroPython-only code from src (espnow, p2p, wifi, machine/Pin)
- Transport: always send 6-byte MAC + payload; optional to/destination_mac in API and WebSocket
- Settings and model DB use project paths (no root); fix sys.print_exception for CPython
- Preset/settings controllers use get_current_sender(); template paths for cwd=src
- Pipfile: run from src, PORT from env; scripts for port 80 (setcap) and test
- ESP32 bridge: receive 6-byte addr + payload, LRU peer management (20 max), handle ESP_ERR_ESPNOW_EXIST
- Add esp32/main.py, esp32/benchmark_peers.py, scripts/setup-port80.sh, scripts/test-port80.sh

Made-with: Cursor
2026-03-15 17:16:07 +13:00

128 lines
4.1 KiB
Python

import asyncio
import json
import os
from microdot import Microdot, send_file
from microdot.websocket import with_websocket
from microdot.session import Session
from settings import Settings
import controllers.preset as preset
import controllers.profile as profile
import controllers.group as group
import controllers.sequence as sequence
import controllers.tab as tab
import controllers.palette as palette
import controllers.scene as scene
import controllers.pattern as pattern
import controllers.settings as settings_controller
from models.transport import get_sender, set_sender
async def main(port=80):
settings = Settings()
print(settings)
print("Starting")
# Initialize transport (serial to ESP32 bridge)
sender = get_sender(settings)
set_sender(sender)
app = Microdot()
# Initialize sessions with a secret key from settings
secret_key = settings.get('session_secret_key', 'led-controller-secret-key-change-in-production')
Session(app, secret_key=secret_key)
# Mount model controllers as subroutes
# Verify controllers are Microdot instances before mounting
controllers_to_mount = [
('/presets', preset, 'preset'),
('/profiles', profile, 'profile'),
('/groups', group, 'group'),
('/sequences', sequence, 'sequence'),
('/tabs', tab, 'tab'),
('/palettes', palette, 'palette'),
('/scenes', scene, 'scene'),
]
# Mount model controllers as subroutes
app.mount(preset.controller, '/presets')
app.mount(profile.controller, '/profiles')
app.mount(group.controller, '/groups')
app.mount(sequence.controller, '/sequences')
app.mount(tab.controller, '/tabs')
app.mount(palette.controller, '/palettes')
app.mount(scene.controller, '/scenes')
app.mount(pattern.controller, '/patterns')
app.mount(settings_controller.controller, '/settings')
# Serve index.html at root (cwd is src/ when run via pipenv run run)
@app.route('/')
def index(request):
"""Serve the main web UI."""
return send_file('templates/index.html')
# Serve settings page
@app.route('/settings')
def settings_page(request):
"""Serve the settings page."""
return send_file('templates/settings.html')
# Favicon: avoid 404 in browser console (no file needed)
@app.route('/favicon.ico')
def favicon(request):
return '', 204
# Static file route
@app.route("/static/<path:path>")
def static_handler(request, path):
"""Serve static files."""
if '..' in path:
# Directory traversal is not allowed
return 'Not found', 404
return send_file('static/' + path)
@app.route('/ws')
@with_websocket
async def ws(request, ws):
while True:
data = await ws.receive()
print(data)
if data:
try:
parsed = json.loads(data)
print("WS received JSON:", parsed)
# Optional "to": 12-char hex MAC; rest is payload (sent with that address).
addr = parsed.pop("to", None)
payload = json.dumps(parsed) if parsed else data
await sender.send(payload, addr=addr)
except json.JSONDecodeError:
# Not JSON: send raw with default address
try:
await sender.send(data)
except Exception:
try:
await ws.send(json.dumps({"error": "Send failed"}))
except Exception:
pass
except Exception:
try:
await ws.send(json.dumps({"error": "Send failed"}))
except Exception:
pass
else:
break
server = asyncio.create_task(app.start_server(host="0.0.0.0", port=port))
while True:
await asyncio.sleep(30)
# cleanup before ending the application
if __name__ == "__main__":
import os
port = int(os.environ.get("PORT", 80))
asyncio.run(main(port=port))