""" Device communication wrapper using integrated mpremote transport. This module provides a simple interface for copying files to/from MicroPython devices without requiring mpremote to be installed as a separate dependency. """ import sys import os import time import serial # Add lib directory to path - handle both normal execution and PyInstaller bundle if getattr(sys, 'frozen', False): # Running as a PyInstaller bundle _this_dir = sys._MEIPASS else: # Running as a normal Python script _this_dir = os.path.dirname(os.path.abspath(__file__)) lib_path = os.path.join(_this_dir, 'lib') if lib_path not in sys.path and os.path.exists(lib_path): sys.path.insert(0, lib_path) from mpremote.transport_serial import SerialTransport from mpremote.transport import TransportError class DeviceConnection: """Wrapper for device communication.""" def __init__(self, device): """Connect to a device.""" self.device = device self.transport = None def connect(self): """Establish connection to device.""" if self.transport is None: self.transport = SerialTransport(self.device, baudrate=115200) self.transport.enter_raw_repl() def disconnect(self): """Close connection to device.""" if self.transport: try: if self.transport.in_raw_repl: self.transport.exit_raw_repl() except Exception: pass try: self.transport.close() except Exception: pass self.transport = None def copy_from_device(self, remote_path, local_path): """Copy a file from device to local filesystem.""" self.connect() try: data = self.transport.fs_readfile(remote_path) with open(local_path, 'wb') as f: f.write(data) finally: self.disconnect() def copy_to_device(self, local_path, remote_path): """Copy a file from local filesystem to device.""" self.connect() try: with open(local_path, 'rb') as f: data = f.read() self.transport.fs_writefile(remote_path, data) finally: self.disconnect() def upload_directory(self, local_dir, remote_dir=None): """ Upload a directory recursively to the device. Args: local_dir: Local directory path to upload remote_dir: Remote directory path (default: root, uses basename of local_dir) """ import os if not os.path.isdir(local_dir): raise ValueError(f"{local_dir} is not a directory") self.connect() try: # Determine remote directory name if remote_dir is None: remote_dir = os.path.basename(os.path.abspath(local_dir)) # Ensure remote directory exists if not self.transport.fs_exists(remote_dir): self.transport.fs_mkdir(remote_dir) # Walk through local directory and copy files files_copied = 0 dirs_created = 0 for root, dirs, files in os.walk(local_dir): # Calculate relative path from local_dir rel_path = os.path.relpath(root, local_dir) # Build remote path if rel_path == '.': remote_base = remote_dir.rstrip('/') or '/' else: rel_path_clean = rel_path.replace(os.sep, '/') if remote_dir.rstrip('/') == '': remote_base = '/' + rel_path_clean else: remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean]) # Create remote directory if needed if rel_path != '.' and not self.transport.fs_exists(remote_base): print(f"Creating directory: {remote_base}", file=sys.stderr) self.transport.fs_mkdir(remote_base) dirs_created += 1 # Copy files for file in files: local_file = os.path.join(root, file) # Handle root directory case properly if remote_base == '/': remote_file = '/' + file else: remote_file = '/'.join([remote_base, file]) print(f"Uploading: {remote_file}", file=sys.stderr) with open(local_file, 'rb') as f: data = f.read() self.transport.fs_writefile(remote_file, data) files_copied += 1 return files_copied, dirs_created finally: self.disconnect() def list_files(self, remote_path=''): """ List files and directories on the device. Args: remote_path: Path on device to list (default: root directory) Returns: List of (name, is_directory, size) tuples """ self.connect() try: if remote_path and not self.transport.fs_exists(remote_path): raise ValueError(f"Path does not exist: {remote_path}") items = self.transport.fs_listdir(remote_path) result = [] for item in items: is_dir = item.st_mode & 0o170000 == 0o040000 result.append((item.name, is_dir, item.st_size)) return result finally: self.disconnect() def delete(self, remote_path, disconnect=True): """ Delete a file or directory on the device. Args: remote_path: Path on device to delete (file or directory) disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls) """ self.connect() try: if not self.transport.fs_exists(remote_path): raise ValueError(f"Path does not exist: {remote_path}") if self.transport.fs_isdir(remote_path): # Delete directory recursively # First, delete all files and subdirectories items = self.transport.fs_listdir(remote_path) for item in items: item_path = '/'.join([remote_path.rstrip('/'), item.name]) if item.st_mode & 0o170000 == 0o040000: # Directory self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect else: # File print(f"Deleting file: {item_path}", file=sys.stderr) self.transport.fs_rmfile(item_path) # Then delete the directory itself print(f"Deleting directory: {remote_path}", file=sys.stderr) self.transport.fs_rmdir(remote_path) else: # Delete file print(f"Deleting file: {remote_path}", file=sys.stderr) self.transport.fs_rmfile(remote_path) finally: if disconnect: self.disconnect() def reset(self): """Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D).""" # Use direct serial connection like dev.py does try: with serial.Serial(self.device, baudrate=115200) as ser: ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D except Exception as e: raise TransportError(f"Failed to reset device: {e}") from e def follow_output(self): """Follow device output continuously (like tail -f).""" # Use direct serial connection like dev.py does try: with serial.Serial(self.device, baudrate=115200) as ser: while True: if ser.in_waiting > 0: data = ser.readline().decode('utf-8', errors='replace').strip() if data: print(data) else: time.sleep(0.01) # Small delay to avoid busy-waiting except KeyboardInterrupt: print("\nStopped following output.", file=sys.stderr) except Exception as e: raise TransportError(f"Failed to follow output: {e}") from e def copy_file(from_device, device, remote_path, local_path): """ Copy a file to/from device. Args: from_device: True to copy from device, False to copy to device device: Device path (e.g., '/dev/ttyACM0') remote_path: Path on device (e.g., 'settings.json') local_path: Path on local filesystem """ conn = DeviceConnection(device) try: if from_device: conn.copy_from_device(remote_path, local_path) else: conn.copy_to_device(local_path, remote_path) except TransportError as e: raise RuntimeError(f"Device communication error: {e}") from e