Files
led-driver/src/main.py

91 lines
2.4 KiB
Python

from settings import Settings
from machine import WDT
import network
import utime
import asyncio
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets
from controller_messages import process_data
from hello import broadcast_hello_udp
settings = Settings()
print(settings)
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
print(f"Selected startup preset: {default_preset}")
else:
print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000)
wdt.feed()
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
utime.sleep(1)
wdt.feed()
print(sta_if.ifconfig())
app = Microdot()
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
try:
while True:
data = await ws.receive()
if not data:
print("WS client disconnected (closed)")
break
print(data)
process_data(data, settings, presets)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
async def presets_loop():
while True:
presets.tick()
wdt.feed()
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
print("UDP hello: broadcasting…")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80):
asyncio.create_task(presets_loop())
asyncio.create_task(_udp_hello_after_http_ready())
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__":
asyncio.run(main(port=80))