Compare commits

2 Commits

5 changed files with 11 additions and 139 deletions

View File

@@ -79,6 +79,7 @@ def apply_default(data, settings, presets):
and default_name in presets.presets
):
settings["default"] = default_name
settings.save()
def _parse_http_url(url):

View File

@@ -58,7 +58,7 @@ async def ws_handler(request, ws):
async def presets_loop():
while True:
await presets.tick()
presets.tick()
wdt.feed()
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)

View File

@@ -1,136 +0,0 @@
import os
import sys
from settings import Settings
from machine import WDT
import network
import utime
import asyncio
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets
from controller_messages import process_data
from hello import broadcast_hello_udp
settings = Settings()
print(settings)
presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings)
presets.b = settings.get("brightness", 255)
default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
print(f"Selected startup preset: {default_preset}")
else:
print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000)
wdt.feed()
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
utime.sleep(1)
wdt.feed()
app = Microdot()
def _simulator_register_microdot_app():
"""led-simulator sets LED_SIM_ROOT so Stop can shutdown() the same Microdot instance."""
root = os.environ.get("LED_SIM_ROOT")
if not root:
return
sys.path.insert(0, root)
try:
import led_driver_sim_hook as _sim_hook
except ImportError:
return
_sim_hook.register_app(app)
_simulator_register_microdot_app()
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
try:
while True:
data = await ws.receive()
if not data:
print("WS client disconnected (closed)")
break
print(data)
process_data(data, settings, presets)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
async def presets_loop():
while True:
await presets.tick()
wdt.feed()
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
print("UDP hello: broadcasting…")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80):
t_presets = asyncio.create_task(presets_loop())
t_hello = asyncio.create_task(_udp_hello_after_http_ready())
try:
await app.start_server(host="0.0.0.0", port=port)
finally:
for t in (t_presets, t_hello):
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
def _simulator_apply_pattern_from_env():
"""led-simulator sets LED_SIM_PATTERN to a patterns/ module name (no .py)."""
mod = os.environ.get("LED_SIM_PATTERN", "").strip()
if not mod:
return
presets.reload_patterns()
presets.edit(
"_sim",
{
"p": mod,
"d": 200,
"b": 255,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
},
)
if not presets.select("_sim"):
print("LED_SIM_PATTERN: could not select pattern:", mod)
if __name__ == "__main__":
_simulator_apply_pattern_from_env()
_port = int(os.environ.get("LED_SIM_PORT", "80"))
asyncio.run(main(port=_port))

View File

@@ -40,7 +40,7 @@ class Presets:
return loaded
for filename in files:
if not filename.endswith(".py") or filename == "__init__.py":
if not filename.endswith(".py") or filename in ("__init__.py", "main.py"):
continue
module_basename = filename[:-3]
module_name = "patterns." + module_basename

View File

@@ -12,11 +12,18 @@ class Settings(dict):
self.color_order = self.get_color_order(self["color_order"])
def set_defaults(self):
self["led_pin"] = 10
self["num_leds"] = 119
self["color_order"] = "rgb"
self["name"] = "a"
sta = network.WLAN(network.STA_IF)
sta.active(True)
#use led-mac for name
mac = sta.config("mac")
mac = ubinascii.hexlify(mac).decode().lower()
self["name"] = "led-" + mac
self["debug"] = False
self["default"] = "on"