12 Commits

Author SHA1 Message Date
f5de99386a feat(cli): add --serial-usb for bridge native USB CDC
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-06 21:09:55 +12:00
2961ad2a29 test(editor): web serial readuntil buffer regression
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
35c0df8f88 docs: update readme for serial baud and deploy paths
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
5c97fa0d0b feat(editor): add bridge ap ip and password fields
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
179ac9c540 feat(cli): add --serial-baudrate for bridge uart uplink
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:14 +12:00
bd4d2060ae fix(led-tool): harden Web Serial raw REPL connect
Improve boot wait, readUntil buffer handling, and settings editor
host/Web Serial flows after device reset.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 00:23:08 +12:00
f74e21f206 feat(led-tool): browser settings editor with Web Serial
Add static editor, host_ports filtering, Flask /editor and REST APIs
for ports and led-cli read/write; document standalone and embedded use.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-18 14:54:15 +12:00
1edcb8b1f7 feat(cli): skip unchanged files using device file_hashes.json
Read and update file_hashes.json on deploy; add --force-upload and tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 19:14:51 +12:00
ccc215acbd fix(cli): lazy settings fetch and full device erase
Download settings only for --show or when applying edits; skip upload
when unchanged; erase entire device root including settings.json.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 21:14:54 +12:00
580fd11aca fix(cli): skip redundant settings uploads when unchanged
Only upload settings when edited values differ from current device settings to avoid unnecessary resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:48:54 +12:00
pi
d6331a105c feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
2f3db9272b feat(cli): extend upload flags for src, patterns, lib, and --all
Support --patterns/--paterns, --all, --erase, and src upload excluding patterns/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:13 +12:00
11 changed files with 2017 additions and 71 deletions

View File

@@ -11,6 +11,7 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) | | `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
| `-s`, `--show` | Print current settings from the device | | `-s`, `--show` | Print current settings from the device |
| `-n`, `--name` | Device **name** | | `-n`, `--name` | Device **name** |
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
| `--id` | Numeric device id (ESP-NOW, 0255) | | `--id` | Numeric device id (ESP-NOW, 0255) |
| `--pin` | LED GPIO (`led_pin`) | | `--pin` | LED GPIO (`led_pin`) |
| `-b`, `--brightness` | Brightness 0255 | | `-b`, `--brightness` | Brightness 0255 |
@@ -19,21 +20,42 @@ Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is
| `-o`, `--order` | LED colour order (`rgb`, `grb`, …) | | `-o`, `--order` | LED colour order (`rgb`, `grb`, …) |
| `--preset` / `--pattern` | Create or replace a named preset in **led-driver** `presets.json` | | `--preset` / `--pattern` | Create or replace a named preset in **led-driver** `presets.json` |
| `--default` | Startup preset name | | `--default` | Startup preset name |
| `--transport` | `espnow` or `wifi` (`transport_type` on device) | | `--transport` | `espnow` or `wifi` (`transport_type` on led-driver) |
| `--serial-baudrate` | Bridge UART1 baud for Pi serial link (e.g. `921600`; also sets GPIO UART pins) |
| `--ssid`, `--wifi-password`, `--wifi-channel` | Wi-Fi / channel fields for the driver | | `--ssid`, `--wifi-password`, `--wifi-channel` | Wi-Fi / channel fields for the driver |
| `-r`, `--reset` | Reset the device | | `-r`, `--reset` | Reset the device |
| `-f`, `--follow` | Follow serial output (optional timeout seconds) | | `-f`, `--follow` | Follow serial output (optional timeout seconds) |
| `--pause` | Sleep N seconds (for chained actions) | | `--pause` | Sleep N seconds (for chained actions) |
| `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` | | `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` (skips unchanged files via `file_hashes.json` on device) |
| `--src`, `--lib` | Upload `src/` or a tree to `/lib` | | `--src`, `--lib`, `--all` | Deploy `src/` to device root and `lib/` to `/lib` (led-driver, espnow-sender, …) |
| `-e` | Erase device code (keeps `settings.json`) | | `--force-upload` | Upload every file; ignore `file_hashes.json` |
| `-e`, `--erase` | Erase everything at device root (including `settings.json` and `presets.json`) |
| `--rm` | Remove a path on the device | | `--rm` | Remove a path on the device |
| `--flash` | Flash a firmware binary (uses **esptool** on the host) | | `--flash` | Flash a firmware binary (uses **esptool** on the host) |
**Default behaviour:** the tool always downloads `settings.json`, prints the merged view, and uploads again **only** when you pass setting edits (`--name`, `--leds`, …), **`--preset`**, or the relevant upload/flash/erase actions in order. **Settings I/O:** With no arguments, **`led-cli`** prints help (no device access). **`--show`** downloads `settings.json` and prints it (read-only). Setting fields (`--name`, `-b`, …) download once, merge, print, and **upload only when a value actually changed**. **`--preset`** alone only touches **`presets.json`** (no `settings.json` download). Ordered actions (`--reset`, `--upload`, `-e`, …) run without touching settings unless you also pass **`--show`** or setting / preset edits as above.
Run **`python cli.py -h`** for the full epilog and argument list. Run **`python cli.py -h`** for the full epilog and argument list.
## Web UI (`static/` + `web.py`)
Edit **`settings.json`** in the browser:
- **Web Serial** — USB on the machine running the browser (Chrome/Edge). Use **Connect USB**, then **Download** / **Upload**.
- **Host serial** — USB on the Pi/PC running **`led-cli`** (port list + **`led-cli`** merge/upload).
**Standalone (Flask):**
```bash
cd led-tool
python web.py
# open http://<host>:5000/editor
```
**Embedded in led-controller:** open **Settings → LED Tool** in the main UI (Edit mode), or visit **`/led-tool/editor`**.
Legacy Flask form UI remains at **`/`** on port 5000; prefer **`/editor`** for Web Serial support.
## License ## License
See **LICENSE** in this directory. See **LICENSE** in this directory.

269
cli.py
View File

@@ -2,8 +2,10 @@
""" """
LED Bar Configuration CLI Tool LED Bar Configuration CLI Tool
Command-line interface for downloading, editing, and uploading settings.json Command-line interface for editing settings.json on MicroPython devices via mpremote.
to/from MicroPython devices via mpremote. Settings are downloaded from the device only when displaying (--show) or when applying
setting changes; uploads occur only when something changed. Use --erase to clear the
device filesystem (including settings).
""" """
import json import json
@@ -11,11 +13,12 @@ import argparse
import subprocess import subprocess
import sys import sys
import time import time
import shutil
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import tempfile import tempfile
import os import os
from device import copy_file, DeviceConnection from device import copy_file, DeviceConnection, firmware_default_device_name
def resolve_flash_binary(path: str) -> Optional[str]: def resolve_flash_binary(path: str) -> Optional[str]:
@@ -147,7 +150,8 @@ _FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--pattern', '--default', '--transport', '--ssid', '--preset', '--pattern', '--default', '--transport', '--ssid',
'--wifi-password', '--wifi-channel', '--wifi-password', '--wifi-channel', '--serial-baudrate', '--serial-usb',
'--src', '--lib', '--patterns', '--paterns',
}) })
@@ -201,8 +205,28 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
i += 2 i += 2
else: else:
i += 1 i += 1
# Use empty string as remote_dir to map to root on device # Upload source tree excluding patterns/ (handled by --patterns).
actions.append(('upload', [local_dir, ""])) actions.append(('upload_src_no_patterns', local_dir))
continue
if arg in ('--patterns', '--paterns'):
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
local_dir = os.path.join("src", "patterns")
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "patterns"]))
continue
if arg == '--all':
actions.append(('upload_src_no_patterns', "src"))
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
actions.append(('upload', ["lib", "lib"]))
i += 1
continue
if arg == '--force-upload':
# Handled via argparse; skip during action scan
i += 1
continue continue
if arg == '--lib': if arg == '--lib':
# Upload local DIR (default: ./lib) to /lib on device # Upload local DIR (default: ./lib) to /lib on device
@@ -222,6 +246,10 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
actions.append(('erase_all', None)) actions.append(('erase_all', None))
i += 1 i += 1
continue continue
if arg == '--erase':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm': if arg == '--rm':
if i + 1 < len(argv): if i + 1 < len(argv):
actions.append(('rm', argv[i + 1])) actions.append(('rm', argv[i + 1]))
@@ -258,10 +286,13 @@ def main() -> None:
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
Examples: Examples:
# Download and print current settings # Show help (no device I/O)
%(prog)s %(prog)s
# Download, edit device name and brightness, then upload # Show current settings.json from device (read-only)
%(prog)s --show
# Edit device name and brightness, then upload only if values changed
%(prog)s -n "LED-Strip-1" -b 128 %(prog)s -n "LED-Strip-1" -b 128
# Set multiple parameters # Set multiple parameters
@@ -288,6 +319,15 @@ Examples:
# Set name, num_leds, default pattern, and upload # Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow %(prog)s --name "MyStrip" -l 60 --default rainbow
# Reset logical device name to firmware default (STA MAC based)
%(prog)s --reset-device-name
# ESP-NOW bridge: Pi on GPIO UART1 (USB-serial adapter)
%(prog)s -p /dev/ttyUSB0 --serial-baudrate 921600
# ESP-NOW bridge: Pi on ESP32-S3 native USB CDC
%(prog)s -p /dev/ttyACM0 --serial-usb
""" """
) )
@@ -302,6 +342,12 @@ Examples:
help="Device name" help="Device name"
) )
parser.add_argument(
"--reset-device-name",
action="store_true",
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
)
parser.add_argument( parser.add_argument(
"--id", "--id",
dest="device_id", dest="device_id",
@@ -374,6 +420,19 @@ Examples:
help="led-driver transport_type", help="led-driver transport_type",
) )
parser.add_argument(
"--serial-baudrate",
type=int,
metavar="BAUD",
help="bridge: UART1 baud for Pi serial link (default 921600)",
)
parser.add_argument(
"--serial-usb",
action="store_true",
help="bridge: use native USB CDC for Pi serial link (ESP32-S3)",
)
parser.add_argument( parser.add_argument(
"--ssid", "--ssid",
help="led-driver ssid (Wi-Fi network in wifi mode)", help="led-driver ssid (Wi-Fi network in wifi mode)",
@@ -434,7 +493,7 @@ Examples:
nargs="?", nargs="?",
const="src", const="src",
metavar="DIR", metavar="DIR",
help="Upload DIR recursively to device root (:/, no leading directory). If DIR is omitted, uses local ./src." help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
) )
parser.add_argument( parser.add_argument(
@@ -445,6 +504,27 @@ Examples:
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib." help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
) )
parser.add_argument(
"--all",
action="store_true",
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
)
parser.add_argument(
"--force-upload",
action="store_true",
help="Upload every file; ignore file_hashes.json on the device",
)
parser.add_argument(
"--patterns", "--paterns",
dest="patterns_dir",
nargs="?",
const=os.path.join("src", "patterns"),
metavar="DIR",
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
)
parser.add_argument( parser.add_argument(
"--ls", "--ls",
action="store_true", action="store_true",
@@ -452,10 +532,10 @@ Examples:
) )
parser.add_argument( parser.add_argument(
"-e", "-e", "--erase",
dest="erase_all", dest="erase_all",
action="store_true", action="store_true",
help="Erase all code on the device (delete all files except settings.json)" help="Erase the device filesystem: delete all files and directories at device root (including settings.json and presets.json)",
) )
parser.add_argument( parser.add_argument(
@@ -471,6 +551,9 @@ Examples:
) )
try: try:
if len(sys.argv) <= 1:
parser.print_help()
return
args = parser.parse_args() args = parser.parse_args()
ordered_actions = _get_ordered_actions(sys.argv) ordered_actions = _get_ordered_actions(sys.argv)
except ValueError as e: except ValueError as e:
@@ -534,11 +617,52 @@ Examples:
else: else:
print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr) print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir) files_copied, dirs_created, files_skipped = conn.upload_directory(
print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr) upload_dir, remote_dir, force=args.force_upload
)
print(
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
f"{dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e: except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr) print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
elif action_name == 'upload_src_no_patterns':
src_dir = value
if not os.path.exists(src_dir):
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(src_dir):
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
sys.exit(1)
try:
with tempfile.TemporaryDirectory() as temp_src:
for entry in sorted(os.listdir(src_dir)):
if entry == "patterns":
continue
src_entry = os.path.join(src_dir, entry)
dst_entry = os.path.join(temp_src, entry)
if os.path.isdir(src_entry):
shutil.copytree(src_entry, dst_entry)
else:
shutil.copy2(src_entry, dst_entry)
print(
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
file=sys.stderr,
)
conn = DeviceConnection(port)
files_copied, dirs_created, files_skipped = conn.upload_directory(
temp_src, "", force=args.force_upload
)
print(
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
f"{dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e:
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'ls': elif action_name == 'ls':
try: try:
@@ -554,14 +678,12 @@ Examples:
elif action_name == 'erase_all': elif action_name == 'erase_all':
try: try:
print(f"Erasing all code on device {port}...", file=sys.stderr) print(f"Erasing device filesystem on {port}...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
items = conn.list_files('') items = conn.list_files('')
for name, is_dir, size in items: for name, is_dir, size in items:
if name == "settings.json":
continue
conn.delete(name) conn.delete(name)
print("Erase complete.", file=sys.stderr) print("Erase complete (device root is empty).", file=sys.stderr)
except Exception as e: except Exception as e:
print(f"Error erasing device: {e}", file=sys.stderr) print(f"Error erasing device: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
@@ -594,11 +716,35 @@ Examples:
sys.exit(1) sys.exit(1)
return # follow blocks; when interrupted we're done return # follow blocks; when interrupted we're done
default_name_from_device: Optional[str] = None
if args.reset_device_name:
if args.name is not None:
print(
"Error: use either --name or --reset-device-name, not both.",
file=sys.stderr,
)
sys.exit(1)
try:
print(
f"Reading firmware default device name (STA MAC) on {port}...",
file=sys.stderr,
)
default_name_from_device = firmware_default_device_name(port)
print(
f"Default name will be {default_name_from_device!r}.",
file=sys.stderr,
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Collect all edit parameters # Collect all edit parameters
edits: Dict[str, Any] = {} edits: Dict[str, Any] = {}
if args.name is not None: if args.name is not None:
edits["name"] = args.name edits["name"] = args.name
elif default_name_from_device is not None:
edits["name"] = default_name_from_device
if args.pin is not None: if args.pin is not None:
edits["led_pin"] = args.pin edits["led_pin"] = args.pin
@@ -621,6 +767,22 @@ Examples:
if args.transport is not None: if args.transport is not None:
edits["transport_type"] = args.transport edits["transport_type"] = args.transport
if args.serial_baudrate is not None:
edits["uplink_transport"] = "serial"
edits["serial_usb"] = False
edits["serial_uart_id"] = 1
edits["serial_tx_pin"] = 2
edits["serial_rx_pin"] = 3
baud = int(args.serial_baudrate)
if baud < 9600 or baud > 3000000:
print("Error: --serial-baudrate must be 96003000000", file=sys.stderr)
sys.exit(1)
edits["serial_baudrate"] = baud
if args.serial_usb:
edits["uplink_transport"] = "serial"
edits["serial_usb"] = True
if args.ssid is not None: if args.ssid is not None:
edits["ssid"] = args.ssid edits["ssid"] = args.ssid
@@ -634,32 +796,42 @@ Examples:
# Clamp into single-byte range; store as int in settings.json # Clamp into single-byte range; store as int in settings.json
edits["id"] = max(0, min(255, args.device_id)) edits["id"] = max(0, min(255, args.device_id))
# 1. Download: get current settings from device settings_work = args.show or bool(edits)
if settings_work:
try: try:
print(f"Downloading settings from {args.port}...", file=sys.stderr) print(f"Downloading settings from {args.port}...", file=sys.stderr)
settings = download_settings(args.port) settings = download_settings(args.port)
print("Settings downloaded successfully.", file=sys.stderr) print("Settings downloaded successfully.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error downloading settings: {e}", file=sys.stderr) print(f"Error downloading settings: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
# If --show only, print and exit (unless presets or settings are being written)
if args.show: if args.show:
print_settings(settings) print_settings(settings)
if not edits and args.preset is None: if not edits and args.preset is None:
return return
# 2. Edit: apply edits to downloaded settings changed_edits: Dict[str, Any] = {}
if edits: for key, value in edits.items():
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) if settings.get(key) != value:
settings.update(edits) changed_edits[key] = value
if edits and not changed_edits:
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
if changed_edits:
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
settings.update(changed_edits)
print_settings(settings) print_settings(settings)
# 3a. Presets file (led-driver presets.json)
if args.preset is not None: if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on" pattern = args.pattern if args.pattern is not None else "on"
try: try:
@@ -680,23 +852,62 @@ Examples:
print("Device will reset.", file=sys.stderr) print("Device will reset.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error uploading presets: {e}", file=sys.stderr) print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
# 3b. Settings upload (resets device) if changed_edits:
if edits:
try: try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr) print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings) upload_settings(args.port, settings)
print("Settings uploaded successfully. Device will reset.", file=sys.stderr) print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error uploading settings: {e}", file=sys.stderr) print(f"Error uploading settings: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
return
if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on"
try:
print(f"Downloading presets from {args.port}...", file=sys.stderr)
presets_data = download_presets(args.port)
entry = presets_data.get(args.preset)
if not isinstance(entry, dict):
entry = {}
entry["p"] = pattern
presets_data[args.preset] = entry
print(
f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...",
file=sys.stderr,
)
upload_presets(args.port, presets_data, reset=True)
print("Presets uploaded successfully.", file=sys.stderr)
print("Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else:
print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1)
return
if ordered_actions:
return
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":

45
deploy_manifest.py Normal file
View File

@@ -0,0 +1,45 @@
"""
Host-side helpers for file_hashes.json (same format as led-driver/src/file_hashes.py).
"""
import hashlib
import json
import os
MANIFEST_VERSION = 1
MANIFEST_FILENAME = "file_hashes.json"
HASH_ALGO = "sha256"
def normalize_remote_path(remote_file: str) -> str:
"""Device-relative path with forward slashes (no leading slash)."""
return remote_file.replace("\\", "/").lstrip("/")
def sha256_hex_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def parse_manifest(data: bytes) -> dict:
"""Return path -> hash map from manifest bytes."""
try:
doc = json.loads(data.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError, TypeError, ValueError):
return {}
if not isinstance(doc, dict):
return {}
files = doc.get("files")
return dict(files) if isinstance(files, dict) else {}
def build_manifest_bytes(files: dict) -> bytes:
doc = {
"version": MANIFEST_VERSION,
"algorithm": HASH_ALGO,
"files": files,
}
return json.dumps(doc, separators=(",", ":")).encode("utf-8")

136
device.py
View File

@@ -10,6 +10,14 @@ import os
import time import time
import serial import serial
from deploy_manifest import (
MANIFEST_FILENAME,
build_manifest_bytes,
normalize_remote_path,
parse_manifest,
sha256_hex_file,
)
# Add lib directory to path - handle both normal execution and PyInstaller bundle # Add lib directory to path - handle both normal execution and PyInstaller bundle
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
# Running as a PyInstaller bundle # Running as a PyInstaller bundle
@@ -23,12 +31,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path) sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError from mpremote.transport import TransportError, TransportExecError
class DeviceConnection: class DeviceConnection:
"""Wrapper for device communication.""" """Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device): def __init__(self, device):
"""Connect to a device.""" """Connect to a device."""
self.device = device self.device = device
@@ -54,6 +65,38 @@ class DeviceConnection:
pass pass
self.transport = None self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path): def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem.""" """Copy a file from device to local filesystem."""
self.connect() self.connect()
@@ -70,17 +113,45 @@ class DeviceConnection:
try: try:
with open(local_path, 'rb') as f: with open(local_path, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_path, data) self._fs_writefile_with_wdt(remote_path, data)
finally: finally:
self.disconnect() self.disconnect()
def upload_directory(self, local_dir, remote_dir=None): def _manifest_remote_path(self) -> str:
return "/" + MANIFEST_FILENAME
def read_hash_manifest(self) -> dict:
"""Load path -> sha256 hex map from file_hashes.json on the device."""
remote = self._manifest_remote_path()
if not self.transport.fs_exists(remote):
return {}
try:
data = self.transport.fs_readfile(remote)
return parse_manifest(data)
except Exception:
return {}
def write_hash_manifest(self, files: dict) -> None:
"""Write merged file_hashes.json to the device root."""
self._fs_writefile_with_wdt(
self._manifest_remote_path(),
build_manifest_bytes(files),
)
def upload_directory(self, local_dir, remote_dir=None, *, force: bool = False):
""" """
Upload a directory recursively to the device. Upload a directory recursively to the device.
Skips files whose sha256 matches file_hashes.json on the device unless
force is True. Updates the manifest after upload.
Args: Args:
local_dir: Local directory path to upload local_dir: Local directory path to upload
remote_dir: Remote directory path (default: root, uses basename of local_dir) remote_dir: Remote directory path (default: root, uses basename of local_dir)
force: Upload every file even when the manifest hash matches
Returns:
(files_copied, dirs_created, files_skipped)
""" """
import os import os
@@ -94,15 +165,19 @@ class DeviceConnection:
remote_dir = os.path.basename(os.path.abspath(local_dir)) remote_dir = os.path.basename(os.path.abspath(local_dir))
# Ensure remote directory exists # Ensure remote directory exists
if not self.transport.fs_exists(remote_dir): if remote_dir and not self.transport.fs_exists(remote_dir):
self.transport.fs_mkdir(remote_dir) self.transport.fs_mkdir(remote_dir)
manifest = {} if force else self.read_hash_manifest()
# Walk through local directory and copy files # Walk through local directory and copy files
files_copied = 0 files_copied = 0
files_skipped = 0
dirs_created = 0 dirs_created = 0
for root, dirs, files in os.walk(local_dir): for root, dirs, files in os.walk(local_dir):
# Calculate relative path from local_dir # Never upload Python bytecode trees (MicroPython does not use them).
dirs[:] = [d for d in dirs if d != "__pycache__"]
rel_path = os.path.relpath(root, local_dir) rel_path = os.path.relpath(root, local_dir)
# Build remote path # Build remote path
@@ -121,8 +196,12 @@ class DeviceConnection:
self.transport.fs_mkdir(remote_base) self.transport.fs_mkdir(remote_base)
dirs_created += 1 dirs_created += 1
# Copy files # Copy files (skip bytecode; __pycache__ dirs are pruned above)
for file in files: for file in files:
if file.endswith((".pyc", ".pyo")):
continue
if file == MANIFEST_FILENAME:
continue
local_file = os.path.join(root, file) local_file = os.path.join(root, file)
# Handle root directory case properly # Handle root directory case properly
if remote_base == '/': if remote_base == '/':
@@ -130,13 +209,23 @@ class DeviceConnection:
else: else:
remote_file = '/'.join([remote_base, file]) remote_file = '/'.join([remote_base, file])
manifest_key = normalize_remote_path(remote_file)
local_hash = sha256_hex_file(local_file)
if not force and manifest.get(manifest_key) == local_hash:
print(f"Skipping (unchanged): {remote_file}", file=sys.stderr)
files_skipped += 1
continue
print(f"Uploading: {remote_file}", file=sys.stderr) print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f: with open(local_file, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_file, data) self._fs_writefile_with_wdt(remote_file, data)
manifest[manifest_key] = local_hash
files_copied += 1 files_copied += 1
return files_copied, dirs_created self.write_hash_manifest(manifest)
return files_copied, dirs_created, files_skipped
finally: finally:
self.disconnect() self.disconnect()
@@ -235,6 +324,37 @@ class DeviceConnection:
raise TransportError(f"Failed to follow output: {e}") from e raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path): def copy_file(from_device, device, remote_path, local_path):
""" """
Copy a file to/from device. Copy a file to/from device.

25
host_ports.py Normal file
View File

@@ -0,0 +1,25 @@
"""Host serial port list filtering for led-tool /ports API."""
import re
# Exclude /dev/ttyS2 and higher (keep ttyS0, ttyS1; keep ttyACM*, ttyUSB*, etc.)
_TTYS_HIGH = re.compile(r"^/dev/ttyS([2-9]|[1-9]\d+)$")
def include_host_serial_device(device: str) -> bool:
return not _TTYS_HIGH.match(device or "")
def _is_na(value: str) -> bool:
return (value or "").strip().lower() in ("n/a", "na")
def include_host_serial_port(port: dict) -> bool:
if not include_host_serial_device(port.get("device", "")):
return False
if _is_na(port.get("description")) or _is_na(port.get("hwid")):
return False
return True
def filter_port_dicts(ports: list) -> list:
return [p for p in ports if include_host_serial_port(p)]

218
static/settings_editor.html Normal file
View File

@@ -0,0 +1,218 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>LED device settings</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style>
:root {
color-scheme: dark;
--text: #e5e7eb;
--muted: #9ca3af;
--border: #374151;
--accent: #3b82f6;
--danger: #f87171;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: system-ui, sans-serif;
background: #0b1020;
color: var(--text);
padding: 1rem;
}
h1 { font-size: 1.15rem; margin: 0 0 0.35rem; }
.muted { color: var(--muted); font-size: 0.9rem; margin-bottom: 1rem; }
.card {
border: 1px solid var(--border);
border-radius: 8px;
padding: 1rem;
margin-bottom: 1rem;
background: #111827;
}
label { display: block; font-size: 0.85rem; margin-bottom: 0.25rem; color: var(--muted); }
input, select, textarea {
width: 100%;
padding: 0.5rem;
border-radius: 4px;
border: 1px solid var(--border);
background: #1f2937;
color: var(--text);
margin-bottom: 0.65rem;
}
.row { display: flex; gap: 0.5rem; flex-wrap: wrap; align-items: flex-end; }
.row > * { flex: 1; min-width: 10rem; }
.btn {
padding: 0.45rem 0.85rem;
border: none;
border-radius: 4px;
cursor: pointer;
background: #4b5563;
color: #fff;
font-size: 0.9rem;
}
.btn-primary { background: var(--accent); }
.btn:disabled { opacity: 0.5; cursor: not-allowed; }
.grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(11rem, 1fr));
gap: 0 0.5rem;
}
#status { font-size: 0.85rem; margin-top: 0.5rem; }
#status.error { color: var(--danger); }
.flash {
display: none;
padding: 0.5rem 0.75rem;
border-radius: 4px;
margin-bottom: 0.75rem;
background: #14532d;
color: #86efac;
}
.flash.error { background: #5e1b1b; color: #fca5a5; }
.flash.show { display: block; }
[hidden] { display: none !important; }
</style>
</head>
<body data-api-base="/led-tool">
<h1>Device settings (USB)</h1>
<p class="muted">Edit <code>settings.json</code> via <strong>Web Serial</strong> (USB on this computer) or <strong>host serial</strong> (Pi running <code>led-cli</code>). Use <strong>Connect</strong> for your transport, then <strong>Download</strong> or <strong>Upload</strong>.</p>
<div id="flash" class="flash"></div>
<div class="card" id="webserial-wrap" hidden>
<h2 style="font-size:1rem;margin:0 0 0.5rem;">Web Serial</h2>
<div class="row">
<button type="button" class="btn btn-primary" id="webserial-connect">Connect USB</button>
<button type="button" class="btn" id="webserial-disconnect" hidden>Disconnect</button>
</div>
</div>
<div class="card" id="host-serial-wrap">
<h2 style="font-size:1rem;margin:0 0 0.5rem;">Host serial</h2>
<label for="host-port">Port</label>
<select id="host-port"><option value="">Select port</option></select>
<div class="row">
<button type="button" class="btn btn-primary" id="host-connect">Connect</button>
<button type="button" class="btn" id="host-disconnect" hidden>Disconnect</button>
<button type="button" class="btn" id="btn-refresh-ports">Refresh ports</button>
</div>
</div>
<div class="card">
<div class="row" style="margin-bottom:0.75rem;">
<button type="button" class="btn btn-primary" id="btn-download">Download</button>
<button type="button" class="btn btn-primary" id="btn-upload">Upload &amp; reset</button>
<button type="button" class="btn" id="btn-from-json">JSON → form</button>
<button type="button" class="btn" id="btn-to-json">Form → JSON</button>
</div>
<p id="status">Ready</p>
<div class="grid">
<div>
<label for="name">Name</label>
<input id="name" data-setting="name" type="text" />
</div>
<div>
<label for="num_leds">Num LEDs</label>
<input id="num_leds" data-setting="num_leds" type="number" min="1" />
</div>
<div>
<label for="led_pin">LED pin</label>
<input id="led_pin" data-setting="led_pin" type="number" min="0" />
</div>
<div>
<label for="brightness">Brightness</label>
<input id="brightness" data-setting="brightness" type="number" min="0" max="255" />
</div>
<div>
<label for="color_order">Colour order</label>
<select id="color_order" data-setting="color_order">
<option value=""></option>
<option value="rgb">rgb</option>
<option value="rbg">rbg</option>
<option value="grb">grb</option>
<option value="gbr">gbr</option>
<option value="brg">brg</option>
<option value="bgr">bgr</option>
</select>
</div>
<div>
<label for="transport_type">Transport</label>
<select id="transport_type" data-setting="transport_type">
<option value=""></option>
<option value="espnow">espnow</option>
<option value="wifi">wifi</option>
</select>
</div>
<div>
<label for="ssid">WiFi SSID</label>
<input id="ssid" data-setting="ssid" type="text" />
</div>
<div>
<label for="password">WiFi password</label>
<input id="password" data-setting="password" type="password" />
</div>
<div>
<label for="wifi_channel">WiFi channel</label>
<input id="wifi_channel" data-setting="wifi_channel" type="number" min="1" max="11" />
</div>
<div>
<label for="ap_ip">Bridge AP IP</label>
<input id="ap_ip" data-setting="ap_ip" type="text" placeholder="192.168.4.1" />
</div>
<div>
<label for="ap_password">Bridge AP password</label>
<input id="ap_password" data-setting="ap_password" type="password" placeholder="min 8 chars, or empty for open" />
</div>
<div>
<label for="default">Default preset</label>
<input id="default" data-setting="default" type="text" />
</div>
<div>
<label for="id">Device id (ESP-NOW)</label>
<input id="id" data-setting="id" type="number" min="0" max="255" />
</div>
<div>
<label for="debug">Debug</label>
<select id="debug" data-setting="debug">
<option value=""></option>
<option value="False">False</option>
<option value="True">True</option>
</select>
</div>
</div>
</div>
<div class="card">
<label for="raw_json">Raw settings.json</label>
<textarea id="raw_json" rows="14" spellcheck="false">{}</textarea>
</div>
<script>
(function () {
const api =
document.body.getAttribute('data-api-base') ??
(location.pathname.includes('/led-tool') ? '/led-tool' : '');
const prefix = api + '/static';
const v = '20260520';
function loadScript(url) {
return new Promise((resolve, reject) => {
const s = document.createElement('script');
s.src = url + (url.includes('?') ? '&' : '?') + 'v=' + v;
s.onload = () => resolve();
s.onerror = () => reject(new Error('Failed to load ' + url));
document.body.appendChild(s);
});
}
loadScript(prefix + '/web_serial.js')
.then(() => loadScript(prefix + '/settings_editor.js'))
.catch((e) => {
const el = document.getElementById('flash');
if (el) {
el.textContent = e.message;
el.className = 'flash error show';
}
});
})();
</script>
</body>
</html>

421
static/settings_editor.js Normal file
View File

@@ -0,0 +1,421 @@
(function () {
const LOG = '[led-tool/editor]';
const log = (...args) => console.log(LOG, ...args);
const apiBase =
(document.body && document.body.dataset.apiBase) ||
(window.location.pathname.includes('/led-tool') ? '/led-tool' : '');
const statusEl = document.getElementById('status');
const messageEl = document.getElementById('flash');
const portSelect = document.getElementById('host-port');
const rawJson = document.getElementById('raw_json');
const hostWrap = document.getElementById('host-serial-wrap');
const webWrap = document.getElementById('webserial-wrap');
const connectBtn = document.getElementById('webserial-connect');
const disconnectBtn = document.getElementById('webserial-disconnect');
const hostConnectBtn = document.getElementById('host-connect');
const hostDisconnectBtn = document.getElementById('host-disconnect');
const refreshPortsBtn = document.getElementById('btn-refresh-ports');
const DEFAULT_HOST_PORT = '/dev/ttyACM0';
let webClient = null;
let webPort = null;
let hostConnected = false;
let hostConnectedPort = '';
let transferBusy = false;
let webConnectBusy = false;
const webSerialOk = window.LedToolWebSerial && window.LedToolWebSerial.supported();
let webSerialGrantedCount = 0;
function setStatus(text, isError) {
if (!statusEl) return;
statusEl.textContent = text;
statusEl.classList.toggle('error', Boolean(isError));
}
function flash(text, isError) {
if (!messageEl) return;
messageEl.textContent = text;
messageEl.className = isError ? 'flash error show' : 'flash show';
setTimeout(() => messageEl.classList.remove('show'), 5000);
}
function errorMessage(e) {
if (!e) return 'Unknown error';
if (typeof e === 'string') return e;
return e.message || String(e);
}
function onWebSerialTransferError(e) {
if (webClient) webClient.inRawRepl = false;
return errorMessage(e);
}
function formToObject() {
const out = {};
document.querySelectorAll('[data-setting]').forEach((el) => {
const key = el.getAttribute('data-setting');
if (!key) return;
const raw = (el.value || '').trim();
if (raw === '') return;
if (el.type === 'number') {
const n = parseInt(raw, 10);
if (!Number.isNaN(n)) out[key] = n;
} else if (el.tagName === 'SELECT' && el.id === 'debug') {
out[key] = raw === 'True';
} else {
out[key] = raw;
}
});
return out;
}
function objectToForm(settings) {
if (!settings || typeof settings !== 'object') return;
document.querySelectorAll('[data-setting]').forEach((el) => {
const key = el.getAttribute('data-setting');
if (!key || !Object.prototype.hasOwnProperty.call(settings, key)) return;
const v = settings[key];
if (el.id === 'debug') {
el.value = v === true || v === 'True' ? 'True' : 'False';
} else {
el.value = v === undefined || v === null ? '' : String(v);
}
});
if (rawJson) rawJson.value = JSON.stringify(settings, null, 2);
}
function usingWebSerial() {
return webClient && webClient.connected;
}
function usingHostSerial() {
return hostConnected && Boolean(hostConnectedPort);
}
function updateUi() {
const webOpen = usingWebSerial();
const hostOpen = usingHostSerial();
if (webWrap) webWrap.hidden = !webSerialOk || hostOpen;
if (connectBtn) connectBtn.hidden = webOpen || hostOpen || !webSerialOk;
if (disconnectBtn) disconnectBtn.hidden = !webOpen;
if (hostWrap) hostWrap.hidden = webOpen;
const hostFieldsLocked = webOpen || hostOpen;
if (portSelect) portSelect.disabled = hostFieldsLocked;
if (refreshPortsBtn) refreshPortsBtn.disabled = hostFieldsLocked;
if (hostConnectBtn) hostConnectBtn.hidden = hostOpen || webOpen;
if (hostDisconnectBtn) hostDisconnectBtn.hidden = !hostOpen || webOpen;
if (webOpen) {
setStatus('USB open (device not reset until Download/Upload).');
} else if (hostOpen) {
setStatus(`Host serial: ${hostConnectedPort} (use Download or Upload).`);
} else if (webSerialOk) {
if (webSerialGrantedCount === 1) {
setStatus('Connect USB (one remembered port) or host serial, then Download or Upload.');
} else {
setStatus('Connect host serial or Web Serial USB, then Download or Upload.');
}
} else {
setStatus(`Connect host serial (default ${DEFAULT_HOST_PORT}).`);
}
}
async function loadHostPorts() {
if (!portSelect) return;
try {
const res = await fetch(`${apiBase}/ports`);
const data = await res.json();
const prev = portSelect.value;
portSelect.innerHTML = '<option value="">Select port</option>';
for (const p of data.ports || []) {
const opt = document.createElement('option');
opt.value = p.device;
opt.textContent = `${p.device} - ${p.description || 'device'}`;
portSelect.appendChild(opt);
}
const acm0 = DEFAULT_HOST_PORT;
const hasAcm0 = [...portSelect.options].some((o) => o.value === acm0);
if (hasAcm0) {
portSelect.value = acm0;
} else if (prev && [...portSelect.options].some((o) => o.value === prev)) {
portSelect.value = prev;
}
} catch (e) {
flash(`Could not list host ports: ${e.message}`, true);
}
}
function selectedHostPort() {
return portSelect && portSelect.value ? portSelect.value.trim() : '';
}
function hostPort() {
if (hostConnected && hostConnectedPort) return hostConnectedPort;
return selectedHostPort();
}
function connectHostSerial() {
if (usingWebSerial()) {
disconnectWebSerial();
}
const port = selectedHostPort();
if (!port) {
flash('Select a host serial port.', true);
return;
}
hostConnectedPort = port;
hostConnected = true;
log('connectHostSerial', port);
updateUi();
flash(`Host serial ready: ${port}`, false);
}
function disconnectHostSerial() {
hostConnected = false;
hostConnectedPort = '';
updateUi();
}
async function downloadHost() {
const port = hostConnectedPort || hostPort();
if (!port) {
flash('Connect host serial first.', true);
return;
}
log('downloadHost', port);
setStatus('Downloading from device (host serial)...');
const res = await fetch(`${apiBase}/settings?port=${encodeURIComponent(port)}`);
const data = await res.json();
log('downloadHost response', { ok: res.ok, status: res.status, data });
if (!res.ok) {
throw new Error(data.error || 'Download failed');
}
if (!data.settings) {
throw new Error('No settings in response (check CLI output)');
}
objectToForm(data.settings);
flash('Settings downloaded.', false);
setStatus(`Downloaded from ${port}`);
}
async function uploadHost() {
const port = hostConnectedPort || hostPort();
if (!port) {
flash('Connect host serial first.', true);
return;
}
const payload = { port, ...formToObject() };
log('uploadHost', payload);
setStatus('Uploading via host serial (led-cli)...');
const res = await fetch(`${apiBase}/settings`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
const data = await res.json();
if (!res.ok) throw new Error(data.error || 'Upload failed');
if (!data.ok) throw new Error(data.stderr || 'led-cli failed');
flash('Settings uploaded; device resetting.', false);
setStatus('Upload complete.');
}
async function connectWebSerial() {
if (webConnectBusy) return;
if (!window.LedToolWebSerial || !window.LedToolWebSerial.supported()) {
flash('Web Serial not supported (use Chrome or Edge over HTTPS or localhost).', true);
return;
}
if (usingWebSerial()) {
flash('USB already connected.', false);
return;
}
if (usingHostSerial()) {
disconnectHostSerial();
}
webConnectBusy = true;
try {
log('connectWebSerial: requestPort');
setStatus('Opening USB port…');
webPort = await window.LedToolWebSerial.requestPort();
log('connectWebSerial: port selected', webPort);
webClient = new window.LedToolWebSerial.MicroPythonRawRepl();
await webClient.connect(webPort);
updateUi();
flash('USB port open. Wait for boot, then Download.', false);
} finally {
webConnectBusy = false;
}
}
async function disconnectWebSerial() {
if (webClient) {
try {
await webClient.disconnect();
} catch (_) {
/* ignore */
}
}
webClient = null;
webPort = null;
updateUi();
}
async function downloadWebSerial() {
log('downloadWebSerial');
setStatus('Reading settings from device (Web Serial)…');
const settings = await webClient.readSettingsJson();
log('downloadWebSerial result', settings);
objectToForm(settings);
if (rawJson) rawJson.value = JSON.stringify(settings, null, 2);
flash('Settings read over Web Serial.', false);
setStatus('Downloaded via Web Serial.');
}
async function uploadWebSerial() {
log('uploadWebSerial');
let settings = {};
if (rawJson && rawJson.value.trim()) {
try {
settings = JSON.parse(rawJson.value);
} catch (e) {
flash('Invalid JSON in raw panel.', true);
return;
}
} else {
settings = await webClient.readSettingsJson();
Object.assign(settings, formToObject());
}
await webClient.writeSettingsJson(settings);
flash('Uploaded via Web Serial; device resetting.', false);
setStatus('Upload complete.');
await disconnectWebSerial();
}
document.getElementById('btn-download')?.addEventListener('click', async () => {
if (transferBusy) return;
transferBusy = true;
try {
if (usingWebSerial()) {
await downloadWebSerial();
} else if (usingHostSerial()) {
await downloadHost();
} else {
flash(
webSerialOk
? 'Connect host serial or Web Serial USB first.'
: 'Click Connect on host serial first.',
true,
);
}
} catch (e) {
console.error(LOG, 'download failed', e);
const msg = usingWebSerial() ? onWebSerialTransferError(e) : errorMessage(e);
if (rawJson) rawJson.value = msg;
flash(msg, true);
setStatus(msg, true);
} finally {
transferBusy = false;
}
});
document.getElementById('btn-upload')?.addEventListener('click', async () => {
if (transferBusy) return;
transferBusy = true;
try {
if (usingWebSerial()) {
await uploadWebSerial();
} else if (usingHostSerial()) {
await uploadHost();
} else {
flash(
webSerialOk
? 'Connect host serial or Web Serial USB first.'
: 'Click Connect on host serial first.',
true,
);
}
} catch (e) {
console.error(LOG, 'upload failed', e);
const msg = usingWebSerial() ? onWebSerialTransferError(e) : errorMessage(e);
flash(msg, true);
setStatus(msg, true);
} finally {
transferBusy = false;
}
});
document.getElementById('btn-from-json')?.addEventListener('click', () => {
try {
const parsed = JSON.parse(rawJson.value || '{}');
objectToForm(parsed);
flash('Form updated from JSON.', false);
} catch (e) {
flash('Invalid JSON.', true);
}
});
document.getElementById('btn-to-json')?.addEventListener('click', () => {
const merged = formToObject();
if (rawJson && rawJson.value.trim()) {
try {
Object.assign(merged, JSON.parse(rawJson.value));
} catch (_) {
/* use form only */
}
}
rawJson.value = JSON.stringify(merged, null, 2);
flash('JSON updated from form.', false);
});
connectBtn?.addEventListener('click', () => {
connectWebSerial().catch((e) => {
disconnectWebSerial();
const msg =
e.name === 'NotFoundError'
? 'No USB device selected.'
: e.name === 'SecurityError'
? 'Web Serial blocked (open LED Tool in a top-level tab, or use host serial on the Pi).'
: e.message || String(e);
flash(msg, true);
setStatus(msg, true);
});
});
disconnectBtn?.addEventListener('click', () => {
disconnectWebSerial();
flash('Web Serial disconnected.', false);
});
refreshPortsBtn?.addEventListener('click', () => loadHostPorts());
hostConnectBtn?.addEventListener('click', () => connectHostSerial());
hostDisconnectBtn?.addEventListener('click', () => {
disconnectHostSerial();
flash('Host serial disconnected.', false);
});
window.addEventListener('beforeunload', () => {
if (webClient) {
webClient.disconnect();
}
});
async function refreshWebSerialGrantHint() {
if (!webSerialOk || !window.LedToolWebSerial.getGrantedPorts) return;
try {
const ports = await window.LedToolWebSerial.getGrantedPorts();
webSerialGrantedCount = ports.length;
updateUi();
} catch (_) {
/* ignore */
}
}
log('settings editor ready', { apiBase, webSerialOk });
updateUi();
loadHostPorts();
refreshWebSerialGrantHint();
})();

653
static/web_serial.js Normal file
View File

@@ -0,0 +1,653 @@
/**
* MicroPython raw REPL over Web Serial (mpremote-compatible).
*/
(function (global) {
const RAW_REPL_BANNER = 'raw REPL; CTRL-B to exit\r\n';
const SOFT_REBOOT_MARKERS = ['soft reboot\r\n', 'MPY: soft reboot\r\n'];
const LOG = '[led-tool/serial]';
function log(...args) {
console.log(LOG, ...args);
}
function logDebug(...args) {
console.debug(LOG, ...args);
}
function describeBytes(data) {
if (!data || !data.length) return '(empty)';
const u8 = data instanceof Uint8Array ? data : new Uint8Array(data);
if (u8.length <= 64) {
return Array.from(u8)
.map((b) => (b >= 0x20 && b < 0x7f ? String.fromCharCode(b) : `\\x${b.toString(16).padStart(2, '0')}`))
.join('');
}
return `${u8.length} bytes: ${decodeText(u8.slice(0, 80))}`;
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/** Common ESP32 USB-UART bridges (usbVendorId only per Web Serial API). */
const ESP32_USB_FILTERS = [
{ usbVendorId: 0x303a }, /* Espressif */
{ usbVendorId: 0x10c4 }, /* Silicon Labs CP210x */
{ usbVendorId: 0x1a86 }, /* WCH CH340 */
{ usbVendorId: 0x0403 }, /* FTDI */
];
async function deassertDtrRts(port) {
if (!port || !port.setSignals) return;
try {
await port.setSignals({ dataTerminalReady: false, requestToSend: false });
} catch (_) {
/* ignore — some adapters/browsers reject setSignals */
}
}
function bytesIndexOf(buf, suffix) {
if (!suffix.length) return 0;
if (buf.length < suffix.length) return -1;
for (let i = 0; i <= buf.length - suffix.length; i += 1) {
let ok = true;
for (let j = 0; j < suffix.length; j += 1) {
if (buf[i + j] !== suffix[j]) {
ok = false;
break;
}
}
if (ok) return i;
}
return -1;
}
function bufferEndsWithFriendlyRepl(buf) {
if (bufferLooksLikeEspRomBoot(buf) && !bufferHasMicroPython(buf)) return false;
const tail = decodeText(Uint8Array.from(buf.slice(-80)));
return /(?:\r\n|\n)>>> ?$/.test(tail);
}
function bufferLooksLikeEspRomBoot(buf) {
if (bufferIndicatesRuntime(buf)) return false;
const text = decodeText(Uint8Array.from(buf.slice(-512)));
return /SPI_FAST_FLASH_BOOT|rst:0x/.test(text) && !/entry 0x403/.test(text);
}
/** MicroPython is up (banner, REPL, raw REPL, or post-ROM heap lines from boot). */
function bufferIndicatesRuntime(buf) {
const text = decodeText(Uint8Array.from(buf));
if (text.includes('MicroPython')) return true;
if (text.includes('raw REPL; CTRL-B to exit')) return true;
if (/(?:\r\n|\n)>>> ?/.test(text)) return true;
if (text.includes('entry 0x') && (/['"]?free['"]?\s*:/.test(text) || /,\s*'free'/.test(text))) {
return true;
}
return false;
}
function bufferHasMicroPython(buf) {
return bufferIndicatesRuntime(buf);
}
function decodeText(u8) {
return new TextDecoder('utf-8', { fatal: false }).decode(u8);
}
function extractJsonObject(text) {
const t = (text || '').trim();
const start = t.indexOf('{');
const end = t.lastIndexOf('}');
if (start === -1 || end === -1 || end < start) {
throw new Error(`No JSON object in device output: ${t.slice(0, 400)}`);
}
return JSON.parse(t.slice(start, end + 1));
}
class MicroPythonRawRepl {
constructor() {
this.port = null;
this.reader = null;
this.writer = null;
this.portOpen = false;
this.inRawRepl = false;
this._rxBuf = [];
this.useRawPaste = true;
this._replLock = null;
}
get connected() {
return Boolean(this.portOpen && this.writer);
}
_appendRx(chunk) {
if (!chunk || !chunk.length) return;
logDebug('rx chunk', chunk.length, describeBytes(chunk));
for (let i = 0; i < chunk.length; i += 1) this._rxBuf.push(chunk[i]);
}
async _readStreamChunk(timeoutMs) {
if (!this.reader) return null;
const result = await Promise.race([
this.reader.read(),
sleep(timeoutMs).then(() => ({ timedOut: true })),
]);
if (result.timedOut) return null;
if (result.done) return null;
return result.value || null;
}
/** Match suffix anywhere in the buffer; keep bytes after the match for the next read. */
async readUntil(suffixBytes, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const idx = bytesIndexOf(this._rxBuf, suffixBytes);
if (idx >= 0) {
const end = idx + suffixBytes.length;
const matched = Uint8Array.from(this._rxBuf.slice(0, end));
this._rxBuf.splice(0, end);
logDebug('readUntil matched', JSON.stringify(decodeText(suffixBytes)), 'len', matched.length);
return matched;
}
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) {
this._appendRx(chunk);
} else {
await sleep(5);
}
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
const err = `Timed out waiting for ${JSON.stringify(decodeText(suffixBytes))} (tail: ${JSON.stringify(tail)})`;
log('readUntil timeout', err);
throw new Error(err);
}
async readUntilAny(suffixList, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
for (const suffixBytes of suffixList) {
const idx = bytesIndexOf(this._rxBuf, suffixBytes);
if (idx >= 0) {
const end = idx + suffixBytes.length;
const matched = Uint8Array.from(this._rxBuf.slice(0, end));
this._rxBuf.splice(0, end);
logDebug('readUntilAny matched', JSON.stringify(decodeText(suffixBytes)));
return matched;
}
}
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(5);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(
`Timed out waiting for one of ${suffixList.map((s) => JSON.stringify(decodeText(s))).join(', ')} (tail: ${JSON.stringify(tail)})`,
);
}
async readExact(n, timeoutMs = 10000) {
const deadline = Date.now() + timeoutMs;
while (this._rxBuf.length < n && Date.now() < deadline) {
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(5);
}
if (this._rxBuf.length < n) {
throw new Error(`Expected ${n} bytes, got ${this._rxBuf.length}`);
}
return Uint8Array.from(this._rxBuf.splice(0, n));
}
async connect(serialPort) {
log('connect: opening port');
await this.disconnect();
this.port = serialPort;
this._rxBuf = [];
await this.port.open({
baudRate: 115200,
dataBits: 8,
stopBits: 1,
parity: 'none',
flowControl: 'none',
bufferSize: 65536,
});
await deassertDtrRts(this.port);
await sleep(50);
await deassertDtrRts(this.port);
this.reader = this.port.readable.getReader();
this.writer = this.port.writable.getWriter();
this.portOpen = true;
await sleep(100);
await this.collectIncoming(8000);
if (bufferIndicatesRuntime(this._rxBuf)) {
log('connect: MicroPython output seen during open');
}
log('connect: port open (raw REPL not entered yet)');
}
/** Read serial into _rxBuf without discarding (for post-open boot). */
async collectIncoming(maxMs) {
const deadline = Date.now() + maxMs;
while (Date.now() < deadline) {
const chunk = await this._readStreamChunk(Math.min(80, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else if (
bufferHasMicroPython(this._rxBuf) ||
bufferEndsWithFriendlyRepl(this._rxBuf) ||
bytesIndexOf(this._rxBuf, new TextEncoder().encode(RAW_REPL_BANNER)) >= 0
) {
break;
} else {
await sleep(15);
}
}
}
async ensureRawRepl() {
if (this.inRawRepl) return;
if (this._replLock) {
await this._replLock;
return;
}
log('ensureRawRepl: entering raw REPL');
// USB open already resets the ESP32; skip soft-reset (unlike host led-cli/mpremote).
this._replLock = this.enterRawRepl(false).finally(() => {
this._replLock = null;
});
await this._replLock;
}
async disconnect() {
log('disconnect');
this._replLock = null;
const wasRaw = this.inRawRepl;
this.inRawRepl = false;
this.portOpen = false;
this.useRawPaste = true;
if (this.writer && wasRaw) {
try {
await this.writer.write(new Uint8Array([0x0d, 0x02]));
} catch (_) {
/* ignore */
}
}
if (this.reader) {
try {
await this.reader.cancel();
this.reader.releaseLock();
} catch (_) {
/* ignore */
}
this.reader = null;
}
if (this.writer) {
try {
await this.writer.close();
this.writer.releaseLock();
} catch (_) {
/* ignore */
}
this.writer = null;
}
if (this.port) {
try {
await deassertDtrRts(this.port);
} catch (_) {
/* ignore */
}
try {
await this.port.close();
} catch (_) {
/* ignore */
}
this.port = null;
}
this._rxBuf = [];
}
async writeBytes(data) {
if (!this.writer) throw new Error('Serial not connected');
logDebug('tx', describeBytes(data));
await this.writer.write(data);
}
async drainInput(maxMs = 400, clearBuffer = true) {
const deadline = Date.now() + maxMs;
while (Date.now() < deadline) {
const chunk = await this._readStreamChunk(40);
if (!chunk || !chunk.length) break;
if (!clearBuffer && chunk.length) this._appendRx(chunk);
}
if (clearBuffer) this._rxBuf.length = 0;
}
async sendCtrlA() {
await this.writeBytes(new Uint8Array([0x0d, 0x01]));
}
/** USB open often resets the ESP32; wait through ROM boot until MicroPython is up. */
async waitForMicroPythonBoot(timeoutMs = 60000) {
log('enterRawRepl: wait for MicroPython after boot', 'buf', this._rxBuf.length);
if (bufferHasMicroPython(this._rxBuf) || bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: MicroPython already in buffer');
return;
}
const deadline = Date.now() + timeoutMs;
let nudgeSent = false;
let lastLog = Date.now();
while (Date.now() < deadline) {
if (bufferHasMicroPython(this._rxBuf) || bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: MicroPython running');
return;
}
const text = decodeText(Uint8Array.from(this._rxBuf));
if (!nudgeSent && text.includes('entry 0x')) {
nudgeSent = true;
log('enterRawRepl: past ROM loader, nudge REPL');
await this.writeBytes(new Uint8Array([0x0d]));
await sleep(200);
}
if (Date.now() - lastLog > 5000) {
lastLog = Date.now();
log('enterRawRepl: still waiting…', 'buf', this._rxBuf.length);
}
const rem = Math.max(1, deadline - Date.now());
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(bufferLooksLikeEspRomBoot(this._rxBuf) ? 20 : 10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(`Timed out waiting for MicroPython boot (tail: ${JSON.stringify(tail)})`);
}
/** Wait until raw REPL banner and `>` prompt (not banner alone — device may still be booting). */
async waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const rem = Math.max(1, deadline - Date.now());
const promptAt = bytesIndexOf(this._rxBuf, bannerPromptBytes);
if (promptAt >= 0) {
this._rxBuf.splice(0, promptAt + bannerPromptBytes.length);
log('enterRawRepl: raw REPL prompt ready');
return;
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: friendly REPL, ctrl-A');
await this.sendCtrlA();
await sleep(30);
continue;
}
if (bufferLooksLikeEspRomBoot(this._rxBuf) && !bufferHasMicroPython(this._rxBuf)) {
logDebug('enterRawRepl: ROM boot in progress…');
}
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(
`Timed out waiting for raw REPL after soft reboot (tail: ${JSON.stringify(tail)})`,
);
}
/** Match mpremote SerialTransport.enter_raw_repl(soft_reset). */
async enterRawRepl(softReset = true) {
log('enterRawRepl', { softReset });
const bannerBytes = new TextEncoder().encode(RAW_REPL_BANNER);
const bannerPromptBytes = new TextEncoder().encode(RAW_REPL_BANNER + '>');
const softRebootBytes = SOFT_REBOOT_MARKERS.map((m) => new TextEncoder().encode(m));
if (
!bufferHasMicroPython(this._rxBuf) &&
!bufferEndsWithFriendlyRepl(this._rxBuf) &&
bytesIndexOf(this._rxBuf, bannerBytes) < 0
) {
await this.waitForMicroPythonBoot(60000);
} else {
log('enterRawRepl: already have REPL output in buffer');
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
await this.writeBytes(new Uint8Array([0x0d, 0x03]));
await sleep(50);
}
let ctrlARetries = 0;
const maxCtrlARetries = 3;
while (ctrlARetries <= maxCtrlARetries) {
try {
await this.sendCtrlA();
await this.waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, 20000);
break;
} catch (e) {
if (ctrlARetries >= maxCtrlARetries) throw e;
ctrlARetries += 1;
log('enterRawRepl: retry ctrl-A', ctrlARetries, e.message);
this._rxBuf.length = 0;
}
}
if (softReset) {
log('enterRawRepl: soft reset');
await this.writeBytes(new Uint8Array([0x04]));
try {
await this.readUntilAny(softRebootBytes, 20000);
log('enterRawRepl: saw soft reboot');
this._rxBuf.length = 0;
} catch (e) {
log('enterRawRepl: no soft reboot banner', e.message);
}
ctrlARetries = 0;
while (ctrlARetries <= maxCtrlARetries) {
try {
await this.waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, 60000);
break;
} catch (e) {
if (ctrlARetries >= maxCtrlARetries) throw e;
ctrlARetries += 1;
log('enterRawRepl: post-reboot retry ctrl-A', ctrlARetries, e.message);
this._rxBuf.length = 0;
await this.sendCtrlA();
}
}
}
this.inRawRepl = true;
log('enterRawRepl: ready');
}
async rawPasteWrite(commandBytes) {
await this.writeBytes(new Uint8Array([0x05, 0x41, 0x01]));
const hdr = await this.readExact(2, 8000);
const windowSize = hdr[0] | (hdr[1] << 8);
let windowRemain = windowSize;
let i = 0;
while (i < commandBytes.length) {
while (windowRemain === 0) {
const b = await this.readExact(1, 15000);
if (b[0] === 0x01) {
windowRemain += windowSize;
} else if (b[0] === 0x04) {
await this.writeBytes(new Uint8Array([0x04]));
return;
} else {
throw new Error(`Unexpected byte during raw paste: 0x${b[0].toString(16)}`);
}
}
const n = Math.min(windowRemain, commandBytes.length - i);
await this.writeBytes(commandBytes.subarray(i, i + n));
windowRemain -= n;
i += n;
}
await this.writeBytes(new Uint8Array([0x04]));
await this.readUntil(new Uint8Array([0x04]), 30000);
}
async awaitRawReplExecPrompt(timeoutMs = 60000) {
const bannerPromptBytes = new TextEncoder().encode(RAW_REPL_BANNER + '>');
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const promptAt = bytesIndexOf(this._rxBuf, bannerPromptBytes);
if (promptAt >= 0) {
this._rxBuf.splice(0, promptAt + bannerPromptBytes.length);
return;
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('exec: friendly REPL before prompt, ctrl-A');
this._rxBuf.length = 0;
await this.sendCtrlA();
} else if (bufferLooksLikeEspRomBoot(this._rxBuf)) {
logDebug('exec: still in ROM boot, waiting…');
}
const rem = Math.max(1, deadline - Date.now());
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(`Timed out waiting for raw REPL prompt (tail: ${JSON.stringify(tail)})`);
}
async execRawNoFollow(commandBytes) {
log('execRawNoFollow', commandBytes.length, 'bytes');
await this.awaitRawReplExecPrompt(60000);
if (this.useRawPaste) {
await this.writeBytes(new Uint8Array([0x05, 0x41, 0x01]));
let data;
try {
data = await this.readExact(2, 3000);
} catch (_) {
this.useRawPaste = false;
data = null;
}
if (data) {
if (data[0] === 0x52 && data[1] === 0x01) {
log('exec: using raw paste mode');
await this.rawPasteWrite(commandBytes);
const ok = await this.readExact(2, 15000);
if (ok[0] !== 0x4f || ok[1] !== 0x4b) {
throw new Error(`Device did not return OK after paste (got ${decodeText(ok)})`);
}
log('exec: paste OK');
return;
}
if (data[0] === 0x52 && data[1] === 0x00) {
log('exec: raw paste not supported (R\\x00)');
this.useRawPaste = false;
} else {
log('exec: raw paste probe unexpected', describeBytes(data));
this.useRawPaste = false;
try {
await this.readUntil(new TextEncoder().encode(RAW_REPL_BANNER + '>'), 5000);
} catch (_) {
/* ignore */
}
}
}
}
for (let i = 0; i < commandBytes.length; i += 256) {
await this.writeBytes(commandBytes.subarray(i, Math.min(i + 256, commandBytes.length)));
await sleep(10);
}
log('exec: standard raw REPL write');
await this.writeBytes(new Uint8Array([0x04]));
const ok = await this.readExact(2, 15000);
if (ok[0] !== 0x4f || ok[1] !== 0x4b) {
throw new Error(`Device did not return OK (got ${decodeText(ok)})`);
}
log('exec: got OK');
}
async follow(timeoutMs = 30000) {
const out1 = await this.readUntil(new Uint8Array([0x04]), timeoutMs);
const stdout = decodeText(out1.slice(0, -1));
const out2 = await this.readUntil(new Uint8Array([0x04]), timeoutMs);
const stderr = decodeText(out2.slice(0, -1));
log('follow stdout', stdout.slice(0, 500));
if (stderr) log('follow stderr', stderr.slice(0, 500));
return { stdout, stderr };
}
async exec(command, timeoutMs = 30000) {
const preview =
typeof command === 'string' ? command.split('\n').slice(0, 3).join('\n') : '<bytes>';
log('exec', preview);
await this.ensureRawRepl();
const commandBytes =
typeof command === 'string' ? new TextEncoder().encode(command) : command;
await this.execRawNoFollow(commandBytes);
return this.follow(timeoutMs);
}
async readSettingsJson() {
log('readSettingsJson');
const code = [
'import json',
'try:',
" f=open('settings.json','r')",
' d=json.load(f)',
' f.close()',
'except Exception:',
' d={}',
'print(json.dumps(d))',
].join('\n');
const { stdout, stderr } = await this.exec(code);
const text = (stdout || '').trim() || (stderr || '').trim();
if (!text) {
log('readSettingsJson: empty response');
return {};
}
const parsed = extractJsonObject(text);
log('readSettingsJson: ok', Object.keys(parsed));
return parsed;
}
async writeSettingsJson(settings) {
log('writeSettingsJson', settings);
const jsonText = JSON.stringify(settings);
const escaped = JSON.stringify(jsonText);
const code = [
'import json',
`s=json.loads(${escaped})`,
"f=open('settings.json','w')",
'f.write(s)',
'f.close()',
'import machine',
'machine.reset()',
].join('\n');
await this.exec(code, 60000);
}
}
function serialSupported() {
return typeof navigator !== 'undefined' && 'serial' in navigator;
}
async function getGrantedPorts() {
if (!serialSupported()) return [];
return navigator.serial.getPorts();
}
async function requestPort() {
if (!serialSupported()) {
throw new Error('Web Serial not supported');
}
const granted = await navigator.serial.getPorts();
if (granted.length === 1) {
log('requestPort: reusing single granted port');
return granted[0];
}
log('requestPort: showing picker', { grantedCount: granted.length });
return navigator.serial.requestPort({ filters: ESP32_USB_FILTERS });
}
global.LedToolWebSerial = {
supported: serialSupported,
getGrantedPorts,
requestPort,
MicroPythonRawRepl,
_test: { bytesIndexOf, decodeText },
};
})(typeof window !== 'undefined' ? window : globalThis);

View File

@@ -0,0 +1,69 @@
/**
* Node regression tests for Web Serial readUntil buffer handling.
* Run: node led-tool/static/web_serial_readuntil_test.mjs
*/
function bytesIndexOf(buf, suffix) {
if (!suffix.length) return 0;
if (buf.length < suffix.length) return -1;
for (let i = 0; i <= buf.length - suffix.length; i += 1) {
let ok = true;
for (let j = 0; j < suffix.length; j += 1) {
if (buf[i + j] !== suffix[j]) {
ok = false;
break;
}
}
if (ok) return i;
}
return -1;
}
function readUntilConsume(rxBuf, suffixBytes) {
const idx = bytesIndexOf(rxBuf, suffixBytes);
if (idx < 0) return null;
const end = idx + suffixBytes.length;
const matched = rxBuf.slice(0, end);
rxBuf.splice(0, end);
return matched;
}
function enc(s) {
return [...new TextEncoder().encode(s)];
}
function assert(cond, msg) {
if (!cond) throw new Error(msg);
}
const RAW = 'raw REPL; CTRL-B to exit\r\n';
const SOFT = 'soft reboot\r\n';
// Old bug: one chunk with soft reboot + banner; suffix-at-end + clear-all loses banner.
{
const rx = enc(`MPY: ${SOFT}${RAW}boot.py line\r\n`);
const soft = enc(SOFT);
const m1 = readUntilConsume(rx, soft);
assert(m1 !== null, 'soft reboot should match');
const banner = enc(RAW);
const m2 = readUntilConsume(rx, banner);
assert(m2 !== null, 'banner must remain in buffer after soft reboot match');
assert(rx.length > 0, 'boot.py tail should remain after banner');
}
// Banner anywhere in buffer, not only at end.
{
const rx = enc(`noise${RAW}>>> `);
const banner = enc(RAW);
const m = readUntilConsume(rx, banner);
assert(m !== null, 'banner should match mid-buffer');
}
// MPY: soft reboot marker
{
const rx = enc(`MPY: ${SOFT}`);
const soft = enc('MPY: soft reboot\r\n');
assert(readUntilConsume(rx, soft) !== null, 'MPY soft reboot marker');
}
console.log('web_serial_readuntil_test: ok');

View File

@@ -0,0 +1,52 @@
"""Tests for deploy_manifest helpers (host Python)."""
import json
import os
import tempfile
import unittest
from deploy_manifest import (
MANIFEST_FILENAME,
build_manifest_bytes,
normalize_remote_path,
parse_manifest,
sha256_hex_file,
)
class DeployManifestTests(unittest.TestCase):
def test_normalize_remote_path(self):
self.assertEqual(normalize_remote_path("/patterns/chase.py"), "patterns/chase.py")
self.assertEqual(normalize_remote_path("main.py"), "main.py")
def test_round_trip_manifest(self):
files = {"main.py": "abc", "patterns/x.py": "def"}
blob = build_manifest_bytes(files)
parsed = parse_manifest(blob)
self.assertEqual(parsed, files)
doc = json.loads(blob.decode())
self.assertEqual(doc["version"], 1)
self.assertEqual(doc["algorithm"], "sha256")
def test_parse_manifest_invalid(self):
self.assertEqual(parse_manifest(b"not json"), {})
self.assertEqual(parse_manifest(b'{"files": "nope"}'), {})
def test_sha256_hex_file(self):
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"hello")
path = f.name
try:
self.assertEqual(
sha256_hex_file(path),
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824",
)
finally:
os.unlink(path)
def test_manifest_filename(self):
self.assertEqual(MANIFEST_FILENAME, "file_hashes.json")
if __name__ == "__main__":
unittest.main()

114
web.py
View File

@@ -19,6 +19,7 @@ from flask import (
redirect, redirect,
url_for, url_for,
flash, flash,
send_from_directory,
) )
@@ -696,10 +697,119 @@ def handle_action():
) )
def _static_dir() -> str:
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
@app.route("/ports")
def api_list_ports():
from host_ports import filter_port_dicts
from serial.tools import list_ports
ports = filter_port_dicts(
[
{"device": i.device, "description": i.description, "hwid": i.hwid}
for i in list_ports.comports()
]
)
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
return json.dumps({"ports": ports, "led_cli_exists": os.path.exists(cli)})
@app.route("/settings", methods=["GET"])
def api_read_settings():
import subprocess
import sys
port = (request.args.get("port") or "").strip()
if not port:
return json.dumps({"error": "port is required"}), 400
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
result = subprocess.run(
[sys.executable, cli, "--port", port, "--show"],
capture_output=True,
text=True,
timeout=180,
cwd=os.path.dirname(cli),
)
settings = None
try:
settings = json.loads((result.stdout or "").strip())
except json.JSONDecodeError:
pass
return json.dumps(
{
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"settings": settings,
}
)
@app.route("/settings", methods=["POST"])
def api_write_settings():
import subprocess
import sys
data = request.get_json(silent=True) or {}
port = str(data.get("port") or "").strip()
if not port:
return json.dumps({"error": "port is required"}), 400
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
cmd = [sys.executable, cli, "--port", port]
flag_map = (
("name", "--name"),
("led_pin", "--pin"),
("num_leds", "--leds"),
("brightness", "--brightness"),
("transport", "--transport"),
("ssid", "--ssid"),
("password", "--wifi-password"),
("wifi_channel", "--wifi-channel"),
("default", "--default"),
)
for key, flag in flag_map:
val = data.get(key)
if val is None:
continue
s = str(val).strip()
if s:
cmd.extend([flag, s])
cmd.append("--follow")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180,
cwd=os.path.dirname(cli),
)
return json.dumps(
{
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
}
)
@app.route("/editor")
def settings_editor():
"""Static settings editor (Web Serial + host serial). Prefer this over the legacy form."""
return send_from_directory(_static_dir(), "settings_editor.html")
@app.route("/static/<path:filename>")
def settings_static(filename):
return send_from_directory(_static_dir(), filename)
def main() -> None: def main() -> None:
# Bind to all interfaces so you can reach it from your LAN: # Bind to all interfaces so you can reach it from your LAN:
# python web_app.py # python web.py
# Then open: http://<pi-ip>:5000/ # Then open: http://<pi-ip>:5000/editor
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)