Files
led-controller/src/main.py

102 lines
2.8 KiB
Python

import asyncio
from settings import Settings
import gc
import machine
from microdot import Microdot, send_file
from microdot.websocket import with_websocket
from microdot.session import Session
import aioespnow
import network
import controllers.preset as preset
import controllers.profile as profile
import controllers.group as group
import controllers.sequence as sequence
import controllers.tab as tab
import controllers.palette as palette
import controllers.scene as scene
import controllers.pattern as pattern
async def main(port=80):
settings = Settings()
print("Starting")
network.WLAN(network.STA_IF).active(True)
e = aioespnow.AIOESPNow()
e.active(True)
e.add_peer(b"\xbb\xbb\xbb\xbb\xbb\xbb")
app = Microdot()
# Initialize sessions with a secret key from settings
secret_key = settings.get('session_secret_key', 'led-controller-secret-key-change-in-production')
Session(app, secret_key=secret_key)
# Mount model controllers as subroutes
# Verify controllers are Microdot instances before mounting
controllers_to_mount = [
('/presets', preset, 'preset'),
('/profiles', profile, 'profile'),
('/groups', group, 'group'),
('/sequences', sequence, 'sequence'),
('/tabs', tab, 'tab'),
('/palettes', palette, 'palette'),
('/scenes', scene, 'scene'),
]
# Mount model controllers as subroutes
app.mount(preset.controller, '/presets')
app.mount(profile.controller, '/profiles')
app.mount(group.controller, '/groups')
app.mount(sequence.controller, '/sequences')
app.mount(tab.controller, '/tabs')
app.mount(palette.controller, '/palettes')
app.mount(scene.controller, '/scenes')
app.mount(pattern.controller, '/patterns')
# Serve index.html at root
@app.route('/')
def index(request):
"""Serve the main web UI."""
return send_file('templates/index.html')
# Static file route
@app.route("/static/<path:path>")
def static_handler(request, path):
"""Serve static files."""
if '..' in path:
# Directory traversal is not allowed
return 'Not found', 404
return send_file('static/' + path)
@app.route('/ws')
@with_websocket
async def ws(request, ws):
while True:
data = await ws.receive()
if data:
await e.asend(b"\xbb\xbb\xbb\xbb\xbb\xbb", data)
print(data)
else:
break
server = asyncio.create_task(app.start_server(host="0.0.0.0", port=port))
wdt = machine.WDT(timeout=10000)
wdt.feed()
while True:
gc.collect()
for i in range(60):
wdt.feed()
await asyncio.sleep_ms(500)
# cleanup before ending the application
if __name__ == "__main__":
asyncio.run(main())