diff --git a/.gitignore b/.gitignore index 36b13f1..36f2ffa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,176 +1,30 @@ -# ---> Python -# Byte-compiled / optimized / DLL files +# Build files +build/ +sdkconfig +sdkconfig.old + +# Binary files +*.bin + +# Python __pycache__/ *.py[cod] *$py.class - -# C extensions *.so - -# Distribution / packaging .Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv env/ venv/ -ENV/ -env.bak/ -venv.bak/ +*.egg-info/ +dist/ +build/ -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ +# OS +.DS_Store +Thumbs.db diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..ce0c926 --- /dev/null +++ b/Pipfile @@ -0,0 +1,22 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +mpremote = "*" +pyserial = "*" +esptool = "*" +watchfiles = "*" +fastapi = "*" +uvicorn = "*" +flask = "*" + +[dev-packages] + +[requires] +python_version = "3" + +[scripts] +dev = 'watchfiles "./dev.py /dev/ttyACM0 src reset follow"' +web = "uvicorn tool:app --host 0.0.0.0 --port 8080" diff --git a/README.md b/README.md index dc029af..2c43c4b 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,37 @@ -# led-driver +# LED Driver - MicroPython +MicroPython-based LED driver application for ESP32 microcontrollers. + +## Prerequisites + +- MicroPython firmware installed on ESP32 +- USB cable for programming +- Python 3 with pipenv + +## Setup + +1. Install dependencies: + ```bash + pipenv install + ``` + +2. Deploy to device: + ```bash + pipenv run dev + ``` + +## Project Structure + +``` +led-driver/ +├── src/ +│ ├── main.py # Main application code +│ ├── patterns.py # LED pattern implementations +│ ├── patterns_base.py # Base pattern class +│ ├── settings.py # Settings management +│ └── p2p.py # Peer-to-peer communication +├── test/ # Pattern tests +├── web_app.py # Web interface +├── dev.py # Development tools +└── Pipfile # Python dependencies +``` diff --git a/dev.py b/dev.py new file mode 100755 index 0000000..920df5e --- /dev/null +++ b/dev.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +import subprocess +import serial +import sys +import glob + +def upload_src(port): + subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", ".", ":"], cwd="src") + +def upload_lib(port): + subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", "lib", ":"]) + +def list_files(port): + subprocess.call(["mpremote", "connect", port, "fs", "ls", ":"]) + +def reset_device(port): + with serial.Serial(port, baudrate=115200) as ser: + ser.write(b'\x03\x03\x04') + +def follow_serial(port): + with serial.Serial(port, baudrate=115200) as ser: + while True: + if ser.in_waiting > 0: + data = ser.readline().decode('utf-8').strip() + print(data) + +def clean_settings(port): + subprocess.call(["mpremote", "connect", port, "fs", "rm", ":/settings.json"]) + +def flash_firmware(port): + # Find MicroPython firmware binary + firmware_files = glob.glob("*.bin") + if not firmware_files: + print("Error: No .bin firmware file found in current directory") + print("Please download MicroPython firmware and place it in the project directory") + sys.exit(1) + + firmware = firmware_files[0] + if len(firmware_files) > 1: + print(f"Warning: Multiple .bin files found, using: {firmware}") + + print(f"Flashing MicroPython firmware: {firmware}") + print("Erasing flash...") + subprocess.call(["esptool.py", "--port", port, "erase_flash"]) + + print(f"Writing firmware to flash...") + subprocess.call([ + "esptool.py", + "--port", port, + "--baud", "460800", + "write_flash", "0", + firmware + ]) + print("Flash complete!") + +def main(): + port = "/dev/ttyACM0" + commands = [] + i = 1 + + # Parse arguments manually to preserve order + while i < len(sys.argv): + arg = sys.argv[i] + if arg in ["-p", "--port"]: + if i + 1 < len(sys.argv): + port = sys.argv[i + 1] + i += 2 + else: + print(f"Error: {arg} requires a port argument") + sys.exit(1) + elif arg in ["-s", "--src"]: + commands.append(("src", upload_src)) + i += 1 + elif arg in ["-r", "--reset"]: + commands.append(("reset", reset_device)) + i += 1 + elif arg in ["-f", "--follow"]: + commands.append(("follow", follow_serial)) + i += 1 + elif arg == "--lib": + commands.append(("lib", upload_lib)) + i += 1 + elif arg == "--ls": + commands.append(("ls", list_files)) + i += 1 + elif arg == "--clean": + commands.append(("clean", clean_settings)) + i += 1 + elif arg == "--flash": + commands.append(("flash", flash_firmware)) + i += 1 + elif arg in ["-h", "--help"]: + print("LED Driver development tools") + print("\nUsage:") + print(" ./dev.py [-p PORT] [FLAGS...]") + print("\nFlags:") + print(" -p, --port PORT Serial port (default: /dev/ttyACM0)") + print(" -s, --src Upload src directory") + print(" -r, --reset Reset device") + print(" -f, --follow Follow serial output") + print(" --lib Upload lib directory") + print(" --ls List files on device") + print(" --clean Remove settings.json from device") + print(" --flash Flash MicroPython firmware") + print("\nExamples:") + print(" ./dev.py -p /dev/ttyACM0 -s -r -f") + print(" ./dev.py --flash -s -r") + sys.exit(0) + else: + print(f"Error: Unknown argument: {arg}") + print("Use -h or --help for usage information") + sys.exit(1) + + # Execute commands in the order they were given + if not commands: + print("No commands specified. Use -h or --help for usage information.") + sys.exit(1) + + for cmd_name, cmd_func in commands: + if cmd_name == "reset": + print("Resetting device...") + elif cmd_name == "follow": + print("Following serial output (Ctrl+C to exit)...") + elif cmd_name == "flash": + pass # flash_firmware prints its own messages + else: + print(f"{cmd_name.capitalize()}...") + cmd_func(port) + +if __name__ == "__main__": + main() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..c87bc4d --- /dev/null +++ b/src/main.py @@ -0,0 +1,34 @@ +from settings import Settings +from machine import WDT +from espnow import ESPNow +import network +from patterns import Patterns +import json + +settings = Settings() +print(settings) + +patterns = Patterns(settings["led_pin"], settings["num_leds"], selected=settings["pattern"]) +patterns.colors = [(8,0,0)] +patterns.select("rainbow") + +wdt = WDT(timeout=10000) +wdt.feed() + +sta_if = network.WLAN(network.STA_IF) +sta_if.active(True) +sta_if.disconnect() +sta_if.config(channel=1) +e = ESPNow() +e.active(True) + + +while True: + wdt.feed() + patterns.tick() + if e.any(): + host, msg = e.recv() + data = json.loads(msg) + if settings.get("name") in data.get("names", []): + settings.set_settings(data.get("settings", {}), patterns, data.get("save", False)) + diff --git a/src/p2p.py b/src/p2p.py new file mode 100644 index 0000000..a00f6a2 --- /dev/null +++ b/src/p2p.py @@ -0,0 +1,16 @@ +import asyncio +import aioespnow +import json + +async def p2p(settings, patterns): + e = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support + e.active(True) + async for mac, msg in e: + try: + data = json.loads(msg) + except: + print(f"Failed to load espnow data {msg}") + continue + + if "names" not in data or settings.get("name") in data.get("names", []): + await settings.set_settings(data.get("settings", {}), patterns, data.get("save", False)) \ No newline at end of file diff --git a/src/patterns.py b/src/patterns.py new file mode 100644 index 0000000..75d6032 --- /dev/null +++ b/src/patterns.py @@ -0,0 +1,322 @@ +import utime +from patterns_base import Patterns as PatternsBase + +class Patterns(PatternsBase): + def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="off", delay=100): + super().__init__(pin, num_leds, color1, color2, brightness, selected, delay) + self.auto = True + self.step = 0 + self.patterns = { + "off": self.off, + "on" : self.on, + "blink": self.blink, + "rainbow": self.rainbow, + "pulse": self.pulse, + "transition": self.transition, + "chase": self.chase, + "circle": self.circle, + } + + + def blink(self): + state = True # True = on, False = off + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, last_update) >= self.delay: + if state: + self.fill(self.apply_brightness(self.colors[0])) + else: + self.fill((0, 0, 0)) + state = not state + last_update = current_time + # Yield once per tick so other logic can run + yield + + + def rainbow(self): + step = self.step % 256 + step_amount = max(1, int(self.n1)) # n1 controls step increment + + # If auto is False, run a single step and then stop + if not self.auto: + for i in range(self.num_leds): + rc_index = (i * 256 // self.num_leds) + step + self.n[i] = self.apply_brightness(self.wheel(rc_index & 255)) + self.n.write() + # Increment step by n1 for next manual call + self.step = (step + step_amount) % 256 + # Allow tick() to advance the generator once + yield + return + + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + sleep_ms = max(1, int(self.delay)) # Access delay directly + if utime.ticks_diff(current_time, last_update) >= sleep_ms: + for i in range(self.num_leds): + rc_index = (i * 256 // self.num_leds) + step + self.n[i] = self.apply_brightness(self.wheel(rc_index & 255)) + self.n.write() + step = (step + step_amount) % 256 + self.step = step + last_update = current_time + # Yield once per tick so other logic can run + yield + + + def pulse(self): + self.off() + + # Ensure we have at least one color + if not self.colors: + self.colors = [(255, 255, 255)] + + color_index = 0 + cycle_start = utime.ticks_ms() + + # State machine based pulse using a single generator loop + while True: + # Read current timing parameters each cycle so they can be changed live + attack_ms = max(0, int(self.n1)) # Attack time in ms + hold_ms = max(0, int(self.n2)) # Hold time in ms + decay_ms = max(0, int(self.n3)) # Decay time in ms + delay_ms = max(0, int(self.delay)) + + total_ms = attack_ms + hold_ms + decay_ms + delay_ms + if total_ms <= 0: + total_ms = 1 + + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, cycle_start) + + base_color = self.colors[color_index % len(self.colors)] + + if elapsed < attack_ms and attack_ms > 0: + # Attack: fade 0 -> 1 + factor = elapsed / attack_ms + color = tuple(int(c * factor) for c in base_color) + self.fill(self.apply_brightness(color)) + elif elapsed < attack_ms + hold_ms: + # Hold: full brightness + self.fill(self.apply_brightness(base_color)) + elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0: + # Decay: fade 1 -> 0 + dec_elapsed = elapsed - attack_ms - hold_ms + factor = max(0.0, 1.0 - (dec_elapsed / decay_ms)) + color = tuple(int(c * factor) for c in base_color) + self.fill(self.apply_brightness(color)) + elif elapsed < total_ms: + # Delay phase: LEDs off between pulses + self.fill((0, 0, 0)) + else: + # End of cycle, move to next color and restart timing + color_index += 1 + cycle_start = now + if not self.auto: + break + # Skip drawing this tick, start next cycle + yield + continue + + # Yield once per tick + yield + + def transition(self): + """Transition between colors, blending over `delay` ms.""" + if not self.colors: + self.off() + yield + return + + # Only one color: just keep it on + if len(self.colors) == 1: + while True: + self.fill(self.apply_brightness(self.colors[0])) + yield + return + + color_index = 0 + start_time = utime.ticks_ms() + + while True: + if not self.colors: + break + + # Get current and next color based on live list + c1 = self.colors[color_index % len(self.colors)] + c2 = self.colors[(color_index + 1) % len(self.colors)] + + duration = max(10, int(self.delay)) # At least 10ms + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, start_time) + + if elapsed >= duration: + # End of this transition step + if not self.auto and color_index >= 0: + # One-shot: transition from first to second color only + self.fill(self.apply_brightness(c2)) + break + # Auto: move to next pair + color_index = (color_index + 1) % len(self.colors) + start_time = now + yield + continue + + # Interpolate between c1 and c2 + factor = elapsed / duration + interpolated = tuple( + int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3) + ) + self.fill(self.apply_brightness(interpolated)) + + yield + + def chase(self): + """Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating. + Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)""" + if len(self.colors) < 1: + # Need at least 1 color + return + + segment_length = 0 # Will be calculated in loop + position = 0 # Current position offset + step_count = 0 # Track which step we're on + + last_update = utime.ticks_ms() + + while True: + # Access colors, delay, and n values directly for live updates + if not self.colors: + break + # If only one color provided, use it for both colors + if len(self.colors) < 2: + color0 = self.colors[0] + color1 = self.colors[0] + else: + color0 = self.colors[0] + color1 = self.colors[1] + + color0 = self.apply_brightness(color0) + color1 = self.apply_brightness(color1) + + n1 = max(1, int(self.n1)) # LEDs of color 0 + n2 = max(1, int(self.n2)) # LEDs of color 1 + n3 = int(self.n3) # Step movement on odd steps (can be negative) + n4 = int(self.n4) # Step movement on even steps (can be negative) + + segment_length = n1 + n2 + transition_duration = max(10, int(self.delay)) + + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, last_update) >= transition_duration: + # Clear all LEDs + self.n.fill((0, 0, 0)) + + # Draw repeating pattern starting at position + for i in range(self.num_leds): + # Calculate position in the repeating segment + relative_pos = (i - position) % segment_length + if relative_pos < 0: + relative_pos = (relative_pos + segment_length) % segment_length + + # Determine which color based on position in segment + if relative_pos < n1: + self.n[i] = color0 + else: + self.n[i] = color1 + + self.n.write() + + # Move position by n3 or n4 on alternate steps + if step_count % 2 == 0: + position = position + n3 + else: + position = position + n4 + + # Wrap position to keep it reasonable + max_pos = self.num_leds + segment_length + position = position % max_pos + if position < 0: + position += max_pos + + step_count += 1 + last_update = current_time + + # Yield once per tick so other logic can run + yield + + def circle(self): + """Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4""" + head = 0 + tail = 0 + + # Calculate timing + head_rate = max(1, int(self.n1)) # n1 = head moves per second + tail_rate = max(1, int(self.n3)) # n3 = tail moves per second + max_length = max(1, int(self.n2)) # n2 = max length + min_length = max(0, int(self.n4)) # n4 = min length + + head_delay = 1000 // head_rate # ms between head movements + tail_delay = 1000 // tail_rate # ms between tail movements + + last_head_move = utime.ticks_ms() + last_tail_move = utime.ticks_ms() + + phase = "growing" # "growing", "shrinking", or "off" + + while True: + current_time = utime.ticks_ms() + + # Clear all LEDs + self.n.fill((0, 0, 0)) + + # Calculate segment length + segment_length = (head - tail) % self.num_leds + if segment_length == 0 and head != tail: + segment_length = self.num_leds + + # Draw segment from tail to head + color = self.apply_brightness(self.colors[0]) + for i in range(segment_length + 1): + led_pos = (tail + i) % self.num_leds + self.n[led_pos] = color + + # Move head continuously at n1 LEDs per second + if utime.ticks_diff(current_time, last_head_move) >= head_delay: + head = (head + 1) % self.num_leds + last_head_move = current_time + + # Tail behavior based on phase + if phase == "growing": + # Growing phase: tail stays at 0 until max length reached + if segment_length >= max_length: + phase = "shrinking" + elif phase == "shrinking": + # Shrinking phase: move tail forward at n3 LEDs per second + if utime.ticks_diff(current_time, last_tail_move) >= tail_delay: + tail = (tail + 1) % self.num_leds + last_tail_move = current_time + + # Check if we've reached min length + current_length = (head - tail) % self.num_leds + if current_length == 0 and head != tail: + current_length = self.num_leds + + # For min_length = 0, we need at least 1 LED (the head) + if min_length == 0 and current_length <= 1: + phase = "off" # All LEDs off for 1 step + elif min_length > 0 and current_length <= min_length: + phase = "growing" # Cycle repeats + else: # phase == "off" + # Off phase: all LEDs off for 1 step, then restart + tail = head # Reset tail to head position to start fresh + phase = "growing" + + self.n.write() + + # Yield once per tick so other logic can run + yield \ No newline at end of file diff --git a/src/patterns_base.py b/src/patterns_base.py new file mode 100644 index 0000000..2108700 --- /dev/null +++ b/src/patterns_base.py @@ -0,0 +1,145 @@ +from machine import Pin +from neopixel import NeoPixel +import utime + + + + +# Short-key parameter mapping for convenience setters +param_mapping = { + "pt": "selected", + "pa": "selected", + "cl": "colors", + "br": "brightness", + "dl": "delay", + "nl": "num_leds", + "co": "color_order", + "lp": "led_pin", + "n1": "n1", + "n2": "n2", + "n3": "n3", + "n4": "n4", + "n5": "n5", + "n6": "n6", + "auto": "auto", +} + +class Patterns: + def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="off", delay=100): + self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) + self.num_leds = num_leds + self.pattern_step = 0 + self.last_update = utime.ticks_ms() + self.delay = delay + self.brightness = brightness + self.auto = False + self.patterns = {} + self.selected = selected + # Ensure colors list always starts with at least two for robust transition handling + self.colors = [color1, color2] if color1 != color2 else [color1, (255, 255, 255)] # Fallback if initial colors are same + if not self.colors: # Ensure at least one color exists + self.colors = [(0, 0, 0)] + + self.transition_duration = delay * 50 # Default transition duration + self.hold_duration = delay * 10 # Default hold duration at each color + self.transition_step = 0 # Current step in the transition + self.current_color_idx = 0 # Index of the color currently being held/transitioned from + self.current_color = self.colors[self.current_color_idx] # The actual blended color + + self.hold_start_time = utime.ticks_ms() # Time when the current color hold started + + # New attributes for scanner patterns + self.scanner_direction = 1 # 1 for forward, -1 for backward + self.scanner_tail_length = 3 # Number of trailing pixels + + self.n1 = 0 + self.n2 = 0 + self.n3 = 0 + self.n4 = 0 + self.n5 = 0 + self.n6 = 0 + + self.generator = None + self.select(self.selected) + + + def tick(self): + if self.generator is None: + return + try: + next(self.generator) + except StopIteration: + self.generator = None + + def select(self, pattern): + if pattern in self.patterns: + self.selected = pattern + self.generator = self.patterns[pattern]() + print(f"Selected pattern: {pattern}") + return True + # If pattern doesn't exist, default to "off" + return False + + def set_param(self, key, value): + if key in param_mapping: + setattr(self, param_mapping[key], value) + return True + print(f"Invalid parameter: {key}") + return False + + def update_num_leds(self, pin, num_leds): + self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) + self.num_leds = num_leds + self.pattern_step = 0 + + + def set_color(self, num, color): + # Changed: More robust index check + if 0 <= num < len(self.colors): + self.colors[num] = color + # If the changed color is part of the current or next transition, + # restart the transition for smoother updates + return True + elif num == len(self.colors): # Allow setting a new color at the end + self.colors.append(color) + return True + return False + + + def del_color(self, num): + # Changed: More robust index check and using del for lists + if 0 <= num < len(self.colors): + del self.colors[num] + return True + return False + + def apply_brightness(self, color, brightness_override=None): + effective_brightness = brightness_override if brightness_override is not None else self.brightness + return tuple(int(c * effective_brightness / 255) for c in color) + + def fill(self, color=None): + fill_color = color if color is not None else self.colors[0] + for i in range(self.num_leds): + self.n[i] = fill_color + self.n.write() + + def off(self): + self.fill((0, 0, 0)) + + def on(self): + self.fill(self.apply_brightness(self.colors[0])) + + + + + def wheel(self, pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + \ No newline at end of file diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..4b4f464 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,131 @@ +import json +import ubinascii +import machine +import network + +class Settings(dict): + SETTINGS_FILE = "/settings.json" + + def __init__(self): + super().__init__() + self.load() # Load settings from file during initialization + self.color_order = self.get_color_order(self["color_order"]) + + def set_defaults(self): + self["led_pin"] = 10 + self["num_leds"] = 50 + self["pattern"] = "on" + self["delay"] = 100 + self["brightness"] = 10 + self["color_order"] = "rgb" + self["name"] = f"led-{ubinascii.hexlify(network.WLAN(network.AP_IF).config('mac')).decode()}" + self["ap_password"] = "" + self["id"] = 0 + self["debug"] = False + + def save(self): + try: + j = json.dumps(self) + with open(self.SETTINGS_FILE, 'w') as file: + file.write(j) + print("Settings saved successfully.") + except Exception as e: + print(f"Error saving settings: {e}") + + def load(self): + try: + with open(self.SETTINGS_FILE, 'r') as file: + loaded_settings = json.load(file) + self.update(loaded_settings) + print("Settings loaded successfully.") + except Exception as e: + print(f"Error loading settings") + self.set_defaults() + self.save() + + def set_settings(self, data, patterns, save): + try: + print(f"Setting settings: {data}") + for key, value in data.items(): + print(key, value) + if key == "colors": + buff = [] + for color in value: + buff.append(tuple(int(color[i:i+2], 16) for i in self.color_order)) + patterns.colors = buff + elif key == "num_leds": + patterns.update_num_leds(self["led_pin"], value) + elif key == "pattern": + if not patterns.select(value): + return "Pattern doesn't exist", 400 + elif key == "delay": + delay = int(data["delay"]) + patterns.delay = delay + elif key == "brightness": + brightness = int(data["brightness"]) + patterns.brightness = brightness + elif key == "n1": + patterns.n1 = value + elif key == "n2": + patterns.n2 = value + elif key == "n3": + patterns.n3 = value + elif key == "n4": + patterns.n4 = value + elif key == "n5": + patterns.n5 = value + elif key == "n6": + patterns.n6 = value + elif key == "name": + self[key] = value + self.save() + machine.reset() + elif key == "color_order": + self["color_order"] = value + self.color_order = self.get_color_order(value) + pass + elif key == "id": + pass + elif key == "led_pin": + patterns.update_num_leds(value, self["num_leds"]) + else: + return "Invalid key", 400 + self[key] = value + #print(self) + if save: + self.save() + print(self) + return "OK", 200 + except (KeyError, ValueError): + return "Bad request", 400 + + def get_color_order(self, color_order): + """Convert color order string to tuple of hex string indices.""" + color_orders = { + "rgb": (1, 3, 5), + "rbg": (1, 5, 3), + "grb": (3, 1, 5), + "gbr": (3, 5, 1), + "brg": (5, 1, 3), + "bgr": (5, 3, 1) + } + return color_orders.get(color_order.lower(), (1, 3, 5)) # Default to RGB + +# Example usage +def main(): + settings = Settings() + print(f"Number of LEDs: {settings['num_leds']}") + settings['num_leds'] = 100 + print(f"Updated number of LEDs: {settings['num_leds']}") + settings.save() + + # Create a new Settings object to test loading + new_settings = Settings() + print(f"Loaded number of LEDs: {new_settings['num_leds']}") + print(settings) + + + +# Run the example +if __name__ == "__main__": + main() diff --git a/test/patterns/blink.py b/test/patterns/blink.py new file mode 100644 index 0000000..105f940 --- /dev/null +++ b/test/patterns/blink.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + p.set_param("br", 64) + p.set_param("dl", 200) + p.set_param("cl", [(255, 0, 0), (0, 0, 255)]) + p.select("blink") + + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 1500: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/test/patterns/chase.py b/test/patterns/chase.py new file mode 100644 index 0000000..5cbd44a --- /dev/null +++ b/test/patterns/chase.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1) + print("Test 1: Basic chase (n1=5, n2=5, n3=1, n4=1)") + p.set_param("br", 255) + p.set_param("dl", 200) + p.set_param("n1", 5) + p.set_param("n2", 5) + p.set_param("n3", 1) + p.set_param("n4", 1) + p.set_param("cl", [(255, 0, 0), (0, 255, 0)]) + p.select("chase") + run_for(p, wdt, 3000) + + # Test 2: Forward and backward (n3=2, n4=-1) + print("Test 2: Forward and backward (n3=2, n4=-1)") + p.set_param("n1", 3) + p.set_param("n2", 3) + p.set_param("n3", 2) + p.set_param("n4", -1) + p.set_param("dl", 150) + p.set_param("cl", [(0, 0, 255), (255, 255, 0)]) + p.select("chase") + run_for(p, wdt, 3000) + + # Test 3: Large segments (n1=10, n2=5) + print("Test 3: Large segments (n1=10, n2=5, n3=3, n4=3)") + p.set_param("n1", 10) + p.set_param("n2", 5) + p.set_param("n3", 3) + p.set_param("n4", 3) + p.set_param("dl", 200) + p.set_param("cl", [(255, 128, 0), (128, 0, 255)]) + p.select("chase") + run_for(p, wdt, 3000) + + # Test 4: Fast movement (n3=5, n4=5) + print("Test 4: Fast movement (n3=5, n4=5)") + p.set_param("n1", 4) + p.set_param("n2", 4) + p.set_param("n3", 5) + p.set_param("n4", 5) + p.set_param("dl", 100) + p.set_param("cl", [(255, 0, 255), (0, 255, 255)]) + p.select("chase") + run_for(p, wdt, 2000) + + # Test 5: Backward movement (n3=-2, n4=-2) + print("Test 5: Backward movement (n3=-2, n4=-2)") + p.set_param("n1", 6) + p.set_param("n2", 4) + p.set_param("n3", -2) + p.set_param("n4", -2) + p.set_param("dl", 200) + p.set_param("cl", [(255, 255, 255), (0, 0, 0)]) + p.select("chase") + run_for(p, wdt, 3000) + + # Test 6: Alternating forward/backward (n3=3, n4=-2) + print("Test 6: Alternating forward/backward (n3=3, n4=-2)") + p.set_param("n1", 5) + p.set_param("n2", 5) + p.set_param("n3", 3) + p.set_param("n4", -2) + p.set_param("dl", 250) + p.set_param("cl", [(255, 0, 0), (0, 255, 0)]) + p.select("chase") + run_for(p, wdt, 4000) + + # Cleanup + print("Test complete, turning off") + p.select("off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/test/patterns/circle.py b/test/patterns/circle.py new file mode 100644 index 0000000..aed7bf4 --- /dev/null +++ b/test/patterns/circle.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0) + print("Test 1: Basic circle (n1=50, n2=100, n3=200, n4=0)") + p.set_param("br", 255) + p.set_param("n1", 50) # Head moves 50 LEDs/second + p.set_param("n2", 100) # Max length 100 LEDs + p.set_param("n3", 200) # Tail moves 200 LEDs/second + p.set_param("n4", 0) # Min length 0 LEDs + p.set_param("cl", [(255, 0, 0)]) # Red + p.select("circle") + run_for(p, wdt, 5000) + + # Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0) + print("Test 2: Slow growth, fast shrink (n1=20, n2=50, n3=100, n4=0)") + p.set_param("n1", 20) + p.set_param("n2", 50) + p.set_param("n3", 100) + p.set_param("n4", 0) + p.set_param("cl", [(0, 255, 0)]) # Green + p.select("circle") + run_for(p, wdt, 5000) + + # Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0) + print("Test 3: Fast growth, slow shrink (n1=100, n2=30, n3=20, n4=0)") + p.set_param("n1", 100) + p.set_param("n2", 30) + p.set_param("n3", 20) + p.set_param("n4", 0) + p.set_param("cl", [(0, 0, 255)]) # Blue + p.select("circle") + run_for(p, wdt, 5000) + + # Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10) + print("Test 4: With minimum length (n1=50, n2=40, n3=100, n4=10)") + p.set_param("n1", 50) + p.set_param("n2", 40) + p.set_param("n3", 100) + p.set_param("n4", 10) + p.set_param("cl", [(255, 255, 0)]) # Yellow + p.select("circle") + run_for(p, wdt, 5000) + + # Test 5: Very fast (n1=200, n2=20, n3=200, n4=0) + print("Test 5: Very fast (n1=200, n2=20, n3=200, n4=0)") + p.set_param("n1", 200) + p.set_param("n2", 20) + p.set_param("n3", 200) + p.set_param("n4", 0) + p.set_param("cl", [(255, 0, 255)]) # Magenta + p.select("circle") + run_for(p, wdt, 3000) + + # Test 6: Very slow (n1=10, n2=25, n3=10, n4=0) + print("Test 6: Very slow (n1=10, n2=25, n3=10, n4=0)") + p.set_param("n1", 10) + p.set_param("n2", 25) + p.set_param("n3", 10) + p.set_param("n4", 0) + p.set_param("cl", [(0, 255, 255)]) # Cyan + p.select("circle") + run_for(p, wdt, 5000) + + # Cleanup + print("Test complete, turning off") + p.select("off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/test/patterns/off.py b/test/patterns/off.py new file mode 100644 index 0000000..c0df8f9 --- /dev/null +++ b/test/patterns/off.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + p.select("off") + + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 200: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/test/patterns/on.py b/test/patterns/on.py new file mode 100644 index 0000000..09d3379 --- /dev/null +++ b/test/patterns/on.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + p.set_param("br", 64) + p.set_param("dl", 120) + p.set_param("cl", [(255, 0, 0), (0, 0, 255)]) + + # ON phase + p.select("on") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 800: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + # OFF phase + p.select("off") + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < 100: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +if __name__ == "__main__": + main() + + diff --git a/test/patterns/pulse.py b/test/patterns/pulse.py new file mode 100644 index 0000000..bf6912d --- /dev/null +++ b/test/patterns/pulse.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Simple single-color pulse + print("Test 1: Single-color pulse (attack=500, hold=500, decay=500, delay=500)") + p.set_param("br", 255) + p.set_param("cl", [(255, 0, 0)]) # Red + p.set_param("n1", 500) # attack ms + p.set_param("n2", 500) # hold ms + p.set_param("n3", 500) # decay ms + p.set_param("dl", 500) # delay ms between pulses + p.set_param("auto", True) + p.select("pulse") + run_for(p, wdt, 5000) + + # Test 2: Faster pulse + print("Test 2: Fast pulse (attack=100, hold=100, decay=100, delay=100)") + p.set_param("n1", 100) + p.set_param("n2", 100) + p.set_param("n3", 100) + p.set_param("dl", 100) + p.set_param("cl", [(0, 255, 0)]) # Green + p.select("pulse") + run_for(p, wdt, 4000) + + # Test 3: Multi-color pulse cycle + print("Test 3: Multi-color pulse (red -> green -> blue)") + p.set_param("n1", 300) + p.set_param("n2", 300) + p.set_param("n3", 300) + p.set_param("dl", 200) + p.set_param("cl", [(255, 0, 0), (0, 255, 0), (0, 0, 255)]) + p.set_param("auto", True) + p.select("pulse") + run_for(p, wdt, 6000) + + # Test 4: One-shot pulse (auto=False) + print("Test 4: Single pulse, auto=False") + p.set_param("n1", 400) + p.set_param("n2", 0) + p.set_param("n3", 400) + p.set_param("dl", 0) + p.set_param("cl", [(255, 255, 255)]) + p.set_param("auto", False) + p.select("pulse") + # Run long enough to allow one full pulse cycle + run_for(p, wdt, 1500) + + # Cleanup + print("Test complete, turning off") + p.select("off") + run_for(p, wdt, 200) + + +if __name__ == "__main__": + main() + + diff --git a/test/patterns/rainbow.py b/test/patterns/rainbow.py new file mode 100644 index 0000000..8e4791d --- /dev/null +++ b/test/patterns/rainbow.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Basic rainbow with auto=True (continuous) + print("Test 1: Basic rainbow (auto=True, n1=1)") + p.set_param("br", 255) + p.set_param("dl", 100) # Delay affects animation speed + p.set_param("n1", 1) # Step increment of 1 + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 3000) + + # Test 2: Fast rainbow + print("Test 2: Fast rainbow (low delay, n1=1)") + p.set_param("dl", 50) + p.set_param("n1", 1) + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 2000) + + # Test 3: Slow rainbow + print("Test 3: Slow rainbow (high delay, n1=1)") + p.set_param("dl", 500) + p.set_param("n1", 1) + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 3000) + + # Test 4: Low brightness rainbow + print("Test 4: Low brightness rainbow (n1=1)") + p.set_param("br", 64) + p.set_param("dl", 100) + p.set_param("n1", 1) + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 2000) + + # Test 5: Single-step rainbow (auto=False) + print("Test 5: Single-step rainbow (auto=False, n1=1)") + p.set_param("br", 255) + p.set_param("dl", 100) + p.set_param("n1", 1) + p.set_param("auto", False) + p.step = 0 + for i in range(10): + p.select("rainbow") + # One tick advances the generator one frame when auto=False + p.tick() + utime.sleep_ms(100) + wdt.feed() + + # Test 6: Verify step updates correctly + print("Test 6: Verify step updates (auto=False, n1=1)") + p.set_param("n1", 1) + p.set_param("auto", False) + initial_step = p.step + p.select("rainbow") + p.tick() + final_step = p.step + print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)") + + # Test 7: Fast step increment (n1=5) + print("Test 7: Fast rainbow (n1=5, auto=True)") + p.set_param("br", 255) + p.set_param("dl", 100) + p.set_param("n1", 5) + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 2000) + + # Test 8: Very fast step increment (n1=10) + print("Test 8: Very fast rainbow (n1=10, auto=True)") + p.set_param("n1", 10) + p.set_param("auto", True) + p.select("rainbow") + run_for(p, wdt, 2000) + + # Test 9: Verify n1 controls step increment (auto=False) + print("Test 9: Verify n1 step increment (auto=False, n1=5)") + p.set_param("n1", 5) + p.set_param("auto", False) + p.step = 0 + initial_step = p.step + p.select("rainbow") + p.tick() + final_step = p.step + expected_step = (initial_step + 5) % 256 + print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})") + if final_step == expected_step: + print("✓ n1 step increment working correctly") + else: + print(f"✗ Step increment mismatch! Expected {expected_step}, got {final_step}") + + # Cleanup + print("Test complete, turning off") + p.select("off") + run_for(p, wdt, 100) + + +if __name__ == "__main__": + main() + diff --git a/test/patterns/transition.py b/test/patterns/transition.py new file mode 100644 index 0000000..7e2f95b --- /dev/null +++ b/test/patterns/transition.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +import utime +from machine import WDT +from settings import Settings +from patterns import Patterns + + +def run_for(p, wdt, ms): + """Helper: run current pattern for given ms using tick().""" + start = utime.ticks_ms() + while utime.ticks_diff(utime.ticks_ms(), start) < ms: + wdt.feed() + p.tick() + utime.sleep_ms(10) + + +def main(): + s = Settings() + pin = s.get("led_pin", 10) + num = s.get("num_leds", 30) + + p = Patterns(pin=pin, num_leds=num) + wdt = WDT(timeout=10000) + + # Test 1: Simple two-color transition + print("Test 1: Two-color transition (red <-> blue, delay=1000)") + p.set_param("br", 255) + p.set_param("dl", 1000) # transition duration + p.set_param("cl", [(255, 0, 0), (0, 0, 255)]) + p.set_param("auto", True) + p.select("transition") + run_for(p, wdt, 6000) + + # Test 2: Multi-color transition + print("Test 2: Multi-color transition (red -> green -> blue -> white)") + p.set_param("dl", 800) + p.set_param("cl", [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 255)]) + p.set_param("auto", True) + p.select("transition") + run_for(p, wdt, 8000) + + # Test 3: One-shot transition (auto=False) + print("Test 3: One-shot transition (auto=False)") + p.set_param("dl", 1000) + p.set_param("cl", [(255, 0, 0), (0, 255, 0)]) + p.set_param("auto", False) + p.select("transition") + # Run long enough for a single transition step + run_for(p, wdt, 2000) + + # Test 4: Single-color behavior (should just stay on) + print("Test 4: Single-color transition (should hold color)") + p.set_param("cl", [(0, 0, 255)]) + p.set_param("dl", 500) + p.set_param("auto", True) + p.select("transition") + run_for(p, wdt, 3000) + + # Cleanup + print("Test complete, turning off") + p.select("off") + run_for(p, wdt, 200) + + +if __name__ == "__main__": + main() + + diff --git a/tool.py b/tool.py new file mode 100644 index 0000000..a4754cf --- /dev/null +++ b/tool.py @@ -0,0 +1,734 @@ +#!/usr/bin/env python3 +""" +LED Bar Configuration Web App + +Flask-based web UI for downloading, editing, and uploading settings.json +to/from MicroPython devices via mpremote. +""" + +import json +import tempfile +import subprocess +import os +from pathlib import Path + +from flask import ( + Flask, + render_template_string, + request, + redirect, + url_for, + flash, +) + + +app = Flask(__name__) +app.secret_key = "change-me-in-production" + + +SETTINGS_CONFIG = [ + ("led_pin", "LED Pin", "number"), + ("num_leds", "Number of LEDs", "number"), + ("color_order", "Color Order", "choice", ["rgb", "rbg", "grb", "gbr", "brg", "bgr"]), + ("name", "Device Name", "text"), + ("pattern", "Pattern", "text"), + ("delay", "Delay (ms)", "number"), + ("brightness", "Brightness", "number"), + ("n1", "N1", "number"), + ("n2", "N2", "number"), + ("n3", "N3", "number"), + ("n4", "N4", "number"), + ("n5", "N5", "number"), + ("n6", "N6", "number"), + ("ap_password", "AP Password", "text"), + ("id", "ID", "number"), + ("debug", "Debug Mode", "choice", ["True", "False"]), +] + + +def _run_mpremote_copy(from_device: bool, device: str, temp_path: str) -> None: + if from_device: + cmd = ["mpremote", "connect", device, "cp", ":/settings.json", temp_path] + else: + cmd = ["mpremote", "connect", device, "cp", temp_path, ":/settings.json"] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode != 0: + raise RuntimeError(f"mpremote error: {result.stderr.strip() or result.stdout.strip()}") + + +def download_settings(device: str) -> dict: + """Download settings.json from the device using mpremote.""" + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) + temp_path = temp_file.name + temp_file.close() + + try: + _run_mpremote_copy(from_device=True, device=device, temp_path=temp_path) + with open(temp_path, "r", encoding="utf-8") as f: + return json.load(f) + finally: + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except OSError: + pass + + +def upload_settings(device: str, settings: dict) -> None: + """Upload settings.json to the device using mpremote and reset device.""" + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) + temp_path = temp_file.name + + try: + json.dump(settings, temp_file, indent=2) + temp_file.close() + + _run_mpremote_copy(from_device=False, device=device, temp_path=temp_path) + + # Reset device (best effort) + try: + import serial # type: ignore + + with serial.Serial(device, baudrate=115200) as ser: + ser.write(b"\x03\x03\x04") + except Exception: + reset_cmd = [ + "mpremote", + "connect", + device, + "exec", + "import machine; machine.reset()", + ] + try: + subprocess.run(reset_cmd, capture_output=True, text=True, timeout=5) + except subprocess.TimeoutExpired: + pass + finally: + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except OSError: + pass + + +def parse_settings_from_form(form) -> dict: + settings = {} + for cfg in SETTINGS_CONFIG: + key = cfg[0] + raw = (form.get(key) or "").strip() + if raw == "": + continue + + if key in ["led_pin", "num_leds", "delay", "brightness", "id", "n1", "n2", "n3", "n4", "n5", "n6"]: + try: + settings[key] = int(raw) + except ValueError: + settings[key] = raw + elif key == "debug": + settings[key] = raw == "True" + else: + settings[key] = raw + return settings + + +TEMPLATE = """ + + + + + LED Bar Configuration + + + + +
+
+
+

+ LED Bar Configuration + Web Console +

+
+ + + Raspberry Pi · MicroPython + + settings.json live editor +
+
+
+ Device: {{ device or "/dev/ttyACM0" }} +
+
+ +
+
+
+
+

Device Connection

+ Connect to your MicroPython LED controller and sync configuration +
+
+ +
+ +
+ + + +
+ +
+ + {{ status or "Ready" }} +
+ +
+ Tip: + Download from device → tweak parameters → Upload and reboot. +
+ +
+ +
+
+

LED Settings

+ Edit all fields before uploading back to your controller +
+
+ +
+ {% for field in settings_config %} + {% set key, label, field_type = field[0], field[1], field[2] %} +
+ + {% if field_type == 'choice' %} + {% set choices = field[3] %} + + {% else %} + + {% endif %} +
+ {% endfor %} +
+ +
+ + +
+
+
+ +
+
+
+

Raw JSON

+ For advanced editing, paste or copy the full settings.json +
+
+ +
+ + + + +
+ + +
+
+
+
+
+ +
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +
+ {{ message }} +
+ {% endfor %} + {% endif %} + {% endwith %} +
+ + +""" + + +@app.route("/", methods=["GET"]) +def index(): + return render_template_string( + TEMPLATE, + device="/dev/ttyACM0", + settings={}, + settings_config=SETTINGS_CONFIG, + status="Ready", + status_type="ok", + raw_json="{}", + ) + + +@app.route("/", methods=["POST"]) +def handle_action(): + action = request.form.get("action") or "" + device = (request.form.get("device") or "/dev/ttyACM0").strip() + raw_json = (request.form.get("raw_json") or "").strip() + + settings = {} + status = "Ready" + status_type = "ok" + + if action == "download": + if not device: + flash("Please specify a device.", "error") + status, status_type = "Missing device.", "error" + else: + try: + settings = download_settings(device) + raw_json = json.dumps(settings, indent=2) + flash(f"Settings downloaded from {device}.", "success") + status = f"Settings downloaded from {device}" + except subprocess.TimeoutExpired: + flash("Connection timeout. Check device connection.", "error") + status, status_type = "Connection timeout.", "error" + except FileNotFoundError: + flash("mpremote not found. Install with: pip install mpremote", "error") + status, status_type = "mpremote not found.", "error" + except Exception as exc: # pylint: disable=broad-except + flash(f"Failed to download settings: {exc}", "error") + status, status_type = "Download failed.", "error" + + elif action == "upload": + if not device: + flash("Please specify a device.", "error") + status, status_type = "Missing device.", "error" + else: + # Take current form fields as source of truth, falling back to JSON if present + if raw_json: + try: + settings = json.loads(raw_json) + except json.JSONDecodeError: + flash("Raw JSON is invalid; using form values instead.", "error") + settings = {} + form_settings = parse_settings_from_form(request.form) + settings.update(form_settings) + + if not settings: + flash("No settings to upload. Download or provide settings first.", "error") + status, status_type = "No settings to upload.", "error" + else: + try: + upload_settings(device, settings) + raw_json = json.dumps(settings, indent=2) + flash(f"Settings uploaded and device reset on {device}.", "success") + status = f"Settings uploaded and device reset on {device}" + except subprocess.TimeoutExpired: + flash("Connection timeout. Check device connection.", "error") + status, status_type = "Connection timeout.", "error" + except FileNotFoundError: + flash("mpremote not found. Install with: pip install mpremote", "error") + status, status_type = "mpremote not found.", "error" + except Exception as exc: # pylint: disable=broad-except + flash(f"Failed to upload settings: {exc}", "error") + status, status_type = "Upload failed.", "error" + + elif action == "from_json": + # No-op here, JSON is just edited in the side panel + form_settings = parse_settings_from_form(request.form) + settings.update(form_settings) + if raw_json: + try: + settings.update(json.loads(raw_json)) + flash("JSON merged into form values.", "success") + status = "JSON merged into form." + except json.JSONDecodeError: + flash("Invalid JSON; keeping previous form values.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "to_form": + if raw_json: + try: + settings = json.loads(raw_json) + flash("Form fields updated from JSON.", "success") + status = "Form fields updated from JSON." + except json.JSONDecodeError: + flash("Invalid JSON; could not update form fields.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "pretty": + if raw_json: + try: + parsed = json.loads(raw_json) + raw_json = json.dumps(parsed, indent=2) + settings = parsed if isinstance(parsed, dict) else {} + flash("JSON pretty-printed.", "success") + status = "JSON pretty-printed." + except json.JSONDecodeError: + flash("Invalid JSON; cannot pretty-print.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "clear": + settings = {} + raw_json = "{}" + flash("Form cleared.", "success") + status = "Form cleared." + + else: + # Unknown / initial action: just reflect form values back + settings = parse_settings_from_form(request.form) + if raw_json and not settings: + try: + settings = json.loads(raw_json) + except json.JSONDecodeError: + pass + + return render_template_string( + TEMPLATE, + device=device, + settings=settings, + settings_config=SETTINGS_CONFIG, + status=status, + status_type=status_type, + raw_json=raw_json or json.dumps(settings or {}, indent=2), + ) + + +def main() -> None: + # Bind to all interfaces so you can reach it from your LAN: + # python web_app.py + # Then open: http://:5000/ + app.run(host="0.0.0.0", port=5000, debug=False) + + +if __name__ == "__main__": + main() + +