#!/usr/bin/env python3 """ LED Bar Configuration CLI Tool Command-line interface for downloading, editing, and uploading settings.json to/from MicroPython devices via mpremote. """ import json import argparse import subprocess import sys import time from typing import Dict, Any, List, Optional import tempfile import os from device import copy_file, DeviceConnection def resolve_flash_binary(path: str) -> Optional[str]: """Resolve binary path: full path or in bin/ directory.""" if os.path.isabs(path): return path if os.path.exists(path) else None if os.path.exists(path): return os.path.abspath(path) bin_path = os.path.join("bin", path) return os.path.abspath(bin_path) if os.path.exists(bin_path) else None def download_settings(device: str) -> dict: """ Download settings.json from the device. Returns an empty dict if the file does not exist so that the tool can be used on a fresh device before settings.json has been created. """ temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name temp_file.close() try: copy_file(from_device=True, device=device, remote_path="settings.json", local_path=temp_path) with open(temp_path, "r", encoding="utf-8") as f: return json.load(f) except (OSError, RuntimeError) as e: # If the file is missing on the device, treat it as empty settings msg = str(e).lower() if "errno 2" in msg or "enoent" in msg or "not found" in msg: return {} raise finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass def upload_settings(device: str, settings: dict) -> None: """Upload settings.json to the device and reset device.""" temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name try: json.dump(settings, temp_file, indent=2) temp_file.close() copy_file(from_device=False, device=device, remote_path="settings.json", local_path=temp_path) # Reset device (best effort) try: conn = DeviceConnection(device) conn.connect() conn.reset() conn.disconnect() except Exception: pass finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass def print_settings(settings: Dict[str, Any]) -> None: """Pretty print settings dictionary.""" print(json.dumps(settings, indent=2)) # Flags that consume the next argv as their value (for skipping during order scan) _FLAGS_WITH_VALUE = frozenset({ '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '--preset', '--default', }) def _get_ordered_actions(argv: List[str]) -> List[tuple]: """ Scan argv and return list of (action_name, value) in order of appearance. Actions: flash, pause, reset, upload, erase_all, rm, follow. """ actions = [] i = 1 while i < len(argv): arg = argv[i] if arg == '--pause': if i + 1 < len(argv): try: secs = float(argv[i + 1]) actions.append(('pause', secs)) except ValueError: raise ValueError("--pause requires a number (seconds)") i += 2 else: raise ValueError("--pause requires an argument") continue if arg == '--flash': if i + 1 < len(argv): actions.append(('flash', argv[i + 1])) i += 2 else: raise ValueError("--flash requires an argument") continue if arg in ('-r', '--reset'): actions.append(('reset', None)) i += 1 continue if arg in ('-u', '--upload'): vals = [] i += 1 while i < len(argv) and not argv[i].startswith('-'): vals.append(argv[i]) i += 1 if len(vals) >= 2: break if vals: actions.append(('upload', vals)) continue if arg == '--src': # Upload local DIR (default: ./src) to device root (:/ local_dir = "src" if i + 1 < len(argv) and not argv[i + 1].startswith('-'): local_dir = argv[i + 1] i += 2 else: i += 1 # Use empty string as remote_dir to map to root on device actions.append(('upload', [local_dir, ""])) continue if arg == '--lib': # Upload local DIR to /lib on device if i + 1 < len(argv): local_dir = argv[i + 1] actions.append(('upload', [local_dir, "lib"])) i += 2 else: raise ValueError("--lib requires a directory argument") continue if arg == '-e': actions.append(('erase_all', None)) i += 1 continue if arg == '--rm': if i + 1 < len(argv): actions.append(('rm', argv[i + 1])) i += 2 else: raise ValueError("--rm requires an argument") continue if arg in ('-f', '--follow'): # Optional duration in seconds: --follow [SECONDS] follow_secs = None if i + 1 < len(argv) and not argv[i + 1].startswith('-'): try: follow_secs = float(argv[i + 1]) i += 2 except ValueError: # Not a number, treat as flag-only i += 1 actions.append(('follow', follow_secs)) else: actions.append(('follow', None)) i += 1 continue # Skip non-action flags and their values if arg in _FLAGS_WITH_VALUE and i + 1 < len(argv): i += 2 else: i += 1 return actions def main() -> None: parser = argparse.ArgumentParser( description="LED Bar Configuration CLI Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Download and print current settings %(prog)s # Download, edit device name and brightness, then upload %(prog)s -n "LED-Strip-1" -b 128 # Set multiple parameters %(prog)s -n "MyLED" -b 200 --preset "rainbow" # Set number of LEDs, color order, and debug mode %(prog)s -l 60 -o rgb -d 1 # Reset the device %(prog)s -r # Follow device output %(prog)s -f # Reset and then follow output %(prog)s -r -f # Flash firmware binary (full path or filename in bin/) %(prog)s --flash firmware.bin %(prog)s -p /dev/ttyUSB0 --flash /path/to/firmware.bin # Reset, pause 2 seconds, then follow output %(prog)s --reset --pause 2 --follow # Set name, num_leds, default pattern, and upload %(prog)s --name "MyStrip" -l 60 --default rainbow """ ) parser.add_argument( "-p", "--port", default="/dev/ttyACM0", help="Serial port/device path (default: /dev/ttyACM0)" ) parser.add_argument( "-n", "--name", help="Device name" ) parser.add_argument( "--pin", type=int, metavar="N", help="LED GPIO pin number (led_pin in settings.json)" ) parser.add_argument( "-b", "--brightness", type=int, metavar="0-255", help="Brightness (0-255)" ) parser.add_argument( "-l", "--leds", dest="leds", type=int, metavar="N", help="Number of LEDs (num_leds in settings.json)" ) parser.add_argument( "-d", "-debug", "--debug", dest="debug", type=int, choices=[0, 1], metavar="0|1", help="Debug mode (0 or 1)" ) parser.add_argument( "-o", "--order", dest="color_order", choices=["rgb", "rbg", "grb", "gbr", "brg", "bgr"], help="Color order (rgb, rbg, grb, gbr, brg, bgr)" ) parser.add_argument( "--preset", help="Pattern preset name" ) parser.add_argument( "--default", metavar="PATTERN", help="Default/startup pattern (stored as 'default' in settings.json)" ) parser.add_argument( "--pause", type=float, metavar="SECONDS", help="Pause for N seconds (use in action sequence, e.g. --reset --pause 2 --follow)" ) parser.add_argument( "-r", "--reset", action="store_true", help="Reset the device" ) parser.add_argument( "-f", "--follow", nargs="?", const=None, type=float, metavar="SECONDS", help="Follow device output continuously (like tail -f). Optionally specify SECONDS to limit follow duration." ) parser.add_argument( "-s", "--show", action="store_true", help="Display current settings from device" ) parser.add_argument( "-u", "--upload", nargs='+', metavar=("SRC", "DEST"), help="Upload a directory recursively to the device. Usage: -u SRC [DEST] (DEST is optional, defaults to basename of SRC)" ) parser.add_argument( "--src", nargs="?", const="src", metavar="DIR", help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src." ) parser.add_argument( "--lib", metavar="DIR", help="Upload DIR recursively to /lib on device" ) parser.add_argument( "-e", dest="erase_all", action="store_true", help="Erase all code on the device (delete all files except settings.json)" ) parser.add_argument( "--rm", metavar="PATH", help="Delete a file or directory on the device (recursive for directories)" ) parser.add_argument( "--flash", metavar="BINARY", help="Flash firmware binary to device. Path can be full path or filename in bin/" ) try: args = parser.parse_args() ordered_actions = _get_ordered_actions(sys.argv) except ValueError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) port = args.port ran_reset = False # Execute actions in command-line order for action_name, value in ordered_actions: if action_name == 'pause': print(f"Pausing for {value} seconds...", file=sys.stderr) time.sleep(value) elif action_name == 'flash': resolved = resolve_flash_binary(value) if not resolved: print(f"Error: binary not found: {value} (checked bin/)", file=sys.stderr) sys.exit(1) try: print(f"Erasing flash on {port}...", file=sys.stderr) subprocess.run(["esptool", "-p", port, "erase_flash"], check=True) print(f"Writing {resolved} to flash at 0...", file=sys.stderr) subprocess.run(["esptool", "-p", port, "write_flash", "0", resolved], check=True) print("Flash complete.", file=sys.stderr) print("Waiting 2s for device to boot...", file=sys.stderr) time.sleep(2) except subprocess.CalledProcessError as e: print(f"Error flashing: {e}", file=sys.stderr) sys.exit(1) except FileNotFoundError: print("Error: esptool not found. Install it with: pip install esptool", file=sys.stderr) sys.exit(1) elif action_name == 'reset': ran_reset = True try: print(f"Resetting device on {port}...", file=sys.stderr) conn = DeviceConnection(port) conn.reset() print("Device reset.", file=sys.stderr) except Exception as e: print(f"Error resetting device: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'upload': if len(value) > 2: print("Error: -u accepts at most 2 arguments (source and optional destination)", file=sys.stderr) sys.exit(1) upload_dir = value[0] remote_dir = value[1] if len(value) > 1 else None if not os.path.exists(upload_dir): print(f"Error: Directory does not exist: {upload_dir}", file=sys.stderr) sys.exit(1) if not os.path.isdir(upload_dir): print(f"Error: Not a directory: {upload_dir}", file=sys.stderr) sys.exit(1) try: if remote_dir: print(f"Uploading {upload_dir} to {remote_dir} on device {port}...", file=sys.stderr) else: print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr) conn = DeviceConnection(port) files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir) print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr) except Exception as e: print(f"Error uploading directory: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'erase_all': try: print(f"Erasing all code on device {port}...", file=sys.stderr) conn = DeviceConnection(port) items = conn.list_files('') for name, is_dir, size in items: if name == "settings.json": continue conn.delete(name) print("Erase complete.", file=sys.stderr) except Exception as e: print(f"Error erasing device: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'rm': try: print(f"Deleting {value} on device {port}...", file=sys.stderr) conn = DeviceConnection(port) conn.delete(value) print(f"Successfully deleted: {value}", file=sys.stderr) except Exception as e: print(f"Error deleting {value}: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'follow': if ran_reset: time.sleep(0.5) try: if value is None: print(f"Following output from {port}... (Press Ctrl+C to stop)", file=sys.stderr) else: print(f"Following output from {port} for {value} seconds...", file=sys.stderr) conn = DeviceConnection(port) conn.follow_output(value) except KeyboardInterrupt: print("\nStopped following.", file=sys.stderr) sys.exit(0) except Exception as e: print(f"Error following output: {e}", file=sys.stderr) sys.exit(1) return # follow blocks; when interrupted we're done # Collect all edit parameters edits: Dict[str, Any] = {} if args.name is not None: edits["name"] = args.name if args.pin is not None: edits["led_pin"] = args.pin if args.brightness is not None: edits["brightness"] = args.brightness if args.leds is not None: edits["num_leds"] = args.leds if args.debug is not None: edits["debug"] = bool(args.debug) if args.color_order is not None: edits["color_order"] = args.color_order if args.preset is not None: edits["pattern"] = args.preset if args.default is not None: edits["default"] = args.default # 1. Download: get current settings from device try: print(f"Downloading settings from {args.port}...", file=sys.stderr) settings = download_settings(args.port) print("Settings downloaded successfully.", file=sys.stderr) except Exception as e: if "timeout" in str(e).lower() or "connection" in str(e).lower(): print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) else: print(f"Error downloading settings: {e}", file=sys.stderr) sys.exit(1) # If --show only, print and exit if args.show: print_settings(settings) if not edits: return # 2. Edit: apply edits to downloaded settings if edits: print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) settings.update(edits) print_settings(settings) # 3. Upload: write settings back to device when edits were made if edits: try: print(f"\nUploading settings to {args.port}...", file=sys.stderr) upload_settings(args.port, settings) print("Settings uploaded successfully. Device will reset.", file=sys.stderr) except Exception as e: if "timeout" in str(e).lower() or "connection" in str(e).lower(): print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) else: print(f"Error uploading settings: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()