import shutil from pathlib import Path from fastapi import HTTPException from editor_app import config from editor_app.models import FileInfo LIB_DIR_NAME = "lib" WRITABLE_ROOTS = {"code", "demos"} def _workspace_root(workspace_root: Path | None = None) -> Path: return (workspace_root or config.WORKSPACE_ROOT).resolve() def _shared_lib_root() -> Path: """Shared MicroPython stubs ship in the repo `lib/` directory and are read directly. Treating the bundle as the single source of truth means there is no on-disk `workspace/lib/` cache to keep in sync — updates to e.g. `machine.py` flow straight through to Pyodide and the file tree. """ return (config.PROJECT_ROOT / LIB_DIR_NAME).resolve() def normalize_relative_path(relative_path: str) -> str: cleaned = (relative_path or "").strip().lstrip("/") if not cleaned: return "" parts = [segment for segment in cleaned.split("/") if segment] if len(parts) >= 2 and parts[0] == "code": while len(parts) >= 2 and parts[0] == parts[1] == "code": parts.pop(1) if len(parts) >= 2 and parts[0] == "demos": while len(parts) >= 2 and parts[0] == parts[1] == "demos": parts.pop(1) return "/".join(parts) def resolve_workspace_path(relative_path: str, workspace_root: Path | None = None) -> Path: relative_path = normalize_relative_path(relative_path) root = _workspace_root(workspace_root) if relative_path == LIB_DIR_NAME or relative_path.startswith(f"{LIB_DIR_NAME}/"): suffix = relative_path[len(LIB_DIR_NAME) :].lstrip("/") target_path = (_shared_lib_root() / suffix).resolve() try: target_path.relative_to(_shared_lib_root()) except ValueError as exc: raise HTTPException(status_code=400, detail="Path escapes shared lib") from exc return target_path target_path = (root / relative_path).resolve() try: target_path.relative_to(root) except ValueError as exc: raise HTTPException(status_code=400, detail="Path escapes workspace") from exc return target_path def _is_path_in_lib(target_path: Path, workspace_root: Path | None = None) -> bool: try: target_path.resolve().relative_to(_shared_lib_root()) return True except ValueError: return False def _ensure_not_lib_path(target_path: Path, workspace_root: Path | None = None) -> None: if _is_path_in_lib(target_path, workspace_root): raise HTTPException(status_code=403, detail="lib is read-only") def _is_writable_path(target_path: Path, workspace_root: Path | None = None) -> bool: workspace = _workspace_root(workspace_root) resolved = target_path.resolve() try: relative = resolved.relative_to(workspace) except ValueError: return False if not relative.parts: return False return relative.parts[0] in WRITABLE_ROOTS def _ensure_writable_path(target_path: Path, workspace_root: Path | None = None) -> None: if not _is_writable_path(target_path, workspace_root): raise HTTPException( status_code=403, detail="Only code/ and demos/ are writable (lib is read-only)", ) def list_files(path: str = "", workspace_root: Path | None = None) -> list[FileInfo]: path = normalize_relative_path(path) root = _workspace_root(workspace_root) target_path = resolve_workspace_path(path, root) if path else root if not target_path.exists() or not target_path.is_dir(): raise HTTPException(status_code=404, detail="Directory not found") files = [] for item in sorted(target_path.iterdir()): if item.name.startswith("."): continue if not path and item.name == "users": continue files.append( FileInfo( name=item.name, is_directory=item.is_dir(), size=item.stat().st_size if item.is_file() else None, ) ) if not path: shared_lib = _shared_lib_root() if shared_lib.exists() and not any(f.name == LIB_DIR_NAME for f in files): files.append(FileInfo(name=LIB_DIR_NAME, is_directory=True, size=None)) return files def read_text_file(file_path: str, workspace_root: Path | None = None) -> tuple[str, str]: target_path = resolve_workspace_path(file_path, workspace_root) if not target_path.exists(): raise HTTPException(status_code=404, detail="File not found") if target_path.is_dir(): raise HTTPException(status_code=400, detail="Path is a directory") try: content = target_path.read_text(encoding="utf-8") except UnicodeDecodeError as exc: raise HTTPException(status_code=400, detail="File is not a text file") from exc return content, target_path.name def save_text_file(file_path: str, content: str, workspace_root: Path | None = None) -> str: target_path = resolve_workspace_path(file_path, workspace_root) _ensure_not_lib_path(target_path, workspace_root) _ensure_writable_path(target_path, workspace_root) target_path.parent.mkdir(parents=True, exist_ok=True) target_path.write_text(content, encoding="utf-8") return target_path.name def delete_file(file_path: str, workspace_root: Path | None = None) -> None: target_path = resolve_workspace_path(file_path, workspace_root) _ensure_not_lib_path(target_path, workspace_root) _ensure_writable_path(target_path, workspace_root) if not target_path.exists(): raise HTTPException(status_code=404, detail="File not found") if target_path.is_dir(): raise HTTPException(status_code=400, detail="Cannot delete directories") target_path.unlink() def move_path(source_path: str, destination_folder: str, workspace_root: Path | None = None) -> tuple[str, str]: root = _workspace_root(workspace_root) source = resolve_workspace_path(source_path, root) _ensure_not_lib_path(source, root) _ensure_writable_path(source, root) if not source.exists(): raise HTTPException(status_code=404, detail="Source path not found") destination_dir = ( resolve_workspace_path(destination_folder, root) if destination_folder else root ) _ensure_not_lib_path(destination_dir, root) _ensure_writable_path(destination_dir, root) if not destination_dir.exists() or not destination_dir.is_dir(): raise HTTPException(status_code=404, detail="Destination folder not found") destination = destination_dir / source.name source_resolved = source.resolve() destination_resolved = destination.resolve() if destination_resolved == source_resolved: raise HTTPException(status_code=400, detail="Path is already in that folder") if source.is_dir(): source_prefix = str(source_resolved) + "/" if str(destination_dir.resolve()).startswith(source_prefix): raise HTTPException( status_code=400, detail="Cannot move a folder into itself or its child" ) if destination.exists(): raise HTTPException( status_code=409, detail="A path with that name already exists in destination", ) destination.parent.mkdir(parents=True, exist_ok=True) source.rename(destination) moved_type = "folder" if destination.is_dir() else "file" return str(destination.relative_to(root)), moved_type def create_folder(folder_path: str, workspace_root: Path | None = None) -> str: target_path = resolve_workspace_path(folder_path, workspace_root) _ensure_not_lib_path(target_path, workspace_root) _ensure_writable_path(target_path, workspace_root) if target_path.exists(): raise HTTPException(status_code=400, detail="Folder already exists") target_path.mkdir(parents=True, exist_ok=False) return target_path.name def delete_folder(folder_path: str, workspace_root: Path | None = None) -> None: target_path = resolve_workspace_path(folder_path, workspace_root) _ensure_not_lib_path(target_path, workspace_root) _ensure_writable_path(target_path, workspace_root) if not target_path.exists(): raise HTTPException(status_code=404, detail="Folder not found") if not target_path.is_dir(): raise HTTPException(status_code=400, detail="Path is not a directory") shutil.rmtree(target_path) def collect_python_sources(workspace_root: Path | None = None) -> dict[str, str]: """Return UTF-8 `.py` under the scoped workspace plus shared stubs from `WORKSPACE_ROOT/lib/` (bundled Micropython mocks for Jedi/completion and Pyodide).""" result: dict[str, str] = {} workspace = _workspace_root(workspace_root) if workspace.exists(): for path in workspace.rglob("*.py"): try: rel = path.relative_to(workspace) except ValueError: continue if any(part.startswith(".") for part in rel.parts): continue try: key = str(rel).replace("\\", "/") result[key] = path.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue shared_lib = _shared_lib_root() if shared_lib.is_dir(): for path in shared_lib.rglob("*.py"): try: rel = path.relative_to(shared_lib) except ValueError: continue if any(part.startswith(".") for part in rel.parts): continue try: key = str(Path(LIB_DIR_NAME) / rel).replace("\\", "/") result[key] = path.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError): continue return result