""" Device communication wrapper using integrated mpremote transport. This module provides a simple interface for copying files to/from MicroPython devices without requiring mpremote to be installed as a separate dependency. """ import sys import os import time import serial # Add lib directory to path - handle both normal execution and PyInstaller bundle if getattr(sys, 'frozen', False): # Running as a PyInstaller bundle _this_dir = sys._MEIPASS else: # Running as a normal Python script _this_dir = os.path.dirname(os.path.abspath(__file__)) lib_path = os.path.join(_this_dir, 'lib') if lib_path not in sys.path and os.path.exists(lib_path): sys.path.insert(0, lib_path) from mpremote.transport_serial import SerialTransport from mpremote.transport import TransportError, TransportExecError class DeviceConnection: """Wrapper for device communication.""" #: Feed interval during uploads (led-driver uses WDT(timeout=10000) ms). WDT_FEED_INTERVAL_SEC = 5.0 def __init__(self, device): """Connect to a device.""" self.device = device self.transport = None def connect(self): """Establish connection to device.""" if self.transport is None: self.transport = SerialTransport(self.device, baudrate=115200) self.transport.enter_raw_repl() def disconnect(self): """Close connection to device.""" if self.transport: try: if self.transport.in_raw_repl: self.transport.exit_raw_repl() except Exception: pass try: self.transport.close() except Exception: pass self.transport = None def _feed_wdt(self) -> None: """Best-effort feed of the ESP task WDT between chunked FS writes.""" if self.transport is None: return try: self.transport.exec( "try:\n import machine\n machine.WDT(timeout=10000).feed()\nexcept Exception:\n pass\n" ) except Exception: pass def _make_wdt_upload_progress_callback(self): """Progress hook for Transport.fs_writefile: feed WDT every WDT_FEED_INTERVAL_SEC.""" last_feed = [time.monotonic()] def progress(written: int, total: int) -> None: now = time.monotonic() if now - last_feed[0] >= self.WDT_FEED_INTERVAL_SEC: self._feed_wdt() last_feed[0] = now return progress def _fs_writefile_with_wdt(self, remote_path: str, data: bytes) -> None: """Write file to device with periodic WDT feeds during long transfers.""" self._feed_wdt() self.transport.fs_writefile( remote_path, data, progress_callback=self._make_wdt_upload_progress_callback(), ) def copy_from_device(self, remote_path, local_path): """Copy a file from device to local filesystem.""" self.connect() try: data = self.transport.fs_readfile(remote_path) with open(local_path, 'wb') as f: f.write(data) finally: self.disconnect() def copy_to_device(self, local_path, remote_path): """Copy a file from local filesystem to device.""" self.connect() try: with open(local_path, 'rb') as f: data = f.read() self._fs_writefile_with_wdt(remote_path, data) finally: self.disconnect() def upload_directory(self, local_dir, remote_dir=None): """ Upload a directory recursively to the device. Args: local_dir: Local directory path to upload remote_dir: Remote directory path (default: root, uses basename of local_dir) """ import os if not os.path.isdir(local_dir): raise ValueError(f"{local_dir} is not a directory") self.connect() try: # Determine remote directory name if remote_dir is None: remote_dir = os.path.basename(os.path.abspath(local_dir)) # Ensure remote directory exists if not self.transport.fs_exists(remote_dir): self.transport.fs_mkdir(remote_dir) # Walk through local directory and copy files files_copied = 0 dirs_created = 0 for root, dirs, files in os.walk(local_dir): # Calculate relative path from local_dir rel_path = os.path.relpath(root, local_dir) # Build remote path if rel_path == '.': remote_base = remote_dir.rstrip('/') or '/' else: rel_path_clean = rel_path.replace(os.sep, '/') if remote_dir.rstrip('/') == '': remote_base = '/' + rel_path_clean else: remote_base = '/'.join([remote_dir.rstrip('/'), rel_path_clean]) # Create remote directory if needed if rel_path != '.' and not self.transport.fs_exists(remote_base): print(f"Creating directory: {remote_base}", file=sys.stderr) self.transport.fs_mkdir(remote_base) dirs_created += 1 # Copy files for file in files: local_file = os.path.join(root, file) # Handle root directory case properly if remote_base == '/': remote_file = '/' + file else: remote_file = '/'.join([remote_base, file]) print(f"Uploading: {remote_file}", file=sys.stderr) with open(local_file, 'rb') as f: data = f.read() self._fs_writefile_with_wdt(remote_file, data) files_copied += 1 return files_copied, dirs_created finally: self.disconnect() def list_files(self, remote_path=''): """ List files and directories on the device. Args: remote_path: Path on device to list (default: root directory) Returns: List of (name, is_directory, size) tuples """ self.connect() try: if remote_path and not self.transport.fs_exists(remote_path): raise ValueError(f"Path does not exist: {remote_path}") items = self.transport.fs_listdir(remote_path) result = [] for item in items: is_dir = item.st_mode & 0o170000 == 0o040000 result.append((item.name, is_dir, item.st_size)) return result finally: self.disconnect() def delete(self, remote_path, disconnect=True): """ Delete a file or directory on the device. Args: remote_path: Path on device to delete (file or directory) disconnect: Whether to disconnect after deletion (default: True, set to False for recursive calls) """ self.connect() try: if not self.transport.fs_exists(remote_path): raise ValueError(f"Path does not exist: {remote_path}") if self.transport.fs_isdir(remote_path): # Delete directory recursively # First, delete all files and subdirectories items = self.transport.fs_listdir(remote_path) for item in items: item_path = '/'.join([remote_path.rstrip('/'), item.name]) if item.st_mode & 0o170000 == 0o040000: # Directory self.delete(item_path, disconnect=False) # Recursive delete, don't disconnect else: # File print(f"Deleting file: {item_path}", file=sys.stderr) self.transport.fs_rmfile(item_path) # Then delete the directory itself print(f"Deleting directory: {remote_path}", file=sys.stderr) self.transport.fs_rmdir(remote_path) else: # Delete file print(f"Deleting file: {remote_path}", file=sys.stderr) self.transport.fs_rmfile(remote_path) finally: if disconnect: self.disconnect() def reset(self): """Reset the device using serial commands (Ctrl+C, Ctrl+C, Ctrl+D).""" # Use direct serial connection like dev.py does try: with serial.Serial(self.device, baudrate=115200) as ser: ser.write(b'\x03\x03\x04') # Ctrl+C, Ctrl+C, Ctrl+D except Exception as e: raise TransportError(f"Failed to reset device: {e}") from e def follow_output(self, duration=None): """ Follow device output (like tail -f). Args: duration: Optional number of seconds to follow output for. If None, follow indefinitely until interrupted. """ # Use direct serial connection like dev.py does start_time = time.time() try: with serial.Serial(self.device, baudrate=115200) as ser: while True: if duration is not None and (time.time() - start_time) >= duration: break if ser.in_waiting > 0: data = ser.readline().decode('utf-8', errors='replace').strip() if data: print(data) else: time.sleep(0.01) # Small delay to avoid busy-waiting except KeyboardInterrupt: print("\nStopped following output.", file=sys.stderr) except Exception as e: raise TransportError(f"Failed to follow output: {e}") from e def firmware_default_device_name(device: str) -> str: """ Default logical device name on led-driver firmware: led-, matching Settings.set_defaults() in led-driver/src/settings.py. """ conn = DeviceConnection(device) conn.connect() try: code = ( "import network, ubinascii\n" "sta = network.WLAN(network.STA_IF)\n" "sta.active(True)\n" "mac = ubinascii.hexlify(sta.config('mac')).decode().lower()\n" "print('led-' + mac)\n" ) out = conn.transport.exec(code) except TransportExecError as e: raise RuntimeError( "Could not read STA MAC from device (is this MicroPython with network?): " + (e.error_output or str(e)).strip() ) from e finally: conn.disconnect() text = out.decode("utf-8", errors="replace").strip() for line in reversed(text.splitlines()): line = line.strip() if line.startswith("led-"): return line raise RuntimeError("Device did not return a led- default name") def copy_file(from_device, device, remote_path, local_path): """ Copy a file to/from device. Args: from_device: True to copy from device, False to copy to device device: Device path (e.g., '/dev/ttyACM0') remote_path: Path on device (e.g., 'settings.json') local_path: Path on local filesystem """ conn = DeviceConnection(device) try: if from_device: conn.copy_from_device(remote_path, local_path) else: conn.copy_to_device(local_path, remote_path) except TransportError as e: raise RuntimeError(f"Device communication error: {e}") from e