- device.py: device communication/handling - cli.py: CLI interface updates - web.py: web interface - build.py, build.sh, install.sh: build and install scripts - Pipfile: Python dependencies - lib/mpremote: mpremote library - test_*.py: import and LED tests - Updated .gitignore and README Co-authored-by: Cursor <cursoragent@cursor.com>
247 lines
9.1 KiB
Python
247 lines
9.1 KiB
Python
"""
|
|
Device communication wrapper using integrated mpremote transport.
|
|
|
|
This module provides a simple interface for copying files to/from MicroPython devices
|
|
without requiring mpremote to be installed as a separate dependency.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import serial
|
|
|
|
# Add lib directory to path - handle both normal execution and PyInstaller bundle
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as a PyInstaller bundle
|
|
_this_dir = sys._MEIPASS
|
|
else:
|
|
# Running as a normal Python script
|
|
_this_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
lib_path = os.path.join(_this_dir, 'lib')
|
|
if lib_path not in sys.path and os.path.exists(lib_path):
|
|
sys.path.insert(0, lib_path)
|
|
|
|
from mpremote.transport_serial import SerialTransport
|
|
from mpremote.transport import TransportError
|
|
|
|
|
|
class DeviceConnection:
|
|
"""Wrapper for device communication."""
|
|
|
|
def __init__(self, device):
|
|
"""Connect to a device."""
|
|
self.device = device
|
|
self.transport = None
|
|
|
|
def connect(self):
|
|
"""Establish connection to device."""
|
|
if self.transport is None:
|
|
self.transport = SerialTransport(self.device, baudrate=115200)
|
|
self.transport.enter_raw_repl()
|
|
|
|
def disconnect(self):
|
|
"""Close connection to device."""
|
|
if self.transport:
|
|
try:
|
|
if self.transport.in_raw_repl:
|
|
self.transport.exit_raw_repl()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.transport.close()
|
|
except Exception:
|
|
pass
|
|
self.transport = None
|
|
|
|
def copy_from_device(self, remote_path, local_path):
|
|
"""Copy a file from device to local filesystem."""
|
|
self.connect()
|
|
try:
|
|
data = self.transport.fs_readfile(remote_path)
|
|
with open(local_path, 'wb') as f:
|
|
f.write(data)
|
|
finally:
|
|
self.disconnect()
|
|
|
|
def copy_to_device(self, local_path, remote_path):
|
|
"""Copy a file from local filesystem to device."""
|
|
self.connect()
|
|
try:
|
|
with open(local_path, 'rb') as f:
|
|
data = f.read()
|
|
self.transport.fs_writefile(remote_path, data)
|
|
finally:
|
|
self.disconnect()
|
|
|
|
def upload_directory(self, local_dir, remote_dir=None):
|
|
"""
|
|
Upload a directory recursively to the device.
|
|
|
|
Args:
|
|
local_dir: Local directory path to upload
|
|
remote_dir: Remote directory path (default: root, uses basename of local_dir)
|
|
"""
|
|
import os
|
|
|
|
if not os.path.isdir(local_dir):
|
|
raise ValueError(f"{local_dir} is not a directory")
|
|
|
|
self.connect()
|
|
try:
|
|
# Determine remote directory name
|
|
if remote_dir is None:
|
|
remote_dir = os.path.basename(os.path.abspath(local_dir))
|
|
|
|
# Ensure remote directory exists
|
|
if not self.transport.fs_exists(remote_dir):
|
|
self.transport.fs_mkdir(remote_dir)
|
|
|
|
# Walk through local directory and copy files
|
|
files_copied = 0
|
|
dirs_created = 0
|
|
|
|
for root, dirs, files in os.walk(local_dir):
|
|
# Calculate relative path from local_dir
|
|
rel_path = os.path.relpath(root, local_dir)
|
|
|
|
# Build remote path
|
|
if rel_path == '.':
|
|
remote_base = remote_dir.rstrip('/') or '/'
|
|
else:
|
|
rel_path_clean = rel_path.replace(os.sep, '/')
|
|
if remote_dir.rstrip('/') == '':
|
|
remote_base = '/' + rel_path_clean
|
|
else:
|
|
remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean])
|
|
|
|
# Create remote directory if needed
|
|
if rel_path != '.' and not self.transport.fs_exists(remote_base):
|
|
print(f"Creating directory: {remote_base}", file=sys.stderr)
|
|
self.transport.fs_mkdir(remote_base)
|
|
dirs_created += 1
|
|
|
|
# Copy files
|
|
for file in files:
|
|
local_file = os.path.join(root, file)
|
|
# Handle root directory case properly
|
|
if remote_base == '/':
|
|
remote_file = '/' + file
|
|
else:
|
|
remote_file = '/'.join([remote_base, file])
|
|
|
|
print(f"Uploading: {remote_file}", file=sys.stderr)
|
|
with open(local_file, 'rb') as f:
|
|
data = f.read()
|
|
self.transport.fs_writefile(remote_file, data)
|
|
files_copied += 1
|
|
|
|
return files_copied, dirs_created
|
|
finally:
|
|
self.disconnect()
|
|
|
|
def list_files(self, remote_path=''):
|
|
"""
|
|
List files and directories on the device.
|
|
|
|
Args:
|
|
remote_path: Path on device to list (default: root directory)
|
|
|
|
Returns:
|
|
List of (name, is_directory, size) tuples
|
|
"""
|
|
self.connect()
|
|
try:
|
|
if remote_path and not self.transport.fs_exists(remote_path):
|
|
raise ValueError(f"Path does not exist: {remote_path}")
|
|
|
|
items = self.transport.fs_listdir(remote_path)
|
|
result = []
|
|
for item in items:
|
|
is_dir = item.st_mode & 0o170000 == 0o040000
|
|
result.append((item.name, is_dir, item.st_size))
|
|
return result
|
|
finally:
|
|
self.disconnect()
|
|
|
|
def delete(self, remote_path, disconnect=True):
|
|
"""
|
|
Delete a file or directory on the device.
|
|
|
|
Args:
|
|
remote_path: Path on device to delete (file or directory)
|
|
disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls)
|
|
"""
|
|
self.connect()
|
|
try:
|
|
if not self.transport.fs_exists(remote_path):
|
|
raise ValueError(f"Path does not exist: {remote_path}")
|
|
|
|
if self.transport.fs_isdir(remote_path):
|
|
# Delete directory recursively
|
|
# First, delete all files and subdirectories
|
|
items = self.transport.fs_listdir(remote_path)
|
|
for item in items:
|
|
item_path = '/'.join([remote_path.rstrip('/'), item.name])
|
|
if item.st_mode & 0o170000 == 0o040000: # Directory
|
|
self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect
|
|
else: # File
|
|
print(f"Deleting file: {item_path}", file=sys.stderr)
|
|
self.transport.fs_rmfile(item_path)
|
|
# Then delete the directory itself
|
|
print(f"Deleting directory: {remote_path}", file=sys.stderr)
|
|
self.transport.fs_rmdir(remote_path)
|
|
else:
|
|
# Delete file
|
|
print(f"Deleting file: {remote_path}", file=sys.stderr)
|
|
self.transport.fs_rmfile(remote_path)
|
|
finally:
|
|
if disconnect:
|
|
self.disconnect()
|
|
|
|
def reset(self):
|
|
"""Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D)."""
|
|
# Use direct serial connection like dev.py does
|
|
try:
|
|
with serial.Serial(self.device, baudrate=115200) as ser:
|
|
ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D
|
|
except Exception as e:
|
|
raise TransportError(f"Failed to reset device: {e}") from e
|
|
|
|
def follow_output(self):
|
|
"""Follow device output continuously (like tail -f)."""
|
|
# Use direct serial connection like dev.py does
|
|
try:
|
|
with serial.Serial(self.device, baudrate=115200) as ser:
|
|
while True:
|
|
if ser.in_waiting > 0:
|
|
data = ser.readline().decode('utf-8', errors='replace').strip()
|
|
if data:
|
|
print(data)
|
|
else:
|
|
time.sleep(0.01) # Small delay to avoid busy-waiting
|
|
except KeyboardInterrupt:
|
|
print("\nStopped following output.", file=sys.stderr)
|
|
except Exception as e:
|
|
raise TransportError(f"Failed to follow output: {e}") from e
|
|
|
|
|
|
def copy_file(from_device, device, remote_path, local_path):
|
|
"""
|
|
Copy a file to/from device.
|
|
|
|
Args:
|
|
from_device: True to copy from device, False to copy to device
|
|
device: Device path (e.g., '/dev/ttyACM0')
|
|
remote_path: Path on device (e.g., 'settings.json')
|
|
local_path: Path on local filesystem
|
|
"""
|
|
conn = DeviceConnection(device)
|
|
try:
|
|
if from_device:
|
|
conn.copy_from_device(remote_path, local_path)
|
|
else:
|
|
conn.copy_to_device(local_path, remote_path)
|
|
except TransportError as e:
|
|
raise RuntimeError(f"Device communication error: {e}") from e
|