19 Commits

Author SHA1 Message Date
f5de99386a feat(cli): add --serial-usb for bridge native USB CDC
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-06 21:09:55 +12:00
2961ad2a29 test(editor): web serial readuntil buffer regression
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
35c0df8f88 docs: update readme for serial baud and deploy paths
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
5c97fa0d0b feat(editor): add bridge ap ip and password fields
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:20 +12:00
179ac9c540 feat(cli): add --serial-baudrate for bridge uart uplink
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-25 22:00:14 +12:00
bd4d2060ae fix(led-tool): harden Web Serial raw REPL connect
Improve boot wait, readUntil buffer handling, and settings editor
host/Web Serial flows after device reset.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 00:23:08 +12:00
f74e21f206 feat(led-tool): browser settings editor with Web Serial
Add static editor, host_ports filtering, Flask /editor and REST APIs
for ports and led-cli read/write; document standalone and embedded use.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-18 14:54:15 +12:00
1edcb8b1f7 feat(cli): skip unchanged files using device file_hashes.json
Read and update file_hashes.json on deploy; add --force-upload and tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 19:14:51 +12:00
ccc215acbd fix(cli): lazy settings fetch and full device erase
Download settings only for --show or when applying edits; skip upload
when unchanged; erase entire device root including settings.json.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 21:14:54 +12:00
580fd11aca fix(cli): skip redundant settings uploads when unchanged
Only upload settings when edited values differ from current device settings to avoid unnecessary resets.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-04 22:48:54 +12:00
pi
d6331a105c feat(cli): add --reset-device-name and WDT feed during uploads
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 21:27:29 +12:00
2f3db9272b feat(cli): extend upload flags for src, patterns, lib, and --all
Support --patterns/--paterns, --all, --erase, and src upload excluding patterns/.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-03 14:54:13 +12:00
713cd6e9a1 feat(cli): add ls flag and default lib upload
Made-with: Cursor
2026-04-15 00:46:19 +12:00
9e72c62481 fix(led-tool): build cli without spec file dependency 2026-04-14 23:13:21 +12:00
pi
eee9327e15 docs: fix readme layout and cli flags
Made-with: Cursor
2026-04-12 00:13:49 +12:00
pi
5f7acf38f0 fix(cli): remove --server-ip flag and settings edit path
Made-with: Cursor
2026-04-11 15:20:22 +12:00
pi
e86312437c feat(cli): create presets on device; flags for led-driver transport
Made-with: Cursor
2026-04-05 21:12:58 +12:00
3844aa9d6a Add numeric device ID option
Allow setting ESPNow device id from CLI and persist to settings.

Made-with: Cursor
2026-03-14 02:41:07 +13:00
d6ed6ad9f5 Enhance CLI actions and default handling
Add timed follow, src/lib upload helpers, and store default startup pattern under default key.

Made-with: Cursor
2026-03-11 22:51:36 +13:00
13 changed files with 2274 additions and 80 deletions

View File

@@ -16,5 +16,5 @@ python_version = "3"
[scripts] [scripts]
web = "python web.py" web = "python web.py"
cli = "python cli.py" cli = "python cli.py"
build = "pyinstaller --clean led-cli.spec" build = "pyinstaller --clean --onefile --name led-cli --paths lib cli.py"
install = "pipenv install" install = "pipenv install"

View File

@@ -1,8 +1,61 @@
# led-tool # led-tool
- `-s, --show`: Display current settings from device
- `--no-download`: Don't download settings first (use empty settings)
**Default behavior:** Downloads settings and prints them. If any edit flags are provided, settings are modified and uploaded automatically (unless `--no-upload` is specified).
## Device Connection CLI helpers for MicroPython LED devices: **`settings.json`** download/upload via **mpremote**, resets, follow/tail, recursive uploads, optional firmware flash, and **led-driver**-oriented flags (presets, transport, Wi-Fi fields).
The tools use an integrated mpremote transport to communicate with MicroPython devices. Make sure your device is connected and accessible via the specified serial port.## LicenseSee LICENSE file for details. Connection is always via **`-p` / `--port`** (default `/dev/ttyACM0`). There is **no** separate “server IP” flag; the Pi or PC address is configured on the device in **`settings.json`** when using Wi-Fi mode.
## Common flags
| Flag | Purpose |
|------|--------|
| `-p`, `--port` | Serial device (default `/dev/ttyACM0`) |
| `-s`, `--show` | Print current settings from the device |
| `-n`, `--name` | Device **name** |
| `--reset-device-name` | Set **name** to firmware default (`led-` + STA MAC hex, same as `Settings.set_defaults` on led-driver) |
| `--id` | Numeric device id (ESP-NOW, 0255) |
| `--pin` | LED GPIO (`led_pin`) |
| `-b`, `--brightness` | Brightness 0255 |
| `-l`, `--leds` | LED count (`num_leds`) |
| `-d`, `--debug` | Debug 0 or 1 |
| `-o`, `--order` | LED colour order (`rgb`, `grb`, …) |
| `--preset` / `--pattern` | Create or replace a named preset in **led-driver** `presets.json` |
| `--default` | Startup preset name |
| `--transport` | `espnow` or `wifi` (`transport_type` on led-driver) |
| `--serial-baudrate` | Bridge UART1 baud for Pi serial link (e.g. `921600`; also sets GPIO UART pins) |
| `--ssid`, `--wifi-password`, `--wifi-channel` | Wi-Fi / channel fields for the driver |
| `-r`, `--reset` | Reset the device |
| `-f`, `--follow` | Follow serial output (optional timeout seconds) |
| `--pause` | Sleep N seconds (for chained actions) |
| `-u`, `--upload` | Recursive upload: `-u SRC [DEST]` (skips unchanged files via `file_hashes.json` on device) |
| `--src`, `--lib`, `--all` | Deploy `src/` to device root and `lib/` to `/lib` (led-driver, espnow-sender, …) |
| `--force-upload` | Upload every file; ignore `file_hashes.json` |
| `-e`, `--erase` | Erase everything at device root (including `settings.json` and `presets.json`) |
| `--rm` | Remove a path on the device |
| `--flash` | Flash a firmware binary (uses **esptool** on the host) |
**Settings I/O:** With no arguments, **`led-cli`** prints help (no device access). **`--show`** downloads `settings.json` and prints it (read-only). Setting fields (`--name`, `-b`, …) download once, merge, print, and **upload only when a value actually changed**. **`--preset`** alone only touches **`presets.json`** (no `settings.json` download). Ordered actions (`--reset`, `--upload`, `-e`, …) run without touching settings unless you also pass **`--show`** or setting / preset edits as above.
Run **`python cli.py -h`** for the full epilog and argument list.
## Web UI (`static/` + `web.py`)
Edit **`settings.json`** in the browser:
- **Web Serial** — USB on the machine running the browser (Chrome/Edge). Use **Connect USB**, then **Download** / **Upload**.
- **Host serial** — USB on the Pi/PC running **`led-cli`** (port list + **`led-cli`** merge/upload).
**Standalone (Flask):**
```bash
cd led-tool
python web.py
# open http://<host>:5000/editor
```
**Embedded in led-controller:** open **Settings → LED Tool** in the main UI (Edit mode), or visit **`/led-tool/editor`**.
Legacy Flask form UI remains at **`/`** on port 5000; prefer **`/editor`** for Web Serial support.
## License
See **LICENSE** in this directory.

537
cli.py
View File

@@ -2,8 +2,10 @@
""" """
LED Bar Configuration CLI Tool LED Bar Configuration CLI Tool
Command-line interface for downloading, editing, and uploading settings.json Command-line interface for editing settings.json on MicroPython devices via mpremote.
to/from MicroPython devices via mpremote. Settings are downloaded from the device only when displaying (--show) or when applying
setting changes; uploads occur only when something changed. Use --erase to clear the
device filesystem (including settings).
""" """
import json import json
@@ -11,11 +13,12 @@ import argparse
import subprocess import subprocess
import sys import sys
import time import time
import shutil
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
import tempfile import tempfile
import os import os
from device import copy_file, DeviceConnection from device import copy_file, DeviceConnection, firmware_default_device_name
def resolve_flash_binary(path: str) -> Optional[str]: def resolve_flash_binary(path: str) -> Optional[str]:
@@ -84,6 +87,59 @@ def upload_settings(device: str, settings: dict) -> None:
pass pass
PRESETS_REMOTE = "presets.json"
def download_presets(device: str) -> dict:
"""Download presets.json from the device; missing file -> {}."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
temp_path = temp_file.name
temp_file.close()
try:
copy_file(from_device=True, device=device, remote_path=PRESETS_REMOTE, local_path=temp_path)
with open(temp_path, "r", encoding="utf-8") as f:
return json.load(f)
except (OSError, RuntimeError) as e:
msg = str(e).lower()
if "errno 2" in msg or "enoent" in msg or "not found" in msg:
return {}
raise
finally:
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except OSError:
pass
def upload_presets(device: str, presets: dict, *, reset: bool = True) -> None:
"""Upload presets.json; optional reset (skip when caller will reset via settings upload)."""
temp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
temp_path = temp_file.name
try:
json.dump(presets, temp_file, indent=2)
temp_file.close()
copy_file(from_device=False, device=device, remote_path=PRESETS_REMOTE, local_path=temp_path)
if reset:
try:
conn = DeviceConnection(device)
conn.connect()
conn.reset()
conn.disconnect()
except Exception:
pass
finally:
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except OSError:
pass
def print_settings(settings: Dict[str, Any]) -> None: def print_settings(settings: Dict[str, Any]) -> None:
"""Pretty print settings dictionary.""" """Pretty print settings dictionary."""
print(json.dumps(settings, indent=2)) print(json.dumps(settings, indent=2))
@@ -93,14 +149,16 @@ def print_settings(settings: Dict[str, Any]) -> None:
_FLAGS_WITH_VALUE = frozenset({ _FLAGS_WITH_VALUE = frozenset({
'-p', '--port', '-n', '--name', '--pin', '-b', '--brightness', '-p', '--port', '-n', '--name', '--pin', '-b', '--brightness',
'-l', '--leds', '-d', '-debug', '--debug', '-o', '--order', '-l', '--leds', '-d', '-debug', '--debug', '-o', '--order',
'--preset', '--default', '--preset', '--pattern', '--default', '--transport', '--ssid',
'--wifi-password', '--wifi-channel', '--serial-baudrate', '--serial-usb',
'--src', '--lib', '--patterns', '--paterns',
}) })
def _get_ordered_actions(argv: List[str]) -> List[tuple]: def _get_ordered_actions(argv: List[str]) -> List[tuple]:
""" """
Scan argv and return list of (action_name, value) in order of appearance. Scan argv and return list of (action_name, value) in order of appearance.
Actions: flash, pause, reset, upload, erase_all, rm, follow. Actions: flash, pause, reset, upload, ls, erase_all, rm, follow.
""" """
actions = [] actions = []
i = 1 i = 1
@@ -139,10 +197,59 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
if vals: if vals:
actions.append(('upload', vals)) actions.append(('upload', vals))
continue continue
if arg == '--src':
# Upload local DIR (default: ./src) to device root (:/
local_dir = "src"
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
# Upload source tree excluding patterns/ (handled by --patterns).
actions.append(('upload_src_no_patterns', local_dir))
continue
if arg in ('--patterns', '--paterns'):
# Upload local patterns DIR (default: ./src/patterns) to /patterns.
local_dir = os.path.join("src", "patterns")
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "patterns"]))
continue
if arg == '--all':
actions.append(('upload_src_no_patterns', "src"))
actions.append(('upload', [os.path.join("src", "patterns"), "patterns"]))
actions.append(('upload', ["lib", "lib"]))
i += 1
continue
if arg == '--force-upload':
# Handled via argparse; skip during action scan
i += 1
continue
if arg == '--lib':
# Upload local DIR (default: ./lib) to /lib on device
local_dir = "lib"
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
local_dir = argv[i + 1]
i += 2
else:
i += 1
actions.append(('upload', [local_dir, "lib"]))
continue
if arg == '--ls':
actions.append(('ls', None))
i += 1
continue
if arg == '-e': if arg == '-e':
actions.append(('erase_all', None)) actions.append(('erase_all', None))
i += 1 i += 1
continue continue
if arg == '--erase':
actions.append(('erase_all', None))
i += 1
continue
if arg == '--rm': if arg == '--rm':
if i + 1 < len(argv): if i + 1 < len(argv):
actions.append(('rm', argv[i + 1])) actions.append(('rm', argv[i + 1]))
@@ -151,8 +258,19 @@ def _get_ordered_actions(argv: List[str]) -> List[tuple]:
raise ValueError("--rm requires an argument") raise ValueError("--rm requires an argument")
continue continue
if arg in ('-f', '--follow'): if arg in ('-f', '--follow'):
actions.append(('follow', None)) # Optional duration in seconds: --follow [SECONDS]
i += 1 follow_secs = None
if i + 1 < len(argv) and not argv[i + 1].startswith('-'):
try:
follow_secs = float(argv[i + 1])
i += 2
except ValueError:
# Not a number, treat as flag-only
i += 1
actions.append(('follow', follow_secs))
else:
actions.append(('follow', None))
i += 1
continue continue
# Skip non-action flags and their values # Skip non-action flags and their values
if arg in _FLAGS_WITH_VALUE and i + 1 < len(argv): if arg in _FLAGS_WITH_VALUE and i + 1 < len(argv):
@@ -168,10 +286,13 @@ def main() -> None:
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
Examples: Examples:
# Download and print current settings # Show help (no device I/O)
%(prog)s %(prog)s
# Download, edit device name and brightness, then upload # Show current settings.json from device (read-only)
%(prog)s --show
# Edit device name and brightness, then upload only if values changed
%(prog)s -n "LED-Strip-1" -b 128 %(prog)s -n "LED-Strip-1" -b 128
# Set multiple parameters # Set multiple parameters
@@ -198,6 +319,15 @@ Examples:
# Set name, num_leds, default pattern, and upload # Set name, num_leds, default pattern, and upload
%(prog)s --name "MyStrip" -l 60 --default rainbow %(prog)s --name "MyStrip" -l 60 --default rainbow
# Reset logical device name to firmware default (STA MAC based)
%(prog)s --reset-device-name
# ESP-NOW bridge: Pi on GPIO UART1 (USB-serial adapter)
%(prog)s -p /dev/ttyUSB0 --serial-baudrate 921600
# ESP-NOW bridge: Pi on ESP32-S3 native USB CDC
%(prog)s -p /dev/ttyACM0 --serial-usb
""" """
) )
@@ -212,6 +342,20 @@ Examples:
help="Device name" help="Device name"
) )
parser.add_argument(
"--reset-device-name",
action="store_true",
help="Set name to firmware default (led-<STA MAC hex>, same as fresh settings.json on led-driver)",
)
parser.add_argument(
"--id",
dest="device_id",
type=int,
metavar="0-255",
help="Numeric device ID used for ESPNow (0-255)"
)
parser.add_argument( parser.add_argument(
"--pin", "--pin",
type=int, type=int,
@@ -252,13 +396,61 @@ Examples:
parser.add_argument( parser.add_argument(
"--preset", "--preset",
help="Pattern preset name" metavar="NAME",
help="Create or replace preset NAME in presets.json (led-driver; use --pattern)",
)
parser.add_argument(
"--pattern",
choices=[
"off", "on", "blink", "rainbow", "pulse", "transition", "chase", "circle",
],
help="Pattern type for --preset (default: on)",
) )
parser.add_argument( parser.add_argument(
"--default", "--default",
metavar="PATTERN", metavar="NAME",
help="Default/startup pattern (startup_preset in settings.json)" help="Default/startup preset name in settings.json (which preset runs at boot)",
)
parser.add_argument(
"--transport",
choices=["espnow", "wifi"],
help="led-driver transport_type",
)
parser.add_argument(
"--serial-baudrate",
type=int,
metavar="BAUD",
help="bridge: UART1 baud for Pi serial link (default 921600)",
)
parser.add_argument(
"--serial-usb",
action="store_true",
help="bridge: use native USB CDC for Pi serial link (ESP32-S3)",
)
parser.add_argument(
"--ssid",
help="led-driver ssid (Wi-Fi network in wifi mode)",
)
parser.add_argument(
"--wifi-password",
dest="wifi_password",
metavar="PASS",
help="led-driver password (Wi-Fi PSK in wifi mode)",
)
parser.add_argument(
"--wifi-channel",
dest="wifi_channel",
type=int,
metavar="1-11",
help="led-driver wifi_channel (ESP-NOW mode)",
) )
parser.add_argument( parser.add_argument(
@@ -276,8 +468,11 @@ Examples:
parser.add_argument( parser.add_argument(
"-f", "--follow", "-f", "--follow",
action="store_true", nargs="?",
help="Follow device output continuously (like tail -f)" const=None,
type=float,
metavar="SECONDS",
help="Follow device output continuously (like tail -f). Optionally specify SECONDS to limit follow duration."
) )
parser.add_argument( parser.add_argument(
@@ -294,10 +489,53 @@ Examples:
) )
parser.add_argument( parser.add_argument(
"-e", "--src",
nargs="?",
const="src",
metavar="DIR",
help="Upload DIR recursively to device root (:/, no leading directory), excluding patterns/. If DIR is omitted, uses local ./src."
)
parser.add_argument(
"--lib",
nargs="?",
const="lib",
metavar="DIR",
help="Upload DIR recursively to /lib on device. If DIR is omitted, uses local ./lib."
)
parser.add_argument(
"--all",
action="store_true",
help="Upload ./src (excluding patterns), ./src/patterns, and ./lib."
)
parser.add_argument(
"--force-upload",
action="store_true",
help="Upload every file; ignore file_hashes.json on the device",
)
parser.add_argument(
"--patterns", "--paterns",
dest="patterns_dir",
nargs="?",
const=os.path.join("src", "patterns"),
metavar="DIR",
help="Upload DIR recursively to /patterns on device. If DIR is omitted, uses local ./src/patterns."
)
parser.add_argument(
"--ls",
action="store_true",
help="List files on the device root (:/)"
)
parser.add_argument(
"-e", "--erase",
dest="erase_all", dest="erase_all",
action="store_true", action="store_true",
help="Erase all code on the device (delete all files except settings.json)" help="Erase the device filesystem: delete all files and directories at device root (including settings.json and presets.json)",
) )
parser.add_argument( parser.add_argument(
@@ -313,6 +551,9 @@ Examples:
) )
try: try:
if len(sys.argv) <= 1:
parser.print_help()
return
args = parser.parse_args() args = parser.parse_args()
ordered_actions = _get_ordered_actions(sys.argv) ordered_actions = _get_ordered_actions(sys.argv)
except ValueError as e: except ValueError as e:
@@ -376,22 +617,73 @@ Examples:
else: else:
print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr) print(f"Uploading {upload_dir} to device on {port}...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
files_copied, dirs_created = conn.upload_directory(upload_dir, remote_dir) files_copied, dirs_created, files_skipped = conn.upload_directory(
print(f"Upload complete: {files_copied} files, {dirs_created} directories created.", file=sys.stderr) upload_dir, remote_dir, force=args.force_upload
)
print(
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
f"{dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e: except Exception as e:
print(f"Error uploading directory: {e}", file=sys.stderr) print(f"Error uploading directory: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
elif action_name == 'upload_src_no_patterns':
src_dir = value
if not os.path.exists(src_dir):
print(f"Error: Directory does not exist: {src_dir}", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(src_dir):
print(f"Error: Not a directory: {src_dir}", file=sys.stderr)
sys.exit(1)
try:
with tempfile.TemporaryDirectory() as temp_src:
for entry in sorted(os.listdir(src_dir)):
if entry == "patterns":
continue
src_entry = os.path.join(src_dir, entry)
dst_entry = os.path.join(temp_src, entry)
if os.path.isdir(src_entry):
shutil.copytree(src_entry, dst_entry)
else:
shutil.copy2(src_entry, dst_entry)
print(
f"Uploading {src_dir} (excluding patterns/) to device root on {port}...",
file=sys.stderr,
)
conn = DeviceConnection(port)
files_copied, dirs_created, files_skipped = conn.upload_directory(
temp_src, "", force=args.force_upload
)
print(
f"Upload complete: {files_copied} uploaded, {files_skipped} skipped, "
f"{dirs_created} directories created.",
file=sys.stderr,
)
except Exception as e:
print(f"Error uploading src (excluding patterns): {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'ls':
try:
print(f"Listing files on device {port}...", file=sys.stderr)
conn = DeviceConnection(port)
items = conn.list_files('')
for name, is_dir, size in items:
marker = "d" if is_dir else "-"
print(f"{marker} {size:>8} {name}")
except Exception as e:
print(f"Error listing files: {e}", file=sys.stderr)
sys.exit(1)
elif action_name == 'erase_all': elif action_name == 'erase_all':
try: try:
print(f"Erasing all code on device {port}...", file=sys.stderr) print(f"Erasing device filesystem on {port}...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
items = conn.list_files('') items = conn.list_files('')
for name, is_dir, size in items: for name, is_dir, size in items:
if name == "settings.json":
continue
conn.delete(name) conn.delete(name)
print("Erase complete.", file=sys.stderr) print("Erase complete (device root is empty).", file=sys.stderr)
except Exception as e: except Exception as e:
print(f"Error erasing device: {e}", file=sys.stderr) print(f"Error erasing device: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
@@ -410,9 +702,12 @@ Examples:
if ran_reset: if ran_reset:
time.sleep(0.5) time.sleep(0.5)
try: try:
print(f"Following output from {port}... (Press Ctrl+C to stop)", file=sys.stderr) if value is None:
print(f"Following output from {port}... (Press Ctrl+C to stop)", file=sys.stderr)
else:
print(f"Following output from {port} for {value} seconds...", file=sys.stderr)
conn = DeviceConnection(port) conn = DeviceConnection(port)
conn.follow_output() conn.follow_output(value)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nStopped following.", file=sys.stderr) print("\nStopped following.", file=sys.stderr)
sys.exit(0) sys.exit(0)
@@ -421,15 +716,35 @@ Examples:
sys.exit(1) sys.exit(1)
return # follow blocks; when interrupted we're done return # follow blocks; when interrupted we're done
# If we ran any actions and follow wasn't last, we're done default_name_from_device: Optional[str] = None
if ordered_actions: if args.reset_device_name:
return if args.name is not None:
print(
"Error: use either --name or --reset-device-name, not both.",
file=sys.stderr,
)
sys.exit(1)
try:
print(
f"Reading firmware default device name (STA MAC) on {port}...",
file=sys.stderr,
)
default_name_from_device = firmware_default_device_name(port)
print(
f"Default name will be {default_name_from_device!r}.",
file=sys.stderr,
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# Collect all edit parameters # Collect all edit parameters
edits: Dict[str, Any] = {} edits: Dict[str, Any] = {}
if args.name is not None: if args.name is not None:
edits["name"] = args.name edits["name"] = args.name
elif default_name_from_device is not None:
edits["name"] = default_name_from_device
if args.pin is not None: if args.pin is not None:
edits["led_pin"] = args.pin edits["led_pin"] = args.pin
@@ -446,50 +761,154 @@ Examples:
if args.color_order is not None: if args.color_order is not None:
edits["color_order"] = args.color_order edits["color_order"] = args.color_order
if args.preset is not None:
edits["pattern"] = args.preset
if args.default is not None: if args.default is not None:
edits["startup_preset"] = args.default edits["default"] = args.default
# 1. Download: get current settings from device if args.transport is not None:
try: edits["transport_type"] = args.transport
print(f"Downloading settings from {args.port}...", file=sys.stderr)
settings = download_settings(args.port)
print("Settings downloaded successfully.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr)
else:
print(f"Error downloading settings: {e}", file=sys.stderr)
sys.exit(1)
# If --show only, print and exit if args.serial_baudrate is not None:
if args.show: edits["uplink_transport"] = "serial"
print_settings(settings) edits["serial_usb"] = False
if not edits: edits["serial_uart_id"] = 1
return edits["serial_tx_pin"] = 2
edits["serial_rx_pin"] = 3
baud = int(args.serial_baudrate)
if baud < 9600 or baud > 3000000:
print("Error: --serial-baudrate must be 96003000000", file=sys.stderr)
sys.exit(1)
edits["serial_baudrate"] = baud
# 2. Edit: apply edits to downloaded settings if args.serial_usb:
if edits: edits["uplink_transport"] = "serial"
print(f"Applying {len(edits)} edit(s)...", file=sys.stderr) edits["serial_usb"] = True
settings.update(edits)
print_settings(settings) if args.ssid is not None:
edits["ssid"] = args.ssid
# 3. Upload: write settings back to device when edits were made if args.wifi_password is not None:
if edits: edits["password"] = args.wifi_password
if args.wifi_channel is not None:
edits["wifi_channel"] = max(1, min(11, args.wifi_channel))
if args.device_id is not None:
# Clamp into single-byte range; store as int in settings.json
edits["id"] = max(0, min(255, args.device_id))
settings_work = args.show or bool(edits)
if settings_work:
try: try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr) print(f"Downloading settings from {args.port}...", file=sys.stderr)
upload_settings(args.port, settings) settings = download_settings(args.port)
print("Settings uploaded successfully. Device will reset.", file=sys.stderr) print("Settings downloaded successfully.", file=sys.stderr)
except Exception as e: except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower(): if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(f"Error: Connection timeout. Check device connection on {args.port}", file=sys.stderr) print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else: else:
print(f"Error uploading settings: {e}", file=sys.stderr) print(f"Error downloading settings: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)
if args.show:
print_settings(settings)
if not edits and args.preset is None:
return
changed_edits: Dict[str, Any] = {}
for key, value in edits.items():
if settings.get(key) != value:
changed_edits[key] = value
if edits and not changed_edits:
print("No settings changes detected; skipping settings upload.", file=sys.stderr)
if changed_edits:
print(f"Applying {len(changed_edits)} setting change(s)...", file=sys.stderr)
settings.update(changed_edits)
print_settings(settings)
if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on"
try:
print(f"Downloading presets from {args.port}...", file=sys.stderr)
presets_data = download_presets(args.port)
entry = presets_data.get(args.preset)
if not isinstance(entry, dict):
entry = {}
entry["p"] = pattern
presets_data[args.preset] = entry
print(
f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...",
file=sys.stderr,
)
upload_presets(args.port, presets_data, reset=not bool(edits))
print("Presets uploaded successfully.", file=sys.stderr)
if not edits:
print("Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else:
print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1)
if changed_edits:
try:
print(f"\nUploading settings to {args.port}...", file=sys.stderr)
upload_settings(args.port, settings)
print("Settings uploaded successfully. Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else:
print(f"Error uploading settings: {e}", file=sys.stderr)
sys.exit(1)
return
if args.preset is not None:
pattern = args.pattern if args.pattern is not None else "on"
try:
print(f"Downloading presets from {args.port}...", file=sys.stderr)
presets_data = download_presets(args.port)
entry = presets_data.get(args.preset)
if not isinstance(entry, dict):
entry = {}
entry["p"] = pattern
presets_data[args.preset] = entry
print(
f"Writing preset {args.preset!r} (pattern={pattern}) to {PRESETS_REMOTE}...",
file=sys.stderr,
)
upload_presets(args.port, presets_data, reset=True)
print("Presets uploaded successfully.", file=sys.stderr)
print("Device will reset.", file=sys.stderr)
except Exception as e:
if "timeout" in str(e).lower() or "connection" in str(e).lower():
print(
f"Error: Connection timeout. Check device connection on {args.port}",
file=sys.stderr,
)
else:
print(f"Error uploading presets: {e}", file=sys.stderr)
sys.exit(1)
return
if ordered_actions:
return
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

45
deploy_manifest.py Normal file
View File

@@ -0,0 +1,45 @@
"""
Host-side helpers for file_hashes.json (same format as led-driver/src/file_hashes.py).
"""
import hashlib
import json
import os
MANIFEST_VERSION = 1
MANIFEST_FILENAME = "file_hashes.json"
HASH_ALGO = "sha256"
def normalize_remote_path(remote_file: str) -> str:
"""Device-relative path with forward slashes (no leading slash)."""
return remote_file.replace("\\", "/").lstrip("/")
def sha256_hex_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def parse_manifest(data: bytes) -> dict:
"""Return path -> hash map from manifest bytes."""
try:
doc = json.loads(data.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError, TypeError, ValueError):
return {}
if not isinstance(doc, dict):
return {}
files = doc.get("files")
return dict(files) if isinstance(files, dict) else {}
def build_manifest_bytes(files: dict) -> bytes:
doc = {
"version": MANIFEST_VERSION,
"algorithm": HASH_ALGO,
"files": files,
}
return json.dumps(doc, separators=(",", ":")).encode("utf-8")

153
device.py
View File

@@ -10,6 +10,14 @@ import os
import time import time
import serial import serial
from deploy_manifest import (
MANIFEST_FILENAME,
build_manifest_bytes,
normalize_remote_path,
parse_manifest,
sha256_hex_file,
)
# Add lib directory to path - handle both normal execution and PyInstaller bundle # Add lib directory to path - handle both normal execution and PyInstaller bundle
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
# Running as a PyInstaller bundle # Running as a PyInstaller bundle
@@ -23,12 +31,15 @@ if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path) sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError from mpremote.transport import TransportError, TransportExecError
class DeviceConnection: class DeviceConnection:
"""Wrapper for device communication.""" """Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device): def __init__(self, device):
"""Connect to a device.""" """Connect to a device."""
self.device = device self.device = device
@@ -53,7 +64,39 @@ class DeviceConnection:
except Exception: except Exception:
pass pass
self.transport = None self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path): def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem.""" """Copy a file from device to local filesystem."""
self.connect() self.connect()
@@ -70,17 +113,45 @@ class DeviceConnection:
try: try:
with open(local_path, 'rb') as f: with open(local_path, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_path, data) self._fs_writefile_with_wdt(remote_path, data)
finally: finally:
self.disconnect() self.disconnect()
def _manifest_remote_path(self) -> str:
return "/" + MANIFEST_FILENAME
def read_hash_manifest(self) -> dict:
"""Load path -> sha256 hex map from file_hashes.json on the device."""
remote = self._manifest_remote_path()
if not self.transport.fs_exists(remote):
return {}
try:
data = self.transport.fs_readfile(remote)
return parse_manifest(data)
except Exception:
return {}
def write_hash_manifest(self, files: dict) -> None:
"""Write merged file_hashes.json to the device root."""
self._fs_writefile_with_wdt(
self._manifest_remote_path(),
build_manifest_bytes(files),
)
def upload_directory(self, local_dir, remote_dir=None): def upload_directory(self, local_dir, remote_dir=None, *, force: bool = False):
""" """
Upload a directory recursively to the device. Upload a directory recursively to the device.
Skips files whose sha256 matches file_hashes.json on the device unless
force is True. Updates the manifest after upload.
Args: Args:
local_dir: Local directory path to upload local_dir: Local directory path to upload
remote_dir: Remote directory path (default: root, uses basename of local_dir) remote_dir: Remote directory path (default: root, uses basename of local_dir)
force: Upload every file even when the manifest hash matches
Returns:
(files_copied, dirs_created, files_skipped)
""" """
import os import os
@@ -94,15 +165,19 @@ class DeviceConnection:
remote_dir = os.path.basename(os.path.abspath(local_dir)) remote_dir = os.path.basename(os.path.abspath(local_dir))
# Ensure remote directory exists # Ensure remote directory exists
if not self.transport.fs_exists(remote_dir): if remote_dir and not self.transport.fs_exists(remote_dir):
self.transport.fs_mkdir(remote_dir) self.transport.fs_mkdir(remote_dir)
manifest = {} if force else self.read_hash_manifest()
# Walk through local directory and copy files # Walk through local directory and copy files
files_copied = 0 files_copied = 0
files_skipped = 0
dirs_created = 0 dirs_created = 0
for root, dirs, files in os.walk(local_dir): for root, dirs, files in os.walk(local_dir):
# Calculate relative path from local_dir # Never upload Python bytecode trees (MicroPython does not use them).
dirs[:] = [d for d in dirs if d != "__pycache__"]
rel_path = os.path.relpath(root, local_dir) rel_path = os.path.relpath(root, local_dir)
# Build remote path # Build remote path
@@ -121,8 +196,12 @@ class DeviceConnection:
self.transport.fs_mkdir(remote_base) self.transport.fs_mkdir(remote_base)
dirs_created += 1 dirs_created += 1
# Copy files # Copy files (skip bytecode; __pycache__ dirs are pruned above)
for file in files: for file in files:
if file.endswith((".pyc", ".pyo")):
continue
if file == MANIFEST_FILENAME:
continue
local_file = os.path.join(root, file) local_file = os.path.join(root, file)
# Handle root directory case properly # Handle root directory case properly
if remote_base == '/': if remote_base == '/':
@@ -130,13 +209,23 @@ class DeviceConnection:
else: else:
remote_file = '/'.join([remote_base, file]) remote_file = '/'.join([remote_base, file])
manifest_key = normalize_remote_path(remote_file)
local_hash = sha256_hex_file(local_file)
if not force and manifest.get(manifest_key) == local_hash:
print(f"Skipping (unchanged): {remote_file}", file=sys.stderr)
files_skipped += 1
continue
print(f"Uploading: {remote_file}", file=sys.stderr) print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f: with open(local_file, 'rb') as f:
data = f.read() data = f.read()
self.transport.fs_writefile(remote_file, data) self._fs_writefile_with_wdt(remote_file, data)
manifest[manifest_key] = local_hash
files_copied += 1 files_copied += 1
return files_copied, dirs_created self.write_hash_manifest(manifest)
return files_copied, dirs_created, files_skipped
finally: finally:
self.disconnect() self.disconnect()
@@ -208,12 +297,21 @@ class DeviceConnection:
except Exception as e: except Exception as e:
raise TransportError(f"Failed to reset device: {e}") from e raise TransportError(f"Failed to reset device: {e}") from e
def follow_output(self): def follow_output(self, duration=None):
"""Follow device output continuously (like tail -f).""" """
Follow device output (like tail -f).
Args:
duration: Optional number of seconds to follow output for. If None,
follow indefinitely until interrupted.
"""
# Use direct serial connection like dev.py does # Use direct serial connection like dev.py does
start_time = time.time()
try: try:
with serial.Serial(self.device, baudrate=115200) as ser: with serial.Serial(self.device, baudrate=115200) as ser:
while True: while True:
if duration is not None and (time.time() - start_time) >= duration:
break
if ser.in_waiting > 0: if ser.in_waiting > 0:
data = ser.readline().decode('utf-8', errors='replace').strip() data = ser.readline().decode('utf-8', errors='replace').strip()
if data: if data:
@@ -226,6 +324,37 @@ class DeviceConnection:
raise TransportError(f"Failed to follow output: {e}") from e raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path): def copy_file(from_device, device, remote_path, local_path):
""" """
Copy a file to/from device. Copy a file to/from device.

25
host_ports.py Normal file
View File

@@ -0,0 +1,25 @@
"""Host serial port list filtering for led-tool /ports API."""
import re
# Exclude /dev/ttyS2 and higher (keep ttyS0, ttyS1; keep ttyACM*, ttyUSB*, etc.)
_TTYS_HIGH = re.compile(r"^/dev/ttyS([2-9]|[1-9]\d+)$")
def include_host_serial_device(device: str) -> bool:
return not _TTYS_HIGH.match(device or "")
def _is_na(value: str) -> bool:
return (value or "").strip().lower() in ("n/a", "na")
def include_host_serial_port(port: dict) -> bool:
if not include_host_serial_device(port.get("device", "")):
return False
if _is_na(port.get("description")) or _is_na(port.get("hwid")):
return False
return True
def filter_port_dicts(ports: list) -> list:
return [p for p in ports if include_host_serial_port(p)]

View File

@@ -10,7 +10,7 @@ pipenv install "$@"
if [ ! -f "dist/led-cli" ] || [ "cli.py" -nt "dist/led-cli" ] || [ "device.py" -nt "dist/led-cli" ]; then if [ ! -f "dist/led-cli" ] || [ "cli.py" -nt "dist/led-cli" ] || [ "device.py" -nt "dist/led-cli" ]; then
echo "" echo ""
echo "Building binary..." echo "Building binary..."
pipenv run pyinstaller --clean led-cli.spec pipenv run pyinstaller --clean --onefile --name led-cli --paths lib cli.py
fi fi
# Ensure ~/.local/bin exists # Ensure ~/.local/bin exists

218
static/settings_editor.html Normal file
View File

@@ -0,0 +1,218 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>LED device settings</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<style>
:root {
color-scheme: dark;
--text: #e5e7eb;
--muted: #9ca3af;
--border: #374151;
--accent: #3b82f6;
--danger: #f87171;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: system-ui, sans-serif;
background: #0b1020;
color: var(--text);
padding: 1rem;
}
h1 { font-size: 1.15rem; margin: 0 0 0.35rem; }
.muted { color: var(--muted); font-size: 0.9rem; margin-bottom: 1rem; }
.card {
border: 1px solid var(--border);
border-radius: 8px;
padding: 1rem;
margin-bottom: 1rem;
background: #111827;
}
label { display: block; font-size: 0.85rem; margin-bottom: 0.25rem; color: var(--muted); }
input, select, textarea {
width: 100%;
padding: 0.5rem;
border-radius: 4px;
border: 1px solid var(--border);
background: #1f2937;
color: var(--text);
margin-bottom: 0.65rem;
}
.row { display: flex; gap: 0.5rem; flex-wrap: wrap; align-items: flex-end; }
.row > * { flex: 1; min-width: 10rem; }
.btn {
padding: 0.45rem 0.85rem;
border: none;
border-radius: 4px;
cursor: pointer;
background: #4b5563;
color: #fff;
font-size: 0.9rem;
}
.btn-primary { background: var(--accent); }
.btn:disabled { opacity: 0.5; cursor: not-allowed; }
.grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(11rem, 1fr));
gap: 0 0.5rem;
}
#status { font-size: 0.85rem; margin-top: 0.5rem; }
#status.error { color: var(--danger); }
.flash {
display: none;
padding: 0.5rem 0.75rem;
border-radius: 4px;
margin-bottom: 0.75rem;
background: #14532d;
color: #86efac;
}
.flash.error { background: #5e1b1b; color: #fca5a5; }
.flash.show { display: block; }
[hidden] { display: none !important; }
</style>
</head>
<body data-api-base="/led-tool">
<h1>Device settings (USB)</h1>
<p class="muted">Edit <code>settings.json</code> via <strong>Web Serial</strong> (USB on this computer) or <strong>host serial</strong> (Pi running <code>led-cli</code>). Use <strong>Connect</strong> for your transport, then <strong>Download</strong> or <strong>Upload</strong>.</p>
<div id="flash" class="flash"></div>
<div class="card" id="webserial-wrap" hidden>
<h2 style="font-size:1rem;margin:0 0 0.5rem;">Web Serial</h2>
<div class="row">
<button type="button" class="btn btn-primary" id="webserial-connect">Connect USB</button>
<button type="button" class="btn" id="webserial-disconnect" hidden>Disconnect</button>
</div>
</div>
<div class="card" id="host-serial-wrap">
<h2 style="font-size:1rem;margin:0 0 0.5rem;">Host serial</h2>
<label for="host-port">Port</label>
<select id="host-port"><option value="">Select port</option></select>
<div class="row">
<button type="button" class="btn btn-primary" id="host-connect">Connect</button>
<button type="button" class="btn" id="host-disconnect" hidden>Disconnect</button>
<button type="button" class="btn" id="btn-refresh-ports">Refresh ports</button>
</div>
</div>
<div class="card">
<div class="row" style="margin-bottom:0.75rem;">
<button type="button" class="btn btn-primary" id="btn-download">Download</button>
<button type="button" class="btn btn-primary" id="btn-upload">Upload &amp; reset</button>
<button type="button" class="btn" id="btn-from-json">JSON → form</button>
<button type="button" class="btn" id="btn-to-json">Form → JSON</button>
</div>
<p id="status">Ready</p>
<div class="grid">
<div>
<label for="name">Name</label>
<input id="name" data-setting="name" type="text" />
</div>
<div>
<label for="num_leds">Num LEDs</label>
<input id="num_leds" data-setting="num_leds" type="number" min="1" />
</div>
<div>
<label for="led_pin">LED pin</label>
<input id="led_pin" data-setting="led_pin" type="number" min="0" />
</div>
<div>
<label for="brightness">Brightness</label>
<input id="brightness" data-setting="brightness" type="number" min="0" max="255" />
</div>
<div>
<label for="color_order">Colour order</label>
<select id="color_order" data-setting="color_order">
<option value=""></option>
<option value="rgb">rgb</option>
<option value="rbg">rbg</option>
<option value="grb">grb</option>
<option value="gbr">gbr</option>
<option value="brg">brg</option>
<option value="bgr">bgr</option>
</select>
</div>
<div>
<label for="transport_type">Transport</label>
<select id="transport_type" data-setting="transport_type">
<option value=""></option>
<option value="espnow">espnow</option>
<option value="wifi">wifi</option>
</select>
</div>
<div>
<label for="ssid">WiFi SSID</label>
<input id="ssid" data-setting="ssid" type="text" />
</div>
<div>
<label for="password">WiFi password</label>
<input id="password" data-setting="password" type="password" />
</div>
<div>
<label for="wifi_channel">WiFi channel</label>
<input id="wifi_channel" data-setting="wifi_channel" type="number" min="1" max="11" />
</div>
<div>
<label for="ap_ip">Bridge AP IP</label>
<input id="ap_ip" data-setting="ap_ip" type="text" placeholder="192.168.4.1" />
</div>
<div>
<label for="ap_password">Bridge AP password</label>
<input id="ap_password" data-setting="ap_password" type="password" placeholder="min 8 chars, or empty for open" />
</div>
<div>
<label for="default">Default preset</label>
<input id="default" data-setting="default" type="text" />
</div>
<div>
<label for="id">Device id (ESP-NOW)</label>
<input id="id" data-setting="id" type="number" min="0" max="255" />
</div>
<div>
<label for="debug">Debug</label>
<select id="debug" data-setting="debug">
<option value=""></option>
<option value="False">False</option>
<option value="True">True</option>
</select>
</div>
</div>
</div>
<div class="card">
<label for="raw_json">Raw settings.json</label>
<textarea id="raw_json" rows="14" spellcheck="false">{}</textarea>
</div>
<script>
(function () {
const api =
document.body.getAttribute('data-api-base') ??
(location.pathname.includes('/led-tool') ? '/led-tool' : '');
const prefix = api + '/static';
const v = '20260520';
function loadScript(url) {
return new Promise((resolve, reject) => {
const s = document.createElement('script');
s.src = url + (url.includes('?') ? '&' : '?') + 'v=' + v;
s.onload = () => resolve();
s.onerror = () => reject(new Error('Failed to load ' + url));
document.body.appendChild(s);
});
}
loadScript(prefix + '/web_serial.js')
.then(() => loadScript(prefix + '/settings_editor.js'))
.catch((e) => {
const el = document.getElementById('flash');
if (el) {
el.textContent = e.message;
el.className = 'flash error show';
}
});
})();
</script>
</body>
</html>

421
static/settings_editor.js Normal file
View File

@@ -0,0 +1,421 @@
(function () {
const LOG = '[led-tool/editor]';
const log = (...args) => console.log(LOG, ...args);
const apiBase =
(document.body && document.body.dataset.apiBase) ||
(window.location.pathname.includes('/led-tool') ? '/led-tool' : '');
const statusEl = document.getElementById('status');
const messageEl = document.getElementById('flash');
const portSelect = document.getElementById('host-port');
const rawJson = document.getElementById('raw_json');
const hostWrap = document.getElementById('host-serial-wrap');
const webWrap = document.getElementById('webserial-wrap');
const connectBtn = document.getElementById('webserial-connect');
const disconnectBtn = document.getElementById('webserial-disconnect');
const hostConnectBtn = document.getElementById('host-connect');
const hostDisconnectBtn = document.getElementById('host-disconnect');
const refreshPortsBtn = document.getElementById('btn-refresh-ports');
const DEFAULT_HOST_PORT = '/dev/ttyACM0';
let webClient = null;
let webPort = null;
let hostConnected = false;
let hostConnectedPort = '';
let transferBusy = false;
let webConnectBusy = false;
const webSerialOk = window.LedToolWebSerial && window.LedToolWebSerial.supported();
let webSerialGrantedCount = 0;
function setStatus(text, isError) {
if (!statusEl) return;
statusEl.textContent = text;
statusEl.classList.toggle('error', Boolean(isError));
}
function flash(text, isError) {
if (!messageEl) return;
messageEl.textContent = text;
messageEl.className = isError ? 'flash error show' : 'flash show';
setTimeout(() => messageEl.classList.remove('show'), 5000);
}
function errorMessage(e) {
if (!e) return 'Unknown error';
if (typeof e === 'string') return e;
return e.message || String(e);
}
function onWebSerialTransferError(e) {
if (webClient) webClient.inRawRepl = false;
return errorMessage(e);
}
function formToObject() {
const out = {};
document.querySelectorAll('[data-setting]').forEach((el) => {
const key = el.getAttribute('data-setting');
if (!key) return;
const raw = (el.value || '').trim();
if (raw === '') return;
if (el.type === 'number') {
const n = parseInt(raw, 10);
if (!Number.isNaN(n)) out[key] = n;
} else if (el.tagName === 'SELECT' && el.id === 'debug') {
out[key] = raw === 'True';
} else {
out[key] = raw;
}
});
return out;
}
function objectToForm(settings) {
if (!settings || typeof settings !== 'object') return;
document.querySelectorAll('[data-setting]').forEach((el) => {
const key = el.getAttribute('data-setting');
if (!key || !Object.prototype.hasOwnProperty.call(settings, key)) return;
const v = settings[key];
if (el.id === 'debug') {
el.value = v === true || v === 'True' ? 'True' : 'False';
} else {
el.value = v === undefined || v === null ? '' : String(v);
}
});
if (rawJson) rawJson.value = JSON.stringify(settings, null, 2);
}
function usingWebSerial() {
return webClient && webClient.connected;
}
function usingHostSerial() {
return hostConnected && Boolean(hostConnectedPort);
}
function updateUi() {
const webOpen = usingWebSerial();
const hostOpen = usingHostSerial();
if (webWrap) webWrap.hidden = !webSerialOk || hostOpen;
if (connectBtn) connectBtn.hidden = webOpen || hostOpen || !webSerialOk;
if (disconnectBtn) disconnectBtn.hidden = !webOpen;
if (hostWrap) hostWrap.hidden = webOpen;
const hostFieldsLocked = webOpen || hostOpen;
if (portSelect) portSelect.disabled = hostFieldsLocked;
if (refreshPortsBtn) refreshPortsBtn.disabled = hostFieldsLocked;
if (hostConnectBtn) hostConnectBtn.hidden = hostOpen || webOpen;
if (hostDisconnectBtn) hostDisconnectBtn.hidden = !hostOpen || webOpen;
if (webOpen) {
setStatus('USB open (device not reset until Download/Upload).');
} else if (hostOpen) {
setStatus(`Host serial: ${hostConnectedPort} (use Download or Upload).`);
} else if (webSerialOk) {
if (webSerialGrantedCount === 1) {
setStatus('Connect USB (one remembered port) or host serial, then Download or Upload.');
} else {
setStatus('Connect host serial or Web Serial USB, then Download or Upload.');
}
} else {
setStatus(`Connect host serial (default ${DEFAULT_HOST_PORT}).`);
}
}
async function loadHostPorts() {
if (!portSelect) return;
try {
const res = await fetch(`${apiBase}/ports`);
const data = await res.json();
const prev = portSelect.value;
portSelect.innerHTML = '<option value="">Select port</option>';
for (const p of data.ports || []) {
const opt = document.createElement('option');
opt.value = p.device;
opt.textContent = `${p.device} - ${p.description || 'device'}`;
portSelect.appendChild(opt);
}
const acm0 = DEFAULT_HOST_PORT;
const hasAcm0 = [...portSelect.options].some((o) => o.value === acm0);
if (hasAcm0) {
portSelect.value = acm0;
} else if (prev && [...portSelect.options].some((o) => o.value === prev)) {
portSelect.value = prev;
}
} catch (e) {
flash(`Could not list host ports: ${e.message}`, true);
}
}
function selectedHostPort() {
return portSelect && portSelect.value ? portSelect.value.trim() : '';
}
function hostPort() {
if (hostConnected && hostConnectedPort) return hostConnectedPort;
return selectedHostPort();
}
function connectHostSerial() {
if (usingWebSerial()) {
disconnectWebSerial();
}
const port = selectedHostPort();
if (!port) {
flash('Select a host serial port.', true);
return;
}
hostConnectedPort = port;
hostConnected = true;
log('connectHostSerial', port);
updateUi();
flash(`Host serial ready: ${port}`, false);
}
function disconnectHostSerial() {
hostConnected = false;
hostConnectedPort = '';
updateUi();
}
async function downloadHost() {
const port = hostConnectedPort || hostPort();
if (!port) {
flash('Connect host serial first.', true);
return;
}
log('downloadHost', port);
setStatus('Downloading from device (host serial)...');
const res = await fetch(`${apiBase}/settings?port=${encodeURIComponent(port)}`);
const data = await res.json();
log('downloadHost response', { ok: res.ok, status: res.status, data });
if (!res.ok) {
throw new Error(data.error || 'Download failed');
}
if (!data.settings) {
throw new Error('No settings in response (check CLI output)');
}
objectToForm(data.settings);
flash('Settings downloaded.', false);
setStatus(`Downloaded from ${port}`);
}
async function uploadHost() {
const port = hostConnectedPort || hostPort();
if (!port) {
flash('Connect host serial first.', true);
return;
}
const payload = { port, ...formToObject() };
log('uploadHost', payload);
setStatus('Uploading via host serial (led-cli)...');
const res = await fetch(`${apiBase}/settings`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
const data = await res.json();
if (!res.ok) throw new Error(data.error || 'Upload failed');
if (!data.ok) throw new Error(data.stderr || 'led-cli failed');
flash('Settings uploaded; device resetting.', false);
setStatus('Upload complete.');
}
async function connectWebSerial() {
if (webConnectBusy) return;
if (!window.LedToolWebSerial || !window.LedToolWebSerial.supported()) {
flash('Web Serial not supported (use Chrome or Edge over HTTPS or localhost).', true);
return;
}
if (usingWebSerial()) {
flash('USB already connected.', false);
return;
}
if (usingHostSerial()) {
disconnectHostSerial();
}
webConnectBusy = true;
try {
log('connectWebSerial: requestPort');
setStatus('Opening USB port…');
webPort = await window.LedToolWebSerial.requestPort();
log('connectWebSerial: port selected', webPort);
webClient = new window.LedToolWebSerial.MicroPythonRawRepl();
await webClient.connect(webPort);
updateUi();
flash('USB port open. Wait for boot, then Download.', false);
} finally {
webConnectBusy = false;
}
}
async function disconnectWebSerial() {
if (webClient) {
try {
await webClient.disconnect();
} catch (_) {
/* ignore */
}
}
webClient = null;
webPort = null;
updateUi();
}
async function downloadWebSerial() {
log('downloadWebSerial');
setStatus('Reading settings from device (Web Serial)…');
const settings = await webClient.readSettingsJson();
log('downloadWebSerial result', settings);
objectToForm(settings);
if (rawJson) rawJson.value = JSON.stringify(settings, null, 2);
flash('Settings read over Web Serial.', false);
setStatus('Downloaded via Web Serial.');
}
async function uploadWebSerial() {
log('uploadWebSerial');
let settings = {};
if (rawJson && rawJson.value.trim()) {
try {
settings = JSON.parse(rawJson.value);
} catch (e) {
flash('Invalid JSON in raw panel.', true);
return;
}
} else {
settings = await webClient.readSettingsJson();
Object.assign(settings, formToObject());
}
await webClient.writeSettingsJson(settings);
flash('Uploaded via Web Serial; device resetting.', false);
setStatus('Upload complete.');
await disconnectWebSerial();
}
document.getElementById('btn-download')?.addEventListener('click', async () => {
if (transferBusy) return;
transferBusy = true;
try {
if (usingWebSerial()) {
await downloadWebSerial();
} else if (usingHostSerial()) {
await downloadHost();
} else {
flash(
webSerialOk
? 'Connect host serial or Web Serial USB first.'
: 'Click Connect on host serial first.',
true,
);
}
} catch (e) {
console.error(LOG, 'download failed', e);
const msg = usingWebSerial() ? onWebSerialTransferError(e) : errorMessage(e);
if (rawJson) rawJson.value = msg;
flash(msg, true);
setStatus(msg, true);
} finally {
transferBusy = false;
}
});
document.getElementById('btn-upload')?.addEventListener('click', async () => {
if (transferBusy) return;
transferBusy = true;
try {
if (usingWebSerial()) {
await uploadWebSerial();
} else if (usingHostSerial()) {
await uploadHost();
} else {
flash(
webSerialOk
? 'Connect host serial or Web Serial USB first.'
: 'Click Connect on host serial first.',
true,
);
}
} catch (e) {
console.error(LOG, 'upload failed', e);
const msg = usingWebSerial() ? onWebSerialTransferError(e) : errorMessage(e);
flash(msg, true);
setStatus(msg, true);
} finally {
transferBusy = false;
}
});
document.getElementById('btn-from-json')?.addEventListener('click', () => {
try {
const parsed = JSON.parse(rawJson.value || '{}');
objectToForm(parsed);
flash('Form updated from JSON.', false);
} catch (e) {
flash('Invalid JSON.', true);
}
});
document.getElementById('btn-to-json')?.addEventListener('click', () => {
const merged = formToObject();
if (rawJson && rawJson.value.trim()) {
try {
Object.assign(merged, JSON.parse(rawJson.value));
} catch (_) {
/* use form only */
}
}
rawJson.value = JSON.stringify(merged, null, 2);
flash('JSON updated from form.', false);
});
connectBtn?.addEventListener('click', () => {
connectWebSerial().catch((e) => {
disconnectWebSerial();
const msg =
e.name === 'NotFoundError'
? 'No USB device selected.'
: e.name === 'SecurityError'
? 'Web Serial blocked (open LED Tool in a top-level tab, or use host serial on the Pi).'
: e.message || String(e);
flash(msg, true);
setStatus(msg, true);
});
});
disconnectBtn?.addEventListener('click', () => {
disconnectWebSerial();
flash('Web Serial disconnected.', false);
});
refreshPortsBtn?.addEventListener('click', () => loadHostPorts());
hostConnectBtn?.addEventListener('click', () => connectHostSerial());
hostDisconnectBtn?.addEventListener('click', () => {
disconnectHostSerial();
flash('Host serial disconnected.', false);
});
window.addEventListener('beforeunload', () => {
if (webClient) {
webClient.disconnect();
}
});
async function refreshWebSerialGrantHint() {
if (!webSerialOk || !window.LedToolWebSerial.getGrantedPorts) return;
try {
const ports = await window.LedToolWebSerial.getGrantedPorts();
webSerialGrantedCount = ports.length;
updateUi();
} catch (_) {
/* ignore */
}
}
log('settings editor ready', { apiBase, webSerialOk });
updateUi();
loadHostPorts();
refreshWebSerialGrantHint();
})();

653
static/web_serial.js Normal file
View File

@@ -0,0 +1,653 @@
/**
* MicroPython raw REPL over Web Serial (mpremote-compatible).
*/
(function (global) {
const RAW_REPL_BANNER = 'raw REPL; CTRL-B to exit\r\n';
const SOFT_REBOOT_MARKERS = ['soft reboot\r\n', 'MPY: soft reboot\r\n'];
const LOG = '[led-tool/serial]';
function log(...args) {
console.log(LOG, ...args);
}
function logDebug(...args) {
console.debug(LOG, ...args);
}
function describeBytes(data) {
if (!data || !data.length) return '(empty)';
const u8 = data instanceof Uint8Array ? data : new Uint8Array(data);
if (u8.length <= 64) {
return Array.from(u8)
.map((b) => (b >= 0x20 && b < 0x7f ? String.fromCharCode(b) : `\\x${b.toString(16).padStart(2, '0')}`))
.join('');
}
return `${u8.length} bytes: ${decodeText(u8.slice(0, 80))}`;
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/** Common ESP32 USB-UART bridges (usbVendorId only per Web Serial API). */
const ESP32_USB_FILTERS = [
{ usbVendorId: 0x303a }, /* Espressif */
{ usbVendorId: 0x10c4 }, /* Silicon Labs CP210x */
{ usbVendorId: 0x1a86 }, /* WCH CH340 */
{ usbVendorId: 0x0403 }, /* FTDI */
];
async function deassertDtrRts(port) {
if (!port || !port.setSignals) return;
try {
await port.setSignals({ dataTerminalReady: false, requestToSend: false });
} catch (_) {
/* ignore — some adapters/browsers reject setSignals */
}
}
function bytesIndexOf(buf, suffix) {
if (!suffix.length) return 0;
if (buf.length < suffix.length) return -1;
for (let i = 0; i <= buf.length - suffix.length; i += 1) {
let ok = true;
for (let j = 0; j < suffix.length; j += 1) {
if (buf[i + j] !== suffix[j]) {
ok = false;
break;
}
}
if (ok) return i;
}
return -1;
}
function bufferEndsWithFriendlyRepl(buf) {
if (bufferLooksLikeEspRomBoot(buf) && !bufferHasMicroPython(buf)) return false;
const tail = decodeText(Uint8Array.from(buf.slice(-80)));
return /(?:\r\n|\n)>>> ?$/.test(tail);
}
function bufferLooksLikeEspRomBoot(buf) {
if (bufferIndicatesRuntime(buf)) return false;
const text = decodeText(Uint8Array.from(buf.slice(-512)));
return /SPI_FAST_FLASH_BOOT|rst:0x/.test(text) && !/entry 0x403/.test(text);
}
/** MicroPython is up (banner, REPL, raw REPL, or post-ROM heap lines from boot). */
function bufferIndicatesRuntime(buf) {
const text = decodeText(Uint8Array.from(buf));
if (text.includes('MicroPython')) return true;
if (text.includes('raw REPL; CTRL-B to exit')) return true;
if (/(?:\r\n|\n)>>> ?/.test(text)) return true;
if (text.includes('entry 0x') && (/['"]?free['"]?\s*:/.test(text) || /,\s*'free'/.test(text))) {
return true;
}
return false;
}
function bufferHasMicroPython(buf) {
return bufferIndicatesRuntime(buf);
}
function decodeText(u8) {
return new TextDecoder('utf-8', { fatal: false }).decode(u8);
}
function extractJsonObject(text) {
const t = (text || '').trim();
const start = t.indexOf('{');
const end = t.lastIndexOf('}');
if (start === -1 || end === -1 || end < start) {
throw new Error(`No JSON object in device output: ${t.slice(0, 400)}`);
}
return JSON.parse(t.slice(start, end + 1));
}
class MicroPythonRawRepl {
constructor() {
this.port = null;
this.reader = null;
this.writer = null;
this.portOpen = false;
this.inRawRepl = false;
this._rxBuf = [];
this.useRawPaste = true;
this._replLock = null;
}
get connected() {
return Boolean(this.portOpen && this.writer);
}
_appendRx(chunk) {
if (!chunk || !chunk.length) return;
logDebug('rx chunk', chunk.length, describeBytes(chunk));
for (let i = 0; i < chunk.length; i += 1) this._rxBuf.push(chunk[i]);
}
async _readStreamChunk(timeoutMs) {
if (!this.reader) return null;
const result = await Promise.race([
this.reader.read(),
sleep(timeoutMs).then(() => ({ timedOut: true })),
]);
if (result.timedOut) return null;
if (result.done) return null;
return result.value || null;
}
/** Match suffix anywhere in the buffer; keep bytes after the match for the next read. */
async readUntil(suffixBytes, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const idx = bytesIndexOf(this._rxBuf, suffixBytes);
if (idx >= 0) {
const end = idx + suffixBytes.length;
const matched = Uint8Array.from(this._rxBuf.slice(0, end));
this._rxBuf.splice(0, end);
logDebug('readUntil matched', JSON.stringify(decodeText(suffixBytes)), 'len', matched.length);
return matched;
}
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) {
this._appendRx(chunk);
} else {
await sleep(5);
}
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
const err = `Timed out waiting for ${JSON.stringify(decodeText(suffixBytes))} (tail: ${JSON.stringify(tail)})`;
log('readUntil timeout', err);
throw new Error(err);
}
async readUntilAny(suffixList, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
for (const suffixBytes of suffixList) {
const idx = bytesIndexOf(this._rxBuf, suffixBytes);
if (idx >= 0) {
const end = idx + suffixBytes.length;
const matched = Uint8Array.from(this._rxBuf.slice(0, end));
this._rxBuf.splice(0, end);
logDebug('readUntilAny matched', JSON.stringify(decodeText(suffixBytes)));
return matched;
}
}
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(5);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(
`Timed out waiting for one of ${suffixList.map((s) => JSON.stringify(decodeText(s))).join(', ')} (tail: ${JSON.stringify(tail)})`,
);
}
async readExact(n, timeoutMs = 10000) {
const deadline = Date.now() + timeoutMs;
while (this._rxBuf.length < n && Date.now() < deadline) {
const chunk = await this._readStreamChunk(Math.min(300, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(5);
}
if (this._rxBuf.length < n) {
throw new Error(`Expected ${n} bytes, got ${this._rxBuf.length}`);
}
return Uint8Array.from(this._rxBuf.splice(0, n));
}
async connect(serialPort) {
log('connect: opening port');
await this.disconnect();
this.port = serialPort;
this._rxBuf = [];
await this.port.open({
baudRate: 115200,
dataBits: 8,
stopBits: 1,
parity: 'none',
flowControl: 'none',
bufferSize: 65536,
});
await deassertDtrRts(this.port);
await sleep(50);
await deassertDtrRts(this.port);
this.reader = this.port.readable.getReader();
this.writer = this.port.writable.getWriter();
this.portOpen = true;
await sleep(100);
await this.collectIncoming(8000);
if (bufferIndicatesRuntime(this._rxBuf)) {
log('connect: MicroPython output seen during open');
}
log('connect: port open (raw REPL not entered yet)');
}
/** Read serial into _rxBuf without discarding (for post-open boot). */
async collectIncoming(maxMs) {
const deadline = Date.now() + maxMs;
while (Date.now() < deadline) {
const chunk = await this._readStreamChunk(Math.min(80, Math.max(1, deadline - Date.now())));
if (chunk && chunk.length) this._appendRx(chunk);
else if (
bufferHasMicroPython(this._rxBuf) ||
bufferEndsWithFriendlyRepl(this._rxBuf) ||
bytesIndexOf(this._rxBuf, new TextEncoder().encode(RAW_REPL_BANNER)) >= 0
) {
break;
} else {
await sleep(15);
}
}
}
async ensureRawRepl() {
if (this.inRawRepl) return;
if (this._replLock) {
await this._replLock;
return;
}
log('ensureRawRepl: entering raw REPL');
// USB open already resets the ESP32; skip soft-reset (unlike host led-cli/mpremote).
this._replLock = this.enterRawRepl(false).finally(() => {
this._replLock = null;
});
await this._replLock;
}
async disconnect() {
log('disconnect');
this._replLock = null;
const wasRaw = this.inRawRepl;
this.inRawRepl = false;
this.portOpen = false;
this.useRawPaste = true;
if (this.writer && wasRaw) {
try {
await this.writer.write(new Uint8Array([0x0d, 0x02]));
} catch (_) {
/* ignore */
}
}
if (this.reader) {
try {
await this.reader.cancel();
this.reader.releaseLock();
} catch (_) {
/* ignore */
}
this.reader = null;
}
if (this.writer) {
try {
await this.writer.close();
this.writer.releaseLock();
} catch (_) {
/* ignore */
}
this.writer = null;
}
if (this.port) {
try {
await deassertDtrRts(this.port);
} catch (_) {
/* ignore */
}
try {
await this.port.close();
} catch (_) {
/* ignore */
}
this.port = null;
}
this._rxBuf = [];
}
async writeBytes(data) {
if (!this.writer) throw new Error('Serial not connected');
logDebug('tx', describeBytes(data));
await this.writer.write(data);
}
async drainInput(maxMs = 400, clearBuffer = true) {
const deadline = Date.now() + maxMs;
while (Date.now() < deadline) {
const chunk = await this._readStreamChunk(40);
if (!chunk || !chunk.length) break;
if (!clearBuffer && chunk.length) this._appendRx(chunk);
}
if (clearBuffer) this._rxBuf.length = 0;
}
async sendCtrlA() {
await this.writeBytes(new Uint8Array([0x0d, 0x01]));
}
/** USB open often resets the ESP32; wait through ROM boot until MicroPython is up. */
async waitForMicroPythonBoot(timeoutMs = 60000) {
log('enterRawRepl: wait for MicroPython after boot', 'buf', this._rxBuf.length);
if (bufferHasMicroPython(this._rxBuf) || bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: MicroPython already in buffer');
return;
}
const deadline = Date.now() + timeoutMs;
let nudgeSent = false;
let lastLog = Date.now();
while (Date.now() < deadline) {
if (bufferHasMicroPython(this._rxBuf) || bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: MicroPython running');
return;
}
const text = decodeText(Uint8Array.from(this._rxBuf));
if (!nudgeSent && text.includes('entry 0x')) {
nudgeSent = true;
log('enterRawRepl: past ROM loader, nudge REPL');
await this.writeBytes(new Uint8Array([0x0d]));
await sleep(200);
}
if (Date.now() - lastLog > 5000) {
lastLog = Date.now();
log('enterRawRepl: still waiting…', 'buf', this._rxBuf.length);
}
const rem = Math.max(1, deadline - Date.now());
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(bufferLooksLikeEspRomBoot(this._rxBuf) ? 20 : 10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(`Timed out waiting for MicroPython boot (tail: ${JSON.stringify(tail)})`);
}
/** Wait until raw REPL banner and `>` prompt (not banner alone — device may still be booting). */
async waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, timeoutMs = 15000) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const rem = Math.max(1, deadline - Date.now());
const promptAt = bytesIndexOf(this._rxBuf, bannerPromptBytes);
if (promptAt >= 0) {
this._rxBuf.splice(0, promptAt + bannerPromptBytes.length);
log('enterRawRepl: raw REPL prompt ready');
return;
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('enterRawRepl: friendly REPL, ctrl-A');
await this.sendCtrlA();
await sleep(30);
continue;
}
if (bufferLooksLikeEspRomBoot(this._rxBuf) && !bufferHasMicroPython(this._rxBuf)) {
logDebug('enterRawRepl: ROM boot in progress…');
}
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(
`Timed out waiting for raw REPL after soft reboot (tail: ${JSON.stringify(tail)})`,
);
}
/** Match mpremote SerialTransport.enter_raw_repl(soft_reset). */
async enterRawRepl(softReset = true) {
log('enterRawRepl', { softReset });
const bannerBytes = new TextEncoder().encode(RAW_REPL_BANNER);
const bannerPromptBytes = new TextEncoder().encode(RAW_REPL_BANNER + '>');
const softRebootBytes = SOFT_REBOOT_MARKERS.map((m) => new TextEncoder().encode(m));
if (
!bufferHasMicroPython(this._rxBuf) &&
!bufferEndsWithFriendlyRepl(this._rxBuf) &&
bytesIndexOf(this._rxBuf, bannerBytes) < 0
) {
await this.waitForMicroPythonBoot(60000);
} else {
log('enterRawRepl: already have REPL output in buffer');
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
await this.writeBytes(new Uint8Array([0x0d, 0x03]));
await sleep(50);
}
let ctrlARetries = 0;
const maxCtrlARetries = 3;
while (ctrlARetries <= maxCtrlARetries) {
try {
await this.sendCtrlA();
await this.waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, 20000);
break;
} catch (e) {
if (ctrlARetries >= maxCtrlARetries) throw e;
ctrlARetries += 1;
log('enterRawRepl: retry ctrl-A', ctrlARetries, e.message);
this._rxBuf.length = 0;
}
}
if (softReset) {
log('enterRawRepl: soft reset');
await this.writeBytes(new Uint8Array([0x04]));
try {
await this.readUntilAny(softRebootBytes, 20000);
log('enterRawRepl: saw soft reboot');
this._rxBuf.length = 0;
} catch (e) {
log('enterRawRepl: no soft reboot banner', e.message);
}
ctrlARetries = 0;
while (ctrlARetries <= maxCtrlARetries) {
try {
await this.waitRawReplAfterSoftReboot(bannerBytes, bannerPromptBytes, 60000);
break;
} catch (e) {
if (ctrlARetries >= maxCtrlARetries) throw e;
ctrlARetries += 1;
log('enterRawRepl: post-reboot retry ctrl-A', ctrlARetries, e.message);
this._rxBuf.length = 0;
await this.sendCtrlA();
}
}
}
this.inRawRepl = true;
log('enterRawRepl: ready');
}
async rawPasteWrite(commandBytes) {
await this.writeBytes(new Uint8Array([0x05, 0x41, 0x01]));
const hdr = await this.readExact(2, 8000);
const windowSize = hdr[0] | (hdr[1] << 8);
let windowRemain = windowSize;
let i = 0;
while (i < commandBytes.length) {
while (windowRemain === 0) {
const b = await this.readExact(1, 15000);
if (b[0] === 0x01) {
windowRemain += windowSize;
} else if (b[0] === 0x04) {
await this.writeBytes(new Uint8Array([0x04]));
return;
} else {
throw new Error(`Unexpected byte during raw paste: 0x${b[0].toString(16)}`);
}
}
const n = Math.min(windowRemain, commandBytes.length - i);
await this.writeBytes(commandBytes.subarray(i, i + n));
windowRemain -= n;
i += n;
}
await this.writeBytes(new Uint8Array([0x04]));
await this.readUntil(new Uint8Array([0x04]), 30000);
}
async awaitRawReplExecPrompt(timeoutMs = 60000) {
const bannerPromptBytes = new TextEncoder().encode(RAW_REPL_BANNER + '>');
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const promptAt = bytesIndexOf(this._rxBuf, bannerPromptBytes);
if (promptAt >= 0) {
this._rxBuf.splice(0, promptAt + bannerPromptBytes.length);
return;
}
if (bufferEndsWithFriendlyRepl(this._rxBuf)) {
log('exec: friendly REPL before prompt, ctrl-A');
this._rxBuf.length = 0;
await this.sendCtrlA();
} else if (bufferLooksLikeEspRomBoot(this._rxBuf)) {
logDebug('exec: still in ROM boot, waiting…');
}
const rem = Math.max(1, deadline - Date.now());
const chunk = await this._readStreamChunk(Math.min(300, rem));
if (chunk && chunk.length) this._appendRx(chunk);
else await sleep(10);
}
const tail = decodeText(Uint8Array.from(this._rxBuf.slice(-240)));
throw new Error(`Timed out waiting for raw REPL prompt (tail: ${JSON.stringify(tail)})`);
}
async execRawNoFollow(commandBytes) {
log('execRawNoFollow', commandBytes.length, 'bytes');
await this.awaitRawReplExecPrompt(60000);
if (this.useRawPaste) {
await this.writeBytes(new Uint8Array([0x05, 0x41, 0x01]));
let data;
try {
data = await this.readExact(2, 3000);
} catch (_) {
this.useRawPaste = false;
data = null;
}
if (data) {
if (data[0] === 0x52 && data[1] === 0x01) {
log('exec: using raw paste mode');
await this.rawPasteWrite(commandBytes);
const ok = await this.readExact(2, 15000);
if (ok[0] !== 0x4f || ok[1] !== 0x4b) {
throw new Error(`Device did not return OK after paste (got ${decodeText(ok)})`);
}
log('exec: paste OK');
return;
}
if (data[0] === 0x52 && data[1] === 0x00) {
log('exec: raw paste not supported (R\\x00)');
this.useRawPaste = false;
} else {
log('exec: raw paste probe unexpected', describeBytes(data));
this.useRawPaste = false;
try {
await this.readUntil(new TextEncoder().encode(RAW_REPL_BANNER + '>'), 5000);
} catch (_) {
/* ignore */
}
}
}
}
for (let i = 0; i < commandBytes.length; i += 256) {
await this.writeBytes(commandBytes.subarray(i, Math.min(i + 256, commandBytes.length)));
await sleep(10);
}
log('exec: standard raw REPL write');
await this.writeBytes(new Uint8Array([0x04]));
const ok = await this.readExact(2, 15000);
if (ok[0] !== 0x4f || ok[1] !== 0x4b) {
throw new Error(`Device did not return OK (got ${decodeText(ok)})`);
}
log('exec: got OK');
}
async follow(timeoutMs = 30000) {
const out1 = await this.readUntil(new Uint8Array([0x04]), timeoutMs);
const stdout = decodeText(out1.slice(0, -1));
const out2 = await this.readUntil(new Uint8Array([0x04]), timeoutMs);
const stderr = decodeText(out2.slice(0, -1));
log('follow stdout', stdout.slice(0, 500));
if (stderr) log('follow stderr', stderr.slice(0, 500));
return { stdout, stderr };
}
async exec(command, timeoutMs = 30000) {
const preview =
typeof command === 'string' ? command.split('\n').slice(0, 3).join('\n') : '<bytes>';
log('exec', preview);
await this.ensureRawRepl();
const commandBytes =
typeof command === 'string' ? new TextEncoder().encode(command) : command;
await this.execRawNoFollow(commandBytes);
return this.follow(timeoutMs);
}
async readSettingsJson() {
log('readSettingsJson');
const code = [
'import json',
'try:',
" f=open('settings.json','r')",
' d=json.load(f)',
' f.close()',
'except Exception:',
' d={}',
'print(json.dumps(d))',
].join('\n');
const { stdout, stderr } = await this.exec(code);
const text = (stdout || '').trim() || (stderr || '').trim();
if (!text) {
log('readSettingsJson: empty response');
return {};
}
const parsed = extractJsonObject(text);
log('readSettingsJson: ok', Object.keys(parsed));
return parsed;
}
async writeSettingsJson(settings) {
log('writeSettingsJson', settings);
const jsonText = JSON.stringify(settings);
const escaped = JSON.stringify(jsonText);
const code = [
'import json',
`s=json.loads(${escaped})`,
"f=open('settings.json','w')",
'f.write(s)',
'f.close()',
'import machine',
'machine.reset()',
].join('\n');
await this.exec(code, 60000);
}
}
function serialSupported() {
return typeof navigator !== 'undefined' && 'serial' in navigator;
}
async function getGrantedPorts() {
if (!serialSupported()) return [];
return navigator.serial.getPorts();
}
async function requestPort() {
if (!serialSupported()) {
throw new Error('Web Serial not supported');
}
const granted = await navigator.serial.getPorts();
if (granted.length === 1) {
log('requestPort: reusing single granted port');
return granted[0];
}
log('requestPort: showing picker', { grantedCount: granted.length });
return navigator.serial.requestPort({ filters: ESP32_USB_FILTERS });
}
global.LedToolWebSerial = {
supported: serialSupported,
getGrantedPorts,
requestPort,
MicroPythonRawRepl,
_test: { bytesIndexOf, decodeText },
};
})(typeof window !== 'undefined' ? window : globalThis);

View File

@@ -0,0 +1,69 @@
/**
* Node regression tests for Web Serial readUntil buffer handling.
* Run: node led-tool/static/web_serial_readuntil_test.mjs
*/
function bytesIndexOf(buf, suffix) {
if (!suffix.length) return 0;
if (buf.length < suffix.length) return -1;
for (let i = 0; i <= buf.length - suffix.length; i += 1) {
let ok = true;
for (let j = 0; j < suffix.length; j += 1) {
if (buf[i + j] !== suffix[j]) {
ok = false;
break;
}
}
if (ok) return i;
}
return -1;
}
function readUntilConsume(rxBuf, suffixBytes) {
const idx = bytesIndexOf(rxBuf, suffixBytes);
if (idx < 0) return null;
const end = idx + suffixBytes.length;
const matched = rxBuf.slice(0, end);
rxBuf.splice(0, end);
return matched;
}
function enc(s) {
return [...new TextEncoder().encode(s)];
}
function assert(cond, msg) {
if (!cond) throw new Error(msg);
}
const RAW = 'raw REPL; CTRL-B to exit\r\n';
const SOFT = 'soft reboot\r\n';
// Old bug: one chunk with soft reboot + banner; suffix-at-end + clear-all loses banner.
{
const rx = enc(`MPY: ${SOFT}${RAW}boot.py line\r\n`);
const soft = enc(SOFT);
const m1 = readUntilConsume(rx, soft);
assert(m1 !== null, 'soft reboot should match');
const banner = enc(RAW);
const m2 = readUntilConsume(rx, banner);
assert(m2 !== null, 'banner must remain in buffer after soft reboot match');
assert(rx.length > 0, 'boot.py tail should remain after banner');
}
// Banner anywhere in buffer, not only at end.
{
const rx = enc(`noise${RAW}>>> `);
const banner = enc(RAW);
const m = readUntilConsume(rx, banner);
assert(m !== null, 'banner should match mid-buffer');
}
// MPY: soft reboot marker
{
const rx = enc(`MPY: ${SOFT}`);
const soft = enc('MPY: soft reboot\r\n');
assert(readUntilConsume(rx, soft) !== null, 'MPY soft reboot marker');
}
console.log('web_serial_readuntil_test: ok');

View File

@@ -0,0 +1,52 @@
"""Tests for deploy_manifest helpers (host Python)."""
import json
import os
import tempfile
import unittest
from deploy_manifest import (
MANIFEST_FILENAME,
build_manifest_bytes,
normalize_remote_path,
parse_manifest,
sha256_hex_file,
)
class DeployManifestTests(unittest.TestCase):
def test_normalize_remote_path(self):
self.assertEqual(normalize_remote_path("/patterns/chase.py"), "patterns/chase.py")
self.assertEqual(normalize_remote_path("main.py"), "main.py")
def test_round_trip_manifest(self):
files = {"main.py": "abc", "patterns/x.py": "def"}
blob = build_manifest_bytes(files)
parsed = parse_manifest(blob)
self.assertEqual(parsed, files)
doc = json.loads(blob.decode())
self.assertEqual(doc["version"], 1)
self.assertEqual(doc["algorithm"], "sha256")
def test_parse_manifest_invalid(self):
self.assertEqual(parse_manifest(b"not json"), {})
self.assertEqual(parse_manifest(b'{"files": "nope"}'), {})
def test_sha256_hex_file(self):
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"hello")
path = f.name
try:
self.assertEqual(
sha256_hex_file(path),
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824",
)
finally:
os.unlink(path)
def test_manifest_filename(self):
self.assertEqual(MANIFEST_FILENAME, "file_hashes.json")
if __name__ == "__main__":
unittest.main()

114
web.py
View File

@@ -19,6 +19,7 @@ from flask import (
redirect, redirect,
url_for, url_for,
flash, flash,
send_from_directory,
) )
@@ -696,10 +697,119 @@ def handle_action():
) )
def _static_dir() -> str:
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
@app.route("/ports")
def api_list_ports():
from host_ports import filter_port_dicts
from serial.tools import list_ports
ports = filter_port_dicts(
[
{"device": i.device, "description": i.description, "hwid": i.hwid}
for i in list_ports.comports()
]
)
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
return json.dumps({"ports": ports, "led_cli_exists": os.path.exists(cli)})
@app.route("/settings", methods=["GET"])
def api_read_settings():
import subprocess
import sys
port = (request.args.get("port") or "").strip()
if not port:
return json.dumps({"error": "port is required"}), 400
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
result = subprocess.run(
[sys.executable, cli, "--port", port, "--show"],
capture_output=True,
text=True,
timeout=180,
cwd=os.path.dirname(cli),
)
settings = None
try:
settings = json.loads((result.stdout or "").strip())
except json.JSONDecodeError:
pass
return json.dumps(
{
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"settings": settings,
}
)
@app.route("/settings", methods=["POST"])
def api_write_settings():
import subprocess
import sys
data = request.get_json(silent=True) or {}
port = str(data.get("port") or "").strip()
if not port:
return json.dumps({"error": "port is required"}), 400
cli = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cli.py")
cmd = [sys.executable, cli, "--port", port]
flag_map = (
("name", "--name"),
("led_pin", "--pin"),
("num_leds", "--leds"),
("brightness", "--brightness"),
("transport", "--transport"),
("ssid", "--ssid"),
("password", "--wifi-password"),
("wifi_channel", "--wifi-channel"),
("default", "--default"),
)
for key, flag in flag_map:
val = data.get(key)
if val is None:
continue
s = str(val).strip()
if s:
cmd.extend([flag, s])
cmd.append("--follow")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=180,
cwd=os.path.dirname(cli),
)
return json.dumps(
{
"ok": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
}
)
@app.route("/editor")
def settings_editor():
"""Static settings editor (Web Serial + host serial). Prefer this over the legacy form."""
return send_from_directory(_static_dir(), "settings_editor.html")
@app.route("/static/<path:filename>")
def settings_static(filename):
return send_from_directory(_static_dir(), filename)
def main() -> None: def main() -> None:
# Bind to all interfaces so you can reach it from your LAN: # Bind to all interfaces so you can reach it from your LAN:
# python web_app.py # python web.py
# Then open: http://<pi-ip>:5000/ # Then open: http://<pi-ip>:5000/editor
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)