Add browser Python editor with Pyodide, user auth, and workspace API
- FastAPI serves static UI, file CRUD under code/ and read-only lib/ - Pyodide worker runs Python and Jedi completions in the browser - SQLite accounts: login/register, session cookies, superuser user management - Optional EDITOR_API_KEY, AUTH_* env vars, .env.example - Pipenv, pytest, Selenium smoke test, README Made-with: Cursor
This commit is contained in:
0
src/editor_app/services/__init__.py
Normal file
0
src/editor_app/services/__init__.py
Normal file
128
src/editor_app/services/accounts.py
Normal file
128
src/editor_app/services/accounts.py
Normal file
@@ -0,0 +1,128 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime as dt
|
||||
import os
|
||||
import secrets
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import bcrypt
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from editor_app.db.models import AuthSession, User
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
SESSION_COOKIE_NAME = "editor_session"
|
||||
SESSION_DAYS_DEFAULT = 14
|
||||
|
||||
|
||||
def auth_enabled() -> bool:
|
||||
return os.environ.get("AUTH_ENABLED", "false").strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
def register_open() -> bool:
|
||||
return os.environ.get("AUTH_REGISTER_OPEN", "true").strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
def session_ttl_days() -> int:
|
||||
try:
|
||||
return max(1, min(365, int(os.environ.get("AUTH_SESSION_DAYS", str(SESSION_DAYS_DEFAULT)))))
|
||||
except ValueError:
|
||||
return SESSION_DAYS_DEFAULT
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("ascii")
|
||||
|
||||
|
||||
def verify_password(plain: str, hashed: str) -> bool:
|
||||
try:
|
||||
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("ascii"))
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def get_user_by_username(db: Session, username: str) -> User | None:
|
||||
return db.scalars(select(User).where(User.username == username)).one_or_none()
|
||||
|
||||
|
||||
def get_user_by_id(db: Session, user_id: int) -> User | None:
|
||||
return db.scalars(select(User).where(User.id == user_id)).one_or_none()
|
||||
|
||||
|
||||
def count_users(db: Session) -> int:
|
||||
return int(db.scalar(select(func.count()).select_from(User)) or 0)
|
||||
|
||||
|
||||
def create_user(db: Session, username: str, password: str, *, is_superuser: bool = False) -> User:
|
||||
user = User(
|
||||
username=username,
|
||||
password_hash=hash_password(password),
|
||||
is_superuser=is_superuser,
|
||||
)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
def register_user(db: Session, username: str, password: str) -> User:
|
||||
if get_user_by_username(db, username):
|
||||
raise ValueError("Username already taken")
|
||||
first = count_users(db) == 0
|
||||
return create_user(db, username, password, is_superuser=first)
|
||||
|
||||
|
||||
def authenticate(db: Session, username: str, password: str) -> User | None:
|
||||
user = get_user_by_username(db, username.strip())
|
||||
if not user or not verify_password(password, user.password_hash):
|
||||
return None
|
||||
return user
|
||||
|
||||
|
||||
def _utc_naive() -> dt.datetime:
|
||||
return dt.datetime.now(dt.UTC).replace(tzinfo=None)
|
||||
|
||||
|
||||
def create_session(db: Session, user: User) -> AuthSession:
|
||||
token = secrets.token_urlsafe(48)
|
||||
expires = _utc_naive() + dt.timedelta(days=session_ttl_days())
|
||||
row = AuthSession(user_id=user.id, token=token, expires_at=expires)
|
||||
db.add(row)
|
||||
db.commit()
|
||||
db.refresh(row)
|
||||
return row
|
||||
|
||||
|
||||
def get_session_user(db: Session, token: str | None) -> User | None:
|
||||
if not token:
|
||||
return None
|
||||
now = _utc_naive()
|
||||
row = db.scalars(select(AuthSession).where(AuthSession.token == token)).one_or_none()
|
||||
if not row or row.expires_at < now:
|
||||
return None
|
||||
return get_user_by_id(db, row.user_id)
|
||||
|
||||
|
||||
def delete_session(db: Session, token: str | None) -> None:
|
||||
if not token:
|
||||
return
|
||||
row = db.scalars(select(AuthSession).where(AuthSession.token == token)).one_or_none()
|
||||
if row:
|
||||
db.delete(row)
|
||||
db.commit()
|
||||
|
||||
|
||||
def list_users(db: Session) -> list[User]:
|
||||
return list(db.scalars(select(User).order_by(User.username)).all())
|
||||
|
||||
|
||||
def delete_user(db: Session, user_id: int) -> bool:
|
||||
user = get_user_by_id(db, user_id)
|
||||
if not user:
|
||||
return False
|
||||
db.delete(user)
|
||||
db.commit()
|
||||
return True
|
||||
202
src/editor_app/services/filesystem.py
Normal file
202
src/editor_app/services/filesystem.py
Normal file
@@ -0,0 +1,202 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from editor_app import config
|
||||
from editor_app.models import FileInfo
|
||||
|
||||
LIB_DIR_NAME = "lib"
|
||||
WRITABLE_ROOTS = {"code"}
|
||||
|
||||
|
||||
def normalize_relative_path(relative_path: str) -> str:
|
||||
cleaned = (relative_path or "").strip().lstrip("/")
|
||||
if not cleaned:
|
||||
return ""
|
||||
|
||||
parts = [segment for segment in cleaned.split("/") if segment]
|
||||
if len(parts) >= 2 and parts[0] == "code":
|
||||
while len(parts) >= 2 and parts[0] == parts[1] == "code":
|
||||
parts.pop(1)
|
||||
return "/".join(parts)
|
||||
|
||||
|
||||
def resolve_workspace_path(relative_path: str) -> Path:
|
||||
relative_path = normalize_relative_path(relative_path)
|
||||
target_path = (config.WORKSPACE_ROOT / relative_path).resolve()
|
||||
try:
|
||||
target_path.relative_to(config.WORKSPACE_ROOT.resolve())
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=400, detail="Path escapes workspace") from exc
|
||||
return target_path
|
||||
|
||||
|
||||
def _is_path_in_lib(target_path: Path) -> bool:
|
||||
workspace = config.WORKSPACE_ROOT.resolve()
|
||||
lib_root = (workspace / LIB_DIR_NAME).resolve()
|
||||
try:
|
||||
target_path.resolve().relative_to(lib_root)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def _ensure_not_lib_path(target_path: Path) -> None:
|
||||
if _is_path_in_lib(target_path):
|
||||
raise HTTPException(status_code=403, detail="lib is read-only")
|
||||
|
||||
|
||||
def _is_writable_path(target_path: Path) -> bool:
|
||||
workspace = config.WORKSPACE_ROOT.resolve()
|
||||
resolved = target_path.resolve()
|
||||
try:
|
||||
relative = resolved.relative_to(workspace)
|
||||
except ValueError:
|
||||
return False
|
||||
if not relative.parts:
|
||||
return False
|
||||
return relative.parts[0] in WRITABLE_ROOTS
|
||||
|
||||
|
||||
def _ensure_writable_path(target_path: Path) -> None:
|
||||
if not _is_writable_path(target_path):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Only code/ is writable (lib is read-only)",
|
||||
)
|
||||
|
||||
|
||||
def list_files(path: str = "") -> list[FileInfo]:
|
||||
path = normalize_relative_path(path)
|
||||
target_path = config.WORKSPACE_ROOT / path if path else config.WORKSPACE_ROOT
|
||||
if not target_path.exists() or not target_path.is_dir():
|
||||
raise HTTPException(status_code=404, detail="Directory not found")
|
||||
|
||||
files = []
|
||||
for item in sorted(target_path.iterdir()):
|
||||
if item.name.startswith("."):
|
||||
continue
|
||||
files.append(
|
||||
FileInfo(
|
||||
name=item.name,
|
||||
is_directory=item.is_dir(),
|
||||
size=item.stat().st_size if item.is_file() else None,
|
||||
)
|
||||
)
|
||||
return files
|
||||
|
||||
|
||||
def read_text_file(file_path: str) -> tuple[str, str]:
|
||||
target_path = resolve_workspace_path(file_path)
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
if target_path.is_dir():
|
||||
raise HTTPException(status_code=400, detail="Path is a directory")
|
||||
try:
|
||||
content = target_path.read_text(encoding="utf-8")
|
||||
except UnicodeDecodeError as exc:
|
||||
raise HTTPException(status_code=400, detail="File is not a text file") from exc
|
||||
return content, target_path.name
|
||||
|
||||
|
||||
def save_text_file(file_path: str, content: str) -> str:
|
||||
target_path = resolve_workspace_path(file_path)
|
||||
_ensure_not_lib_path(target_path)
|
||||
_ensure_writable_path(target_path)
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
target_path.write_text(content, encoding="utf-8")
|
||||
return target_path.name
|
||||
|
||||
|
||||
def delete_file(file_path: str) -> None:
|
||||
target_path = resolve_workspace_path(file_path)
|
||||
_ensure_not_lib_path(target_path)
|
||||
_ensure_writable_path(target_path)
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
if target_path.is_dir():
|
||||
raise HTTPException(status_code=400, detail="Cannot delete directories")
|
||||
target_path.unlink()
|
||||
|
||||
|
||||
def move_path(source_path: str, destination_folder: str) -> tuple[str, str]:
|
||||
source = resolve_workspace_path(source_path)
|
||||
_ensure_not_lib_path(source)
|
||||
_ensure_writable_path(source)
|
||||
if not source.exists():
|
||||
raise HTTPException(status_code=404, detail="Source path not found")
|
||||
|
||||
destination_dir = (
|
||||
resolve_workspace_path(destination_folder)
|
||||
if destination_folder
|
||||
else config.WORKSPACE_ROOT
|
||||
)
|
||||
_ensure_not_lib_path(destination_dir)
|
||||
_ensure_writable_path(destination_dir)
|
||||
if not destination_dir.exists() or not destination_dir.is_dir():
|
||||
raise HTTPException(status_code=404, detail="Destination folder not found")
|
||||
|
||||
destination = destination_dir / source.name
|
||||
source_resolved = source.resolve()
|
||||
destination_resolved = destination.resolve()
|
||||
if destination_resolved == source_resolved:
|
||||
raise HTTPException(status_code=400, detail="Path is already in that folder")
|
||||
if source.is_dir():
|
||||
source_prefix = str(source_resolved) + "/"
|
||||
if str(destination_dir.resolve()).startswith(source_prefix):
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Cannot move a folder into itself or its child"
|
||||
)
|
||||
if destination.exists():
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail="A path with that name already exists in destination",
|
||||
)
|
||||
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
source.rename(destination)
|
||||
moved_type = "folder" if destination.is_dir() else "file"
|
||||
return str(destination.relative_to(config.WORKSPACE_ROOT)), moved_type
|
||||
|
||||
|
||||
def create_folder(folder_path: str) -> str:
|
||||
target_path = resolve_workspace_path(folder_path)
|
||||
_ensure_not_lib_path(target_path)
|
||||
_ensure_writable_path(target_path)
|
||||
if target_path.exists():
|
||||
raise HTTPException(status_code=400, detail="Folder already exists")
|
||||
target_path.mkdir(parents=True, exist_ok=False)
|
||||
return target_path.name
|
||||
|
||||
|
||||
def delete_folder(folder_path: str) -> None:
|
||||
target_path = resolve_workspace_path(folder_path)
|
||||
_ensure_not_lib_path(target_path)
|
||||
_ensure_writable_path(target_path)
|
||||
if not target_path.exists():
|
||||
raise HTTPException(status_code=404, detail="Folder not found")
|
||||
if not target_path.is_dir():
|
||||
raise HTTPException(status_code=400, detail="Path is not a directory")
|
||||
shutil.rmtree(target_path)
|
||||
|
||||
|
||||
def collect_python_sources() -> dict[str, str]:
|
||||
"""Return all UTF-8 .py files under the workspace for browser-side Pyodide sync."""
|
||||
result: dict[str, str] = {}
|
||||
workspace = config.WORKSPACE_ROOT.resolve()
|
||||
if not workspace.exists():
|
||||
return result
|
||||
for path in workspace.rglob("*.py"):
|
||||
try:
|
||||
rel = path.relative_to(workspace)
|
||||
except ValueError:
|
||||
continue
|
||||
if any(part.startswith(".") for part in rel.parts):
|
||||
continue
|
||||
try:
|
||||
key = str(rel).replace("\\", "/")
|
||||
result[key] = path.read_text(encoding="utf-8")
|
||||
except (UnicodeDecodeError, OSError):
|
||||
continue
|
||||
return result
|
||||
Reference in New Issue
Block a user