Files
led-tool/device.py
jimmy accf8f06a5 Add LED tool: device, CLI, web UI, build scripts, and tests
- device.py: device communication/handling
- cli.py: CLI interface updates
- web.py: web interface
- build.py, build.sh, install.sh: build and install scripts
- Pipfile: Python dependencies
- lib/mpremote: mpremote library
- test_*.py: import and LED tests
- Updated .gitignore and README

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-01 16:00:04 +13:00

247 lines
9.1 KiB
Python

"""
Device communication wrapper using integrated mpremote transport.
This module provides a simple interface for copying files to/from MicroPython devices
without requiring mpremote to be installed as a separate dependency.
"""
import sys
import os
import time
import serial
# Add lib directory to path - handle both normal execution and PyInstaller bundle
if getattr(sys, 'frozen', False):
# Running as a PyInstaller bundle
_this_dir = sys._MEIPASS
else:
# Running as a normal Python script
_this_dir = os.path.dirname(os.path.abspath(__file__))
lib_path = os.path.join(_this_dir, 'lib')
if lib_path not in sys.path and os.path.exists(lib_path):
sys.path.insert(0, lib_path)
from mpremote.transport_serial import SerialTransport
from mpremote.transport import TransportError
class DeviceConnection:
"""Wrapper for device communication."""
def __init__(self, device):
"""Connect to a device."""
self.device = device
self.transport = None
def connect(self):
"""Establish connection to device."""
if self.transport is None:
self.transport = SerialTransport(self.device, baudrate=115200)
self.transport.enter_raw_repl()
def disconnect(self):
"""Close connection to device."""
if self.transport:
try:
if self.transport.in_raw_repl:
self.transport.exit_raw_repl()
except Exception:
pass
try:
self.transport.close()
except Exception:
pass
self.transport = None
def copy_from_device(self, remote_path, local_path):
"""Copy a file from device to local filesystem."""
self.connect()
try:
data = self.transport.fs_readfile(remote_path)
with open(local_path, 'wb') as f:
f.write(data)
finally:
self.disconnect()
def copy_to_device(self, local_path, remote_path):
"""Copy a file from local filesystem to device."""
self.connect()
try:
with open(local_path, 'rb') as f:
data = f.read()
self.transport.fs_writefile(remote_path, data)
finally:
self.disconnect()
def upload_directory(self, local_dir, remote_dir=None):
"""
Upload a directory recursively to the device.
Args:
local_dir: Local directory path to upload
remote_dir: Remote directory path (default: root, uses basename of local_dir)
"""
import os
if not os.path.isdir(local_dir):
raise ValueError(f"{local_dir} is not a directory")
self.connect()
try:
# Determine remote directory name
if remote_dir is None:
remote_dir = os.path.basename(os.path.abspath(local_dir))
# Ensure remote directory exists
if not self.transport.fs_exists(remote_dir):
self.transport.fs_mkdir(remote_dir)
# Walk through local directory and copy files
files_copied = 0
dirs_created = 0
for root, dirs, files in os.walk(local_dir):
# Calculate relative path from local_dir
rel_path = os.path.relpath(root, local_dir)
# Build remote path
if rel_path == '.':
remote_base = remote_dir.rstrip('/') or '/'
else:
rel_path_clean = rel_path.replace(os.sep, '/')
if remote_dir.rstrip('/') == '':
remote_base = '/' + rel_path_clean
else:
remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean])
# Create remote directory if needed
if rel_path != '.' and not self.transport.fs_exists(remote_base):
print(f"Creating directory: {remote_base}", file=sys.stderr)
self.transport.fs_mkdir(remote_base)
dirs_created += 1
# Copy files
for file in files:
local_file = os.path.join(root, file)
# Handle root directory case properly
if remote_base == '/':
remote_file = '/' + file
else:
remote_file = '/'.join([remote_base, file])
print(f"Uploading: {remote_file}", file=sys.stderr)
with open(local_file, 'rb') as f:
data = f.read()
self.transport.fs_writefile(remote_file, data)
files_copied += 1
return files_copied, dirs_created
finally:
self.disconnect()
def list_files(self, remote_path=''):
"""
List files and directories on the device.
Args:
remote_path: Path on device to list (default: root directory)
Returns:
List of (name, is_directory, size) tuples
"""
self.connect()
try:
if remote_path and not self.transport.fs_exists(remote_path):
raise ValueError(f"Path does not exist: {remote_path}")
items = self.transport.fs_listdir(remote_path)
result = []
for item in items:
is_dir = item.st_mode & 0o170000 == 0o040000
result.append((item.name, is_dir, item.st_size))
return result
finally:
self.disconnect()
def delete(self, remote_path, disconnect=True):
"""
Delete a file or directory on the device.
Args:
remote_path: Path on device to delete (file or directory)
disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls)
"""
self.connect()
try:
if not self.transport.fs_exists(remote_path):
raise ValueError(f"Path does not exist: {remote_path}")
if self.transport.fs_isdir(remote_path):
# Delete directory recursively
# First, delete all files and subdirectories
items = self.transport.fs_listdir(remote_path)
for item in items:
item_path = '/'.join([remote_path.rstrip('/'), item.name])
if item.st_mode & 0o170000 == 0o040000: # Directory
self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect
else: # File
print(f"Deleting file: {item_path}", file=sys.stderr)
self.transport.fs_rmfile(item_path)
# Then delete the directory itself
print(f"Deleting directory: {remote_path}", file=sys.stderr)
self.transport.fs_rmdir(remote_path)
else:
# Delete file
print(f"Deleting file: {remote_path}", file=sys.stderr)
self.transport.fs_rmfile(remote_path)
finally:
if disconnect:
self.disconnect()
def reset(self):
"""Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D)."""
# Use direct serial connection like dev.py does
try:
with serial.Serial(self.device, baudrate=115200) as ser:
ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D
except Exception as e:
raise TransportError(f"Failed to reset device: {e}") from e
def follow_output(self):
"""Follow device output continuously (like tail -f)."""
# Use direct serial connection like dev.py does
try:
with serial.Serial(self.device, baudrate=115200) as ser:
while True:
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8', errors='replace').strip()
if data:
print(data)
else:
time.sleep(0.01) # Small delay to avoid busy-waiting
except KeyboardInterrupt:
print("\nStopped following output.", file=sys.stderr)
except Exception as e:
raise TransportError(f"Failed to follow output: {e}") from e
def copy_file(from_device, device, remote_path, local_path):
"""
Copy a file to/from device.
Args:
from_device: True to copy from device, False to copy to device
device: Device path (e.g., '/dev/ttyACM0')
remote_path: Path on device (e.g., 'settings.json')
local_path: Path on local filesystem
"""
conn = DeviceConnection(device)
try:
if from_device:
conn.copy_from_device(remote_path, local_path)
else:
conn.copy_to_device(local_path, remote_path)
except TransportError as e:
raise RuntimeError(f"Device communication error: {e}") from e