Update main application and dependencies

- Update main.py and run_web.py for local development
- Update microdot session handling
- Update wifi utility
This commit is contained in:
2026-01-27 13:05:07 +13:00
parent 3ed435824c
commit e74ef6d64f
4 changed files with 168 additions and 16 deletions

View File

@@ -1,4 +1,25 @@
import jwt
try:
import jwt
HAS_JWT = True
except ImportError:
HAS_JWT = False
try:
import ubinascii
except ImportError:
import binascii as ubinascii
try:
import uhashlib as hashlib
except ImportError:
import hashlib
try:
import uhmac as hmac
except ImportError:
try:
import hmac
except ImportError:
hmac = None
import json
from microdot.microdot import invoke_handler
from microdot.helpers import wraps
@@ -125,16 +146,61 @@ class Session:
return response
def encode(self, payload, secret_key=None):
return jwt.encode(payload, secret_key or self.secret_key,
algorithm='HS256')
"""Encode session data using JWT if available, otherwise use simple HMAC."""
if HAS_JWT:
return jwt.encode(payload, secret_key or self.secret_key,
algorithm='HS256')
else:
# Simple encoding for MicroPython: base64(json) + HMAC signature
key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key)
payload_json = json.dumps(payload)
payload_b64 = ubinascii.b2a_base64(payload_json.encode()).decode().strip()
# Create HMAC signature
if hmac:
# Use hmac module if available
h = hmac.new(key, payload_json.encode(), hashlib.sha256)
else:
# Fallback: simple SHA256(key + message)
h = hashlib.sha256(key + payload_json.encode())
signature = ubinascii.b2a_base64(h.digest()).decode().strip()
return f"{payload_b64}.{signature}"
def decode(self, session, secret_key=None):
try:
payload = jwt.decode(session, secret_key or self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
return {}
return payload
"""Decode session data using JWT if available, otherwise use simple HMAC."""
if HAS_JWT:
try:
payload = jwt.decode(session, secret_key or self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
return {}
return payload
else:
try:
# Simple decoding for MicroPython
if '.' not in session:
return {}
payload_b64, signature = session.rsplit('.', 1)
payload_json = ubinascii.a2b_base64(payload_b64).decode()
# Verify HMAC signature
key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key)
if hmac:
# Use hmac module if available
h = hmac.new(key, payload_json.encode(), hashlib.sha256)
else:
# Fallback: simple SHA256(key + message)
h = hashlib.sha256(key + payload_json.encode())
expected_signature = ubinascii.b2a_base64(h.digest()).decode().strip()
if signature != expected_signature:
return {}
return json.loads(payload_json)
except Exception:
return {}
def with_session(f):

View File

@@ -87,6 +87,8 @@ async def run_local():
from microdot import Microdot, send_file
from microdot.websocket import with_websocket
from microdot.session import Session
import controllers.preset as preset
import controllers.profile as profile
import controllers.group as group
@@ -94,9 +96,15 @@ async def run_local():
import controllers.tab as tab
import controllers.palette as palette
import controllers.scene as scene
import controllers.pattern as pattern
import controllers.settings as settings_controller
app = Microdot()
# Initialize sessions with a secret key from settings
secret_key = settings.get('session_secret_key', 'led-controller-secret-key-change-in-production')
Session(app, secret_key=secret_key)
# Mount model controllers as subroutes
app.mount(preset.controller, '/presets')
app.mount(profile.controller, '/profiles')
@@ -105,6 +113,8 @@ async def run_local():
app.mount(tab.controller, '/tabs')
app.mount(palette.controller, '/palettes')
app.mount(scene.controller, '/scenes')
app.mount(pattern.controller, '/patterns')
app.mount(settings_controller.controller, '/settings')
# Serve index.html at root
@app.route('/')
@@ -112,6 +122,12 @@ async def run_local():
"""Serve the main web UI."""
return send_file('src/templates/index.html')
# Serve settings page
@app.route('/settings')
def settings_page(request):
"""Serve the settings page."""
return send_file('src/templates/settings.html')
# Static file route
@app.route("/static/<path:path>")
def static_handler(request, path):

View File

@@ -16,6 +16,7 @@ import controllers.tab as tab
import controllers.palette as palette
import controllers.scene as scene
import controllers.pattern as pattern
import controllers.settings as settings_controller
async def main(port=80):
@@ -56,6 +57,7 @@ async def main(port=80):
app.mount(palette.controller, '/palettes')
app.mount(scene.controller, '/scenes')
app.mount(pattern.controller, '/patterns')
app.mount(settings_controller.controller, '/settings')
# Serve index.html at root
@app.route('/')
@@ -63,6 +65,12 @@ async def main(port=80):
"""Serve the main web UI."""
return send_file('templates/index.html')
# Serve settings page
@app.route('/settings')
def settings_page(request):
"""Serve the settings page."""
return send_file('templates/settings.html')
# Static file route
@app.route("/static/<path:path>")
def static_handler(request, path):
@@ -87,13 +95,13 @@ async def main(port=80):
server = asyncio.create_task(app.start_server(host="0.0.0.0", port=port))
wdt = machine.WDT(timeout=10000)
wdt.feed()
#wdt = machine.WDT(timeout=10000)
#wdt.feed()
while True:
gc.collect()
for i in range(60):
wdt.feed()
#wdt.feed()
await asyncio.sleep_ms(500)
# cleanup before ending the application

View File

@@ -2,9 +2,11 @@ import network
from time import sleep
def connect(ssid, password, ip, gateway):
if ssid is None or password is None:
print("Missing ssid or password")
if ssid is None:
print("Missing ssid")
return None
if password is None:
password = ''
try:
sta_if = network.WLAN(network.STA_IF)
if ip is not None and gateway is not None:
@@ -23,12 +25,15 @@ def connect(ssid, password, ip, gateway):
return None
def ap(ssid, password):
def ap(ssid, password, channel=None):
ap_if = network.WLAN(network.AP_IF)
ap_mac = ap_if.config('mac')
print(ssid)
ap_if.active(True)
ap_if.config(essid=ssid, password=password)
if channel is not None:
ap_if.config(essid=ssid, password=password, channel=channel)
else:
ap_if.config(essid=ssid, password=password)
ap_if.active(False)
ap_if.active(True)
print(ap_if.ifconfig())
@@ -36,3 +41,60 @@ def ap(ssid, password):
def get_mac():
ap_if = network.WLAN(network.AP_IF)
return ap_if.config('mac')
def get_ap_config():
"""Get current AP configuration."""
try:
ap_if = network.WLAN(network.AP_IF)
if ap_if.active():
config = ap_if.ifconfig()
return {
'ssid': ap_if.config('essid'),
'channel': ap_if.config('channel'),
'ip': config[0] if config else None,
'active': True
}
return {
'ssid': None,
'channel': None,
'ip': None,
'active': False
}
except Exception as e:
print(f"Error getting AP config: {e}")
return None
def get_sta_status():
"""Get current station connection status."""
try:
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
if sta_if.isconnected():
config = sta_if.ifconfig()
return {
'connected': True,
'ssid': sta_if.config('essid'),
'ip': config[0] if config else None,
'gateway': config[2] if len(config) > 2 else None,
'netmask': config[1] if len(config) > 1 else None,
'dns': config[3] if len(config) > 3 else None
}
return {
'connected': False,
'ssid': None,
'ip': None,
'gateway': None,
'netmask': None,
'dns': None
}
return {
'connected': False,
'ssid': None,
'ip': None,
'gateway': None,
'netmask': None,
'dns': None
}
except Exception as e:
print(f"Error getting STA status: {e}")
return None