Compare commits

...

3 Commits

Author SHA1 Message Date
580fd11aca fix(cli): skip redundant settings uploads when unchanged
Only upload settings when edited values differ from current device settings to avoid unnecessary resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:48:54 +12:00
pi
d6331a105c feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
2f3db9272b feat(cli): extend upload flags for src, patterns, lib, and --all
Support --patterns/--paterns, --all, --erase, and src upload excluding patterns/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:13 +12:00
3 changed files with 192 additions and 16 deletions

View File

@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
| `-s`, `--show` | Print current settings from the device |
| `-n`, `--name` | Device **name** |
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
| `--id` | Numeric device id (ESP-NOW, 0255) |
| `--pin` | LED GPIO (`led_pin`) |
| `-b`, `--brightness` | Brightness 0255 |

131
cli.py
View File

@@ -11,11 +11,12 @@ import argparse
import subprocess
import sys
import time
import shutil
from typing import Dict, Any, List, Optional
import tempfile
import os
from device import copy_file, DeviceConnection
from device import copy_file, DeviceConnection, firmware_default_device_name
def resolve_flash_binary(path: str) -> Optional[str]:
@@ -147,7 +148,7 @@ _FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--pattern', '--default', '--transport', '--ssid',
'--wifi-password', '--wifi-channel',
'--wifi-password', '--wifi-channel', '--src', '--lib', '--patterns', '--paterns',
})
@@ -201,8 +202,24 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
i += 2
else:
i += 1
# Use empty string as remote_dir to map to root on device
actions.append(('upload', [local_dir, ""]))
# Upload source tree excluding patterns/ (handled by --patterns).
actions.append(('upload_src_no_patterns', local_dir))
continue
if arg in ('--patterns', '--paterns'):
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
local_dir = os.path.join("src", "patterns")
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "patterns"]))
continue
if arg == '--all':
actions.append(('upload_src_no_patterns', "src"))
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
actions.append(('upload', ["lib", "lib"]))
i += 1
continue
if arg == '--lib':
# Upload local DIR (default: ./lib) to /lib on device
@@ -222,6 +239,10 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
actions.append(('erase_all', None))
i += 1
continue
if arg == '--erase':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm':
if i + 1 < len(argv):
actions.append(('rm', argv[i + 1]))
@@ -288,6 +309,9 @@ Examples:
# Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow
# Reset logical device name to firmware default (STA MAC based)
%(prog)s --reset-device-name
"""
)
@@ -302,6 +326,12 @@ Examples:
help="Device name"
)
parser.add_argument(
"--reset-device-name",
action="store_true",
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
)
parser.add_argument(
"--id",
dest="device_id",
@@ -434,7 +464,7 @@ Examples:
nargs="?",
const="src",
metavar="DIR",
help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src."
help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
)
parser.add_argument(
@@ -445,6 +475,21 @@ Examples:
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
)
parser.add_argument(
"--all",
action="store_true",
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
)
parser.add_argument(
"--patterns", "--paterns",
dest="patterns_dir",
nargs="?",
const=os.path.join("src", "patterns"),
metavar="DIR",
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
)
parser.add_argument(
"--ls",
action="store_true",
@@ -452,7 +497,7 @@ Examples:
)
parser.add_argument(
"-e",
"-e", "--erase",
dest="erase_all",
action="store_true",
help="Erase all code on the device (delete all files except settings.json)"
@@ -539,6 +584,38 @@ Examples:
except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'upload_src_no_patterns':
src_dir = value
if not os.path.exists(src_dir):
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(src_dir):
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
sys.exit(1)
try:
with tempfile.TemporaryDirectory() as temp_src:
for entry in sorted(os.listdir(src_dir)):
if entry == "patterns":
continue
src_entry = os.path.join(src_dir, entry)
dst_entry = os.path.join(temp_src, entry)
if os.path.isdir(src_entry):
shutil.copytree(src_entry, dst_entry)
else:
shutil.copy2(src_entry, dst_entry)
print(
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
file=sys.stderr,
)
conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(temp_src, "")
print(
f"Upload complete: {files_copied} files, {dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e:
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'ls':
try:
@@ -594,11 +671,35 @@ Examples:
sys.exit(1)
return # follow blocks; when interrupted we're done
default_name_from_device: Optional[str] = None
if args.reset_device_name:
if args.name is not None:
print(
"Error: use either --name or --reset-device-name, not both.",
file=sys.stderr,
)
sys.exit(1)
try:
print(
f"Reading firmware default device name (STA MAC) on {port}...",
file=sys.stderr,
)
default_name_from_device = firmware_default_device_name(port)
print(
f"Default name will be {default_name_from_device!r}.",
file=sys.stderr,
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Collect all edit parameters
edits: Dict[str, Any] = {}
if args.name is not None:
edits["name"] = args.name
elif default_name_from_device is not None:
edits["name"] = default_name_from_device
if args.pin is not None:
edits["led_pin"] = args.pin
@@ -652,10 +753,18 @@ Examples:
if not edits and args.preset is None:
return
# 2. Edit: apply edits to downloaded settings
if edits:
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr)
settings.update(edits)
# 2. Edit: only apply/upload settings when values actually change
changed_edits: Dict[str, Any] = {}
for key, value in edits.items():
if settings.get(key) != value:
changed_edits[key] = value
if edits and not changed_edits:
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
if changed_edits:
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
settings.update(changed_edits)
print_settings(settings)
@@ -686,7 +795,7 @@ Examples:
sys.exit(1)
# 3b. Settings upload (resets device)
if edits:
if changed_edits:
try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings)

View File

@@ -23,12 +23,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError
from mpremote.transport import TransportError, TransportExecError
class DeviceConnection:
"""Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device):
"""Connect to a device."""
self.device = device
@@ -53,7 +56,39 @@ class DeviceConnection:
except Exception:
pass
self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem."""
self.connect()
@@ -70,7 +105,7 @@ class DeviceConnection:
try:
with open(local_path, 'rb') as f:
data = f.read()
self.transport.fs_writefile(remote_path, data)
self._fs_writefile_with_wdt(remote_path, data)
finally:
self.disconnect()
@@ -133,7 +168,7 @@ class DeviceConnection:
print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f:
data = f.read()
self.transport.fs_writefile(remote_file, data)
self._fs_writefile_with_wdt(remote_file, data)
files_copied += 1
return files_copied, dirs_created
@@ -235,6 +270,37 @@ class DeviceConnection:
raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path):
"""
Copy a file to/from device.