Compare commits

...

3 Commits

Author SHA1 Message Date
580fd11aca fix(cli): skip redundant settings uploads when unchanged
Only upload settings when edited values differ from current device settings to avoid unnecessary resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:48:54 +12:00
pi
d6331a105c feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
2f3db9272b feat(cli): extend upload flags for src, patterns, lib, and --all
Support --patterns/--paterns, --all, --erase, and src upload excluding patterns/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:13 +12:00
3 changed files with 192 additions and 16 deletions

View File

@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) | | `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
| `-s`, `--show` | Print current settings from the device | | `-s`, `--show` | Print current settings from the device |
| `-n`, `--name` | Device **name** | | `-n`, `--name` | Device **name** |
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
| `--id` | Numeric device id (ESP-NOW, 0255) | | `--id` | Numeric device id (ESP-NOW, 0255) |
| `--pin` | LED GPIO (`led_pin`) | | `--pin` | LED GPIO (`led_pin`) |
| `-b`, `--brightness` | Brightness 0255 | | `-b`, `--brightness` | Brightness 0255 |

131
cli.py
View File

@@ -11,11 +11,12 @@ import argparse
import subprocess import subprocess
import sys import sys
import time import time
import shutil
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import tempfile import tempfile
import os import os
from device import copy_file, DeviceConnection from device import copy_file, DeviceConnection, firmware_default_device_name
def resolve_flash_binary(path: str) -> Optional[str]: def resolve_flash_binary(path: str) -> Optional[str]:
@@ -147,7 +148,7 @@ _FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--pattern', '--default', '--transport', '--ssid', '--preset', '--pattern', '--default', '--transport', '--ssid',
'--wifi-password', '--wifi-channel', '--wifi-password', '--wifi-channel', '--src', '--lib', '--patterns', '--paterns',
}) })
@@ -201,8 +202,24 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
i += 2 i += 2
else: else:
i += 1 i += 1
# Use empty string as remote_dir to map to root on device # Upload source tree excluding patterns/ (handled by --patterns).
actions.append(('upload', [local_dir, ""])) actions.append(('upload_src_no_patterns', local_dir))
continue
if arg in ('--patterns', '--paterns'):
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
local_dir = os.path.join("src", "patterns")
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "patterns"]))
continue
if arg == '--all':
actions.append(('upload_src_no_patterns', "src"))
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
actions.append(('upload', ["lib", "lib"]))
i += 1
continue continue
if arg == '--lib': if arg == '--lib':
# Upload local DIR (default: ./lib) to /lib on device # Upload local DIR (default: ./lib) to /lib on device
@@ -222,6 +239,10 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
actions.append(('erase_all', None)) actions.append(('erase_all', None))
i += 1 i += 1
continue continue
if arg == '--erase':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm': if arg == '--rm':
if i + 1 < len(argv): if i + 1 < len(argv):
actions.append(('rm', argv[i + 1])) actions.append(('rm', argv[i + 1]))
@@ -288,6 +309,9 @@ Examples:
# Set name, num_leds, default pattern, and upload # Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow %(prog)s --name "MyStrip" -l 60 --default rainbow
# Reset logical device name to firmware default (STA MAC based)
%(prog)s --reset-device-name
""" """
) )
@@ -302,6 +326,12 @@ Examples:
help="Device name" help="Device name"
) )
parser.add_argument(
"--reset-device-name",
action="store_true",
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
)
parser.add_argument( parser.add_argument(
"--id", "--id",
dest="device_id", dest="device_id",
@@ -434,7 +464,7 @@ Examples:
nargs="?", nargs="?",
const="src", const="src",
metavar="DIR", metavar="DIR",
help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src." help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
) )
parser.add_argument( parser.add_argument(
@@ -445,6 +475,21 @@ Examples:
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib." help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
) )
parser.add_argument(
"--all",
action="store_true",
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
)
parser.add_argument(
"--patterns", "--paterns",
dest="patterns_dir",
nargs="?",
const=os.path.join("src", "patterns"),
metavar="DIR",
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
)
parser.add_argument( parser.add_argument(
"--ls", "--ls",
action="store_true", action="store_true",
@@ -452,7 +497,7 @@ Examples:
) )
parser.add_argument( parser.add_argument(
"-e", "-e", "--erase",
dest="erase_all", dest="erase_all",
action="store_true", action="store_true",
help="Erase all code on the device (delete all files except settings.json)" help="Erase all code on the device (delete all files except settings.json)"
@@ -539,6 +584,38 @@ Examples:
except Exception as e: except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr) print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
elif action_name == 'upload_src_no_patterns':
src_dir = value
if not os.path.exists(src_dir):
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(src_dir):
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
sys.exit(1)
try:
with tempfile.TemporaryDirectory() as temp_src:
for entry in sorted(os.listdir(src_dir)):
if entry == "patterns":
continue
src_entry = os.path.join(src_dir, entry)
dst_entry = os.path.join(temp_src, entry)
if os.path.isdir(src_entry):
shutil.copytree(src_entry, dst_entry)
else:
shutil.copy2(src_entry, dst_entry)
print(
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
file=sys.stderr,
)
conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(temp_src, "")
print(
f"Upload complete: {files_copied} files, {dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e:
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'ls': elif action_name == 'ls':
try: try:
@@ -594,11 +671,35 @@ Examples:
sys.exit(1) sys.exit(1)
return # follow blocks; when interrupted we're done return # follow blocks; when interrupted we're done
default_name_from_device: Optional[str] = None
if args.reset_device_name:
if args.name is not None:
print(
"Error: use either --name or --reset-device-name, not both.",
file=sys.stderr,
)
sys.exit(1)
try:
print(
f"Reading firmware default device name (STA MAC) on {port}...",
file=sys.stderr,
)
default_name_from_device = firmware_default_device_name(port)
print(
f"Default name will be {default_name_from_device!r}.",
file=sys.stderr,
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Collect all edit parameters # Collect all edit parameters
edits: Dict[str, Any] = {} edits: Dict[str, Any] = {}
if args.name is not None: if args.name is not None:
edits["name"] = args.name edits["name"] = args.name
elif default_name_from_device is not None:
edits["name"] = default_name_from_device
if args.pin is not None: if args.pin is not None:
edits["led_pin"] = args.pin edits["led_pin"] = args.pin
@@ -652,10 +753,18 @@ Examples:
if not edits and args.preset is None: if not edits and args.preset is None:
return return
# 2. Edit: apply edits to downloaded settings # 2. Edit: only apply/upload settings when values actually change
if edits: changed_edits: Dict[str, Any] = {}
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) for key, value in edits.items():
settings.update(edits) if settings.get(key) != value:
changed_edits[key] = value
if edits and not changed_edits:
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
if changed_edits:
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
settings.update(changed_edits)
print_settings(settings) print_settings(settings)
@@ -686,7 +795,7 @@ Examples:
sys.exit(1) sys.exit(1)
# 3b. Settings upload (resets device) # 3b. Settings upload (resets device)
if edits: if changed_edits:
try: try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr) print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings) upload_settings(args.port, settings)

View File

@@ -23,12 +23,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path) sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError from mpremote.transport import TransportError, TransportExecError
class DeviceConnection: class DeviceConnection:
"""Wrapper for device communication.""" """Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device): def __init__(self, device):
"""Connect to a device.""" """Connect to a device."""
self.device = device self.device = device
@@ -54,6 +57,38 @@ class DeviceConnection:
pass pass
self.transport = None self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path): def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem.""" """Copy a file from device to local filesystem."""
self.connect() self.connect()
@@ -70,7 +105,7 @@ class DeviceConnection:
try: try:
with open(local_path, 'rb') as f: with open(local_path, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_path, data) self._fs_writefile_with_wdt(remote_path, data)
finally: finally:
self.disconnect() self.disconnect()
@@ -133,7 +168,7 @@ class DeviceConnection:
print(f"Uploading: {remote_file}", file=sys.stderr) print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f: with open(local_file, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_file, data) self._fs_writefile_with_wdt(remote_file, data)
files_copied += 1 files_copied += 1
return files_copied, dirs_created return files_copied, dirs_created
@@ -235,6 +270,37 @@ class DeviceConnection:
raise TransportError(f"Failed to follow output: {e}") from e raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path): def copy_file(from_device, device, remote_path, local_path):
""" """
Copy a file to/from device. Copy a file to/from device.