from __future__ import annotations import os import re from pathlib import Path from fastapi import Cookie, Depends, Header, HTTPException, Query from sqlalchemy.orm import Session from editor_app.db.session import get_db from editor_app.db.models import User from editor_app import config from editor_app.services import accounts def api_key_valid(authorization: str | None) -> bool: expected = (os.environ.get("EDITOR_API_KEY") or "").strip() if not expected: return False return (authorization or "").strip() == f"Bearer {expected}" async def require_api_access( db: Session = Depends(get_db), authorization: str | None = Header(None), editor_session: str | None = Cookie(None), ) -> None: """API key, or (when auth off) open access, or (when auth on) valid session — see README.""" if api_key_valid(authorization): return key_configured = bool((os.environ.get("EDITOR_API_KEY") or "").strip()) if key_configured: if accounts.auth_enabled(): user = accounts.get_session_user(db, editor_session) if user: return raise HTTPException(status_code=401, detail="Invalid or missing API key") if not accounts.auth_enabled(): return user = accounts.get_session_user(db, editor_session) if user is None: raise HTTPException(status_code=401, detail="Not authenticated") async def get_current_user_optional( db: Session = Depends(get_db), authorization: str | None = Header(None), editor_session: str | None = Cookie(None), ) -> User | None: if api_key_valid(authorization): return None if not accounts.auth_enabled(): return None return accounts.get_session_user(db, editor_session) async def require_superuser( user: User | None = Depends(get_current_user_optional), ) -> User: if not accounts.auth_enabled(): raise HTTPException(status_code=400, detail="User management requires AUTH_ENABLED=true") if user is None: raise HTTPException(status_code=401, detail="Not authenticated") if not user.is_superuser: raise HTTPException(status_code=403, detail="Superuser required") return user def _safe_workspace_leaf(user: User) -> str: base = re.sub(r"[^a-zA-Z0-9._-]+", "-", user.username).strip("-").lower() or "user" return f"{base}-{user.id}" def _seed_user_workspace(user_root: Path) -> None: (user_root / "code").mkdir(parents=True, exist_ok=True) async def get_workspace_root( user: User | None = Depends(get_current_user_optional), workspace_user_id: int | None = Query(default=None), db: Session = Depends(get_db), ) -> Path: root = config.WORKSPACE_ROOT.resolve() if not accounts.auth_enabled() or user is None: return root target_user = user if workspace_user_id is not None: if not user.is_superuser: raise HTTPException(status_code=403, detail="Superuser required for workspace override") lookup = accounts.get_user_by_id(db, int(workspace_user_id)) if lookup is None: raise HTTPException(status_code=404, detail="Workspace user not found") target_user = lookup user_root = root / "users" / _safe_workspace_leaf(target_user) _seed_user_workspace(user_root) return user_root