4 Commits

Author SHA1 Message Date
ccc215acbd fix(cli): lazy settings fetch and full device erase
Download settings only for --show or when applying edits; skip upload
when unchanged; erase entire device root including settings.json.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 21:14:54 +12:00
580fd11aca fix(cli): skip redundant settings uploads when unchanged
Only upload settings when edited values differ from current device settings to avoid unnecessary resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:48:54 +12:00
pi
d6331a105c feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
2f3db9272b feat(cli): extend upload flags for src, patterns, lib, and --all
Support --patterns/--paterns, --all, --erase, and src upload excluding patterns/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:13 +12:00
3 changed files with 287 additions and 61 deletions

View File

@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) | | `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
| `-s`, `--show` | Print current settings from the device | | `-s`, `--show` | Print current settings from the device |
| `-n`, `--name` | Device **name** | | `-n`, `--name` | Device **name** |
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
| `--id` | Numeric device id (ESP-NOW, 0255) | | `--id` | Numeric device id (ESP-NOW, 0255) |
| `--pin` | LED GPIO (`led_pin`) | | `--pin` | LED GPIO (`led_pin`) |
| `-b`, `--brightness` | Brightness 0255 | | `-b`, `--brightness` | Brightness 0255 |
@@ -26,11 +27,11 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `--pause` | Sleep N seconds (for chained actions) | | `--pause` | Sleep N seconds (for chained actions) |
| `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` | | `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` |
| `--src`, `--lib` | Upload `src/` or a tree to `/lib` | | `--src`, `--lib` | Upload `src/` or a tree to `/lib` |
| `-e` | Erase device code (keeps `settings.json`) | | `-e`, `--erase` | Erase everything at device root (including `settings.json` and `presets.json`) |
| `--rm` | Remove a path on the device | | `--rm` | Remove a path on the device |
| `--flash` | Flash a firmware binary (uses **esptool** on the host) | | `--flash` | Flash a firmware binary (uses **esptool** on the host) |
**Default behaviour:** the tool always downloads `settings.json`, prints the merged view, and uploads again **only** when you pass setting edits (`--name`, `--leds`, …), **`--preset`**, or the relevant upload/flash/erase actions in order. **Settings I/O:** With no arguments, **`led-cli`** prints help (no device access). **`--show`** downloads `settings.json` and prints it (read-only). Setting fields (`--name`, `-b`, …) download once, merge, print, and **upload only when a value actually changed**. **`--preset`** alone only touches **`presets.json`** (no `settings.json` download). Ordered actions (`--reset`, `--upload`, `-e`, …) run without touching settings unless you also pass **`--show`** or setting / preset edits as above.
Run **`python cli.py -h`** for the full epilog and argument list. Run **`python cli.py -h`** for the full epilog and argument list.

210
cli.py
View File

@@ -2,8 +2,10 @@
""" """
LED Bar Configuration CLI Tool LED Bar Configuration CLI Tool
Command-line interface for downloading, editing, and uploading settings.json Command-line interface for editing settings.json on MicroPython devices via mpremote.
to/from MicroPython devices via mpremote. Settings are downloaded from the device only when displaying (--show) or when applying
setting changes; uploads occur only when something changed. Use --erase to clear the
device filesystem (including settings).
""" """
import json import json
@@ -11,11 +13,12 @@ import argparse
import subprocess import subprocess
import sys import sys
import time import time
import shutil
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import tempfile import tempfile
import os import os
from device import copy_file, DeviceConnection from device import copy_file, DeviceConnection, firmware_default_device_name
def resolve_flash_binary(path: str) -> Optional[str]: def resolve_flash_binary(path: str) -> Optional[str]:
@@ -147,7 +150,7 @@ _FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--pattern', '--default', '--transport', '--ssid', '--preset', '--pattern', '--default', '--transport', '--ssid',
'--wifi-password', '--wifi-channel', '--wifi-password', '--wifi-channel', '--src', '--lib', '--patterns', '--paterns',
}) })
@@ -201,8 +204,24 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
i += 2 i += 2
else: else:
i += 1 i += 1
# Use empty string as remote_dir to map to root on device # Upload source tree excluding patterns/ (handled by --patterns).
actions.append(('upload', [local_dir, ""])) actions.append(('upload_src_no_patterns', local_dir))
continue
if arg in ('--patterns', '--paterns'):
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
local_dir = os.path.join("src", "patterns")
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "patterns"]))
continue
if arg == '--all':
actions.append(('upload_src_no_patterns', "src"))
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
actions.append(('upload', ["lib", "lib"]))
i += 1
continue continue
if arg == '--lib': if arg == '--lib':
# Upload local DIR (default: ./lib) to /lib on device # Upload local DIR (default: ./lib) to /lib on device
@@ -222,6 +241,10 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
actions.append(('erase_all', None)) actions.append(('erase_all', None))
i += 1 i += 1
continue continue
if arg == '--erase':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm': if arg == '--rm':
if i + 1 < len(argv): if i + 1 < len(argv):
actions.append(('rm', argv[i + 1])) actions.append(('rm', argv[i + 1]))
@@ -258,10 +281,13 @@ def main() -> None:
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
Examples: Examples:
# Download and print current settings # Show help (no device I/O)
%(prog)s %(prog)s
# Download, edit device name and brightness, then upload # Show current settings.json from device (read-only)
%(prog)s --show
# Edit device name and brightness, then upload only if values changed
%(prog)s -n "LED-Strip-1" -b 128 %(prog)s -n "LED-Strip-1" -b 128
# Set multiple parameters # Set multiple parameters
@@ -288,6 +314,9 @@ Examples:
# Set name, num_leds, default pattern, and upload # Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow %(prog)s --name "MyStrip" -l 60 --default rainbow
# Reset logical device name to firmware default (STA MAC based)
%(prog)s --reset-device-name
""" """
) )
@@ -302,6 +331,12 @@ Examples:
help="Device name" help="Device name"
) )
parser.add_argument(
"--reset-device-name",
action="store_true",
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
)
parser.add_argument( parser.add_argument(
"--id", "--id",
dest="device_id", dest="device_id",
@@ -434,7 +469,7 @@ Examples:
nargs="?", nargs="?",
const="src", const="src",
metavar="DIR", metavar="DIR",
help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src." help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
) )
parser.add_argument( parser.add_argument(
@@ -445,6 +480,21 @@ Examples:
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib." help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
) )
parser.add_argument(
"--all",
action="store_true",
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
)
parser.add_argument(
"--patterns", "--paterns",
dest="patterns_dir",
nargs="?",
const=os.path.join("src", "patterns"),
metavar="DIR",
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
)
parser.add_argument( parser.add_argument(
"--ls", "--ls",
action="store_true", action="store_true",
@@ -452,10 +502,10 @@ Examples:
) )
parser.add_argument( parser.add_argument(
"-e", "-e", "--erase",
dest="erase_all", dest="erase_all",
action="store_true", action="store_true",
help="Erase all code on the device (delete all files except settings.json)" help="Erase the device filesystem: delete all files and directories at device root (including settings.json and presets.json)",
) )
parser.add_argument( parser.add_argument(
@@ -471,6 +521,9 @@ Examples:
) )
try: try:
if len(sys.argv) <= 1:
parser.print_help()
return
args = parser.parse_args() args = parser.parse_args()
ordered_actions = _get_ordered_actions(sys.argv) ordered_actions = _get_ordered_actions(sys.argv)
except ValueError as e: except ValueError as e:
@@ -539,6 +592,38 @@ Examples:
except Exception as e: except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr) print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
elif action_name == 'upload_src_no_patterns':
src_dir = value
if not os.path.exists(src_dir):
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(src_dir):
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
sys.exit(1)
try:
with tempfile.TemporaryDirectory() as temp_src:
for entry in sorted(os.listdir(src_dir)):
if entry == "patterns":
continue
src_entry = os.path.join(src_dir, entry)
dst_entry = os.path.join(temp_src, entry)
if os.path.isdir(src_entry):
shutil.copytree(src_entry, dst_entry)
else:
shutil.copy2(src_entry, dst_entry)
print(
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
file=sys.stderr,
)
conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(temp_src, "")
print(
f"Upload complete: {files_copied} files, {dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e:
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'ls': elif action_name == 'ls':
try: try:
@@ -554,14 +639,12 @@ Examples:
elif action_name == 'erase_all': elif action_name == 'erase_all':
try: try:
print(f"Erasing all code on device {port}...", file=sys.stderr) print(f"Erasing device filesystem on {port}...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
items = conn.list_files('') items = conn.list_files('')
for name, is_dir, size in items: for name, is_dir, size in items:
if name == "settings.json":
continue
conn.delete(name) conn.delete(name)
print("Erase complete.", file=sys.stderr) print("Erase complete (device root is empty).", file=sys.stderr)
except Exception as e: except Exception as e:
print(f"Error erasing device: {e}", file=sys.stderr) print(f"Error erasing device: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
@@ -594,11 +677,35 @@ Examples:
sys.exit(1) sys.exit(1)
return # follow blocks; when interrupted we're done return # follow blocks; when interrupted we're done
default_name_from_device: Optional[str] = None
if args.reset_device_name:
if args.name is not None:
print(
"Error: use either --name or --reset-device-name, not both.",
file=sys.stderr,
)
sys.exit(1)
try:
print(
f"Reading firmware default device name (STA MAC) on {port}...",
file=sys.stderr,
)
default_name_from_device = firmware_default_device_name(port)
print(
f"Default name will be {default_name_from_device!r}.",
file=sys.stderr,
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Collect all edit parameters # Collect all edit parameters
edits: Dict[str, Any] = {} edits: Dict[str, Any] = {}
if args.name is not None: if args.name is not None:
edits["name"] = args.name edits["name"] = args.name
elif default_name_from_device is not None:
edits["name"] = default_name_from_device
if args.pin is not None: if args.pin is not None:
edits["led_pin"] = args.pin edits["led_pin"] = args.pin
@@ -634,32 +741,42 @@ Examples:
# Clamp into single-byte range; store as int in settings.json # Clamp into single-byte range; store as int in settings.json
edits["id"] = max(0, min(255, args.device_id)) edits["id"] = max(0, min(255, args.device_id))
# 1. Download: get current settings from device settings_work = args.show or bool(edits)
if settings_work:
try: try:
print(f"Downloading settings from {args.port}...", file=sys.stderr) print(f"Downloading settings from {args.port}...", file=sys.stderr)
settings = download_settings(args.port) settings = download_settings(args.port)
print("Settings downloaded successfully.", file=sys.stderr) print("Settings downloaded successfully.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error downloading settings: {e}", file=sys.stderr) print(f"Error downloading settings: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
# If --show only, print and exit (unless presets or settings are being written)
if args.show: if args.show:
print_settings(settings) print_settings(settings)
if not edits and args.preset is None: if not edits and args.preset is None:
return return
# 2. Edit: apply edits to downloaded settings changed_edits: Dict[str, Any] = {}
if edits: for key, value in edits.items():
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) if settings.get(key) != value:
settings.update(edits) changed_edits[key] = value
if edits and not changed_edits:
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
if changed_edits:
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
settings.update(changed_edits)
print_settings(settings) print_settings(settings)
# 3a. Presets file (led-driver presets.json)
if args.preset is not None: if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on" pattern = args.pattern if args.pattern is not None else "on"
try: try:
@@ -680,23 +797,62 @@ Examples:
print("Device will reset.", file=sys.stderr) print("Device will reset.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error uploading presets: {e}", file=sys.stderr) print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
# 3b. Settings upload (resets device) if changed_edits:
if edits:
try: try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr) print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings) upload_settings(args.port, settings)
print("Settings uploaded successfully. Device will reset.", file=sys.stderr) print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error uploading settings: {e}", file=sys.stderr) print(f"Error uploading settings: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
return
if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on"
try:
print(f"Downloading presets from {args.port}...", file=sys.stderr)
presets_data = download_presets(args.port)
entry = presets_data.get(args.preset)
if not isinstance(entry, dict):
entry = {}
entry["p"] = pattern
presets_data[args.preset] = entry
print(
f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...",
file=sys.stderr,
)
upload_presets(args.port, presets_data, reset=True)
print("Presets uploaded successfully.", file=sys.stderr)
print("Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else:
print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1)
return
if ordered_actions:
return
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -23,12 +23,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path) sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError from mpremote.transport import TransportError, TransportExecError
class DeviceConnection: class DeviceConnection:
"""Wrapper for device communication.""" """Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device): def __init__(self, device):
"""Connect to a device.""" """Connect to a device."""
self.device = device self.device = device
@@ -54,6 +57,38 @@ class DeviceConnection:
pass pass
self.transport = None self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path): def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem.""" """Copy a file from device to local filesystem."""
self.connect() self.connect()
@@ -70,7 +105,7 @@ class DeviceConnection:
try: try:
with open(local_path, 'rb') as f: with open(local_path, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_path, data) self._fs_writefile_with_wdt(remote_path, data)
finally: finally:
self.disconnect() self.disconnect()
@@ -102,7 +137,8 @@ class DeviceConnection:
dirs_created = 0 dirs_created = 0
for root, dirs, files in os.walk(local_dir): for root, dirs, files in os.walk(local_dir):
# Calculate relative path from local_dir # Never upload Python bytecode trees (MicroPython does not use them).
dirs[:] = [d for d in dirs if d != "__pycache__"]
rel_path = os.path.relpath(root, local_dir) rel_path = os.path.relpath(root, local_dir)
# Build remote path # Build remote path
@@ -121,8 +157,10 @@ class DeviceConnection:
self.transport.fs_mkdir(remote_base) self.transport.fs_mkdir(remote_base)
dirs_created += 1 dirs_created += 1
# Copy files # Copy files (skip bytecode; __pycache__ dirs are pruned above)
for file in files: for file in files:
if file.endswith((".pyc", ".pyo")):
continue
local_file = os.path.join(root, file) local_file = os.path.join(root, file)
# Handle root directory case properly # Handle root directory case properly
if remote_base == '/': if remote_base == '/':
@@ -133,7 +171,7 @@ class DeviceConnection:
print(f"Uploading: {remote_file}", file=sys.stderr) print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f: with open(local_file, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_file, data) self._fs_writefile_with_wdt(remote_file, data)
files_copied += 1 files_copied += 1
return files_copied, dirs_created return files_copied, dirs_created
@@ -235,6 +273,37 @@ class DeviceConnection:
raise TransportError(f"Failed to follow output: {e}") from e raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path): def copy_file(from_device, device, remote_path, local_path):
""" """
Copy a file to/from device. Copy a file to/from device.