feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
|
|||||||
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
|
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
|
||||||
| `-s`, `--show` | Print current settings from the device |
|
| `-s`, `--show` | Print current settings from the device |
|
||||||
| `-n`, `--name` | Device **name** |
|
| `-n`, `--name` | Device **name** |
|
||||||
|
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
|
||||||
| `--id` | Numeric device id (ESP-NOW, 0–255) |
|
| `--id` | Numeric device id (ESP-NOW, 0–255) |
|
||||||
| `--pin` | LED GPIO (`led_pin`) |
|
| `--pin` | LED GPIO (`led_pin`) |
|
||||||
| `-b`, `--brightness` | Brightness 0–255 |
|
| `-b`, `--brightness` | Brightness 0–255 |
|
||||||
|
|||||||
35
cli.py
35
cli.py
@@ -16,7 +16,7 @@ from typing import Dict, Any, List, Optional
|
|||||||
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
from device import copy_file, DeviceConnection
|
from device import copy_file, DeviceConnection, firmware_default_device_name
|
||||||
|
|
||||||
|
|
||||||
def resolve_flash_binary(path: str) -> Optional[str]:
|
def resolve_flash_binary(path: str) -> Optional[str]:
|
||||||
@@ -309,6 +309,9 @@ Examples:
|
|||||||
|
|
||||||
# Set name, num_leds, default pattern, and upload
|
# Set name, num_leds, default pattern, and upload
|
||||||
%(prog)s --name "MyStrip" -l 60 --default rainbow
|
%(prog)s --name "MyStrip" -l 60 --default rainbow
|
||||||
|
|
||||||
|
# Reset logical device name to firmware default (STA MAC based)
|
||||||
|
%(prog)s --reset-device-name
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -323,6 +326,12 @@ Examples:
|
|||||||
help="Device name"
|
help="Device name"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--reset-device-name",
|
||||||
|
action="store_true",
|
||||||
|
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--id",
|
"--id",
|
||||||
dest="device_id",
|
dest="device_id",
|
||||||
@@ -662,11 +671,35 @@ Examples:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
return # follow blocks; when interrupted we're done
|
return # follow blocks; when interrupted we're done
|
||||||
|
|
||||||
|
default_name_from_device: Optional[str] = None
|
||||||
|
if args.reset_device_name:
|
||||||
|
if args.name is not None:
|
||||||
|
print(
|
||||||
|
"Error: use either --name or --reset-device-name, not both.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
print(
|
||||||
|
f"Reading firmware default device name (STA MAC) on {port}...",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
default_name_from_device = firmware_default_device_name(port)
|
||||||
|
print(
|
||||||
|
f"Default name will be {default_name_from_device!r}.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# Collect all edit parameters
|
# Collect all edit parameters
|
||||||
edits: Dict[str, Any] = {}
|
edits: Dict[str, Any] = {}
|
||||||
|
|
||||||
if args.name is not None:
|
if args.name is not None:
|
||||||
edits["name"] = args.name
|
edits["name"] = args.name
|
||||||
|
elif default_name_from_device is not None:
|
||||||
|
edits["name"] = default_name_from_device
|
||||||
|
|
||||||
if args.pin is not None:
|
if args.pin is not None:
|
||||||
edits["led_pin"] = args.pin
|
edits["led_pin"] = args.pin
|
||||||
|
|||||||
76
device.py
76
device.py
@@ -23,12 +23,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
|
|||||||
sys.path.insert(0, lib_path)
|
sys.path.insert(0, lib_path)
|
||||||
|
|
||||||
from mpremote.transport_serial import SerialTransport
|
from mpremote.transport_serial import SerialTransport
|
||||||
from mpremote.transport import TransportError
|
from mpremote.transport import TransportError, TransportExecError
|
||||||
|
|
||||||
|
|
||||||
class DeviceConnection:
|
class DeviceConnection:
|
||||||
"""Wrapper for device communication."""
|
"""Wrapper for device communication."""
|
||||||
|
|
||||||
|
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
|
||||||
|
WDT_FEED_INTERVAL_SEC = 5.0
|
||||||
|
|
||||||
def __init__(self, device):
|
def __init__(self, device):
|
||||||
"""Connect to a device."""
|
"""Connect to a device."""
|
||||||
self.device = device
|
self.device = device
|
||||||
@@ -53,7 +56,39 @@ class DeviceConnection:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
self.transport = None
|
self.transport = None
|
||||||
|
|
||||||
|
def _feed_wdt(self) -> None:
|
||||||
|
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
|
||||||
|
if self.transport is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.transport.exec(
|
||||||
|
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _make_wdt_upload_progress_callback(self):
|
||||||
|
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
|
||||||
|
last_feed = [time.monotonic()]
|
||||||
|
|
||||||
|
def progress(written: int, total: int) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
|
||||||
|
self._feed_wdt()
|
||||||
|
last_feed[0] = now
|
||||||
|
|
||||||
|
return progress
|
||||||
|
|
||||||
|
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
|
||||||
|
"""Write file to device with periodic WDT feeds during long transfers."""
|
||||||
|
self._feed_wdt()
|
||||||
|
self.transport.fs_writefile(
|
||||||
|
remote_path,
|
||||||
|
data,
|
||||||
|
progress_callback=self._make_wdt_upload_progress_callback(),
|
||||||
|
)
|
||||||
|
|
||||||
def copy_from_device(self, remote_path, local_path):
|
def copy_from_device(self, remote_path, local_path):
|
||||||
"""Copy a file from device to local filesystem."""
|
"""Copy a file from device to local filesystem."""
|
||||||
self.connect()
|
self.connect()
|
||||||
@@ -70,7 +105,7 @@ class DeviceConnection:
|
|||||||
try:
|
try:
|
||||||
with open(local_path, 'rb') as f:
|
with open(local_path, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.transport.fs_writefile(remote_path, data)
|
self._fs_writefile_with_wdt(remote_path, data)
|
||||||
finally:
|
finally:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
@@ -133,7 +168,7 @@ class DeviceConnection:
|
|||||||
print(f"Uploading: {remote_file}", file=sys.stderr)
|
print(f"Uploading: {remote_file}", file=sys.stderr)
|
||||||
with open(local_file, 'rb') as f:
|
with open(local_file, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.transport.fs_writefile(remote_file, data)
|
self._fs_writefile_with_wdt(remote_file, data)
|
||||||
files_copied += 1
|
files_copied += 1
|
||||||
|
|
||||||
return files_copied, dirs_created
|
return files_copied, dirs_created
|
||||||
@@ -235,6 +270,37 @@ class DeviceConnection:
|
|||||||
raise TransportError(f"Failed to follow output: {e}") from e
|
raise TransportError(f"Failed to follow output: {e}") from e
|
||||||
|
|
||||||
|
|
||||||
|
def firmware_default_device_name(device: str) -> str:
|
||||||
|
"""
|
||||||
|
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
|
||||||
|
matching Settings.set_defaults() in led-driver/src/settings.py.
|
||||||
|
"""
|
||||||
|
conn = DeviceConnection(device)
|
||||||
|
conn.connect()
|
||||||
|
try:
|
||||||
|
code = (
|
||||||
|
"import network, ubinascii\n"
|
||||||
|
"sta = network.WLAN(network.STA_IF)\n"
|
||||||
|
"sta.active(True)\n"
|
||||||
|
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
|
||||||
|
"print('led-' + mac)\n"
|
||||||
|
)
|
||||||
|
out = conn.transport.exec(code)
|
||||||
|
except TransportExecError as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Could not read STA MAC from device (is this MicroPython with network?): "
|
||||||
|
+ (e.error_output or str(e)).strip()
|
||||||
|
) from e
|
||||||
|
finally:
|
||||||
|
conn.disconnect()
|
||||||
|
text = out.decode("utf-8", errors="replace").strip()
|
||||||
|
for line in reversed(text.splitlines()):
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith("led-"):
|
||||||
|
return line
|
||||||
|
raise RuntimeError("Device did not return a led-<mac> default name")
|
||||||
|
|
||||||
|
|
||||||
def copy_file(from_device, device, remote_path, local_path):
|
def copy_file(from_device, device, remote_path, local_path):
|
||||||
"""
|
"""
|
||||||
Copy a file to/from device.
|
Copy a file to/from device.
|
||||||
|
|||||||
Reference in New Issue
Block a user