Compare commits
5 Commits
713cd6e9a1
...
1edcb8b1f7
| Author | SHA1 | Date | |
|---|---|---|---|
| 1edcb8b1f7 | |||
| ccc215acbd | |||
| 580fd11aca | |||
|
|
d6331a105c | ||
| 2f3db9272b |
10
README.md
10
README.md
@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
|
|||||||
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
|
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
|
||||||
| `-s`, `--show` | Print current settings from the device |
|
| `-s`, `--show` | Print current settings from the device |
|
||||||
| `-n`, `--name` | Device **name** |
|
| `-n`, `--name` | Device **name** |
|
||||||
|
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
|
||||||
| `--id` | Numeric device id (ESP-NOW, 0–255) |
|
| `--id` | Numeric device id (ESP-NOW, 0–255) |
|
||||||
| `--pin` | LED GPIO (`led_pin`) |
|
| `--pin` | LED GPIO (`led_pin`) |
|
||||||
| `-b`, `--brightness` | Brightness 0–255 |
|
| `-b`, `--brightness` | Brightness 0–255 |
|
||||||
@@ -24,13 +25,14 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
|
|||||||
| `-r`, `--reset` | Reset the device |
|
| `-r`, `--reset` | Reset the device |
|
||||||
| `-f`, `--follow` | Follow serial output (optional timeout seconds) |
|
| `-f`, `--follow` | Follow serial output (optional timeout seconds) |
|
||||||
| `--pause` | Sleep N seconds (for chained actions) |
|
| `--pause` | Sleep N seconds (for chained actions) |
|
||||||
| `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` |
|
| `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` (skips unchanged files via `file_hashes.json` on device) |
|
||||||
| `--src`, `--lib` | Upload `src/` or a tree to `/lib` |
|
| `--src`, `--lib`, `--all` | Deploy led-driver trees to flash root, `patterns/`, and `lib/` |
|
||||||
| `-e` | Erase device code (keeps `settings.json`) |
|
| `--force-upload` | Upload every file; ignore `file_hashes.json` |
|
||||||
|
| `-e`, `--erase` | Erase everything at device root (including `settings.json` and `presets.json`) |
|
||||||
| `--rm` | Remove a path on the device |
|
| `--rm` | Remove a path on the device |
|
||||||
| `--flash` | Flash a firmware binary (uses **esptool** on the host) |
|
| `--flash` | Flash a firmware binary (uses **esptool** on the host) |
|
||||||
|
|
||||||
**Default behaviour:** the tool always downloads `settings.json`, prints the merged view, and uploads again **only** when you pass setting edits (`--name`, `--leds`, …), **`--preset`**, or the relevant upload/flash/erase actions in order.
|
**Settings I/O:** With no arguments, **`led-cli`** prints help (no device access). **`--show`** downloads `settings.json` and prints it (read-only). Setting fields (`--name`, `-b`, …) download once, merge, print, and **upload only when a value actually changed**. **`--preset`** alone only touches **`presets.json`** (no `settings.json` download). Ordered actions (`--reset`, `--upload`, `-e`, …) run without touching settings unless you also pass **`--show`** or setting / preset edits as above.
|
||||||
|
|
||||||
Run **`python cli.py -h`** for the full epilog and argument list.
|
Run **`python cli.py -h`** for the full epilog and argument list.
|
||||||
|
|
||||||
|
|||||||
233
cli.py
233
cli.py
@@ -2,8 +2,10 @@
|
|||||||
"""
|
"""
|
||||||
LED Bar Configuration CLI Tool
|
LED Bar Configuration CLI Tool
|
||||||
|
|
||||||
Command-line interface for downloading, editing, and uploading settings.json
|
Command-line interface for editing settings.json on MicroPython devices via mpremote.
|
||||||
to/from MicroPython devices via mpremote.
|
Settings are downloaded from the device only when displaying (--show) or when applying
|
||||||
|
setting changes; uploads occur only when something changed. Use --erase to clear the
|
||||||
|
device filesystem (including settings).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@@ -11,11 +13,12 @@ import argparse
|
|||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import shutil
|
||||||
from typing import Dict, Any, List, Optional
|
from typing import Dict, Any, List, Optional
|
||||||
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
from device import copy_file, DeviceConnection
|
from device import copy_file, DeviceConnection, firmware_default_device_name
|
||||||
|
|
||||||
|
|
||||||
def resolve_flash_binary(path: str) -> Optional[str]:
|
def resolve_flash_binary(path: str) -> Optional[str]:
|
||||||
@@ -147,7 +150,7 @@ _FLAGS_WITH_VALUE = frozenset({
|
|||||||
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
|
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
|
||||||
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
|
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
|
||||||
'--preset', '--pattern', '--default', '--transport', '--ssid',
|
'--preset', '--pattern', '--default', '--transport', '--ssid',
|
||||||
'--wifi-password', '--wifi-channel',
|
'--wifi-password', '--wifi-channel', '--src', '--lib', '--patterns', '--paterns',
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@@ -201,8 +204,28 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
|
|||||||
i += 2
|
i += 2
|
||||||
else:
|
else:
|
||||||
i += 1
|
i += 1
|
||||||
# Use empty string as remote_dir to map to root on device
|
# Upload source tree excluding patterns/ (handled by --patterns).
|
||||||
actions.append(('upload', [local_dir, ""]))
|
actions.append(('upload_src_no_patterns', local_dir))
|
||||||
|
continue
|
||||||
|
if arg in ('--patterns', '--paterns'):
|
||||||
|
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
|
||||||
|
local_dir = os.path.join("src", "patterns")
|
||||||
|
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
|
||||||
|
local_dir = argv[i + 1]
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
i += 1
|
||||||
|
actions.append(('upload', [local_dir, "patterns"]))
|
||||||
|
continue
|
||||||
|
if arg == '--all':
|
||||||
|
actions.append(('upload_src_no_patterns', "src"))
|
||||||
|
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
|
||||||
|
actions.append(('upload', ["lib", "lib"]))
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
if arg == '--force-upload':
|
||||||
|
# Handled via argparse; skip during action scan
|
||||||
|
i += 1
|
||||||
continue
|
continue
|
||||||
if arg == '--lib':
|
if arg == '--lib':
|
||||||
# Upload local DIR (default: ./lib) to /lib on device
|
# Upload local DIR (default: ./lib) to /lib on device
|
||||||
@@ -222,6 +245,10 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
|
|||||||
actions.append(('erase_all', None))
|
actions.append(('erase_all', None))
|
||||||
i += 1
|
i += 1
|
||||||
continue
|
continue
|
||||||
|
if arg == '--erase':
|
||||||
|
actions.append(('erase_all', None))
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
if arg == '--rm':
|
if arg == '--rm':
|
||||||
if i + 1 < len(argv):
|
if i + 1 < len(argv):
|
||||||
actions.append(('rm', argv[i + 1]))
|
actions.append(('rm', argv[i + 1]))
|
||||||
@@ -258,10 +285,13 @@ def main() -> None:
|
|||||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
epilog="""
|
epilog="""
|
||||||
Examples:
|
Examples:
|
||||||
# Download and print current settings
|
# Show help (no device I/O)
|
||||||
%(prog)s
|
%(prog)s
|
||||||
|
|
||||||
# Download, edit device name and brightness, then upload
|
# Show current settings.json from device (read-only)
|
||||||
|
%(prog)s --show
|
||||||
|
|
||||||
|
# Edit device name and brightness, then upload only if values changed
|
||||||
%(prog)s -n "LED-Strip-1" -b 128
|
%(prog)s -n "LED-Strip-1" -b 128
|
||||||
|
|
||||||
# Set multiple parameters
|
# Set multiple parameters
|
||||||
@@ -288,6 +318,9 @@ Examples:
|
|||||||
|
|
||||||
# Set name, num_leds, default pattern, and upload
|
# Set name, num_leds, default pattern, and upload
|
||||||
%(prog)s --name "MyStrip" -l 60 --default rainbow
|
%(prog)s --name "MyStrip" -l 60 --default rainbow
|
||||||
|
|
||||||
|
# Reset logical device name to firmware default (STA MAC based)
|
||||||
|
%(prog)s --reset-device-name
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -302,6 +335,12 @@ Examples:
|
|||||||
help="Device name"
|
help="Device name"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--reset-device-name",
|
||||||
|
action="store_true",
|
||||||
|
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--id",
|
"--id",
|
||||||
dest="device_id",
|
dest="device_id",
|
||||||
@@ -434,7 +473,7 @@ Examples:
|
|||||||
nargs="?",
|
nargs="?",
|
||||||
const="src",
|
const="src",
|
||||||
metavar="DIR",
|
metavar="DIR",
|
||||||
help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src."
|
help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -445,6 +484,27 @@ Examples:
|
|||||||
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
|
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--all",
|
||||||
|
action="store_true",
|
||||||
|
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--force-upload",
|
||||||
|
action="store_true",
|
||||||
|
help="Upload every file; ignore file_hashes.json on the device",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--patterns", "--paterns",
|
||||||
|
dest="patterns_dir",
|
||||||
|
nargs="?",
|
||||||
|
const=os.path.join("src", "patterns"),
|
||||||
|
metavar="DIR",
|
||||||
|
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--ls",
|
"--ls",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
@@ -452,10 +512,10 @@ Examples:
|
|||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-e",
|
"-e", "--erase",
|
||||||
dest="erase_all",
|
dest="erase_all",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="Erase all code on the device (delete all files except settings.json)"
|
help="Erase the device filesystem: delete all files and directories at device root (including settings.json and presets.json)",
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -471,6 +531,9 @@ Examples:
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
if len(sys.argv) <= 1:
|
||||||
|
parser.print_help()
|
||||||
|
return
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
ordered_actions = _get_ordered_actions(sys.argv)
|
ordered_actions = _get_ordered_actions(sys.argv)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
@@ -534,11 +597,52 @@ Examples:
|
|||||||
else:
|
else:
|
||||||
print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr)
|
print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr)
|
||||||
conn = DeviceConnection(port)
|
conn = DeviceConnection(port)
|
||||||
files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir)
|
files_copied, dirs_created, files_skipped = conn.upload_directory(
|
||||||
print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr)
|
upload_dir, remote_dir, force=args.force_upload
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
|
||||||
|
f"{dirs_created} directories created.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error uploading directory: {e}", file=sys.stderr)
|
print(f"Error uploading directory: {e}", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
elif action_name == 'upload_src_no_patterns':
|
||||||
|
src_dir = value
|
||||||
|
if not os.path.exists(src_dir):
|
||||||
|
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
if not os.path.isdir(src_dir):
|
||||||
|
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
with tempfile.TemporaryDirectory() as temp_src:
|
||||||
|
for entry in sorted(os.listdir(src_dir)):
|
||||||
|
if entry == "patterns":
|
||||||
|
continue
|
||||||
|
src_entry = os.path.join(src_dir, entry)
|
||||||
|
dst_entry = os.path.join(temp_src, entry)
|
||||||
|
if os.path.isdir(src_entry):
|
||||||
|
shutil.copytree(src_entry, dst_entry)
|
||||||
|
else:
|
||||||
|
shutil.copy2(src_entry, dst_entry)
|
||||||
|
print(
|
||||||
|
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
conn = DeviceConnection(port)
|
||||||
|
files_copied, dirs_created, files_skipped = conn.upload_directory(
|
||||||
|
temp_src, "", force=args.force_upload
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
|
||||||
|
f"{dirs_created} directories created.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
elif action_name == 'ls':
|
elif action_name == 'ls':
|
||||||
try:
|
try:
|
||||||
@@ -554,14 +658,12 @@ Examples:
|
|||||||
|
|
||||||
elif action_name == 'erase_all':
|
elif action_name == 'erase_all':
|
||||||
try:
|
try:
|
||||||
print(f"Erasing all code on device {port}...", file=sys.stderr)
|
print(f"Erasing device filesystem on {port}...", file=sys.stderr)
|
||||||
conn = DeviceConnection(port)
|
conn = DeviceConnection(port)
|
||||||
items = conn.list_files('')
|
items = conn.list_files('')
|
||||||
for name, is_dir, size in items:
|
for name, is_dir, size in items:
|
||||||
if name == "settings.json":
|
|
||||||
continue
|
|
||||||
conn.delete(name)
|
conn.delete(name)
|
||||||
print("Erase complete.", file=sys.stderr)
|
print("Erase complete (device root is empty).", file=sys.stderr)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error erasing device: {e}", file=sys.stderr)
|
print(f"Error erasing device: {e}", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@@ -594,11 +696,35 @@ Examples:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
return # follow blocks; when interrupted we're done
|
return # follow blocks; when interrupted we're done
|
||||||
|
|
||||||
|
default_name_from_device: Optional[str] = None
|
||||||
|
if args.reset_device_name:
|
||||||
|
if args.name is not None:
|
||||||
|
print(
|
||||||
|
"Error: use either --name or --reset-device-name, not both.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
try:
|
||||||
|
print(
|
||||||
|
f"Reading firmware default device name (STA MAC) on {port}...",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
default_name_from_device = firmware_default_device_name(port)
|
||||||
|
print(
|
||||||
|
f"Default name will be {default_name_from_device!r}.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# Collect all edit parameters
|
# Collect all edit parameters
|
||||||
edits: Dict[str, Any] = {}
|
edits: Dict[str, Any] = {}
|
||||||
|
|
||||||
if args.name is not None:
|
if args.name is not None:
|
||||||
edits["name"] = args.name
|
edits["name"] = args.name
|
||||||
|
elif default_name_from_device is not None:
|
||||||
|
edits["name"] = default_name_from_device
|
||||||
|
|
||||||
if args.pin is not None:
|
if args.pin is not None:
|
||||||
edits["led_pin"] = args.pin
|
edits["led_pin"] = args.pin
|
||||||
@@ -634,32 +760,42 @@ Examples:
|
|||||||
# Clamp into single-byte range; store as int in settings.json
|
# Clamp into single-byte range; store as int in settings.json
|
||||||
edits["id"] = max(0, min(255, args.device_id))
|
edits["id"] = max(0, min(255, args.device_id))
|
||||||
|
|
||||||
# 1. Download: get current settings from device
|
settings_work = args.show or bool(edits)
|
||||||
|
|
||||||
|
if settings_work:
|
||||||
try:
|
try:
|
||||||
print(f"Downloading settings from {args.port}...", file=sys.stderr)
|
print(f"Downloading settings from {args.port}...", file=sys.stderr)
|
||||||
settings = download_settings(args.port)
|
settings = download_settings(args.port)
|
||||||
print("Settings downloaded successfully.", file=sys.stderr)
|
print("Settings downloaded successfully.", file=sys.stderr)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
||||||
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
|
print(
|
||||||
|
f"Error: Connection timeout. Check device connection on {args.port}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
print(f"Error downloading settings: {e}", file=sys.stderr)
|
print(f"Error downloading settings: {e}", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# If --show only, print and exit (unless presets or settings are being written)
|
|
||||||
if args.show:
|
if args.show:
|
||||||
print_settings(settings)
|
print_settings(settings)
|
||||||
if not edits and args.preset is None:
|
if not edits and args.preset is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2. Edit: apply edits to downloaded settings
|
changed_edits: Dict[str, Any] = {}
|
||||||
if edits:
|
for key, value in edits.items():
|
||||||
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr)
|
if settings.get(key) != value:
|
||||||
settings.update(edits)
|
changed_edits[key] = value
|
||||||
|
|
||||||
|
if edits and not changed_edits:
|
||||||
|
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
|
||||||
|
|
||||||
|
if changed_edits:
|
||||||
|
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
|
||||||
|
settings.update(changed_edits)
|
||||||
|
|
||||||
print_settings(settings)
|
print_settings(settings)
|
||||||
|
|
||||||
# 3a. Presets file (led-driver presets.json)
|
|
||||||
if args.preset is not None:
|
if args.preset is not None:
|
||||||
pattern = args.pattern if args.pattern is not None else "on"
|
pattern = args.pattern if args.pattern is not None else "on"
|
||||||
try:
|
try:
|
||||||
@@ -680,23 +816,62 @@ Examples:
|
|||||||
print("Device will reset.", file=sys.stderr)
|
print("Device will reset.", file=sys.stderr)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
||||||
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
|
print(
|
||||||
|
f"Error: Connection timeout. Check device connection on {args.port}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
print(f"Error uploading presets: {e}", file=sys.stderr)
|
print(f"Error uploading presets: {e}", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# 3b. Settings upload (resets device)
|
if changed_edits:
|
||||||
if edits:
|
|
||||||
try:
|
try:
|
||||||
print(f"\nUploading settings to {args.port}...", file=sys.stderr)
|
print(f"\nUploading settings to {args.port}...", file=sys.stderr)
|
||||||
upload_settings(args.port, settings)
|
upload_settings(args.port, settings)
|
||||||
print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
|
print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
||||||
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
|
print(
|
||||||
|
f"Error: Connection timeout. Check device connection on {args.port}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
print(f"Error uploading settings: {e}", file=sys.stderr)
|
print(f"Error uploading settings: {e}", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
return
|
||||||
|
|
||||||
|
if args.preset is not None:
|
||||||
|
pattern = args.pattern if args.pattern is not None else "on"
|
||||||
|
try:
|
||||||
|
print(f"Downloading presets from {args.port}...", file=sys.stderr)
|
||||||
|
presets_data = download_presets(args.port)
|
||||||
|
entry = presets_data.get(args.preset)
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
entry = {}
|
||||||
|
entry["p"] = pattern
|
||||||
|
presets_data[args.preset] = entry
|
||||||
|
print(
|
||||||
|
f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
upload_presets(args.port, presets_data, reset=True)
|
||||||
|
print("Presets uploaded successfully.", file=sys.stderr)
|
||||||
|
print("Device will reset.", file=sys.stderr)
|
||||||
|
except Exception as e:
|
||||||
|
if "timeout" in str(e).lower() or "connection" in str(e).lower():
|
||||||
|
print(
|
||||||
|
f"Error: Connection timeout. Check device connection on {args.port}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(f"Error uploading presets: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
return
|
||||||
|
|
||||||
|
if ordered_actions:
|
||||||
|
return
|
||||||
|
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
45
deploy_manifest.py
Normal file
45
deploy_manifest.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
"""
|
||||||
|
Host-side helpers for file_hashes.json (same format as led-driver/src/file_hashes.py).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
MANIFEST_VERSION = 1
|
||||||
|
MANIFEST_FILENAME = "file_hashes.json"
|
||||||
|
HASH_ALGO = "sha256"
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_remote_path(remote_file: str) -> str:
|
||||||
|
"""Device-relative path with forward slashes (no leading slash)."""
|
||||||
|
return remote_file.replace("\\", "/").lstrip("/")
|
||||||
|
|
||||||
|
|
||||||
|
def sha256_hex_file(path: str) -> str:
|
||||||
|
h = hashlib.sha256()
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(65536), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def parse_manifest(data: bytes) -> dict:
|
||||||
|
"""Return path -> hash map from manifest bytes."""
|
||||||
|
try:
|
||||||
|
doc = json.loads(data.decode("utf-8"))
|
||||||
|
except (UnicodeDecodeError, json.JSONDecodeError, TypeError, ValueError):
|
||||||
|
return {}
|
||||||
|
if not isinstance(doc, dict):
|
||||||
|
return {}
|
||||||
|
files = doc.get("files")
|
||||||
|
return dict(files) if isinstance(files, dict) else {}
|
||||||
|
|
||||||
|
|
||||||
|
def build_manifest_bytes(files: dict) -> bytes:
|
||||||
|
doc = {
|
||||||
|
"version": MANIFEST_VERSION,
|
||||||
|
"algorithm": HASH_ALGO,
|
||||||
|
"files": files,
|
||||||
|
}
|
||||||
|
return json.dumps(doc, separators=(",", ":")).encode("utf-8")
|
||||||
136
device.py
136
device.py
@@ -10,6 +10,14 @@ import os
|
|||||||
import time
|
import time
|
||||||
import serial
|
import serial
|
||||||
|
|
||||||
|
from deploy_manifest import (
|
||||||
|
MANIFEST_FILENAME,
|
||||||
|
build_manifest_bytes,
|
||||||
|
normalize_remote_path,
|
||||||
|
parse_manifest,
|
||||||
|
sha256_hex_file,
|
||||||
|
)
|
||||||
|
|
||||||
# Add lib directory to path - handle both normal execution and PyInstaller bundle
|
# Add lib directory to path - handle both normal execution and PyInstaller bundle
|
||||||
if getattr(sys, 'frozen', False):
|
if getattr(sys, 'frozen', False):
|
||||||
# Running as a PyInstaller bundle
|
# Running as a PyInstaller bundle
|
||||||
@@ -23,12 +31,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
|
|||||||
sys.path.insert(0, lib_path)
|
sys.path.insert(0, lib_path)
|
||||||
|
|
||||||
from mpremote.transport_serial import SerialTransport
|
from mpremote.transport_serial import SerialTransport
|
||||||
from mpremote.transport import TransportError
|
from mpremote.transport import TransportError, TransportExecError
|
||||||
|
|
||||||
|
|
||||||
class DeviceConnection:
|
class DeviceConnection:
|
||||||
"""Wrapper for device communication."""
|
"""Wrapper for device communication."""
|
||||||
|
|
||||||
|
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
|
||||||
|
WDT_FEED_INTERVAL_SEC = 5.0
|
||||||
|
|
||||||
def __init__(self, device):
|
def __init__(self, device):
|
||||||
"""Connect to a device."""
|
"""Connect to a device."""
|
||||||
self.device = device
|
self.device = device
|
||||||
@@ -54,6 +65,38 @@ class DeviceConnection:
|
|||||||
pass
|
pass
|
||||||
self.transport = None
|
self.transport = None
|
||||||
|
|
||||||
|
def _feed_wdt(self) -> None:
|
||||||
|
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
|
||||||
|
if self.transport is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.transport.exec(
|
||||||
|
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _make_wdt_upload_progress_callback(self):
|
||||||
|
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
|
||||||
|
last_feed = [time.monotonic()]
|
||||||
|
|
||||||
|
def progress(written: int, total: int) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
|
||||||
|
self._feed_wdt()
|
||||||
|
last_feed[0] = now
|
||||||
|
|
||||||
|
return progress
|
||||||
|
|
||||||
|
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
|
||||||
|
"""Write file to device with periodic WDT feeds during long transfers."""
|
||||||
|
self._feed_wdt()
|
||||||
|
self.transport.fs_writefile(
|
||||||
|
remote_path,
|
||||||
|
data,
|
||||||
|
progress_callback=self._make_wdt_upload_progress_callback(),
|
||||||
|
)
|
||||||
|
|
||||||
def copy_from_device(self, remote_path, local_path):
|
def copy_from_device(self, remote_path, local_path):
|
||||||
"""Copy a file from device to local filesystem."""
|
"""Copy a file from device to local filesystem."""
|
||||||
self.connect()
|
self.connect()
|
||||||
@@ -70,17 +113,45 @@ class DeviceConnection:
|
|||||||
try:
|
try:
|
||||||
with open(local_path, 'rb') as f:
|
with open(local_path, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.transport.fs_writefile(remote_path, data)
|
self._fs_writefile_with_wdt(remote_path, data)
|
||||||
finally:
|
finally:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
def upload_directory(self, local_dir, remote_dir=None):
|
def _manifest_remote_path(self) -> str:
|
||||||
|
return "/" + MANIFEST_FILENAME
|
||||||
|
|
||||||
|
def read_hash_manifest(self) -> dict:
|
||||||
|
"""Load path -> sha256 hex map from file_hashes.json on the device."""
|
||||||
|
remote = self._manifest_remote_path()
|
||||||
|
if not self.transport.fs_exists(remote):
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
data = self.transport.fs_readfile(remote)
|
||||||
|
return parse_manifest(data)
|
||||||
|
except Exception:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def write_hash_manifest(self, files: dict) -> None:
|
||||||
|
"""Write merged file_hashes.json to the device root."""
|
||||||
|
self._fs_writefile_with_wdt(
|
||||||
|
self._manifest_remote_path(),
|
||||||
|
build_manifest_bytes(files),
|
||||||
|
)
|
||||||
|
|
||||||
|
def upload_directory(self, local_dir, remote_dir=None, *, force: bool = False):
|
||||||
"""
|
"""
|
||||||
Upload a directory recursively to the device.
|
Upload a directory recursively to the device.
|
||||||
|
|
||||||
|
Skips files whose sha256 matches file_hashes.json on the device unless
|
||||||
|
force is True. Updates the manifest after upload.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
local_dir: Local directory path to upload
|
local_dir: Local directory path to upload
|
||||||
remote_dir: Remote directory path (default: root, uses basename of local_dir)
|
remote_dir: Remote directory path (default: root, uses basename of local_dir)
|
||||||
|
force: Upload every file even when the manifest hash matches
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(files_copied, dirs_created, files_skipped)
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@@ -94,15 +165,19 @@ class DeviceConnection:
|
|||||||
remote_dir = os.path.basename(os.path.abspath(local_dir))
|
remote_dir = os.path.basename(os.path.abspath(local_dir))
|
||||||
|
|
||||||
# Ensure remote directory exists
|
# Ensure remote directory exists
|
||||||
if not self.transport.fs_exists(remote_dir):
|
if remote_dir and not self.transport.fs_exists(remote_dir):
|
||||||
self.transport.fs_mkdir(remote_dir)
|
self.transport.fs_mkdir(remote_dir)
|
||||||
|
|
||||||
|
manifest = {} if force else self.read_hash_manifest()
|
||||||
|
|
||||||
# Walk through local directory and copy files
|
# Walk through local directory and copy files
|
||||||
files_copied = 0
|
files_copied = 0
|
||||||
|
files_skipped = 0
|
||||||
dirs_created = 0
|
dirs_created = 0
|
||||||
|
|
||||||
for root, dirs, files in os.walk(local_dir):
|
for root, dirs, files in os.walk(local_dir):
|
||||||
# Calculate relative path from local_dir
|
# Never upload Python bytecode trees (MicroPython does not use them).
|
||||||
|
dirs[:] = [d for d in dirs if d != "__pycache__"]
|
||||||
rel_path = os.path.relpath(root, local_dir)
|
rel_path = os.path.relpath(root, local_dir)
|
||||||
|
|
||||||
# Build remote path
|
# Build remote path
|
||||||
@@ -121,8 +196,12 @@ class DeviceConnection:
|
|||||||
self.transport.fs_mkdir(remote_base)
|
self.transport.fs_mkdir(remote_base)
|
||||||
dirs_created += 1
|
dirs_created += 1
|
||||||
|
|
||||||
# Copy files
|
# Copy files (skip bytecode; __pycache__ dirs are pruned above)
|
||||||
for file in files:
|
for file in files:
|
||||||
|
if file.endswith((".pyc", ".pyo")):
|
||||||
|
continue
|
||||||
|
if file == MANIFEST_FILENAME:
|
||||||
|
continue
|
||||||
local_file = os.path.join(root, file)
|
local_file = os.path.join(root, file)
|
||||||
# Handle root directory case properly
|
# Handle root directory case properly
|
||||||
if remote_base == '/':
|
if remote_base == '/':
|
||||||
@@ -130,13 +209,23 @@ class DeviceConnection:
|
|||||||
else:
|
else:
|
||||||
remote_file = '/'.join([remote_base, file])
|
remote_file = '/'.join([remote_base, file])
|
||||||
|
|
||||||
|
manifest_key = normalize_remote_path(remote_file)
|
||||||
|
local_hash = sha256_hex_file(local_file)
|
||||||
|
if not force and manifest.get(manifest_key) == local_hash:
|
||||||
|
print(f"Skipping (unchanged): {remote_file}", file=sys.stderr)
|
||||||
|
files_skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
print(f"Uploading: {remote_file}", file=sys.stderr)
|
print(f"Uploading: {remote_file}", file=sys.stderr)
|
||||||
with open(local_file, 'rb') as f:
|
with open(local_file, 'rb') as f:
|
||||||
data = f.read()
|
data = f.read()
|
||||||
self.transport.fs_writefile(remote_file, data)
|
self._fs_writefile_with_wdt(remote_file, data)
|
||||||
|
manifest[manifest_key] = local_hash
|
||||||
files_copied += 1
|
files_copied += 1
|
||||||
|
|
||||||
return files_copied, dirs_created
|
self.write_hash_manifest(manifest)
|
||||||
|
|
||||||
|
return files_copied, dirs_created, files_skipped
|
||||||
finally:
|
finally:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
@@ -235,6 +324,37 @@ class DeviceConnection:
|
|||||||
raise TransportError(f"Failed to follow output: {e}") from e
|
raise TransportError(f"Failed to follow output: {e}") from e
|
||||||
|
|
||||||
|
|
||||||
|
def firmware_default_device_name(device: str) -> str:
|
||||||
|
"""
|
||||||
|
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
|
||||||
|
matching Settings.set_defaults() in led-driver/src/settings.py.
|
||||||
|
"""
|
||||||
|
conn = DeviceConnection(device)
|
||||||
|
conn.connect()
|
||||||
|
try:
|
||||||
|
code = (
|
||||||
|
"import network, ubinascii\n"
|
||||||
|
"sta = network.WLAN(network.STA_IF)\n"
|
||||||
|
"sta.active(True)\n"
|
||||||
|
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
|
||||||
|
"print('led-' + mac)\n"
|
||||||
|
)
|
||||||
|
out = conn.transport.exec(code)
|
||||||
|
except TransportExecError as e:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Could not read STA MAC from device (is this MicroPython with network?): "
|
||||||
|
+ (e.error_output or str(e)).strip()
|
||||||
|
) from e
|
||||||
|
finally:
|
||||||
|
conn.disconnect()
|
||||||
|
text = out.decode("utf-8", errors="replace").strip()
|
||||||
|
for line in reversed(text.splitlines()):
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith("led-"):
|
||||||
|
return line
|
||||||
|
raise RuntimeError("Device did not return a led-<mac> default name")
|
||||||
|
|
||||||
|
|
||||||
def copy_file(from_device, device, remote_path, local_path):
|
def copy_file(from_device, device, remote_path, local_path):
|
||||||
"""
|
"""
|
||||||
Copy a file to/from device.
|
Copy a file to/from device.
|
||||||
|
|||||||
52
tests/test_deploy_manifest.py
Normal file
52
tests/test_deploy_manifest.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
"""Tests for deploy_manifest helpers (host Python)."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from deploy_manifest import (
|
||||||
|
MANIFEST_FILENAME,
|
||||||
|
build_manifest_bytes,
|
||||||
|
normalize_remote_path,
|
||||||
|
parse_manifest,
|
||||||
|
sha256_hex_file,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DeployManifestTests(unittest.TestCase):
|
||||||
|
def test_normalize_remote_path(self):
|
||||||
|
self.assertEqual(normalize_remote_path("/patterns/chase.py"), "patterns/chase.py")
|
||||||
|
self.assertEqual(normalize_remote_path("main.py"), "main.py")
|
||||||
|
|
||||||
|
def test_round_trip_manifest(self):
|
||||||
|
files = {"main.py": "abc", "patterns/x.py": "def"}
|
||||||
|
blob = build_manifest_bytes(files)
|
||||||
|
parsed = parse_manifest(blob)
|
||||||
|
self.assertEqual(parsed, files)
|
||||||
|
doc = json.loads(blob.decode())
|
||||||
|
self.assertEqual(doc["version"], 1)
|
||||||
|
self.assertEqual(doc["algorithm"], "sha256")
|
||||||
|
|
||||||
|
def test_parse_manifest_invalid(self):
|
||||||
|
self.assertEqual(parse_manifest(b"not json"), {})
|
||||||
|
self.assertEqual(parse_manifest(b'{"files": "nope"}'), {})
|
||||||
|
|
||||||
|
def test_sha256_hex_file(self):
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False) as f:
|
||||||
|
f.write(b"hello")
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
self.assertEqual(
|
||||||
|
sha256_hex_file(path),
|
||||||
|
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824",
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
|
||||||
|
def test_manifest_filename(self):
|
||||||
|
self.assertEqual(MANIFEST_FILENAME, "file_hashes.json")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user