From e74ef6d64feb1b29416c9a7209839f34fd723749 Mon Sep 17 00:00:00 2001 From: jimmy Date: Tue, 27 Jan 2026 13:05:07 +1300 Subject: [PATCH] Update main application and dependencies - Update main.py and run_web.py for local development - Update microdot session handling - Update wifi utility --- lib/microdot/session.py | 84 ++++++++++++++++++++++++++++++++++++----- run_web.py | 16 ++++++++ src/main.py | 14 +++++-- src/util/wifi.py | 70 ++++++++++++++++++++++++++++++++-- 4 files changed, 168 insertions(+), 16 deletions(-) diff --git a/lib/microdot/session.py b/lib/microdot/session.py index 041b290..78ce2e6 100644 --- a/lib/microdot/session.py +++ b/lib/microdot/session.py @@ -1,4 +1,25 @@ -import jwt +try: + import jwt + HAS_JWT = True +except ImportError: + HAS_JWT = False + try: + import ubinascii + except ImportError: + import binascii as ubinascii + try: + import uhashlib as hashlib + except ImportError: + import hashlib + try: + import uhmac as hmac + except ImportError: + try: + import hmac + except ImportError: + hmac = None + import json + from microdot.microdot import invoke_handler from microdot.helpers import wraps @@ -125,16 +146,61 @@ class Session: return response def encode(self, payload, secret_key=None): - return jwt.encode(payload, secret_key or self.secret_key, - algorithm='HS256') + """Encode session data using JWT if available, otherwise use simple HMAC.""" + if HAS_JWT: + return jwt.encode(payload, secret_key or self.secret_key, + algorithm='HS256') + else: + # Simple encoding for MicroPython: base64(json) + HMAC signature + key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key) + payload_json = json.dumps(payload) + payload_b64 = ubinascii.b2a_base64(payload_json.encode()).decode().strip() + + # Create HMAC signature + if hmac: + # Use hmac module if available + h = hmac.new(key, payload_json.encode(), hashlib.sha256) + else: + # Fallback: simple SHA256(key + message) + h = hashlib.sha256(key + payload_json.encode()) + signature = ubinascii.b2a_base64(h.digest()).decode().strip() + + return f"{payload_b64}.{signature}" def decode(self, session, secret_key=None): - try: - payload = jwt.decode(session, secret_key or self.secret_key, - algorithms=['HS256']) - except jwt.exceptions.PyJWTError: # pragma: no cover - return {} - return payload + """Decode session data using JWT if available, otherwise use simple HMAC.""" + if HAS_JWT: + try: + payload = jwt.decode(session, secret_key or self.secret_key, + algorithms=['HS256']) + except jwt.exceptions.PyJWTError: # pragma: no cover + return {} + return payload + else: + try: + # Simple decoding for MicroPython + if '.' not in session: + return {} + + payload_b64, signature = session.rsplit('.', 1) + payload_json = ubinascii.a2b_base64(payload_b64).decode() + + # Verify HMAC signature + key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key) + if hmac: + # Use hmac module if available + h = hmac.new(key, payload_json.encode(), hashlib.sha256) + else: + # Fallback: simple SHA256(key + message) + h = hashlib.sha256(key + payload_json.encode()) + expected_signature = ubinascii.b2a_base64(h.digest()).decode().strip() + + if signature != expected_signature: + return {} + + return json.loads(payload_json) + except Exception: + return {} def with_session(f): diff --git a/run_web.py b/run_web.py index 6a0fe0f..8bbc975 100644 --- a/run_web.py +++ b/run_web.py @@ -87,6 +87,8 @@ async def run_local(): from microdot import Microdot, send_file from microdot.websocket import with_websocket + from microdot.session import Session + import controllers.preset as preset import controllers.profile as profile import controllers.group as group @@ -94,9 +96,15 @@ async def run_local(): import controllers.tab as tab import controllers.palette as palette import controllers.scene as scene + import controllers.pattern as pattern + import controllers.settings as settings_controller app = Microdot() + # Initialize sessions with a secret key from settings + secret_key = settings.get('session_secret_key', 'led-controller-secret-key-change-in-production') + Session(app, secret_key=secret_key) + # Mount model controllers as subroutes app.mount(preset.controller, '/presets') app.mount(profile.controller, '/profiles') @@ -105,6 +113,8 @@ async def run_local(): app.mount(tab.controller, '/tabs') app.mount(palette.controller, '/palettes') app.mount(scene.controller, '/scenes') + app.mount(pattern.controller, '/patterns') + app.mount(settings_controller.controller, '/settings') # Serve index.html at root @app.route('/') @@ -112,6 +122,12 @@ async def run_local(): """Serve the main web UI.""" return send_file('src/templates/index.html') + # Serve settings page + @app.route('/settings') + def settings_page(request): + """Serve the settings page.""" + return send_file('src/templates/settings.html') + # Static file route @app.route("/static/") def static_handler(request, path): diff --git a/src/main.py b/src/main.py index fd6189a..aa7d395 100644 --- a/src/main.py +++ b/src/main.py @@ -16,6 +16,7 @@ import controllers.tab as tab import controllers.palette as palette import controllers.scene as scene import controllers.pattern as pattern +import controllers.settings as settings_controller async def main(port=80): @@ -56,6 +57,7 @@ async def main(port=80): app.mount(palette.controller, '/palettes') app.mount(scene.controller, '/scenes') app.mount(pattern.controller, '/patterns') + app.mount(settings_controller.controller, '/settings') # Serve index.html at root @app.route('/') @@ -63,6 +65,12 @@ async def main(port=80): """Serve the main web UI.""" return send_file('templates/index.html') + # Serve settings page + @app.route('/settings') + def settings_page(request): + """Serve the settings page.""" + return send_file('templates/settings.html') + # Static file route @app.route("/static/") def static_handler(request, path): @@ -87,13 +95,13 @@ async def main(port=80): server = asyncio.create_task(app.start_server(host="0.0.0.0", port=port)) - wdt = machine.WDT(timeout=10000) - wdt.feed() + #wdt = machine.WDT(timeout=10000) + #wdt.feed() while True: gc.collect() for i in range(60): - wdt.feed() + #wdt.feed() await asyncio.sleep_ms(500) # cleanup before ending the application diff --git a/src/util/wifi.py b/src/util/wifi.py index 5839234..cdae6d6 100644 --- a/src/util/wifi.py +++ b/src/util/wifi.py @@ -2,9 +2,11 @@ import network from time import sleep def connect(ssid, password, ip, gateway): - if ssid is None or password is None: - print("Missing ssid or password") + if ssid is None: + print("Missing ssid") return None + if password is None: + password = '' try: sta_if = network.WLAN(network.STA_IF) if ip is not None and gateway is not None: @@ -23,12 +25,15 @@ def connect(ssid, password, ip, gateway): return None -def ap(ssid, password): +def ap(ssid, password, channel=None): ap_if = network.WLAN(network.AP_IF) ap_mac = ap_if.config('mac') print(ssid) ap_if.active(True) - ap_if.config(essid=ssid, password=password) + if channel is not None: + ap_if.config(essid=ssid, password=password, channel=channel) + else: + ap_if.config(essid=ssid, password=password) ap_if.active(False) ap_if.active(True) print(ap_if.ifconfig()) @@ -36,3 +41,60 @@ def ap(ssid, password): def get_mac(): ap_if = network.WLAN(network.AP_IF) return ap_if.config('mac') + +def get_ap_config(): + """Get current AP configuration.""" + try: + ap_if = network.WLAN(network.AP_IF) + if ap_if.active(): + config = ap_if.ifconfig() + return { + 'ssid': ap_if.config('essid'), + 'channel': ap_if.config('channel'), + 'ip': config[0] if config else None, + 'active': True + } + return { + 'ssid': None, + 'channel': None, + 'ip': None, + 'active': False + } + except Exception as e: + print(f"Error getting AP config: {e}") + return None + +def get_sta_status(): + """Get current station connection status.""" + try: + sta_if = network.WLAN(network.STA_IF) + if sta_if.active(): + if sta_if.isconnected(): + config = sta_if.ifconfig() + return { + 'connected': True, + 'ssid': sta_if.config('essid'), + 'ip': config[0] if config else None, + 'gateway': config[2] if len(config) > 2 else None, + 'netmask': config[1] if len(config) > 1 else None, + 'dns': config[3] if len(config) > 3 else None + } + return { + 'connected': False, + 'ssid': None, + 'ip': None, + 'gateway': None, + 'netmask': None, + 'dns': None + } + return { + 'connected': False, + 'ssid': None, + 'ip': None, + 'gateway': None, + 'netmask': None, + 'dns': None + } + except Exception as e: + print(f"Error getting STA status: {e}") + return None