diff --git a/.gitignore b/.gitignore index 36b13f1..86bf2de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,176 +1,37 @@ -# ---> Python -# Byte-compiled / optimized / DLL files +# Build files +build/ +sdkconfig +sdkconfig.old + +# Binary files +*.bin + +# Python __pycache__/ *.py[cod] *$py.class - -# C extensions *.so - -# Distribution / packaging .Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -cover/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -.pybuilder/ -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv env/ venv/ -ENV/ -env.bak/ -venv.bak/ +*.egg-info/ +dist/ +build/ -# Spyder project settings -.spyderproject -.spyproject +# PyInstaller +*.spec +*.pyz -# Rope project settings -.ropeproject +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc +# OS +.DS_Store +Thumbs.db +# Pipenv +Pipfile.lock diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..1182f10 --- /dev/null +++ b/Pipfile @@ -0,0 +1,20 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +flask = "*" +pyserial = "*" + +[dev-packages] +pyinstaller = "*" + +[requires] +python_version = "3" + +[scripts] +web = "python web.py" +cli = "python cli.py" +build = "pyinstaller --clean led-cli.spec" +install = "pipenv install" diff --git a/README.md b/README.md index 8ebca23..322daa1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ # led-tool +- `-s, --show`: Display current settings from device +- `--no-download`: Don't download settings first (use empty settings) +**Default behavior:** Downloads settings and prints them. If any edit flags are provided, settings are modified and uploaded automatically (unless `--no-upload` is specified). +## Device Connection + +The tools use an integrated mpremote transport to communicate with MicroPython devices. Make sure your device is connected and accessible via the specified serial port.## LicenseSee LICENSE file for details. diff --git a/build.py b/build.py new file mode 100644 index 0000000..72f3c33 --- /dev/null +++ b/build.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +""" +Build script for creating binary executables from the CLI and web tools. +""" + +import subprocess +import sys +import os + +def build_binary(script_name, output_name=None): + """Build a binary from a Python script using PyInstaller.""" + if output_name is None: + output_name = script_name.replace('.py', '') + + print(f"Building {script_name} -> {output_name}...") + + cmd = [ + 'pyinstaller', + '--onefile', # Create a single executable file + '--name', output_name, + '--clean', # Clean PyInstaller cache + script_name + ] + + # Add hidden imports that might be needed + hidden_imports = [ + 'mpremote.transport_serial', + 'mpremote.transport', + 'mpremote.console', + 'mpremote.mp_errno', + 'serial', + 'serial.tools.list_ports', + ] + + for imp in hidden_imports: + cmd.extend(['--hidden-import', imp]) + + # Include the lib directory + cmd.extend(['--add-data', 'lib:lib']) + + result = subprocess.run(cmd, cwd=os.path.dirname(os.path.abspath(__file__))) + + if result.returncode == 0: + print(f"✓ Successfully built {output_name}") + print(f" Binary location: dist/{output_name}") + else: + print(f"✗ Failed to build {output_name}") + return False + + return True + +if __name__ == '__main__': + print("Building binaries for led-tool...") + print("=" * 60) + + success = True + + # Build CLI binary + if build_binary('cli.py', 'led-cli'): + print() + else: + success = False + + # Optionally build web binary (commented out as it's less common) + # if build_binary('web.py', 'led-tool-web'): + # print() + # else: + # success = False + + if success: + print("=" * 60) + print("Build complete! Binaries are in the 'dist' directory.") + else: + print("=" * 60) + print("Build failed. Check errors above.") + sys.exit(1) diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..d23b05e --- /dev/null +++ b/build.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# Build script for creating binaries + +set -e + +echo "Building led-tool binaries..." +echo "================================" + +# Check if we're in a pipenv environment or use pipenv run +if command -v pipenv &> /dev/null; then + echo "Using pipenv..." + pipenv install --dev + PYINSTALLER_CMD="pipenv run pyinstaller" +else + # Check if pyinstaller is available directly + if ! command -v pyinstaller &> /dev/null; then + echo "Installing PyInstaller..." + pip install pyinstaller + fi + PYINSTALLER_CMD="pyinstaller" +fi + +# Build CLI binary using the spec file +echo "" +echo "Building CLI binary..." +$PYINSTALLER_CMD --clean led-cli.spec + +echo "" +echo "================================" +echo "Build complete!" +echo "Binary location: dist/led-cli" +echo "" +echo "To test: ./dist/led-cli -h" +echo "" +echo "You can distribute the binary from the dist/ directory" +echo "without requiring Python to be installed on the target system." + diff --git a/cli.py b/cli.py index 1fa45bb..8d0091c 100755 --- a/cli.py +++ b/cli.py @@ -11,34 +11,32 @@ import argparse import sys from typing import Dict, Any, List -# Import functions from tool.py import tempfile -import subprocess import os - - -def _run_mpremote_copy(from_device: bool, device: str, temp_path: str) -> None: - """Run mpremote copy command.""" - if from_device: - cmd = ["mpremote", "connect", device, "cp", ":/settings.json", temp_path] - else: - cmd = ["mpremote", "connect", device, "cp", temp_path, ":/settings.json"] - - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) - if result.returncode != 0: - raise RuntimeError(f"mpremote error: {result.stderr.strip() or result.stdout.strip()}") +from device import copy_file, DeviceConnection def download_settings(device: str) -> dict: - """Download settings.json from the device using mpremote.""" + """ + Download settings.json from the device. + + Returns an empty dict if the file does not exist so that the tool + can be used on a fresh device before settings.json has been created. + """ temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name temp_file.close() try: - _run_mpremote_copy(from_device=True, device=device, temp_path=temp_path) + copy_file(from_device=True, device=device, remote_path="settings.json", local_path=temp_path) with open(temp_path, "r", encoding="utf-8") as f: return json.load(f) + except (OSError, RuntimeError) as e: + # If the file is missing on the device, treat it as empty settings + msg = str(e).lower() + if "errno 2" in msg or "enoent" in msg or "not found" in msg: + return {} + raise finally: if os.path.exists(temp_path): try: @@ -48,7 +46,7 @@ def download_settings(device: str) -> dict: def upload_settings(device: str, settings: dict) -> None: - """Upload settings.json to the device using mpremote and reset device.""" + """Upload settings.json to the device and reset device.""" temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name @@ -56,26 +54,16 @@ def upload_settings(device: str, settings: dict) -> None: json.dump(settings, temp_file, indent=2) temp_file.close() - _run_mpremote_copy(from_device=False, device=device, temp_path=temp_path) + copy_file(from_device=False, device=device, remote_path="settings.json", local_path=temp_path) # Reset device (best effort) try: - import serial # type: ignore - - with serial.Serial(device, baudrate=115200) as ser: - ser.write(b"\x03\x03\x04") + conn = DeviceConnection(device) + conn.connect() + conn.reset() + conn.disconnect() except Exception: - reset_cmd = [ - "mpremote", - "connect", - device, - "exec", - "import machine; machine.reset()", - ] - try: - subprocess.run(reset_cmd, capture_output=True, text=True, timeout=5) - except subprocess.TimeoutExpired: - pass + pass finally: if os.path.exists(temp_path): try: @@ -130,6 +118,15 @@ Examples: # Set number of LEDs, color order, and debug mode %(prog)s -l 60 -o rgb -d 1 + + # Reset the device + %(prog)s -r + + # Follow device output + %(prog)s -f + + # Reset and then follow output + %(prog)s -r -f """ ) @@ -144,6 +141,12 @@ Examples: help="Device name" ) + parser.add_argument( + "--pin", + type=int, + help="LED GPIO pin number (updates 'led_pin' in settings.json)" + ) + parser.add_argument( "-b", "--brightness", type=int, @@ -199,14 +202,145 @@ Examples: help="Don't download settings first (use empty settings)" ) + parser.add_argument( + "-r", "--reset", + action="store_true", + help="Reset the device" + ) + + parser.add_argument( + "-f", "--follow", + action="store_true", + help="Follow device output continuously (like tail -f)" + ) + + parser.add_argument( + "-s", "--show", + action="store_true", + help="Display current settings from device" + ) + + parser.add_argument( + "-u", "--upload", + nargs='+', + metavar=("SRC", "DEST"), + help="Upload a directory recursively to the device. Usage: -u SRC [DEST] (DEST is optional, defaults to basename of SRC)" + ) + + parser.add_argument( + "-e", + dest="erase_all", + action="store_true", + help="Erase all code on the device (delete all files except settings.json)" + ) + + parser.add_argument( + "--rm", + metavar="PATH", + help="Delete a file or directory on the device (recursive for directories)" + ) + args = parser.parse_args() + # Handle reset option (can be combined with follow/erase/upload) + if args.reset: + try: + print(f"Resetting device on {args.port}...", file=sys.stderr) + conn = DeviceConnection(args.port) + conn.reset() + print("Device reset.", file=sys.stderr) + except Exception as e: + print(f"Error resetting device: {e}", file=sys.stderr) + sys.exit(1) + + # Handle upload option (can be combined with other operations) + if args.upload: + import os + if len(args.upload) > 2: + print("Error: -u accepts at most 2 arguments (source and optional destination)", file=sys.stderr) + sys.exit(1) + + upload_dir = args.upload[0] + remote_dir = args.upload[1] if len(args.upload) > 1 else None + + if not os.path.exists(upload_dir): + print(f"Error: Directory does not exist: {upload_dir}", file=sys.stderr) + sys.exit(1) + if not os.path.isdir(upload_dir): + print(f"Error: Not a directory: {upload_dir}", file=sys.stderr) + sys.exit(1) + + try: + if remote_dir: + print(f"Uploading {upload_dir} to {remote_dir} on device {args.port}...", file=sys.stderr) + else: + print(f"Uploading {upload_dir} to device on {args.port}...", file=sys.stderr) + conn = DeviceConnection(args.port) + files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir) + print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr) + except Exception as e: + print(f"Error uploading directory: {e}", file=sys.stderr) + sys.exit(1) + + # Handle erase-all option (can be combined with other operations) + if args.erase_all: + try: + print(f"Erasing all code on device {args.port}...", file=sys.stderr) + conn = DeviceConnection(args.port) + # List top-level items and delete everything except settings.json + items = conn.list_files('') + for name, is_dir, size in items: + if name == "settings.json": + continue + path = name + conn.delete(path) + print("Erase complete.", file=sys.stderr) + except Exception as e: + print(f"Error erasing device: {e}", file=sys.stderr) + sys.exit(1) + + # Handle targeted remove option (can be combined with other operations) + if args.rm: + try: + print(f"Deleting {args.rm} on device {args.port}...", file=sys.stderr) + conn = DeviceConnection(args.port) + conn.delete(args.rm) + print(f"Successfully deleted: {args.rm}", file=sys.stderr) + except Exception as e: + print(f"Error deleting {args.rm}: {e}", file=sys.stderr) + sys.exit(1) + + # Handle follow option (can be combined with reset) + if args.follow: + try: + if args.reset: + # Small delay after reset to let device boot + import time + time.sleep(0.5) + print(f"Following output from {args.port}... (Press Ctrl+C to stop)", file=sys.stderr) + conn = DeviceConnection(args.port) + conn.follow_output() + except KeyboardInterrupt: + print("\nStopped following.", file=sys.stderr) + sys.exit(0) + except Exception as e: + print(f"Error following output: {e}", file=sys.stderr) + sys.exit(1) + return # Don't continue with settings operations if following + + # If only reset was specified (without follow/other actions), we're done + if args.reset and not (args.upload or args.erase_all or args.rm or args.show): + return + # Collect all edit parameters edits: Dict[str, Any] = {} if args.name is not None: edits["name"] = args.name + if args.pin is not None: + edits["led_pin"] = args.pin + if args.brightness is not None: edits["brightness"] = args.brightness @@ -233,24 +367,27 @@ Examples: if value is not None: edits[attr_name] = value - # Download current settings (unless --no-download is specified) - if args.no_download: + # Download current settings (unless --no-download is specified and not showing) + if args.no_download and not args.show: settings = {} else: try: print(f"Downloading settings from {args.port}...", file=sys.stderr) settings = download_settings(args.port) print("Settings downloaded successfully.", file=sys.stderr) - except FileNotFoundError: - print("Error: mpremote not found. Install with: pip install mpremote", file=sys.stderr) - sys.exit(1) - except subprocess.TimeoutExpired: - print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) - sys.exit(1) except Exception as e: - print(f"Error downloading settings: {e}", file=sys.stderr) + if "timeout" in str(e).lower() or "connection" in str(e).lower(): + print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) + else: + print(f"Error downloading settings: {e}", file=sys.stderr) sys.exit(1) + # If --show is set, print current settings. If there are no edits, exit afterwards. + if args.show: + print_settings(settings) + if not edits: + return + # Apply edits if edits: print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) @@ -269,14 +406,11 @@ Examples: print(f"\nUploading settings to {args.port}...", file=sys.stderr) upload_settings(args.port, settings) print("Settings uploaded successfully. Device will reset.", file=sys.stderr) - except FileNotFoundError: - print("Error: mpremote not found. Install with: pip install mpremote", file=sys.stderr) - sys.exit(1) - except subprocess.TimeoutExpired: - print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) - sys.exit(1) except Exception as e: - print(f"Error uploading settings: {e}", file=sys.stderr) + if "timeout" in str(e).lower() or "connection" in str(e).lower(): + print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) + else: + print(f"Error uploading settings: {e}", file=sys.stderr) sys.exit(1) elif edits and args.no_upload: print("\nSettings modified but not uploaded (--no-upload specified).", file=sys.stderr) diff --git a/device.py b/device.py new file mode 100644 index 0000000..b935361 --- /dev/null +++ b/device.py @@ -0,0 +1,246 @@ +""" +Device communication wrapper using integrated mpremote transport. + +This module provides a simple interface for copying files to/from MicroPython devices +without requiring mpremote to be installed as a separate dependency. +""" + +import sys +import os +import time +import serial + +# Add lib directory to path - handle both normal execution and PyInstaller bundle +if getattr(sys, 'frozen', False): + # Running as a PyInstaller bundle + _this_dir = sys._MEIPASS +else: + # Running as a normal Python script + _this_dir = os.path.dirname(os.path.abspath(__file__)) + +lib_path = os.path.join(_this_dir, 'lib') +if lib_path not in sys.path and os.path.exists(lib_path): + sys.path.insert(0, lib_path) + +from mpremote.transport_serial import SerialTransport +from mpremote.transport import TransportError + + +class DeviceConnection: + """Wrapper for device communication.""" + + def __init__(self, device): + """Connect to a device.""" + self.device = device + self.transport = None + + def connect(self): + """Establish connection to device.""" + if self.transport is None: + self.transport = SerialTransport(self.device, baudrate=115200) + self.transport.enter_raw_repl() + + def disconnect(self): + """Close connection to device.""" + if self.transport: + try: + if self.transport.in_raw_repl: + self.transport.exit_raw_repl() + except Exception: + pass + try: + self.transport.close() + except Exception: + pass + self.transport = None + + def copy_from_device(self, remote_path, local_path): + """Copy a file from device to local filesystem.""" + self.connect() + try: + data = self.transport.fs_readfile(remote_path) + with open(local_path, 'wb') as f: + f.write(data) + finally: + self.disconnect() + + def copy_to_device(self, local_path, remote_path): + """Copy a file from local filesystem to device.""" + self.connect() + try: + with open(local_path, 'rb') as f: + data = f.read() + self.transport.fs_writefile(remote_path, data) + finally: + self.disconnect() + + def upload_directory(self, local_dir, remote_dir=None): + """ + Upload a directory recursively to the device. + + Args: + local_dir: Local directory path to upload + remote_dir: Remote directory path (default: root, uses basename of local_dir) + """ + import os + + if not os.path.isdir(local_dir): + raise ValueError(f"{local_dir} is not a directory") + + self.connect() + try: + # Determine remote directory name + if remote_dir is None: + remote_dir = os.path.basename(os.path.abspath(local_dir)) + + # Ensure remote directory exists + if not self.transport.fs_exists(remote_dir): + self.transport.fs_mkdir(remote_dir) + + # Walk through local directory and copy files + files_copied = 0 + dirs_created = 0 + + for root, dirs, files in os.walk(local_dir): + # Calculate relative path from local_dir + rel_path = os.path.relpath(root, local_dir) + + # Build remote path + if rel_path == '.': + remote_base = remote_dir.rstrip('/') or '/' + else: + rel_path_clean = rel_path.replace(os.sep, '/') + if remote_dir.rstrip('/') == '': + remote_base = '/' + rel_path_clean + else: + remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean]) + + # Create remote directory if needed + if rel_path != '.' and not self.transport.fs_exists(remote_base): + print(f"Creating directory: {remote_base}", file=sys.stderr) + self.transport.fs_mkdir(remote_base) + dirs_created += 1 + + # Copy files + for file in files: + local_file = os.path.join(root, file) + # Handle root directory case properly + if remote_base == '/': + remote_file = '/' + file + else: + remote_file = '/'.join([remote_base, file]) + + print(f"Uploading: {remote_file}", file=sys.stderr) + with open(local_file, 'rb') as f: + data = f.read() + self.transport.fs_writefile(remote_file, data) + files_copied += 1 + + return files_copied, dirs_created + finally: + self.disconnect() + + def list_files(self, remote_path=''): + """ + List files and directories on the device. + + Args: + remote_path: Path on device to list (default: root directory) + + Returns: + List of (name, is_directory, size) tuples + """ + self.connect() + try: + if remote_path and not self.transport.fs_exists(remote_path): + raise ValueError(f"Path does not exist: {remote_path}") + + items = self.transport.fs_listdir(remote_path) + result = [] + for item in items: + is_dir = item.st_mode & 0o170000 == 0o040000 + result.append((item.name, is_dir, item.st_size)) + return result + finally: + self.disconnect() + + def delete(self, remote_path, disconnect=True): + """ + Delete a file or directory on the device. + + Args: + remote_path: Path on device to delete (file or directory) + disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls) + """ + self.connect() + try: + if not self.transport.fs_exists(remote_path): + raise ValueError(f"Path does not exist: {remote_path}") + + if self.transport.fs_isdir(remote_path): + # Delete directory recursively + # First, delete all files and subdirectories + items = self.transport.fs_listdir(remote_path) + for item in items: + item_path = '/'.join([remote_path.rstrip('/'), item.name]) + if item.st_mode & 0o170000 == 0o040000: # Directory + self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect + else: # File + print(f"Deleting file: {item_path}", file=sys.stderr) + self.transport.fs_rmfile(item_path) + # Then delete the directory itself + print(f"Deleting directory: {remote_path}", file=sys.stderr) + self.transport.fs_rmdir(remote_path) + else: + # Delete file + print(f"Deleting file: {remote_path}", file=sys.stderr) + self.transport.fs_rmfile(remote_path) + finally: + if disconnect: + self.disconnect() + + def reset(self): + """Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D).""" + # Use direct serial connection like dev.py does + try: + with serial.Serial(self.device, baudrate=115200) as ser: + ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D + except Exception as e: + raise TransportError(f"Failed to reset device: {e}") from e + + def follow_output(self): + """Follow device output continuously (like tail -f).""" + # Use direct serial connection like dev.py does + try: + with serial.Serial(self.device, baudrate=115200) as ser: + while True: + if ser.in_waiting > 0: + data = ser.readline().decode('utf-8', errors='replace').strip() + if data: + print(data) + else: + time.sleep(0.01) # Small delay to avoid busy-waiting + except KeyboardInterrupt: + print("\nStopped following output.", file=sys.stderr) + except Exception as e: + raise TransportError(f"Failed to follow output: {e}") from e + + +def copy_file(from_device, device, remote_path, local_path): + """ + Copy a file to/from device. + + Args: + from_device: True to copy from device, False to copy to device + device: Device path (e.g., '/dev/ttyACM0') + remote_path: Path on device (e.g., 'settings.json') + local_path: Path on local filesystem + """ + conn = DeviceConnection(device) + try: + if from_device: + conn.copy_from_device(remote_path, local_path) + else: + conn.copy_to_device(local_path, remote_path) + except TransportError as e: + raise RuntimeError(f"Device communication error: {e}") from e diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..45311d2 --- /dev/null +++ b/install.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Install script - installs dependencies and copies binary to ~/.local/bin + +set -e + +echo "Installing dependencies..." +pipenv install "$@" + +# Build binary if it doesn't exist or if source is newer +if [ ! -f "dist/led-cli" ] || [ "cli.py" -nt "dist/led-cli" ] || [ "device.py" -nt "dist/led-cli" ]; then + echo "" + echo "Building binary..." + pipenv run pyinstaller --clean led-cli.spec +fi + +# Ensure ~/.local/bin exists +mkdir -p ~/.local/bin + +# Copy binary to ~/.local/bin +if [ -f "dist/led-cli" ]; then + echo "" + echo "Installing binary to ~/.local/bin/led-cli..." + cp dist/led-cli ~/.local/bin/led-cli + chmod +x ~/.local/bin/led-cli + echo "✓ Binary installed successfully!" + echo "" + echo "You can now run 'led-cli' from anywhere." + echo "Make sure ~/.local/bin is in your PATH." +else + echo "Error: Binary not found at dist/led-cli" >&2 + exit 1 +fi diff --git a/lib/mpremote/__init__.py b/lib/mpremote/__init__.py new file mode 100644 index 0000000..577d28c --- /dev/null +++ b/lib/mpremote/__init__.py @@ -0,0 +1 @@ +# Empty __init__.py to make this a package diff --git a/lib/mpremote/console.py b/lib/mpremote/console.py new file mode 100644 index 0000000..399f979 --- /dev/null +++ b/lib/mpremote/console.py @@ -0,0 +1,4 @@ +# Minimal console module for mpremote compatibility +# Only exports VT_ENABLED which is used by transport_serial + +VT_ENABLED = False # Disable VT for non-interactive use diff --git a/lib/mpremote/mp_errno.py b/lib/mpremote/mp_errno.py new file mode 100644 index 0000000..e2554ef --- /dev/null +++ b/lib/mpremote/mp_errno.py @@ -0,0 +1,55 @@ +import errno +import platform + +# This table maps numeric values defined by `py/mperrno.h` to host errno code. +MP_ERRNO_TABLE = { + 1: errno.EPERM, + 2: errno.ENOENT, + 3: errno.ESRCH, + 4: errno.EINTR, + 5: errno.EIO, + 6: errno.ENXIO, + 7: errno.E2BIG, + 8: errno.ENOEXEC, + 9: errno.EBADF, + 10: errno.ECHILD, + 11: errno.EAGAIN, + 12: errno.ENOMEM, + 13: errno.EACCES, + 14: errno.EFAULT, + 16: errno.EBUSY, + 17: errno.EEXIST, + 18: errno.EXDEV, + 19: errno.ENODEV, + 20: errno.ENOTDIR, + 21: errno.EISDIR, + 22: errno.EINVAL, + 23: errno.ENFILE, + 24: errno.EMFILE, + 25: errno.ENOTTY, + 26: errno.ETXTBSY, + 27: errno.EFBIG, + 28: errno.ENOSPC, + 29: errno.ESPIPE, + 30: errno.EROFS, + 31: errno.EMLINK, + 32: errno.EPIPE, + 33: errno.EDOM, + 34: errno.ERANGE, + 95: errno.EOPNOTSUPP, + 97: errno.EAFNOSUPPORT, + 98: errno.EADDRINUSE, + 103: errno.ECONNABORTED, + 104: errno.ECONNRESET, + 105: errno.ENOBUFS, + 106: errno.EISCONN, + 107: errno.ENOTCONN, + 110: errno.ETIMEDOUT, + 111: errno.ECONNREFUSED, + 113: errno.EHOSTUNREACH, + 114: errno.EALREADY, + 115: errno.EINPROGRESS, + 125: errno.ECANCELED, +} +if platform.system() != "Windows": + MP_ERRNO_TABLE[15] = errno.ENOTBLK diff --git a/lib/mpremote/transport.py b/lib/mpremote/transport.py new file mode 100644 index 0000000..e738ade --- /dev/null +++ b/lib/mpremote/transport.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +# +# This file is part of the MicroPython project, http://micropython.org/ +# +# The MIT License (MIT) +# +# Copyright (c) 2023 Jim Mussared +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import ast, errno, hashlib, os, re, sys +from collections import namedtuple +from mpremote.mp_errno import MP_ERRNO_TABLE + + +def stdout_write_bytes(b): + b = b.replace(b"\x04", b"") + if hasattr(sys.stdout, "buffer"): + sys.stdout.buffer.write(b) + sys.stdout.buffer.flush() + else: + text = b.decode(sys.stdout.encoding, "strict") + sys.stdout.write(text) + + +class TransportError(Exception): + pass + + +class TransportExecError(TransportError): + def __init__(self, status_code, error_output): + self.status_code = status_code + self.error_output = error_output + super().__init__(error_output) + + +listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) + + +# Takes a Transport error (containing the text of an OSError traceback) and +# raises it as the corresponding OSError-derived exception. +def _convert_filesystem_error(e, info): + if "OSError" in e.error_output: + for code, estr in [ + *errno.errorcode.items(), + (errno.EOPNOTSUPP, "EOPNOTSUPP"), + ]: + if estr in e.error_output: + return OSError(code, info) + + # Some targets don't render OSError with the name of the errno, so in these + # cases support an explicit mapping of errnos to known numeric codes. + error_lines = e.error_output.splitlines() + match = re.match(r"OSError: (\d+)$", error_lines[-1]) + if match: + value = int(match.group(1), 10) + if value in MP_ERRNO_TABLE: + return OSError(MP_ERRNO_TABLE[value], info) + + return e + + +class Transport: + def fs_listdir(self, src=""): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = "import os\nfor f in os.ilistdir(%s):\n print(repr(f), end=',')" % ( + ("'%s'" % src) if src else "" + ) + try: + buf.extend(b"[") + self.exec(cmd, data_consumer=repr_consumer) + buf.extend(b"]") + except TransportExecError as e: + raise _convert_filesystem_error(e, src) from None + + return [ + listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,))) + for f in ast.literal_eval(buf.decode()) + ] + + def fs_stat(self, src): + try: + self.exec("import os") + return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src))) + except TransportExecError as e: + raise _convert_filesystem_error(e, src) from None + + def fs_exists(self, src): + try: + self.fs_stat(src) + return True + except OSError: + return False + + def fs_isdir(self, src): + try: + mode = self.fs_stat(src).st_mode + return (mode & 0x4000) != 0 + except OSError: + # Match CPython, a non-existent path is not a directory. + return False + + def fs_printfile(self, src, chunk_size=256): + cmd = ( + "with open('%s') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + try: + self.exec(cmd, data_consumer=stdout_write_bytes) + except TransportExecError as e: + raise _convert_filesystem_error(e, src) from None + + def fs_readfile(self, src, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = self.fs_stat(src).st_size + + contents = bytearray() + + try: + self.exec("f=open('%s','rb')\nr=f.read" % src) + while True: + chunk = self.eval("r({})".format(chunk_size)) + if not chunk: + break + contents.extend(chunk) + if progress_callback: + progress_callback(len(contents), src_size) + self.exec("f.close()") + except TransportExecError as e: + raise _convert_filesystem_error(e, src) from None + + return contents + + def fs_writefile(self, dest, data, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = len(data) + written = 0 + + try: + self.exec("f=open('%s','wb')\nw=f.write" % dest) + while data: + chunk = data[:chunk_size] + self.exec("w(" + repr(chunk) + ")") + data = data[len(chunk) :] + if progress_callback: + written += len(chunk) + progress_callback(written, src_size) + self.exec("f.close()") + except TransportExecError as e: + raise _convert_filesystem_error(e, dest) from None + + def fs_mkdir(self, path): + try: + self.exec("import os\nos.mkdir('%s')" % path) + except TransportExecError as e: + raise _convert_filesystem_error(e, path) from None + + def fs_rmdir(self, path): + try: + self.exec("import os\nos.rmdir('%s')" % path) + except TransportExecError as e: + raise _convert_filesystem_error(e, path) from None + + def fs_rmfile(self, path): + try: + self.exec("import os\nos.remove('%s')" % path) + except TransportExecError as e: + raise _convert_filesystem_error(e, path) from None + + def fs_touchfile(self, path): + try: + self.exec("f=open('%s','a')\nf.close()" % path) + except TransportExecError as e: + raise _convert_filesystem_error(e, path) from None + + def fs_hashfile(self, path, algo, chunk_size=256): + try: + self.exec("import hashlib\nh = hashlib.{algo}()".format(algo=algo)) + except TransportExecError: + # hashlib (or hashlib.{algo}) not available on device. Do the hash locally. + data = self.fs_readfile(path, chunk_size=chunk_size) + return getattr(hashlib, algo)(data).digest() + try: + self.exec( + "buf = memoryview(bytearray({chunk_size}))\nwith open('{path}', 'rb') as f:\n while True:\n n = f.readinto(buf)\n if n == 0:\n break\n h.update(buf if n == {chunk_size} else buf[:n])\n".format( + chunk_size=chunk_size, path=path + ) + ) + return self.eval("h.digest()") + except TransportExecError as e: + raise _convert_filesystem_error(e, path) from None diff --git a/lib/mpremote/transport_serial.py b/lib/mpremote/transport_serial.py new file mode 100644 index 0000000..370c9d7 --- /dev/null +++ b/lib/mpremote/transport_serial.py @@ -0,0 +1,1053 @@ +#!/usr/bin/env python3 +# +# This file is part of the MicroPython project, http://micropython.org/ +# +# The MIT License (MIT) +# +# Copyright (c) 2014-2021 Damien P. George +# Copyright (c) 2017 Paul Sokolovsky +# Copyright (c) 2023 Jim Mussared +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# This is based on the serial-only parts of tools/pyboard.py, with Python 2 +# support removed, and is currently in the process of being refactored to +# support multiple transports (webrepl, socket, BLE, etc). At the moment, +# SerialTransport is just the old Pyboard+PyboardExtended class without any +# of this refactoring. The API is going to change significantly. + +# Once the API is stabilised, the idea is that mpremote can be used both +# as a command line tool and a library for interacting with devices. + +import ast, io, os, re, struct, sys, time +import serial +import serial.tools.list_ports +from errno import EPERM, ENOTTY +from mpremote.console import VT_ENABLED +from mpremote.transport import TransportError, TransportExecError, Transport + + +VID_SILICON_LABS = 0x10C4 + + +class SerialTransport(Transport): + fs_hook_mount = "/remote" # MUST match the mount point in fs_hook_code + + def __init__(self, device, baudrate=115200, wait=0, exclusive=True, timeout=None): + self.in_raw_repl = False + self.use_raw_paste = True + self.device_name = device + self.mounted = False + + # Set options, and exclusive if pyserial supports it + serial_kwargs = { + "baudrate": baudrate, + "timeout": timeout, + "interCharTimeout": 1, + } + if serial.__version__ >= "3.3": + serial_kwargs["exclusive"] = exclusive + + delayed = False + for attempt in range(wait + 1): + try: + self.serial = serial.serial_for_url(device, do_not_open=True, **serial_kwargs) + if os.name == "nt": + portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore + if portinfo and getattr(portinfo[0], "vid", None) == VID_SILICON_LABS: + # Silicon Labs CP210x driver on Windows has a quirk + # where after a power on reset it will set DTR and RTS + # at different times when the port is opened (it doesn't + # happen on subsequent openings). + # + # To avoid issues with spurious reset on Espressif boards we clear DTR and RTS, + # open the port, and then set them in an order which prevents triggering a reset. + self.serial.dtr = False + self.serial.rts = False + self.serial.open() + self.serial.dtr = True + self.serial.rts = True + + # On all other host/driver combinations we keep the default + # behaviour (pyserial will set DTR and RTS automatically on open) + if not self.serial.isOpen(): + self.serial.open() + break + except OSError: + if wait == 0: + continue + if attempt == 0: + sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) + delayed = True + time.sleep(1) + sys.stdout.write(".") + sys.stdout.flush() + else: + if delayed: + print("") + raise TransportError("failed to access " + device) + if delayed: + print("") + + def close(self): + # ESP Windows quirk: Prevent target from resetting when Windows clears DTR before RTS + try: + self.serial.rts = False + self.serial.dtr = False + except OSError as er: + if er.errno == ENOTTY: + # Some devices (like QEMU pts) don't support RTS/DTR control + pass + else: + raise er + self.serial.close() + + def read_until( + self, min_num_bytes, ending, timeout=10, data_consumer=None, timeout_overall=None + ): + """ + min_num_bytes: Obsolete. + ending: Return if 'ending' matches. + timeout [s]: Return if timeout between characters. None: Infinite timeout. + timeout_overall [s]: Return not later than timeout_overall. None: Infinite timeout. + data_consumer: Use callback for incoming characters. + If data_consumer is used then data is not accumulated and the ending must be 1 byte long + + It is not visible to the caller why the function returned. It could be ending or timeout. + """ + assert data_consumer is None or len(ending) == 1 + assert isinstance(timeout, (type(None), int, float)) + assert isinstance(timeout_overall, (type(None), int, float)) + + data = b"" + begin_overall_s = begin_char_s = time.monotonic() + while True: + if data.endswith(ending): + break + elif self.serial.inWaiting() > 0: + new_data = self.serial.read(1) + if data_consumer: + data_consumer(new_data) + data = new_data + else: + data = data + new_data + begin_char_s = time.monotonic() + else: + if timeout is not None and time.monotonic() >= begin_char_s + timeout: + break + if ( + timeout_overall is not None + and time.monotonic() >= begin_overall_s + timeout_overall + ): + break + time.sleep(0.01) + return data + + def enter_raw_repl(self, soft_reset=True, timeout_overall=10): + self.serial.write(b"\r\x03") # ctrl-C: interrupt any running program + + # flush input (without relying on serial.flushInput()) + n = self.serial.inWaiting() + while n > 0: + self.serial.read(n) + n = self.serial.inWaiting() + + self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL + + if soft_reset: + data = self.read_until( + 1, b"raw REPL; CTRL-B to exit\r\n>", timeout_overall=timeout_overall + ) + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): + print(data) + raise TransportError("could not enter raw repl") + + self.serial.write(b"\x04") # ctrl-D: soft reset + + # Waiting for "soft reboot" independently to "raw REPL" (done below) + # allows boot.py to print, which will show up after "soft reboot" + # and before "raw REPL". + data = self.read_until(1, b"soft reboot\r\n", timeout_overall=timeout_overall) + if not data.endswith(b"soft reboot\r\n"): + print(data) + raise TransportError("could not enter raw repl") + + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n", timeout_overall=timeout_overall) + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): + print(data) + raise TransportError("could not enter raw repl") + + self.in_raw_repl = True + + def exit_raw_repl(self): + self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL + self.in_raw_repl = False + + def follow(self, timeout, data_consumer=None): + # wait for normal output + data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b"\x04"): + raise TransportError("timeout waiting for first EOF reception") + data = data[:-1] + + # wait for error output + data_err = self.read_until(1, b"\x04", timeout=timeout) + if not data_err.endswith(b"\x04"): + raise TransportError("timeout waiting for second EOF reception") + data_err = data_err[:-1] + + # return normal and error output + return data, data_err + + def raw_paste_write(self, command_bytes): + # Read initial header, with window size. + data = self.serial.read(2) + window_size = struct.unpack("") + if not data.endswith(b">"): + raise TransportError("could not enter raw repl") + + if self.use_raw_paste: + # Try to enter raw-paste mode. + self.serial.write(b"\x05A\x01") + data = self.serial.read(2) + if data == b"R\x00": + # Device understood raw-paste command but doesn't support it. + pass + elif data == b"R\x01": + # Device supports raw-paste mode, write out the command using this mode. + return self.raw_paste_write(command_bytes) + else: + # Device doesn't support raw-paste, fall back to normal raw REPL. + data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): + print(data) + raise TransportError("could not enter raw repl") + # Don't try to use raw-paste mode again for this connection. + self.use_raw_paste = False + + # Write command using standard raw REPL, 256 bytes every 10ms. + for i in range(0, len(command_bytes), 256): + self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) + time.sleep(0.01) + self.serial.write(b"\x04") + + # check if we could exec command + data = self.serial.read(2) + if data != b"OK": + raise TransportError("could not exec command (response: %r)" % data) + + def exec_raw(self, command, timeout=10, data_consumer=None): + self.exec_raw_no_follow(command) + return self.follow(timeout, data_consumer) + + def eval(self, expression, parse=True): + if parse: + ret = self.exec("print(repr({}))".format(expression)) + ret = ret.strip() + return ast.literal_eval(ret.decode()) + else: + ret = self.exec("print({})".format(expression)) + ret = ret.strip() + return ret + + def exec(self, command, data_consumer=None): + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) + if ret_err: + raise TransportExecError(ret, ret_err.decode()) + return ret + + def execfile(self, filename): + with open(filename, "rb") as f: + pyfile = f.read() + return self.exec(pyfile) + + def mount_local(self, path, unsafe_links=False): + fout = self.serial + if not self.eval('"RemoteFS" in globals()'): + self.exec(fs_hook_code) + self.exec("__mount()") + self.mounted = True + self.cmd = PyboardCommand(self.serial, fout, path, unsafe_links=unsafe_links) + self.serial = SerialIntercept(self.serial, self.cmd) + + def write_ctrl_d(self, out_callback): + self.serial.write(b"\x04") + if not self.mounted: + return + + # Read response from the device until it is quiet (with a timeout). + INITIAL_TIMEOUT = 0.5 + BANNER_TIMEOUT = 2 + QUIET_TIMEOUT = 0.1 + FULL_TIMEOUT = 5 + t_start = t_last_activity = time.monotonic() + data_all = b"" + soft_reboot_started = False + soft_reboot_banner = False + while True: + t = time.monotonic() + n = self.serial.inWaiting() + if n > 0: + data = self.serial.read(n) + out_callback(data) + data_all += data + t_last_activity = t + else: + if len(data_all) == 0: + if t - t_start > INITIAL_TIMEOUT: + return + else: + if t - t_start > FULL_TIMEOUT: + if soft_reboot_started: + break + return + + next_data_timeout = QUIET_TIMEOUT + + if not soft_reboot_started and data_all.find(b"MPY: soft reboot") != -1: + soft_reboot_started = True + + if soft_reboot_started and not soft_reboot_banner: + # Once soft reboot has been initiated, give some more time for the startup + # banner to be shown + if data_all.find(b"\nMicroPython ") != -1: + soft_reboot_banner = True + elif data_all.find(b"\nraw REPL; CTRL-B to exit\r\n") != -1: + soft_reboot_banner = True + else: + next_data_timeout = BANNER_TIMEOUT + + if t - t_last_activity > next_data_timeout: + break + + if not soft_reboot_started: + return + + if not soft_reboot_banner: + out_callback(b"Warning: Could not remount local filesystem\r\n") + return + + # Determine type of prompt + if data_all.endswith(b">"): + in_friendly_repl = False + prompt = b">" + else: + in_friendly_repl = True + prompt = data_all.rsplit(b"\r\n", 1)[-1] + + # Clear state while board remounts, it will be re-set once mounted. + self.mounted = False + self.serial = self.serial.orig_serial + + # Provide a message about the remount. + out_callback( + bytes( + f"\r\nRemount local directory {self.cmd.root} at {self.fs_hook_mount}\r\n", "utf8" + ) + ) + + # Enter raw REPL and re-mount the remote filesystem. + self.serial.write(b"\x01") + self.exec(fs_hook_code) + self.exec("__mount()") + self.mounted = True + + # Exit raw REPL if needed, and wait for the friendly REPL prompt. + if in_friendly_repl: + self.exit_raw_repl() + self.read_until(len(prompt), prompt) + out_callback(prompt) + self.serial = SerialIntercept(self.serial, self.cmd) + + def umount_local(self): + if self.mounted: + self.exec(f'os.umount("{self.fs_hook_mount}")') + self.mounted = False + self.serial = self.serial.orig_serial + + +fs_hook_cmds = { + "CMD_STAT": 1, + "CMD_ILISTDIR_START": 2, + "CMD_ILISTDIR_NEXT": 3, + "CMD_OPEN": 4, + "CMD_CLOSE": 5, + "CMD_READ": 6, + "CMD_READLINE": 7, + "CMD_WRITE": 8, + "CMD_SEEK": 9, + "CMD_REMOVE": 10, + "CMD_RENAME": 11, + "CMD_MKDIR": 12, + "CMD_RMDIR": 13, +} + +fs_hook_code = f"""\ +import os, io, struct, micropython + +SEEK_SET = 0 + +class RemoteCommand: + def __init__(self): + import select, sys + self.buf4 = bytearray(4) + self.fout = sys.stdout.buffer + self.fin = sys.stdin.buffer + self.poller = select.poll() + self.poller.register(self.fin, select.POLLIN) + + def poll_in(self): + for _ in self.poller.ipoll(1000): + return + self.end() + raise Exception('timeout waiting for remote') + + def rd(self, n): + buf = bytearray(n) + self.rd_into(buf, n) + return buf + + def rd_into(self, buf, n): + # implement reading with a timeout in case other side disappears + if n == 0: + return + self.poll_in() + r = self.fin.readinto(buf, n) + if r < n: + mv = memoryview(buf) + while r < n: + self.poll_in() + r += self.fin.readinto(mv[r:], n - r) + + def begin(self, type): + micropython.kbd_intr(-1) + buf4 = self.buf4 + buf4[0] = 0x18 + buf4[1] = type + self.fout.write(buf4, 2) + # Wait for sync byte 0x18, but don't get stuck forever + for i in range(30): + self.poller.poll(1000) + self.fin.readinto(buf4, 1) + if buf4[0] == 0x18: + break + + def end(self): + micropython.kbd_intr(3) + + def rd_s8(self): + self.rd_into(self.buf4, 1) + n = self.buf4[0] + if n & 0x80: + n -= 0x100 + return n + + def rd_s32(self): + buf4 = self.buf4 + self.rd_into(buf4, 4) + n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 + if buf4[3] & 0x80: + n -= 0x100000000 + return n + + def rd_u32(self): + buf4 = self.buf4 + self.rd_into(buf4, 4) + return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24 + + def rd_bytes(self, buf): + # TODO if n is large (eg >256) then we may miss bytes on stdin + n = self.rd_s32() + if buf is None: + ret = buf = bytearray(n) + else: + ret = n + self.rd_into(buf, n) + return ret + + def rd_str(self): + n = self.rd_s32() + if n == 0: + return '' + else: + return str(self.rd(n), 'utf8') + + def wr_s8(self, i): + self.buf4[0] = i + self.fout.write(self.buf4, 1) + + def wr_s32(self, i): + struct.pack_into(' {len(buf)}") + + def do_readline(self): + fd = self.rd_s8() + buf = self.data_files[fd][0].readline() + if self.data_files[fd][1]: + buf = bytes(buf, "utf8") + self.wr_bytes(buf) + # self.log_cmd(f"readline {fd} -> {len(buf)}") + + def do_seek(self): + fd = self.rd_s8() + n = self.rd_s32() + whence = self.rd_s8() + # self.log_cmd(f"seek {fd} {n}") + try: + n = self.data_files[fd][0].seek(n, whence) + except io.UnsupportedOperation: + n = -1 + self.wr_s32(n) + + def do_write(self): + fd = self.rd_s8() + buf = self.rd_bytes() + if self.data_files[fd][1]: + buf = str(buf, "utf8", errors="backslashreplace") + n = self.data_files[fd][0].write(buf) + self.wr_s32(n) + # self.log_cmd(f"write {fd} {len(buf)} -> {n}") + + def do_remove(self): + path = self.root + self.rd_str() + # self.log_cmd(f"remove {path}") + try: + self.path_check(path) + os.remove(path) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + def do_rename(self): + old = self.root + self.rd_str() + new = self.root + self.rd_str() + # self.log_cmd(f"rename {old} {new}") + try: + self.path_check(old) + self.path_check(new) + os.rename(old, new) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + def do_mkdir(self): + path = self.root + self.rd_str() + # self.log_cmd(f"mkdir {path}") + try: + self.path_check(path) + os.mkdir(path) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + def do_rmdir(self): + path = self.root + self.rd_str() + # self.log_cmd(f"rmdir {path}") + try: + self.path_check(path) + os.rmdir(path) + ret = 0 + except OSError as er: + ret = -abs(er.errno) + self.wr_s32(ret) + + cmd_table = { + fs_hook_cmds["CMD_STAT"]: do_stat, + fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start, + fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next, + fs_hook_cmds["CMD_OPEN"]: do_open, + fs_hook_cmds["CMD_CLOSE"]: do_close, + fs_hook_cmds["CMD_READ"]: do_read, + fs_hook_cmds["CMD_READLINE"]: do_readline, + fs_hook_cmds["CMD_WRITE"]: do_write, + fs_hook_cmds["CMD_SEEK"]: do_seek, + fs_hook_cmds["CMD_REMOVE"]: do_remove, + fs_hook_cmds["CMD_RENAME"]: do_rename, + fs_hook_cmds["CMD_MKDIR"]: do_mkdir, + fs_hook_cmds["CMD_RMDIR"]: do_rmdir, + } + + +class SerialIntercept: + def __init__(self, serial, cmd): + self.orig_serial = serial + self.cmd = cmd + self.buf = b"" + self.orig_serial.timeout = 5.0 + + def _check_input(self, blocking): + if blocking or self.orig_serial.inWaiting() > 0: + c = self.orig_serial.read(1) + if c == b"\x18": + # a special command + c = self.orig_serial.read(1)[0] + self.orig_serial.write(b"\x18") # Acknowledge command + PyboardCommand.cmd_table[c](self.cmd) + elif not VT_ENABLED and c == b"\x1b": + # ESC code, ignore these on windows + esctype = self.orig_serial.read(1) + if esctype == b"[": # CSI + while not (0x40 < self.orig_serial.read(1)[0] < 0x7E): + # Looking for "final byte" of escape sequence + pass + else: + self.buf += c + + @property + def fd(self): + return self.orig_serial.fd + + def close(self): + self.orig_serial.close() + + def inWaiting(self): + self._check_input(False) + return len(self.buf) + + def read(self, n): + while len(self.buf) < n: + self._check_input(True) + out = self.buf[:n] + self.buf = self.buf[n:] + return out + + def write(self, buf): + self.orig_serial.write(buf) diff --git a/test_imports.py b/test_imports.py new file mode 100644 index 0000000..0525449 --- /dev/null +++ b/test_imports.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +"""Test script to verify imports work""" +import sys +import os + +# Test 1: Check if file exists +transport_serial_path = 'lib/mpremote/transport_serial.py' +print(f"1. Checking if {transport_serial_path} exists...") +if os.path.exists(transport_serial_path): + print(f" ✓ File exists ({os.path.getsize(transport_serial_path)} bytes)") +else: + print(f" ✗ File does NOT exist!") + sys.exit(1) + +# Test 2: Add lib to path and import +print("2. Testing imports...") +sys.path.insert(0, 'lib') +try: + from mpremote.transport_serial import SerialTransport + print(" ✓ transport_serial import works") +except Exception as e: + print(f" ✗ transport_serial import failed: {e}") + sys.exit(1) + +# Test 3: Import device module +try: + import device + print(" ✓ device module import works") +except Exception as e: + print(f" ✗ device module import failed: {e}") + sys.exit(1) + +# Test 4: Import cli module +try: + import cli + print(" ✓ cli module import works") +except Exception as e: + print(f" ✗ cli module import failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + +print("\n✓ All imports successful!") diff --git a/test_led_simple.py b/test_led_simple.py new file mode 100644 index 0000000..37fe581 --- /dev/null +++ b/test_led_simple.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +"""Simple test for LED count update""" +import json +import sys +import os + +# Add current directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Test the edit logic +edits = {} +args_leds = 100 + +if args_leds is not None: + edits["num_leds"] = args_leds + +print("Edits:", edits) + +# Simulate --no-download +settings = {} + +# Apply edits +if edits: + print(f"Applying {len(edits)} edit(s)...") + settings.update(edits) + +# Print settings (this is what print_settings does) +print("\nOutput JSON:") +print(json.dumps(settings, indent=2)) + +# Verify +if settings.get("num_leds") == 100: + print("\n✓ SUCCESS: num_leds is correctly set to 100") +else: + print(f"\n✗ FAILED: num_leds is {settings.get('num_leds')}, expected 100") diff --git a/test_leds.py b/test_leds.py new file mode 100644 index 0000000..0de2b1d --- /dev/null +++ b/test_leds.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Test script for updating number of LEDs""" +import sys +import json +import subprocess + +print("Testing CLI with -l (number of LEDs) option...") +print("=" * 60) + +# Test 1: Check help output +print("\n1. Testing help output:") +result = subprocess.run( + [sys.executable, "cli.py", "-h"], + capture_output=True, + text=True, + cwd="/home/jimmy/projects/led-tool" +) +if "-l" in result.stdout or "--leds" in result.stdout: + print(" ✓ Help shows -l/--leds option") +else: + print(" ✗ Help does not show -l/--leds option") + print(f" Output: {result.stdout[:200]}") + +# Test 2: Test with --no-download and --no-upload (won't connect to device) +print("\n2. Testing -l 100 with --no-download --no-upload:") +result = subprocess.run( + [sys.executable, "cli.py", "-l", "100", "--no-download", "--no-upload"], + capture_output=True, + text=True, + cwd="/home/jimmy/projects/led-tool" +) + +output = result.stdout +print(f" Return code: {result.returncode}") +print(f" Output:\n{output}") + +# Check if num_leds is in the output +if "num_leds" in output or '"num_leds": 100' in output: + print(" ✓ num_leds is set to 100 in output") + # Try to parse as JSON + try: + # Extract JSON from output (it should be the last part) + lines = output.strip().split('\n') + json_str = '\n'.join([l for l in lines if l.strip().startswith('{') or l.strip().startswith('"')]) + if not json_str: + json_str = output.strip() + settings = json.loads(json_str) + if settings.get("num_leds") == 100: + print(" ✓ JSON is valid and num_leds = 100") + else: + print(f" ✗ num_leds is {settings.get('num_leds')}, expected 100") + except json.JSONDecodeError as e: + print(f" ⚠ Could not parse as JSON: {e}") + print(f" Raw output: {output[:500]}") +else: + print(" ✗ num_leds not found in output") + print(f" Output: {output[:500]}") + +# Test 3: Test with multiple options including LEDs +print("\n3. Testing -l 60 with other options:") +result = subprocess.run( + [sys.executable, "cli.py", "-l", "60", "-b", "128", "-n", "TestLED", "--no-download", "--no-upload"], + capture_output=True, + text=True, + cwd="/home/jimmy/projects/led-tool" +) + +output = result.stdout +if result.returncode == 0: + try: + settings = json.loads(output.strip()) + if settings.get("num_leds") == 60: + print(" ✓ num_leds is set to 60") + if settings.get("brightness") == 128: + print(" ✓ brightness is set to 128") + if settings.get("name") == "TestLED": + print(" ✓ name is set to TestLED") + print(f" Settings: {json.dumps(settings, indent=2)}") + except json.JSONDecodeError: + print(f" ⚠ Could not parse output as JSON") + print(f" Output: {output[:500]}") +else: + print(f" ✗ Command failed with return code {result.returncode}") + print(f" Error: {result.stderr}") + +print("\n" + "=" * 60) +print("Test complete!") diff --git a/web.py b/web.py new file mode 100644 index 0000000..bd77045 --- /dev/null +++ b/web.py @@ -0,0 +1,707 @@ +#!/usr/bin/env python3 +""" +LED Bar Configuration Web App + +Flask-based web UI for downloading, editing, and uploading settings.json +to/from MicroPython devices via mpremote. +""" + +import json +import tempfile +import os +from pathlib import Path +from device import copy_file, DeviceConnection + +from flask import ( + Flask, + render_template_string, + request, + redirect, + url_for, + flash, +) + + +app = Flask(__name__) +app.secret_key = "change-me-in-production" + + +SETTINGS_CONFIG = [ + ("led_pin", "LED Pin", "number"), + ("num_leds", "Number of LEDs", "number"), + ("color_order", "Color Order", "choice", ["rgb", "rbg", "grb", "gbr", "brg", "bgr"]), + ("name", "Device Name", "text"), + ("pattern", "Pattern", "text"), + ("delay", "Delay (ms)", "number"), + ("brightness", "Brightness", "number"), + ("n1", "N1", "number"), + ("n2", "N2", "number"), + ("n3", "N3", "number"), + ("n4", "N4", "number"), + ("n5", "N5", "number"), + ("n6", "N6", "number"), + ("ap_password", "AP Password", "text"), + ("id", "ID", "number"), + ("debug", "Debug Mode", "choice", ["True", "False"]), +] + + +def download_settings(device: str) -> dict: + """Download settings.json from the device.""" + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) + temp_path = temp_file.name + temp_file.close() + + try: + copy_file(from_device=True, device=device, remote_path="settings.json", local_path=temp_path) + with open(temp_path, "r", encoding="utf-8") as f: + return json.load(f) + finally: + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except OSError: + pass + + +def upload_settings(device: str, settings: dict) -> None: + """Upload settings.json to the device and reset device.""" + temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) + temp_path = temp_file.name + + try: + json.dump(settings, temp_file, indent=2) + temp_file.close() + + copy_file(from_device=False, device=device, remote_path="settings.json", local_path=temp_path) + + # Reset device (best effort) + try: + conn = DeviceConnection(device) + conn.connect() + conn.reset() + conn.disconnect() + except Exception: + pass + finally: + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except OSError: + pass + + +def parse_settings_from_form(form) -> dict: + settings = {} + for cfg in SETTINGS_CONFIG: + key = cfg[0] + raw = (form.get(key) or "").strip() + if raw == "": + continue + + if key in ["led_pin", "num_leds", "delay", "brightness", "id", "n1", "n2", "n3", "n4", "n5", "n6"]: + try: + settings[key] = int(raw) + except ValueError: + settings[key] = raw + elif key == "debug": + settings[key] = raw == "True" + else: + settings[key] = raw + return settings + + +TEMPLATE = """ + + + + + LED Bar Configuration + + + + +
+
+
+

+ LED Bar Configuration + Web Console +

+
+ + + Raspberry Pi · MicroPython + + settings.json live editor +
+
+
+ Device: {{ device or "/dev/ttyACM0" }} +
+
+ +
+
+
+
+

Device Connection

+ Connect to your MicroPython LED controller and sync configuration +
+
+ +
+ +
+ + + +
+ +
+ + {{ status or "Ready" }} +
+ +
+ Tip: + Download from device → tweak parameters → Upload and reboot. +
+ +
+ +
+
+

LED Settings

+ Edit all fields before uploading back to your controller +
+
+ +
+ {% for field in settings_config %} + {% set key, label, field_type = field[0], field[1], field[2] %} +
+ + {% if field_type == 'choice' %} + {% set choices = field[3] %} + + {% else %} + + {% endif %} +
+ {% endfor %} +
+ +
+ + +
+
+
+ +
+
+
+

Raw JSON

+ For advanced editing, paste or copy the full settings.json +
+
+ +
+ + + + +
+ + +
+
+
+
+
+ +
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +
+ {{ message }} +
+ {% endfor %} + {% endif %} + {% endwith %} +
+ + +""" + + +@app.route("/", methods=["GET"]) +def index(): + return render_template_string( + TEMPLATE, + device="/dev/ttyACM0", + settings={}, + settings_config=SETTINGS_CONFIG, + status="Ready", + status_type="ok", + raw_json="{}", + ) + + +@app.route("/", methods=["POST"]) +def handle_action(): + action = request.form.get("action") or "" + device = (request.form.get("device") or "/dev/ttyACM0").strip() + raw_json = (request.form.get("raw_json") or "").strip() + + settings = {} + status = "Ready" + status_type = "ok" + + if action == "download": + if not device: + flash("Please specify a device.", "error") + status, status_type = "Missing device.", "error" + else: + try: + settings = download_settings(device) + raw_json = json.dumps(settings, indent=2) + flash(f"Settings downloaded from {device}.", "success") + status = f"Settings downloaded from {device}" + except Exception as exc: # pylint: disable=broad-except + if "timeout" in str(exc).lower() or "connection" in str(exc).lower(): + flash("Connection timeout. Check device connection.", "error") + status, status_type = "Connection timeout.", "error" + else: + flash(f"Failed to download settings: {exc}", "error") + status, status_type = "Download failed.", "error" + + elif action == "upload": + if not device: + flash("Please specify a device.", "error") + status, status_type = "Missing device.", "error" + else: + # Take current form fields as source of truth, falling back to JSON if present + if raw_json: + try: + settings = json.loads(raw_json) + except json.JSONDecodeError: + flash("Raw JSON is invalid; using form values instead.", "error") + settings = {} + form_settings = parse_settings_from_form(request.form) + settings.update(form_settings) + + if not settings: + flash("No settings to upload. Download or provide settings first.", "error") + status, status_type = "No settings to upload.", "error" + else: + try: + upload_settings(device, settings) + raw_json = json.dumps(settings, indent=2) + flash(f"Settings uploaded and device reset on {device}.", "success") + status = f"Settings uploaded and device reset on {device}" + except Exception as exc: # pylint: disable=broad-except + if "timeout" in str(exc).lower() or "connection" in str(exc).lower(): + flash("Connection timeout. Check device connection.", "error") + status, status_type = "Connection timeout.", "error" + else: + flash(f"Failed to upload settings: {exc}", "error") + status, status_type = "Upload failed.", "error" + + elif action == "from_json": + # No-op here, JSON is just edited in the side panel + form_settings = parse_settings_from_form(request.form) + settings.update(form_settings) + if raw_json: + try: + settings.update(json.loads(raw_json)) + flash("JSON merged into form values.", "success") + status = "JSON merged into form." + except json.JSONDecodeError: + flash("Invalid JSON; keeping previous form values.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "to_form": + if raw_json: + try: + settings = json.loads(raw_json) + flash("Form fields updated from JSON.", "success") + status = "Form fields updated from JSON." + except json.JSONDecodeError: + flash("Invalid JSON; could not update form fields.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "pretty": + if raw_json: + try: + parsed = json.loads(raw_json) + raw_json = json.dumps(parsed, indent=2) + settings = parsed if isinstance(parsed, dict) else {} + flash("JSON pretty-printed.", "success") + status = "JSON pretty-printed." + except json.JSONDecodeError: + flash("Invalid JSON; cannot pretty-print.", "error") + status, status_type = "JSON parse error.", "error" + + elif action == "clear": + settings = {} + raw_json = "{}" + flash("Form cleared.", "success") + status = "Form cleared." + + else: + # Unknown / initial action: just reflect form values back + settings = parse_settings_from_form(request.form) + if raw_json and not settings: + try: + settings = json.loads(raw_json) + except json.JSONDecodeError: + pass + + return render_template_string( + TEMPLATE, + device=device, + settings=settings, + settings_config=SETTINGS_CONFIG, + status=status, + status_type=status_type, + raw_json=raw_json or json.dumps(settings or {}, indent=2), + ) + + +def main() -> None: + # Bind to all interfaces so you can reach it from your LAN: + # python web_app.py + # Then open: http://:5000/ + app.run(host="0.0.0.0", port=5000, debug=False) + + +if __name__ == "__main__": + main()