Add initial web editor app, CLI scripts, and test scaffolding.

This introduces the FastAPI editor implementation and related project setup so the app can be run and validated locally.

Made-with: Cursor
This commit is contained in:
2026-04-11 02:14:26 +12:00
parent fb5f55cda7
commit f9bf119af6
33 changed files with 4846 additions and 0 deletions

373
tests/test_api.py Normal file
View File

@@ -0,0 +1,373 @@
import importlib
import time
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(tmp_path):
import editor_app.config as config
import editor_app.main as main
import editor_app.services.python_runner as runner
config.WORKSPACE_ROOT = tmp_path
importlib.reload(main)
# Reset runner state to avoid cross-test contamination.
with runner.python_runner.lock:
runner.python_runner.process = None
runner.python_runner.output_lines = []
runner.python_runner.return_code = None
runner.python_runner.running = False
return TestClient(main.app)
def test_root_serves_html(client):
response = client.get("/")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "Connection Machine" in response.text
def test_editor_route_serves_editor_html(client):
response = client.get("/editor")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "Connection Machine Editor" in response.text
def test_list_files_hides_dotfiles_and_reports_sizes(client, tmp_path):
(tmp_path / ".hidden.txt").write_text("secret", encoding="utf-8")
(tmp_path / "visible.txt").write_text("hello", encoding="utf-8")
(tmp_path / "folder").mkdir()
response = client.get("/api/files")
assert response.status_code == 200
files = response.json()["files"]
names = {item["name"] for item in files}
assert ".hidden.txt" not in names
assert "visible.txt" in names
assert "folder" in names
def test_list_files_missing_directory_returns_404(client):
response = client.get("/api/files", params={"path": "does-not-exist"})
assert response.status_code == 404
def test_save_and_read_file_roundtrip(client, tmp_path):
response = client.post("/api/file/code/docs/readme.txt", json={"content": "doc body"})
assert response.status_code == 200
assert (tmp_path / "code" / "docs" / "readme.txt").read_text(encoding="utf-8") == "doc body"
read_response = client.get("/api/file/code/docs/readme.txt")
assert read_response.status_code == 200
assert read_response.json()["content"] == "doc body"
def test_save_file_collapses_duplicate_scoped_prefix(client, tmp_path):
response = client.post("/api/file/code/code/main.py", json={"content": "print('ok')"})
assert response.status_code == 200
assert (tmp_path / "code" / "main.py").read_text(encoding="utf-8") == "print('ok')"
assert not (tmp_path / "code" / "code" / "main.py").exists()
def test_lib_folder_is_read_only_for_mutations(client, tmp_path):
lib_dir = tmp_path / "lib"
lib_dir.mkdir()
(lib_dir / "helper.py").write_text("x = 1\n", encoding="utf-8")
code_dir = tmp_path / "code"
code_dir.mkdir()
(code_dir / "main.py").write_text("print('ok')\n", encoding="utf-8")
save_blocked = client.post("/api/file/lib/new.txt", json={"content": "nope"})
assert save_blocked.status_code == 403
delete_blocked = client.delete("/api/file/lib/helper.py")
assert delete_blocked.status_code == 403
move_blocked = client.post(
"/api/file-move",
json={"source_path": "code/main.py", "destination_folder": "lib"},
)
assert move_blocked.status_code == 403
def test_only_code_and_prompts_are_writable(client, tmp_path):
blocked_file = client.post("/api/file/notes.txt", json={"content": "nope"})
assert blocked_file.status_code == 403
blocked_folder = client.post("/api/folder/new/archive", json={"path": "ignored"})
assert blocked_folder.status_code == 403
allowed_prompt = client.post("/api/file/prompts/a.txt", json={"content": "ok"})
assert allowed_prompt.status_code == 200
def test_read_file_errors_for_directory_and_missing(client, tmp_path):
(tmp_path / "docs").mkdir()
dir_response = client.get("/api/file/docs")
assert dir_response.status_code == 400
missing_response = client.get("/api/file/missing.txt")
assert missing_response.status_code == 404
def test_read_file_non_utf8_returns_400(client, tmp_path):
(tmp_path / "bin.dat").write_bytes(b"\xff\xfe\x00")
response = client.get("/api/file/bin.dat")
assert response.status_code == 400
def test_delete_file_success_and_errors(client, tmp_path):
target = tmp_path / "code" / "delete-me.txt"
target.parent.mkdir()
target.write_text("x", encoding="utf-8")
ok = client.delete("/api/file/code/delete-me.txt")
assert ok.status_code == 200
assert not target.exists()
missing = client.delete("/api/file/code/delete-me.txt")
assert missing.status_code == 404
(tmp_path / "code" / "dir").mkdir(parents=True)
directory = client.delete("/api/file/code/dir")
assert directory.status_code == 400
def test_move_file_to_another_folder(client, tmp_path):
source = tmp_path / "code" / "docs" / "note.txt"
source.parent.mkdir(parents=True)
source.write_text("hello", encoding="utf-8")
(tmp_path / "code" / "archive").mkdir(parents=True)
response = client.post(
"/api/file-move",
json={"source_path": "code/docs/note.txt", "destination_folder": "code/archive"},
)
assert response.status_code == 200
assert response.json()["new_path"] == "code/archive/note.txt"
assert not source.exists()
assert (tmp_path / "code" / "archive" / "note.txt").exists()
def test_move_file_errors(client, tmp_path):
(tmp_path / "code" / "docs").mkdir(parents=True)
(tmp_path / "code" / "docs" / "note.txt").write_text("x", encoding="utf-8")
(tmp_path / "code" / "archive").mkdir(parents=True)
(tmp_path / "code" / "archive" / "note.txt").write_text("x", encoding="utf-8")
conflict = client.post(
"/api/file-move",
json={"source_path": "code/docs/note.txt", "destination_folder": "code/archive"},
)
assert conflict.status_code == 409
missing = client.post(
"/api/file-move",
json={"source_path": "code/missing.txt", "destination_folder": "code/archive"},
)
assert missing.status_code == 404
def test_move_folder_to_another_folder(client, tmp_path):
(tmp_path / "code" / "docs").mkdir(parents=True)
(tmp_path / "code" / "docs" / "note.txt").write_text("x", encoding="utf-8")
(tmp_path / "code" / "archive").mkdir(parents=True)
response = client.post(
"/api/file-move",
json={"source_path": "code/docs", "destination_folder": "code/archive"},
)
assert response.status_code == 200
assert response.json()["new_path"] == "code/archive/docs"
assert response.json()["moved_type"] == "folder"
assert (tmp_path / "code" / "archive" / "docs" / "note.txt").exists()
assert not (tmp_path / "code" / "docs").exists()
def test_move_folder_errors(client, tmp_path):
(tmp_path / "code" / "docs").mkdir(parents=True)
(tmp_path / "code" / "docs" / "nested").mkdir()
(tmp_path / "code" / "archive").mkdir(parents=True)
(tmp_path / "code" / "archive" / "docs").mkdir()
into_child = client.post(
"/api/file-move",
json={"source_path": "code/docs", "destination_folder": "code/docs/nested"},
)
assert into_child.status_code == 400
name_conflict = client.post(
"/api/file-move",
json={"source_path": "code/docs", "destination_folder": "code/archive"},
)
assert name_conflict.status_code == 409
def test_path_escape_is_blocked(client):
response = client.post("/api/file/%2E%2E/evil.txt", json={"content": "nope"})
assert response.status_code == 400
def test_folder_create_and_delete(client, tmp_path):
create = client.post("/api/folder/new/code/new-folder", json={"path": "ignored"})
assert create.status_code == 200
assert (tmp_path / "code" / "new-folder").is_dir()
exists = client.post("/api/folder/new/code/new-folder", json={"path": "ignored"})
assert exists.status_code == 400
delete = client.delete("/api/folder/code/new-folder")
assert delete.status_code == 200
assert not (tmp_path / "code" / "new-folder").exists()
def test_create_folder_collapses_duplicate_scoped_prefix(client, tmp_path):
create = client.post("/api/folder/new/prompts/prompts/drafts", json={"path": "ignored"})
assert create.status_code == 200
assert (tmp_path / "prompts" / "drafts").is_dir()
assert not (tmp_path / "prompts" / "prompts").exists()
def test_folder_delete_errors(client, tmp_path):
missing = client.delete("/api/folder/code/missing")
assert missing.status_code == 404
(tmp_path / "code").mkdir()
(tmp_path / "code" / "file.txt").write_text("x", encoding="utf-8")
not_dir = client.delete("/api/folder/code/file.txt")
assert not_dir.status_code == 400
def test_python_run_output_and_stop(client, tmp_path):
script = tmp_path / "demo.py"
script.write_text(
"import time\nprint('hello')\ntime.sleep(0.3)\nprint('bye')\n",
encoding="utf-8",
)
run = client.post("/api/python/run", json={"file_path": "demo.py"})
assert run.status_code == 200
# Eventually process is running and output starts arriving.
output = {"running": True, "lines": []}
for _ in range(30):
output = client.get("/api/python/output", params={"offset": 0}).json()
if output["lines"]:
break
time.sleep(0.05)
assert output["lines"]
assert output["lines"][0].startswith("$ ")
stop = client.post("/api/python/stop")
assert stop.status_code == 200
final = client.get("/api/python/output", params={"offset": 0}).json()
assert final["running"] is False
def test_python_run_accepts_args_and_logs_command(client, tmp_path):
script = tmp_path / "demo_args.py"
script.write_text("import sys\nprint('args=' + '|'.join(sys.argv[1:]))\n", encoding="utf-8")
run = client.post("/api/python/run", json={"file_path": "demo_args.py", "args": ["child-a"]})
assert run.status_code == 200
output = {"running": True, "lines": []}
for _ in range(30):
output = client.get("/api/python/output", params={"offset": 0}).json()
joined = "".join(output.get("lines", []))
if "args=child-a" in joined:
break
time.sleep(0.05)
joined = "".join(output.get("lines", []))
assert "demo_args.py child-a" in joined
assert "args=child-a" in joined
client.post("/api/python/stop")
def test_python_run_rejects_missing_and_non_python(client, tmp_path):
missing = client.post("/api/python/run", json={"file_path": "missing.py"})
assert missing.status_code == 404
(tmp_path / "note.txt").write_text("hello", encoding="utf-8")
wrong_ext = client.post("/api/python/run", json={"file_path": "note.txt"})
assert wrong_ext.status_code == 400
def test_python_run_rejects_when_already_running(client, tmp_path):
script = tmp_path / "long.py"
script.write_text("import time\ntime.sleep(1.0)\n", encoding="utf-8")
first = client.post("/api/python/run", json={"file_path": "long.py"})
assert first.status_code == 200
second = client.post("/api/python/run", json={"file_path": "long.py"})
assert second.status_code == 409
client.post("/api/python/stop")
def test_python_stop_when_nothing_running(client):
response = client.post("/api/python/stop")
assert response.status_code == 200
assert response.json()["message"] == "No running process"
def test_python_completions_returns_results(client):
response = client.post(
"/api/python/completions",
json={
"file_path": "scratch.py",
"content": "import os\nos.pa",
"line": 2,
"column": 5,
"max_results": 10,
},
)
assert response.status_code == 200
names = {item["name"] for item in response.json()["completions"]}
assert "path" in names
def test_python_completions_bad_position_returns_400(client):
response = client.post(
"/api/python/completions",
json={
"file_path": "scratch.py",
"content": "x = 1",
"line": 99,
"column": 1,
"max_results": 10,
},
)
assert response.status_code == 400
def test_python_scripts_list_and_startup_selection(client, tmp_path):
(tmp_path / "code").mkdir()
(tmp_path / "code" / "job.py").write_text("print('ok')\n", encoding="utf-8")
scripts = client.get("/api/python/scripts")
assert scripts.status_code == 200
assert "code/job.py" in scripts.json()["scripts"]
set_startup = client.post(
"/api/python/startup-script",
json={"file_path": "code/job.py"},
)
assert set_startup.status_code == 200
startup = client.get("/api/python/startup-script")
assert startup.status_code == 200
assert startup.json()["file_path"] == "code/job.py"

91
tests/test_browser.py Normal file
View File

@@ -0,0 +1,91 @@
import importlib
import socket
import subprocess
import sys
import time
from pathlib import Path
import pytest
def _is_port_open(port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.3)
return sock.connect_ex(("127.0.0.1", port)) == 0
@pytest.mark.integration
def test_browser_create_file_not_forced_into_new(tmp_path):
playwright = pytest.importorskip("playwright.sync_api")
sync_playwright = playwright.sync_playwright
editor_dir = Path(__file__).resolve().parents[1]
port = 8123
env = dict(**__import__("os").environ, WORKSPACE_ROOT=str(tmp_path))
server = subprocess.Popen(
[
sys.executable,
"-m",
"uvicorn",
"app:app",
"--app-dir",
"src",
"--host",
"127.0.0.1",
"--port",
str(port),
],
cwd=str(editor_dir),
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
for _ in range(50):
if _is_port_open(port):
break
time.sleep(0.1)
else:
pytest.fail("Server did not start in time")
with sync_playwright() as p:
try:
browser = p.chromium.launch()
except Exception as exc: # pragma: no cover
pytest.skip(f"Playwright browser not installed: {exc}")
page = browser.new_page()
page.goto(f"http://127.0.0.1:{port}/editor", wait_until="networkidle")
page.click("#new-file-btn")
page.fill("#new-filename", "browser-test.txt")
page.click("#create-file-btn")
page.wait_for_timeout(500)
browser.close()
assert (tmp_path / "browser-test.txt").exists()
assert not (tmp_path / "new" / "browser-test.txt").exists()
finally:
server.terminate()
try:
server.wait(timeout=3)
except subprocess.TimeoutExpired:
server.kill()
def test_new_file_uses_api_file_route(tmp_path):
import editor_app.config as config
import editor_app.main as main
from fastapi.testclient import TestClient
config.WORKSPACE_ROOT = tmp_path
importlib.reload(main)
client = TestClient(main.app)
response = client.post(
"/api/file/code/routing-check.txt",
json={"content": "ok"},
)
assert response.status_code == 200
assert (tmp_path / "code" / "routing-check.txt").exists()
assert not (tmp_path / "new" / "routing-check.txt").exists()

View File

@@ -0,0 +1,36 @@
import importlib
import os
import sys
from pathlib import Path
from fastapi.testclient import TestClient
def test_python_completion_returns_basic_suggestions(tmp_path):
editor_dir = Path(__file__).resolve().parents[1]
src_dir = editor_dir / "src"
if str(src_dir) not in sys.path:
sys.path.insert(0, str(src_dir))
os.environ["WORKSPACE_ROOT"] = str(tmp_path)
import app as editor_app
editor_app = importlib.reload(editor_app)
editor_app.WORKSPACE_ROOT = tmp_path
client = TestClient(editor_app.app)
response = client.post(
"/api/python/completions",
json={
"file_path": "example.py",
"content": "import os\nos.pa",
"line": 2,
"column": 5,
"max_results": 20,
},
)
assert response.status_code == 200
body = response.json()
names = [item["name"] for item in body["completions"]]
assert "path" in names

237
tests/test_internal.py Normal file
View File

@@ -0,0 +1,237 @@
import builtins
import importlib
import subprocess
from pathlib import Path
import pytest
from fastapi import HTTPException
def test_load_env_file_sets_missing_keys_only(tmp_path, monkeypatch):
import editor_app.config as config
env_file = tmp_path / ".env"
env_file.write_text(
"# comment\nFOO=bar\nBAZ='quoted'\nEXISTING=from_file\n", encoding="utf-8"
)
monkeypatch.setenv("EXISTING", "kept")
monkeypatch.delenv("FOO", raising=False)
monkeypatch.delenv("BAZ", raising=False)
config.load_env_file(env_file)
assert __import__("os").environ["FOO"] == "bar"
assert __import__("os").environ["BAZ"] == "quoted"
assert __import__("os").environ["EXISTING"] == "kept"
def test_load_env_file_ignores_missing_file(tmp_path):
import editor_app.config as config
config.load_env_file(tmp_path / "missing.env")
class _ProcNoStdout:
stdout = None
class _ProcWithLines:
def __init__(self):
self.stdout = iter(["line1\n", "line2\n"])
class _ProcWait:
def __init__(self, code=0):
self.code = code
def wait(self, timeout=None):
return self.code
class _ProcStop:
def __init__(self):
self.returncode = 0
self.terminated = False
self.killed = False
self.wait_calls = 0
def terminate(self):
self.terminated = True
def wait(self, timeout=None):
self.wait_calls += 1
if self.wait_calls == 1:
raise subprocess.TimeoutExpired("cmd", timeout)
return self.returncode
def kill(self):
self.killed = True
def test_python_runner_stream_helpers_and_wait():
import editor_app.services.python_runner as runner
with runner.python_runner.lock:
runner.python_runner.output_lines = []
runner.stream_process_output(_ProcNoStdout())
runner.stream_process_output(_ProcWithLines())
with runner.python_runner.lock:
assert runner.python_runner.output_lines[-2:] == ["line1\n", "line2\n"]
proc = _ProcWait(code=7)
runner.wait_for_process(proc)
with runner.python_runner.lock:
assert runner.python_runner.return_code == 7
assert runner.python_runner.running is False
assert runner.python_runner.process is None
def test_python_runner_run_failure_raises_http(monkeypatch, tmp_path):
import editor_app.config as config
import editor_app.services.python_runner as runner
config.WORKSPACE_ROOT = tmp_path
def _boom(*args, **kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(runner.subprocess, "Popen", _boom)
with pytest.raises(HTTPException) as exc:
runner.run_python_file(tmp_path / "x.py", "x.py")
assert exc.value.status_code == 500
def test_python_runner_stop_kill_path():
import editor_app.services.python_runner as runner
proc = _ProcStop()
with runner.python_runner.lock:
runner.python_runner.process = proc
runner.python_runner.output_lines = []
runner.python_runner.running = True
message = runner.stop_python_process()
assert message == "Python process stopped"
assert proc.terminated is True
assert proc.killed is True
def test_completions_import_error_path(tmp_path, monkeypatch):
import editor_app.config as config
import editor_app.main as main
from fastapi.testclient import TestClient
config.WORKSPACE_ROOT = tmp_path
importlib.reload(main)
client = TestClient(main.app)
real_import = builtins.__import__
def fake_import(name, *args, **kwargs):
if name == "jedi":
raise ImportError("forced")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", fake_import)
response = client.post(
"/api/python/completions",
json={
"file_path": "scratch.py",
"content": "x = 1",
"line": 1,
"column": 1,
"max_results": 10,
},
)
assert response.status_code == 500
def test_websocket_output_status_message(tmp_path):
import editor_app.config as config
import editor_app.main as main
from fastapi.testclient import TestClient
config.WORKSPACE_ROOT = tmp_path
importlib.reload(main)
with TestClient(main.app) as client:
with client.websocket_connect("/api/python/ws/output") as ws:
payload = ws.receive_json()
assert "lines" in payload
assert "running" in payload
assert payload["running"] is False
def test_create_app_startup_creates_lib(tmp_path):
import editor_app.config as config
import editor_app.main as main
from fastapi.testclient import TestClient
config.WORKSPACE_ROOT = tmp_path
importlib.reload(main)
assert not (tmp_path / "lib").exists()
with TestClient(main.app):
pass
assert (tmp_path / "lib").is_dir()
def test_python_runner_helpers_and_startup_paths(tmp_path, monkeypatch):
import editor_app.config as config
import editor_app.services.python_runner as runner
config.WORKSPACE_ROOT = tmp_path
runner.STARTUP_SCRIPT_FILE = tmp_path / ".connectionmachine_startup_script"
(tmp_path / "lib").mkdir(exist_ok=True)
# list_python_scripts should skip hidden paths.
(tmp_path / "code").mkdir(exist_ok=True)
(tmp_path / "code" / "a.py").write_text("print('x')\n", encoding="utf-8")
(tmp_path / ".hidden.py").write_text("print('x')\n", encoding="utf-8")
scripts = runner.list_python_scripts()
assert "code/a.py" in scripts
assert ".hidden.py" not in scripts
# startup script getters/setters.
assert runner.get_startup_script() is None
runner.STARTUP_SCRIPT_FILE.write_text("", encoding="utf-8")
assert runner.get_startup_script() is None
missing = tmp_path / "code" / "missing.py"
with pytest.raises(HTTPException) as missing_exc:
runner.set_startup_script(str(missing.relative_to(tmp_path)))
assert missing_exc.value.status_code == 404
(tmp_path / "code" / "note.txt").write_text("x", encoding="utf-8")
with pytest.raises(HTTPException) as nonpy_exc:
runner.set_startup_script("code/note.txt")
assert nonpy_exc.value.status_code == 400
selected = runner.set_startup_script("code/a.py")
assert selected == "code/a.py"
assert runner.get_startup_script() == "code/a.py"
# run_startup_script_if_configured no-op branches.
runner.STARTUP_SCRIPT_FILE.write_text("code/missing.py", encoding="utf-8")
runner.run_startup_script_if_configured()
runner.STARTUP_SCRIPT_FILE.write_text("code/note.txt", encoding="utf-8")
runner.run_startup_script_if_configured()
# run_startup_script_if_configured should call run_python_file on valid startup script.
called = {"count": 0}
def fake_run(target_path, requested_path):
called["count"] += 1
assert requested_path == "code/a.py"
assert str(target_path).endswith("code/a.py")
monkeypatch.setattr(runner, "run_python_file", fake_run)
runner.STARTUP_SCRIPT_FILE.write_text("code/a.py", encoding="utf-8")
runner.run_startup_script_if_configured()
assert called["count"] == 1