Files
led-tool/device.py
2026-05-03 21:27:29 +12:00

322 lines
12 KiB
Python

"""
Device communication wrapper using integrated mpremote transport.
This module provides a simple interface for copying files to/from MicroPython devices
without requiring mpremote to be installed as a separate dependency.
"""
import sys
import os
import time
import serial
# Add lib directory to path - handle both normal execution and PyInstaller bundle
if getattr(sys, 'frozen', False):
# Running as a PyInstaller bundle
_this_dir = sys._MEIPASS
else:
# Running as a normal Python script
_this_dir = os.path.dirname(os.path.abspath(__file__))
lib_path = os.path.join(_this_dir, 'lib')
if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError, TransportExecError
class DeviceConnection:
"""Wrapper for device communication."""
#: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms).
WDT_FEED_INTERVAL_SEC = 5.0
def __init__(self, device):
"""Connect to a device."""
self.device = device
self.transport = None
def connect(self):
"""Establish connection to device."""
if self.transport is None:
self.transport = SerialTransport(self.device, baudrate=115200)
self.transport.enter_raw_repl()
def disconnect(self):
"""Close connection to device."""
if self.transport:
try:
if self.transport.in_raw_repl:
self.transport.exit_raw_repl()
except Exception:
pass
try:
self.transport.close()
except Exception:
pass
self.transport = None
def _feed_wdt(self) -> None:
"""Best-effort feed of the ESP task WDT between chunked FS writes."""
if self.transport is None:
return
try:
self.transport.exec(
"try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n"
)
except Exception:
pass
def _make_wdt_upload_progress_callback(self):
"""Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC."""
last_feed = [time.monotonic()]
def progress(written: int, total: int) -> None:
now = time.monotonic()
if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC:
self._feed_wdt()
last_feed[0] = now
return progress
def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None:
"""Write file to device with periodic WDT feeds during long transfers."""
self._feed_wdt()
self.transport.fs_writefile(
remote_path,
data,
progress_callback=self._make_wdt_upload_progress_callback(),
)
def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem."""
self.connect()
try:
data = self.transport.fs_readfile(remote_path)
with open(local_path, 'wb') as f:
f.write(data)
finally:
self.disconnect()
def copy_to_device(self, local_path, remote_path):
"""Copy a file from local filesystem to device."""
self.connect()
try:
with open(local_path, 'rb') as f:
data = f.read()
self._fs_writefile_with_wdt(remote_path, data)
finally:
self.disconnect()
def upload_directory(self, local_dir, remote_dir=None):
"""
Upload a directory recursively to the device.
Args:
local_dir: Local directory path to upload
remote_dir: Remote directory path (default: root, uses basename of local_dir)
"""
import os
if not os.path.isdir(local_dir):
raise ValueError(f"{local_dir} is not a directory")
self.connect()
try:
# Determine remote directory name
if remote_dir is None:
remote_dir = os.path.basename(os.path.abspath(local_dir))
# Ensure remote directory exists
if not self.transport.fs_exists(remote_dir):
self.transport.fs_mkdir(remote_dir)
# Walk through local directory and copy files
files_copied = 0
dirs_created = 0
for root, dirs, files in os.walk(local_dir):
# Calculate relative path from local_dir
rel_path = os.path.relpath(root, local_dir)
# Build remote path
if rel_path == '.':
remote_base = remote_dir.rstrip('/') or '/'
else:
rel_path_clean = rel_path.replace(os.sep, '/')
if remote_dir.rstrip('/') == '':
remote_base = '/' + rel_path_clean
else:
remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean])
# Create remote directory if needed
if rel_path != '.' and not self.transport.fs_exists(remote_base):
print(f"Creating directory: {remote_base}", file=sys.stderr)
self.transport.fs_mkdir(remote_base)
dirs_created += 1
# Copy files
for file in files:
local_file = os.path.join(root, file)
# Handle root directory case properly
if remote_base == '/':
remote_file = '/' + file
else:
remote_file = '/'.join([remote_base, file])
print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f:
data = f.read()
self._fs_writefile_with_wdt(remote_file, data)
files_copied += 1
return files_copied, dirs_created
finally:
self.disconnect()
def list_files(self, remote_path=''):
"""
List files and directories on the device.
Args:
remote_path: Path on device to list (default: root directory)
Returns:
List of (name, is_directory, size) tuples
"""
self.connect()
try:
if remote_path and not self.transport.fs_exists(remote_path):
raise ValueError(f"Path does not exist: {remote_path}")
items = self.transport.fs_listdir(remote_path)
result = []
for item in items:
is_dir = item.st_mode & 0o170000 == 0o040000
result.append((item.name, is_dir, item.st_size))
return result
finally:
self.disconnect()
def delete(self, remote_path, disconnect=True):
"""
Delete a file or directory on the device.
Args:
remote_path: Path on device to delete (file or directory)
disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls)
"""
self.connect()
try:
if not self.transport.fs_exists(remote_path):
raise ValueError(f"Path does not exist: {remote_path}")
if self.transport.fs_isdir(remote_path):
# Delete directory recursively
# First, delete all files and subdirectories
items = self.transport.fs_listdir(remote_path)
for item in items:
item_path = '/'.join([remote_path.rstrip('/'), item.name])
if item.st_mode & 0o170000 == 0o040000: # Directory
self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect
else: # File
print(f"Deleting file: {item_path}", file=sys.stderr)
self.transport.fs_rmfile(item_path)
# Then delete the directory itself
print(f"Deleting directory: {remote_path}", file=sys.stderr)
self.transport.fs_rmdir(remote_path)
else:
# Delete file
print(f"Deleting file: {remote_path}", file=sys.stderr)
self.transport.fs_rmfile(remote_path)
finally:
if disconnect:
self.disconnect()
def reset(self):
"""Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D)."""
# Use direct serial connection like dev.py does
try:
with serial.Serial(self.device, baudrate=115200) as ser:
ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D
except Exception as e:
raise TransportError(f"Failed to reset device: {e}") from e
def follow_output(self, duration=None):
"""
Follow device output (like tail -f).
Args:
duration: Optional number of seconds to follow output for. If None,
follow indefinitely until interrupted.
"""
# Use direct serial connection like dev.py does
start_time = time.time()
try:
with serial.Serial(self.device, baudrate=115200) as ser:
while True:
if duration is not None and (time.time() - start_time) >= duration:
break
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8', errors='replace').strip()
if data:
print(data)
else:
time.sleep(0.01) # Small delay to avoid busy-waiting
except KeyboardInterrupt:
print("\nStopped following output.", file=sys.stderr)
except Exception as e:
raise TransportError(f"Failed to follow output: {e}") from e
def firmware_default_device_name(device: str) -> str:
"""
Default logical device name on led-driver firmware: led-<sta_mac_hex>,
matching Settings.set_defaults() in led-driver/src/settings.py.
"""
conn = DeviceConnection(device)
conn.connect()
try:
code = (
"import network, ubinascii\n"
"sta = network.WLAN(network.STA_IF)\n"
"sta.active(True)\n"
"mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n"
"print('led-' + mac)\n"
)
out = conn.transport.exec(code)
except TransportExecError as e:
raise RuntimeError(
"Could not read STA MAC from device (is this MicroPython with network?): "
+ (e.error_output or str(e)).strip()
) from e
finally:
conn.disconnect()
text = out.decode("utf-8", errors="replace").strip()
for line in reversed(text.splitlines()):
line = line.strip()
if line.startswith("led-"):
return line
raise RuntimeError("Device did not return a led-<mac> default name")
def copy_file(from_device, device, remote_path, local_path):
"""
Copy a file to/from device.
Args:
from_device: True to copy from device, False to copy to device
device: Device path (e.g., '/dev/ttyACM0')
remote_path: Path on device (e.g., 'settings.json')
local_path: Path on local filesystem
"""
conn = DeviceConnection(device)
try:
if from_device:
conn.copy_from_device(remote_path, local_path)
else:
conn.copy_to_device(local_path, remote_path)
except TransportError as e:
raise RuntimeError(f"Device communication error: {e}") from e