Files
led-tool/cli.py
2026-03-10 22:50:00 +13:00

496 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
LED Bar Configuration CLI Tool
Command-line interface for downloading, editing, and uploading settings.json
to/from MicroPython devices via mpremote.
"""
import json
import argparse
import subprocess
import sys
import time
from typing import Dict, Any, List, Optional
import tempfile
import os
from device import copy_file, DeviceConnection
def resolve_flash_binary(path: str) -> Optional[str]:
"""Resolve binary path: full path or in bin/ directory."""
if os.path.isabs(path):
return path if os.path.exists(path) else None
if os.path.exists(path):
return os.path.abspath(path)
bin_path = os.path.join("bin", path)
return os.path.abspath(bin_path) if os.path.exists(bin_path) else None
def download_settings(device: str) -> dict:
"""
Download settings.json from the device.
Returns an empty dict if the file does not exist so that the tool
can be used on a fresh device before settings.json has been created.
"""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
temp_path = temp_file.name
temp_file.close()
try:
copy_file(from_device=True, device=device, remote_path="settings.json", local_path=temp_path)
with open(temp_path, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, RuntimeError) as e:
# If the file is missing on the device, treat it as empty settings
msg = str(e).lower()
if "errno 2" in msg or "enoent" in msg or "not found" in msg:
return {}
raise
finally:
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except OSError:
pass
def upload_settings(device: str, settings: dict) -> None:
"""Upload settings.json to the device and reset device."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
temp_path = temp_file.name
try:
json.dump(settings, temp_file, indent=2)
temp_file.close()
copy_file(from_device=False, device=device, remote_path="settings.json", local_path=temp_path)
# Reset device (best effort)
try:
conn = DeviceConnection(device)
conn.connect()
conn.reset()
conn.disconnect()
except Exception:
pass
finally:
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except OSError:
pass
def print_settings(settings: Dict[str, Any]) -> None:
"""Pretty print settings dictionary."""
print(json.dumps(settings, indent=2))
# Flags that consume the next argv as their value (for skipping during order scan)
_FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--default',
})
def _get_ordered_actions(argv: List[str]) -> List[tuple]:
"""
Scan argv and return list of (action_name, value) in order of appearance.
Actions: flash, pause, reset, upload, erase_all, rm, follow.
"""
actions = []
i = 1
while i < len(argv):
arg = argv[i]
if arg == '--pause':
if i + 1 < len(argv):
try:
secs = float(argv[i + 1])
actions.append(('pause', secs))
except ValueError:
raise ValueError("--pause requires a number (seconds)")
i += 2
else:
raise ValueError("--pause requires an argument")
continue
if arg == '--flash':
if i + 1 < len(argv):
actions.append(('flash', argv[i + 1]))
i += 2
else:
raise ValueError("--flash requires an argument")
continue
if arg in ('-r', '--reset'):
actions.append(('reset', None))
i += 1
continue
if arg in ('-u', '--upload'):
vals = []
i += 1
while i < len(argv) and not argv[i].startswith('-'):
vals.append(argv[i])
i += 1
if len(vals) >= 2:
break
if vals:
actions.append(('upload', vals))
continue
if arg == '-e':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm':
if i + 1 < len(argv):
actions.append(('rm', argv[i + 1]))
i += 2
else:
raise ValueError("--rm requires an argument")
continue
if arg in ('-f', '--follow'):
actions.append(('follow', None))
i += 1
continue
# Skip non-action flags and their values
if arg in _FLAGS_WITH_VALUE and i + 1 < len(argv):
i += 2
else:
i += 1
return actions
def main() -> None:
parser = argparse.ArgumentParser(
description="LED Bar Configuration CLI Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Download and print current settings
%(prog)s
# Download, edit device name and brightness, then upload
%(prog)s -n "LED-Strip-1" -b 128
# Set multiple parameters
%(prog)s -n "MyLED" -b 200 --preset "rainbow"
# Set number of LEDs, color order, and debug mode
%(prog)s -l 60 -o rgb -d 1
# Reset the device
%(prog)s -r
# Follow device output
%(prog)s -f
# Reset and then follow output
%(prog)s -r -f
# Flash firmware binary (full path or filename in bin/)
%(prog)s --flash firmware.bin
%(prog)s -p /dev/ttyUSB0 --flash /path/to/firmware.bin
# Reset, pause 2 seconds, then follow output
%(prog)s --reset --pause 2 --follow
# Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow
"""
)
parser.add_argument(
"-p", "--port",
default="/dev/ttyACM0",
help="Serial port/device path (default: /dev/ttyACM0)"
)
parser.add_argument(
"-n", "--name",
help="Device name"
)
parser.add_argument(
"--pin",
type=int,
metavar="N",
help="LED GPIO pin number (led_pin in settings.json)"
)
parser.add_argument(
"-b", "--brightness",
type=int,
metavar="0-255",
help="Brightness (0-255)"
)
parser.add_argument(
"-l", "--leds",
dest="leds",
type=int,
metavar="N",
help="Number of LEDs (num_leds in settings.json)"
)
parser.add_argument(
"-d", "-debug", "--debug",
dest="debug",
type=int,
choices=[0, 1],
metavar="0|1",
help="Debug mode (0 or 1)"
)
parser.add_argument(
"-o", "--order",
dest="color_order",
choices=["rgb", "rbg", "grb", "gbr", "brg", "bgr"],
help="Color order (rgb, rbg, grb, gbr, brg, bgr)"
)
parser.add_argument(
"--preset",
help="Pattern preset name"
)
parser.add_argument(
"--default",
metavar="PATTERN",
help="Default/startup pattern (startup_preset in settings.json)"
)
parser.add_argument(
"--pause",
type=float,
metavar="SECONDS",
help="Pause for N seconds (use in action sequence, e.g. --reset --pause 2 --follow)"
)
parser.add_argument(
"-r", "--reset",
action="store_true",
help="Reset the device"
)
parser.add_argument(
"-f", "--follow",
action="store_true",
help="Follow device output continuously (like tail -f)"
)
parser.add_argument(
"-s", "--show",
action="store_true",
help="Display current settings from device"
)
parser.add_argument(
"-u", "--upload",
nargs='+',
metavar=("SRC", "DEST"),
help="Upload a directory recursively to the device. Usage: -u SRC [DEST] (DEST is optional, defaults to basename of SRC)"
)
parser.add_argument(
"-e",
dest="erase_all",
action="store_true",
help="Erase all code on the device (delete all files except settings.json)"
)
parser.add_argument(
"--rm",
metavar="PATH",
help="Delete a file or directory on the device (recursive for directories)"
)
parser.add_argument(
"--flash",
metavar="BINARY",
help="Flash firmware binary to device. Path can be full path or filename in bin/"
)
try:
args = parser.parse_args()
ordered_actions = _get_ordered_actions(sys.argv)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
port = args.port
ran_reset = False
# Execute actions in command-line order
for action_name, value in ordered_actions:
if action_name == 'pause':
print(f"Pausing for {value} seconds...", file=sys.stderr)
time.sleep(value)
elif action_name == 'flash':
resolved = resolve_flash_binary(value)
if not resolved:
print(f"Error: binary not found: {value} (checked bin/)", file=sys.stderr)
sys.exit(1)
try:
print(f"Erasing flash on {port}...", file=sys.stderr)
subprocess.run(["esptool", "-p", port, "erase_flash"], check=True)
print(f"Writing {resolved} to flash at 0...", file=sys.stderr)
subprocess.run(["esptool", "-p", port, "write_flash", "0", resolved], check=True)
print("Flash complete.", file=sys.stderr)
print("Waiting 2s for device to boot...", file=sys.stderr)
time.sleep(2)
except subprocess.CalledProcessError as e:
print(f"Error flashing: {e}", file=sys.stderr)
sys.exit(1)
except FileNotFoundError:
print("Error: esptool not found. Install it with: pip install esptool", file=sys.stderr)
sys.exit(1)
elif action_name == 'reset':
ran_reset = True
try:
print(f"Resetting device on {port}...", file=sys.stderr)
conn = DeviceConnection(port)
conn.reset()
print("Device reset.", file=sys.stderr)
except Exception as e:
print(f"Error resetting device: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'upload':
if len(value) > 2:
print("Error: -u accepts at most 2 arguments (source and optional destination)", file=sys.stderr)
sys.exit(1)
upload_dir = value[0]
remote_dir = value[1] if len(value) > 1 else None
if not os.path.exists(upload_dir):
print(f"Error: Directory does not exist: {upload_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(upload_dir):
print(f"Error: Not a directory: {upload_dir}", file=sys.stderr)
sys.exit(1)
try:
if remote_dir:
print(f"Uploading {upload_dir} to {remote_dir} on device {port}...", file=sys.stderr)
else:
print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr)
conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir)
print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr)
except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'erase_all':
try:
print(f"Erasing all code on device {port}...", file=sys.stderr)
conn = DeviceConnection(port)
items = conn.list_files('')
for name, is_dir, size in items:
if name == "settings.json":
continue
conn.delete(name)
print("Erase complete.", file=sys.stderr)
except Exception as e:
print(f"Error erasing device: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'rm':
try:
print(f"Deleting {value} on device {port}...", file=sys.stderr)
conn = DeviceConnection(port)
conn.delete(value)
print(f"Successfully deleted: {value}", file=sys.stderr)
except Exception as e:
print(f"Error deleting {value}: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'follow':
if ran_reset:
time.sleep(0.5)
try:
print(f"Following output from {port}... (Press Ctrl+C to stop)", file=sys.stderr)
conn = DeviceConnection(port)
conn.follow_output()
except KeyboardInterrupt:
print("\nStopped following.", file=sys.stderr)
sys.exit(0)
except Exception as e:
print(f"Error following output: {e}", file=sys.stderr)
sys.exit(1)
return # follow blocks; when interrupted we're done
# If we ran any actions and follow wasn't last, we're done
if ordered_actions:
return
# Collect all edit parameters
edits: Dict[str, Any] = {}
if args.name is not None:
edits["name"] = args.name
if args.pin is not None:
edits["led_pin"] = args.pin
if args.brightness is not None:
edits["brightness"] = args.brightness
if args.leds is not None:
edits["num_leds"] = args.leds
if args.debug is not None:
edits["debug"] = bool(args.debug)
if args.color_order is not None:
edits["color_order"] = args.color_order
if args.preset is not None:
edits["pattern"] = args.preset
if args.default is not None:
edits["startup_preset"] = args.default
# 1. Download: get current settings from device
try:
print(f"Downloading settings from {args.port}...", file=sys.stderr)
settings = download_settings(args.port)
print("Settings downloaded successfully.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
else:
print(f"Error downloading settings: {e}", file=sys.stderr)
sys.exit(1)
# If --show only, print and exit
if args.show:
print_settings(settings)
if not edits:
return
# 2. Edit: apply edits to downloaded settings
if edits:
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr)
settings.update(edits)
print_settings(settings)
# 3. Upload: write settings back to device when edits were made
if edits:
try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings)
print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
else:
print(f"Error uploading settings: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()