#!/usr/bin/env python3 """ LED Bar Configuration CLI Tool Command-line interface for downloading, editing, and uploading settings.json to/from MicroPython devices via mpremote. """ import json import argparse import subprocess import sys import time from typing import Dict, Any, List, Optional import tempfile import os from device import copy_file, DeviceConnection def resolve_flash_binary(path: str) -> Optional[str]: """Resolve binary path: full path or in bin/ directory.""" if os.path.isabs(path): return path if os.path.exists(path) else None if os.path.exists(path): return os.path.abspath(path) bin_path = os.path.join("bin", path) return os.path.abspath(bin_path) if os.path.exists(bin_path) else None def download_settings(device: str) -> dict: """ Download settings.json from the device. Returns an empty dict if the file does not exist so that the tool can be used on a fresh device before settings.json has been created. """ temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name temp_file.close() try: copy_file(from_device=True, device=device, remote_path="settings.json", local_path=temp_path) with open(temp_path, "r", encoding="utf-8") as f: return json.load(f) except (OSError, RuntimeError) as e: # If the file is missing on the device, treat it as empty settings msg = str(e).lower() if "errno 2" in msg or "enoent" in msg or "not found" in msg: return {} raise finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass def upload_settings(device: str, settings: dict) -> None: """Upload settings.json to the device and reset device.""" temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name try: json.dump(settings, temp_file, indent=2) temp_file.close() copy_file(from_device=False, device=device, remote_path="settings.json", local_path=temp_path) # Reset device (best effort) try: conn = DeviceConnection(device) conn.connect() conn.reset() conn.disconnect() except Exception: pass finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass PRESETS_REMOTE = "presets.json" def download_presets(device: str) -> dict: """Download presets.json from the device; missing file -> {}.""" temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name temp_file.close() try: copy_file(from_device=True, device=device, remote_path=PRESETS_REMOTE, local_path=temp_path) with open(temp_path, "r", encoding="utf-8") as f: return json.load(f) except (OSError, RuntimeError) as e: msg = str(e).lower() if "errno 2" in msg or "enoent" in msg or "not found" in msg: return {} raise finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass def upload_presets(device: str, presets: dict, *, reset: bool = True) -> None: """Upload presets.json; optional reset (skip when caller will reset via settings upload).""" temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) temp_path = temp_file.name try: json.dump(presets, temp_file, indent=2) temp_file.close() copy_file(from_device=False, device=device, remote_path=PRESETS_REMOTE, local_path=temp_path) if reset: try: conn = DeviceConnection(device) conn.connect() conn.reset() conn.disconnect() except Exception: pass finally: if os.path.exists(temp_path): try: os.unlink(temp_path) except OSError: pass def print_settings(settings: Dict[str, Any]) -> None: """Pretty print settings dictionary.""" print(json.dumps(settings, indent=2)) # Flags that consume the next argv as their value (for skipping during order scan) _FLAGS_WITH_VALUE = frozenset({ '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '--preset', '--pattern', '--default', '--transport', '--ssid', '--wifi-password', '--wifi-channel', }) def _get_ordered_actions(argv: List[str]) -> List[tuple]: """ Scan argv and return list of (action_name, value) in order of appearance. Actions: flash, pause, reset, upload, erase_all, rm, follow. """ actions = [] i = 1 while i < len(argv): arg = argv[i] if arg == '--pause': if i + 1 < len(argv): try: secs = float(argv[i + 1]) actions.append(('pause', secs)) except ValueError: raise ValueError("--pause requires a number (seconds)") i += 2 else: raise ValueError("--pause requires an argument") continue if arg == '--flash': if i + 1 < len(argv): actions.append(('flash', argv[i + 1])) i += 2 else: raise ValueError("--flash requires an argument") continue if arg in ('-r', '--reset'): actions.append(('reset', None)) i += 1 continue if arg in ('-u', '--upload'): vals = [] i += 1 while i < len(argv) and not argv[i].startswith('-'): vals.append(argv[i]) i += 1 if len(vals) >= 2: break if vals: actions.append(('upload', vals)) continue if arg == '--src': # Upload local DIR (default: ./src) to device root (:/ local_dir = "src" if i + 1 < len(argv) and not argv[i + 1].startswith('-'): local_dir = argv[i + 1] i += 2 else: i += 1 # Use empty string as remote_dir to map to root on device actions.append(('upload', [local_dir, ""])) continue if arg == '--lib': # Upload local DIR to /lib on device if i + 1 < len(argv): local_dir = argv[i + 1] actions.append(('upload', [local_dir, "lib"])) i += 2 else: raise ValueError("--lib requires a directory argument") continue if arg == '-e': actions.append(('erase_all', None)) i += 1 continue if arg == '--rm': if i + 1 < len(argv): actions.append(('rm', argv[i + 1])) i += 2 else: raise ValueError("--rm requires an argument") continue if arg in ('-f', '--follow'): # Optional duration in seconds: --follow [SECONDS] follow_secs = None if i + 1 < len(argv) and not argv[i + 1].startswith('-'): try: follow_secs = float(argv[i + 1]) i += 2 except ValueError: # Not a number, treat as flag-only i += 1 actions.append(('follow', follow_secs)) else: actions.append(('follow', None)) i += 1 continue # Skip non-action flags and their values if arg in _FLAGS_WITH_VALUE and i + 1 < len(argv): i += 2 else: i += 1 return actions def main() -> None: parser = argparse.ArgumentParser( description="LED Bar Configuration CLI Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Download and print current settings %(prog)s # Download, edit device name and brightness, then upload %(prog)s -n "LED-Strip-1" -b 128 # Set multiple parameters %(prog)s -n "MyLED" -b 200 --preset "rainbow" # Set number of LEDs, color order, and debug mode %(prog)s -l 60 -o rgb -d 1 # Reset the device %(prog)s -r # Follow device output %(prog)s -f # Reset and then follow output %(prog)s -r -f # Flash firmware binary (full path or filename in bin/) %(prog)s --flash firmware.bin %(prog)s -p /dev/ttyUSB0 --flash /path/to/firmware.bin # Reset, pause 2 seconds, then follow output %(prog)s --reset --pause 2 --follow # Set name, num_leds, default pattern, and upload %(prog)s --name "MyStrip" -l 60 --default rainbow """ ) parser.add_argument( "-p", "--port", default="/dev/ttyACM0", help="Serial port/device path (default: /dev/ttyACM0)" ) parser.add_argument( "-n", "--name", help="Device name" ) parser.add_argument( "--id", dest="device_id", type=int, metavar="0-255", help="Numeric device ID used for ESPNow (0-255)" ) parser.add_argument( "--pin", type=int, metavar="N", help="LED GPIO pin number (led_pin in settings.json)" ) parser.add_argument( "-b", "--brightness", type=int, metavar="0-255", help="Brightness (0-255)" ) parser.add_argument( "-l", "--leds", dest="leds", type=int, metavar="N", help="Number of LEDs (num_leds in settings.json)" ) parser.add_argument( "-d", "-debug", "--debug", dest="debug", type=int, choices=[0, 1], metavar="0|1", help="Debug mode (0 or 1)" ) parser.add_argument( "-o", "--order", dest="color_order", choices=["rgb", "rbg", "grb", "gbr", "brg", "bgr"], help="Color order (rgb, rbg, grb, gbr, brg, bgr)" ) parser.add_argument( "--preset", metavar="NAME", help="Create or replace preset NAME in presets.json (led-driver; use --pattern)", ) parser.add_argument( "--pattern", choices=[ "off", "on", "blink", "rainbow", "pulse", "transition", "chase", "circle", ], help="Pattern type for --preset (default: on)", ) parser.add_argument( "--default", metavar="NAME", help="Default/startup preset name in settings.json (which preset runs at boot)", ) parser.add_argument( "--transport", choices=["espnow", "wifi"], help="led-driver transport_type", ) parser.add_argument( "--ssid", help="led-driver ssid (Wi-Fi network in wifi mode)", ) parser.add_argument( "--wifi-password", dest="wifi_password", metavar="PASS", help="led-driver password (Wi-Fi PSK in wifi mode)", ) parser.add_argument( "--wifi-channel", dest="wifi_channel", type=int, metavar="1-11", help="led-driver wifi_channel (ESP-NOW mode)", ) parser.add_argument( "--pause", type=float, metavar="SECONDS", help="Pause for N seconds (use in action sequence, e.g. --reset --pause 2 --follow)" ) parser.add_argument( "-r", "--reset", action="store_true", help="Reset the device" ) parser.add_argument( "-f", "--follow", nargs="?", const=None, type=float, metavar="SECONDS", help="Follow device output continuously (like tail -f). Optionally specify SECONDS to limit follow duration." ) parser.add_argument( "-s", "--show", action="store_true", help="Display current settings from device" ) parser.add_argument( "-u", "--upload", nargs='+', metavar=("SRC", "DEST"), help="Upload a directory recursively to the device. Usage: -u SRC [DEST] (DEST is optional, defaults to basename of SRC)" ) parser.add_argument( "--src", nargs="?", const="src", metavar="DIR", help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src." ) parser.add_argument( "--lib", metavar="DIR", help="Upload DIR recursively to /lib on device" ) parser.add_argument( "-e", dest="erase_all", action="store_true", help="Erase all code on the device (delete all files except settings.json)" ) parser.add_argument( "--rm", metavar="PATH", help="Delete a file or directory on the device (recursive for directories)" ) parser.add_argument( "--flash", metavar="BINARY", help="Flash firmware binary to device. Path can be full path or filename in bin/" ) try: args = parser.parse_args() ordered_actions = _get_ordered_actions(sys.argv) except ValueError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) port = args.port ran_reset = False # Execute actions in command-line order for action_name, value in ordered_actions: if action_name == 'pause': print(f"Pausing for {value} seconds...", file=sys.stderr) time.sleep(value) elif action_name == 'flash': resolved = resolve_flash_binary(value) if not resolved: print(f"Error: binary not found: {value} (checked bin/)", file=sys.stderr) sys.exit(1) try: print(f"Erasing flash on {port}...", file=sys.stderr) subprocess.run(["esptool", "-p", port, "erase_flash"], check=True) print(f"Writing {resolved} to flash at 0...", file=sys.stderr) subprocess.run(["esptool", "-p", port, "write_flash", "0", resolved], check=True) print("Flash complete.", file=sys.stderr) print("Waiting 2s for device to boot...", file=sys.stderr) time.sleep(2) except subprocess.CalledProcessError as e: print(f"Error flashing: {e}", file=sys.stderr) sys.exit(1) except FileNotFoundError: print("Error: esptool not found. Install it with: pip install esptool", file=sys.stderr) sys.exit(1) elif action_name == 'reset': ran_reset = True try: print(f"Resetting device on {port}...", file=sys.stderr) conn = DeviceConnection(port) conn.reset() print("Device reset.", file=sys.stderr) except Exception as e: print(f"Error resetting device: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'upload': if len(value) > 2: print("Error: -u accepts at most 2 arguments (source and optional destination)", file=sys.stderr) sys.exit(1) upload_dir = value[0] remote_dir = value[1] if len(value) > 1 else None if not os.path.exists(upload_dir): print(f"Error: Directory does not exist: {upload_dir}", file=sys.stderr) sys.exit(1) if not os.path.isdir(upload_dir): print(f"Error: Not a directory: {upload_dir}", file=sys.stderr) sys.exit(1) try: if remote_dir: print(f"Uploading {upload_dir} to {remote_dir} on device {port}...", file=sys.stderr) else: print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr) conn = DeviceConnection(port) files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir) print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr) except Exception as e: print(f"Error uploading directory: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'erase_all': try: print(f"Erasing all code on device {port}...", file=sys.stderr) conn = DeviceConnection(port) items = conn.list_files('') for name, is_dir, size in items: if name == "settings.json": continue conn.delete(name) print("Erase complete.", file=sys.stderr) except Exception as e: print(f"Error erasing device: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'rm': try: print(f"Deleting {value} on device {port}...", file=sys.stderr) conn = DeviceConnection(port) conn.delete(value) print(f"Successfully deleted: {value}", file=sys.stderr) except Exception as e: print(f"Error deleting {value}: {e}", file=sys.stderr) sys.exit(1) elif action_name == 'follow': if ran_reset: time.sleep(0.5) try: if value is None: print(f"Following output from {port}... (Press Ctrl+C to stop)", file=sys.stderr) else: print(f"Following output from {port} for {value} seconds...", file=sys.stderr) conn = DeviceConnection(port) conn.follow_output(value) except KeyboardInterrupt: print("\nStopped following.", file=sys.stderr) sys.exit(0) except Exception as e: print(f"Error following output: {e}", file=sys.stderr) sys.exit(1) return # follow blocks; when interrupted we're done # Collect all edit parameters edits: Dict[str, Any] = {} if args.name is not None: edits["name"] = args.name if args.pin is not None: edits["led_pin"] = args.pin if args.brightness is not None: edits["brightness"] = args.brightness if args.leds is not None: edits["num_leds"] = args.leds if args.debug is not None: edits["debug"] = bool(args.debug) if args.color_order is not None: edits["color_order"] = args.color_order if args.default is not None: edits["default"] = args.default if args.transport is not None: edits["transport_type"] = args.transport if args.ssid is not None: edits["ssid"] = args.ssid if args.wifi_password is not None: edits["password"] = args.wifi_password if args.wifi_channel is not None: edits["wifi_channel"] = max(1, min(11, args.wifi_channel)) if args.device_id is not None: # Clamp into single-byte range; store as int in settings.json edits["id"] = max(0, min(255, args.device_id)) # 1. Download: get current settings from device try: print(f"Downloading settings from {args.port}...", file=sys.stderr) settings = download_settings(args.port) print("Settings downloaded successfully.", file=sys.stderr) except Exception as e: if "timeout" in str(e).lower() or "connection" in str(e).lower(): print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) else: print(f"Error downloading settings: {e}", file=sys.stderr) sys.exit(1) # If --show only, print and exit (unless presets or settings are being written) if args.show: print_settings(settings) if not edits and args.preset is None: return # 2. Edit: apply edits to downloaded settings if edits: print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) settings.update(edits) print_settings(settings) # 3a. Presets file (led-driver presets.json) if args.preset is not None: pattern = args.pattern if args.pattern is not None else "on" try: print(f"Downloading presets from {args.port}...", file=sys.stderr) presets_data = download_presets(args.port) entry = presets_data.get(args.preset) if not isinstance(entry, dict): entry = {} entry["p"] = pattern presets_data[args.preset] = entry print( f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...", file=sys.stderr, ) upload_presets(args.port, presets_data, reset=not bool(edits)) print("Presets uploaded successfully.", file=sys.stderr) if not edits: print("Device will reset.", file=sys.stderr) except Exception as e: if "timeout" in str(e).lower() or "connection" in str(e).lower(): print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) else: print(f"Error uploading presets: {e}", file=sys.stderr) sys.exit(1) # 3b. Settings upload (resets device) if edits: try: print(f"\nUploading settings to {args.port}...", file=sys.stderr) upload_settings(args.port, settings) print("Settings uploaded successfully. Device will reset.", file=sys.stderr) except Exception as e: if "timeout" in str(e).lower() or "connection" in str(e).lower(): print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) else: print(f"Error uploading settings: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()