343 lines
12 KiB
Python
Executable File
343 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Local development web server - imports and runs src.main with port 5000
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import signal
|
|
|
|
# Add project root, src, and lib to path
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
src_path = os.path.join(project_root, 'src')
|
|
lib_path = os.path.join(project_root, 'lib')
|
|
|
|
# Add to path in the right order - src must be first so 'models' and 'controllers' can be imported
|
|
# This ensures imports like 'from models.preset import Preset' work
|
|
sys.path.insert(0, src_path)
|
|
sys.path.insert(0, lib_path)
|
|
sys.path.insert(0, project_root)
|
|
|
|
# Mock MicroPython modules before importing main
|
|
class MockMachine:
|
|
class WDT:
|
|
def __init__(self, timeout):
|
|
pass
|
|
def feed(self):
|
|
pass
|
|
|
|
class MockESPNow:
|
|
def __init__(self):
|
|
self.active_value = False
|
|
self.peers = []
|
|
self.websocket_client = None # Store single WebSocket connection
|
|
def active(self, value):
|
|
self.active_value = value
|
|
print(f"[MOCK] ESPNow active: {value}")
|
|
def add_peer(self, peer):
|
|
self.peers.append(peer)
|
|
if hasattr(peer, 'hex'):
|
|
print(f"[MOCK] Added peer: {peer.hex()}")
|
|
else:
|
|
print(f"[MOCK] Added peer: {peer}")
|
|
def register_websocket(self, ws):
|
|
"""Register a WebSocket connection to forward ESPNow data to."""
|
|
self.websocket_client = ws
|
|
print(f"[MOCK] Registered WebSocket client")
|
|
def unregister_websocket(self, ws):
|
|
"""Unregister a WebSocket connection."""
|
|
if self.websocket_client == ws:
|
|
self.websocket_client = None
|
|
print(f"[MOCK] Unregistered WebSocket client")
|
|
async def asend(self, peer, data):
|
|
if hasattr(peer, 'hex'):
|
|
print(f"[MOCK] Would send to {peer.hex()}: {data}")
|
|
else:
|
|
print(f"[MOCK] Would send to {peer}: {data}")
|
|
|
|
# Forward data to the connected WebSocket client
|
|
if self.websocket_client:
|
|
try:
|
|
await self.websocket_client.send(data)
|
|
print(f"[MOCK] Forwarded to WebSocket client")
|
|
except Exception as e:
|
|
print(f"[MOCK] WebSocket client disconnected: {e}")
|
|
self.websocket_client = None
|
|
|
|
class MockAIOESPNow:
|
|
def __init__(self):
|
|
self.espnow = MockESPNow()
|
|
def active(self, value):
|
|
self.espnow.active(value)
|
|
return self.espnow
|
|
def add_peer(self, peer):
|
|
self.espnow.add_peer(peer)
|
|
async def asend(self, peer, data):
|
|
await self.espnow.asend(peer, data)
|
|
|
|
# Store reference to mock instance for WebSocket registration
|
|
@property
|
|
def mock_instance(self):
|
|
return self.espnow
|
|
|
|
# Create mock ESPNow instance and store reference for WebSocket registration
|
|
mock_espnow_instance = MockESPNow()
|
|
mock_aioespnow = MockAIOESPNow()
|
|
mock_aioespnow.espnow = mock_espnow_instance # Use the shared instance
|
|
|
|
# Create mock ESPNow instance and store reference for WebSocket registration
|
|
mock_espnow_instance = MockESPNow()
|
|
mock_aioespnow = MockAIOESPNow()
|
|
mock_aioespnow.espnow = mock_espnow_instance # Use the shared instance
|
|
|
|
# Install mocks in sys.modules before any imports
|
|
sys.modules['machine'] = MockMachine()
|
|
# Store the mock instance in the module so it can be accessed
|
|
aioespnow_module = type('module', (), {'AIOESPNow': MockAIOESPNow, '_mock_instance': mock_espnow_instance})()
|
|
sys.modules['aioespnow'] = aioespnow_module
|
|
class MockWLAN:
|
|
def __init__(self, interface):
|
|
self.interface = interface
|
|
def active(self, value):
|
|
print(f"[MOCK] WLAN({self.interface}) active: {value}")
|
|
|
|
sys.modules['network'] = type('module', (), {
|
|
'WLAN': MockWLAN,
|
|
'STA_IF': 0
|
|
})()
|
|
|
|
# Mock asyncio.sleep_ms for regular Python
|
|
_original_sleep = asyncio.sleep
|
|
async def sleep_ms(ms):
|
|
await _original_sleep(ms / 1000.0)
|
|
|
|
# Patch asyncio.sleep_ms
|
|
asyncio.sleep_ms = sleep_ms
|
|
|
|
# Patch sys.print_exception for regular Python (MicroPython has this, regular Python doesn't)
|
|
if not hasattr(sys, 'print_exception'):
|
|
import traceback
|
|
sys.print_exception = lambda e, file=None: traceback.print_exception(type(e), e, e.__traceback__, file=file)
|
|
|
|
# Patch builtins.open to redirect /db/ paths to project db directory
|
|
import builtins
|
|
_original_open = builtins.open
|
|
def patched_open(file, mode='r', *args, **kwargs):
|
|
if isinstance(file, str):
|
|
if file.startswith('/db/'):
|
|
# Redirect to project db directory
|
|
filename = os.path.basename(file)
|
|
file = os.path.join(project_root, 'db', filename)
|
|
elif not os.path.isabs(file):
|
|
# For relative paths starting with templates/ or static/,
|
|
# always resolve to src/ directory
|
|
if file.startswith('templates/') or file.startswith('static/'):
|
|
file = os.path.join(src_path, file)
|
|
# For other relative paths, check if they exist in current dir
|
|
# If not, try src/ directory
|
|
elif not os.path.exists(file):
|
|
src_file = os.path.join(src_path, file)
|
|
if os.path.exists(src_file):
|
|
file = src_file
|
|
return _original_open(file, mode, *args, **kwargs)
|
|
builtins.open = patched_open
|
|
|
|
# Also patch os.mkdir to handle /db path
|
|
original_mkdir = os.mkdir
|
|
def patched_mkdir(path):
|
|
if path == "/db":
|
|
# Use project db directory instead
|
|
db_path = os.path.join(project_root, "db")
|
|
if not os.path.exists(db_path):
|
|
os.makedirs(db_path, exist_ok=True)
|
|
else:
|
|
original_mkdir(path)
|
|
os.mkdir = patched_mkdir
|
|
|
|
# Create a flag to stop the infinite loop
|
|
_stop_flag = False
|
|
|
|
# Patch gc.collect to check stop flag
|
|
import gc as gc_module
|
|
_original_collect = gc_module.collect
|
|
def collect():
|
|
global _stop_flag
|
|
if _stop_flag:
|
|
raise KeyboardInterrupt("Stop requested")
|
|
return _original_collect()
|
|
gc_module.collect = collect
|
|
|
|
# Change to src directory for file paths (where templates and static are)
|
|
# main.py expects templates/ and static/ to be relative to the working directory
|
|
os.chdir(src_path)
|
|
|
|
# Override settings path for local development
|
|
# Import settings module and patch the path before main imports it
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location("settings", os.path.join(src_path, "settings.py"))
|
|
settings_module = importlib.util.module_from_spec(spec)
|
|
sys.modules['settings'] = settings_module
|
|
spec.loader.exec_module(settings_module)
|
|
settings_module.Settings.SETTINGS_FILE = os.path.join(project_root, 'settings.json')
|
|
|
|
# Patch the Model class file path before importing
|
|
# We need to monkey-patch the model.py file's behavior
|
|
import importlib.util
|
|
model_spec = importlib.util.spec_from_file_location("models.model", os.path.join(src_path, "models", "model.py"))
|
|
model_module = importlib.util.module_from_spec(model_spec)
|
|
|
|
# Patch os.mkdir in the model module's context
|
|
original_mkdir = os.mkdir
|
|
def patched_mkdir(path):
|
|
if path == "/db":
|
|
db_path = os.path.join(project_root, "db")
|
|
if not os.path.exists(db_path):
|
|
os.makedirs(db_path, exist_ok=True)
|
|
else:
|
|
original_mkdir(path)
|
|
|
|
# Set up the module's namespace with patched os
|
|
model_module.__dict__['os'] = type('os', (), {'mkdir': patched_mkdir, 'path': os.path})()
|
|
model_spec.loader.exec_module(model_module)
|
|
sys.modules['models.model'] = model_module
|
|
|
|
# Now patch the Model class to fix file paths
|
|
# The issue is that Model.__init__ sets self.file and immediately calls load()
|
|
# before we can patch it. We need to replace __init__ completely.
|
|
# Also clear any existing singleton instances
|
|
Model = model_module.Model
|
|
# Clear singleton instances for all Model subclasses
|
|
for attr_name in dir(model_module):
|
|
attr = getattr(model_module, attr_name)
|
|
if isinstance(attr, type) and issubclass(attr, Model) and attr != Model:
|
|
if hasattr(attr, '_instance'):
|
|
delattr(attr, '_instance')
|
|
|
|
original_save = Model.save
|
|
original_load = Model.load
|
|
original_set_defaults = Model.set_defaults
|
|
|
|
def patched_init(self):
|
|
# Only initialize once (check if already initialized)
|
|
if hasattr(self, '_initialized'):
|
|
return
|
|
|
|
# Create db directory if it doesn't exist (use project db, not /db)
|
|
db_path = os.path.join(project_root, "db")
|
|
if not os.path.exists(db_path):
|
|
os.makedirs(db_path, exist_ok=True)
|
|
|
|
self.class_name = self.__class__.__name__
|
|
# Set file path to project db directory from the start
|
|
self.file = os.path.join(project_root, 'db', f"{self.class_name.lower()}.json")
|
|
super(Model, self).__init__()
|
|
|
|
# Now call load with the correct path already set
|
|
# Call the patched load method (defined below)
|
|
Model.load(self)
|
|
self._initialized = True
|
|
|
|
def patched_save(self):
|
|
# Ensure file path is correct before saving (this will also fix print statements)
|
|
if hasattr(self, 'file') and self.file.startswith('/db/'):
|
|
filename = os.path.basename(self.file)
|
|
self.file = os.path.join(project_root, 'db', filename)
|
|
# Also ensure the directory exists
|
|
db_dir = os.path.dirname(self.file)
|
|
if not os.path.exists(db_dir):
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
return original_save(self)
|
|
|
|
def patched_load(self):
|
|
# Ensure file path is correct before loading
|
|
if hasattr(self, 'file') and self.file.startswith('/db/'):
|
|
filename = os.path.basename(self.file)
|
|
self.file = os.path.join(project_root, 'db', filename)
|
|
try:
|
|
with open(self.file, 'r') as file:
|
|
import json
|
|
loaded_settings = json.load(file)
|
|
# Use dict.update() directly, not the subclass's update() method
|
|
dict.update(self, loaded_settings)
|
|
print(f"{self.class_name} loaded successfully.")
|
|
except FileNotFoundError:
|
|
# File doesn't exist yet - this is normal on first run
|
|
print(f"No existing {self.class_name} file found, creating defaults.")
|
|
self.set_defaults()
|
|
self.save()
|
|
except Exception as e:
|
|
# Other errors - log and create defaults
|
|
print(f"Error loading {self.class_name}: {type(e).__name__}: {e}")
|
|
self.set_defaults()
|
|
self.save()
|
|
|
|
# Apply patches - load must be patched before init uses it
|
|
Model.load = patched_load
|
|
Model.__init__ = patched_init
|
|
Model.save = patched_save
|
|
|
|
# Patch with_websocket decorator before importing main to register WebSocket connections
|
|
from microdot.websocket import with_websocket as original_with_websocket
|
|
|
|
def patched_with_websocket(f):
|
|
"""Patched with_websocket decorator that registers connections with mock ESPNow."""
|
|
@original_with_websocket
|
|
async def wrapped_handler(request, ws):
|
|
# Register WebSocket connection with mock ESPNow
|
|
mock_espnow_instance.register_websocket(ws)
|
|
try:
|
|
# Call original handler
|
|
await f(request, ws)
|
|
finally:
|
|
# Unregister when connection closes
|
|
mock_espnow_instance.unregister_websocket(ws)
|
|
return wrapped_handler
|
|
|
|
# Now import main (which will use the patched settings module and model)
|
|
# Import as a module file directly to avoid package import issues
|
|
main_spec = importlib.util.spec_from_file_location("main", os.path.join(src_path, "main.py"))
|
|
main_module = importlib.util.module_from_spec(main_spec)
|
|
|
|
# Patch with_websocket in the main module before executing it
|
|
main_module.__dict__['with_websocket'] = patched_with_websocket
|
|
|
|
main_spec.loader.exec_module(main_module)
|
|
main = main_module.main
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle Ctrl+C gracefully."""
|
|
global _stop_flag
|
|
print("\nShutting down server...")
|
|
_stop_flag = True
|
|
# Force exit since main has an infinite loop
|
|
sys.exit(0)
|
|
|
|
async def run_web():
|
|
"""Run main with port 5000."""
|
|
print("Starting LED Controller Web Server (Local Development)")
|
|
print("=" * 60)
|
|
print(f"Server will run on http://localhost:5000")
|
|
print("Press Ctrl+C to stop")
|
|
print("=" * 60)
|
|
|
|
# Set up signal handler
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
try:
|
|
# Call main with port 5000
|
|
await main(port=5000)
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down server...")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(run_web())
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...")
|
|
except SystemExit:
|
|
pass
|