Admin user editing, knight-rider demos, self-contained user seeds

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-05-10 02:23:53 +12:00
parent b02a182bf1
commit f7892dd31b
16 changed files with 864 additions and 75 deletions

View File

@@ -73,7 +73,7 @@ Notes:
- `data/` is mounted to `/app/data` for the SQLite auth DB. - `data/` is mounted to `/app/data` for the SQLite auth DB.
- In container mode, `WORKSPACE_ROOT` and `AUTH_DATABASE_PATH` are set by `docker-compose.yml`. - In container mode, `WORKSPACE_ROOT` and `AUTH_DATABASE_PATH` are set by `docker-compose.yml`.
**User accounts** — Set `AUTH_ENABLED=true` in `.env` to require sign-in for workspace APIs. Users live in a SQLite file (`AUTH_DATABASE_PATH`, default `./data/editor.db`). Use `/register` with an invite link (unless you opt into open signup) or `BOOTSTRAP_ADMIN_USERNAME` / `BOOTSTRAP_ADMIN_PASSWORD` for the first superuser. Superusers can **GET/POST/DELETE `/api/users`** to list, create, or remove accounts. **User accounts** — Set `AUTH_ENABLED=true` in `.env` to require sign-in for workspace APIs. Users live in a SQLite file (`AUTH_DATABASE_PATH`, default `./data/editor.db`). Use `/register` with an invite link (unless you opt into open signup) or `BOOTSTRAP_ADMIN_USERNAME` / `BOOTSTRAP_ADMIN_PASSWORD` for the first superuser. Superusers can **GET `/api/users`**, **PATCH `/api/users/{id}`** (username, password reset, admin flag — renames workspace folder when the username changes), or **DELETE `/api/users/{id}`** to manage accounts. New accounts are added only through **invite links** (**`POST /api/users/invites`**) plus self-service registration (`/register?invite=…`).
Email invite signup: Email invite signup:
@@ -128,10 +128,10 @@ Tutorial files:
- `LED_TUTORIAL.md` - step-by-step NeoPixel tutorial - `LED_TUTORIAL.md` - step-by-step NeoPixel tutorial
- `workspace/code/led_tutorial.py` - runnable guided LED example - `workspace/code/led_tutorial.py` - runnable guided LED example
- `workspace/code/led_patterns.py` - reusable pattern helpers (`rainbow_frame`, `chase_frame`, `twinkle_frame`) - `workspace/code/led_patterns.py` - shared pattern helpers (used by automated tests); each `pattern_*_demo.py` duplicates what it needs and uses only Python stdlib + `machine` / `neopixel` / `time`
- `workspace/code/pattern_rainbow_demo.py` - rainbow animation demo - `workspace/code/pattern_rainbow_demo.py` - rainbow animation (self-contained)
- `workspace/code/pattern_chase_demo.py` - chase animation demo - `workspace/code/pattern_chase_demo.py` - Knight Riderstyle bouncing scanner (self-contained)
- `workspace/code/pattern_twinkle_demo.py` - twinkle animation demo - `workspace/code/pattern_twinkle_demo.py` - twinkle animation (self-contained)
- `workspace/code/panel16_utils.py` - helpers for 16x16 serpentine mapping - `workspace/code/panel16_utils.py` - helpers for 16x16 serpentine mapping
- `workspace/code/panel16_rainbow_wave.py` - 16x16 rainbow wave - `workspace/code/panel16_rainbow_wave.py` - 16x16 rainbow wave
- `workspace/code/panel16_bounce.py` - 16x16 bouncing pixel with trail - `workspace/code/panel16_bounce.py` - 16x16 bouncing pixel with trail

View File

@@ -54,4 +54,5 @@ class InviteToken(Base):
consumed_by_user_id: Mapped[int | None] = mapped_column( consumed_by_user_id: Mapped[int | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
) )
grants_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[dt.datetime] = mapped_column(DateTime, default=_utc_naive) created_at: Mapped[dt.datetime] = mapped_column(DateTime, default=_utc_naive)

View File

@@ -3,6 +3,7 @@ from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from editor_app.config import STATIC_DIR, WORKSPACE_ROOT from editor_app.config import STATIC_DIR, WORKSPACE_ROOT
@@ -21,6 +22,15 @@ async def lifespan(_app: FastAPI):
(WORKSPACE_ROOT / "lib").mkdir(parents=True, exist_ok=True) (WORKSPACE_ROOT / "lib").mkdir(parents=True, exist_ok=True)
engine = get_engine() engine = get_engine()
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
with engine.begin() as conn:
cols = conn.execute(text("PRAGMA table_info(invite_tokens)")).fetchall()
column_names = {row[1] for row in cols}
if column_names and "grants_superuser" not in column_names:
conn.execute(
text(
"ALTER TABLE invite_tokens ADD COLUMN grants_superuser BOOLEAN NOT NULL DEFAULT 0"
)
)
factory = sessionmaker(autocommit=False, autoflush=False, bind=engine) factory = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = factory() db = factory()
try: try:

View File

@@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
from editor_app.db.session import get_db from editor_app.db.session import get_db
from editor_app.db.models import User from editor_app.db.models import User
from editor_app.deps import require_superuser from editor_app.deps import require_superuser
from editor_app.schemas.users import InviteCreateRequest, InviteCreateResponse, UserCreateAdmin, UserPublic from editor_app.schemas.users import InviteCreateRequest, InviteCreateResponse, UserPublic, UserUpdateAdmin
from editor_app.services import accounts from editor_app.services import accounts
router = APIRouter(prefix="/api/users", tags=["users"]) router = APIRouter(prefix="/api/users", tags=["users"])
@@ -20,21 +20,26 @@ async def list_users(
return [UserPublic.model_validate(u) for u in accounts.list_users(db)] return [UserPublic.model_validate(u) for u in accounts.list_users(db)]
@router.post("", response_model=UserPublic) @router.patch("/{user_id}", response_model=UserPublic)
async def create_user_admin( async def patch_user_admin(
body: UserCreateAdmin, user_id: int,
admin: User = Depends(require_superuser), body: UserUpdateAdmin,
_admin: User = Depends(require_superuser),
db: Session = Depends(get_db), db: Session = Depends(get_db),
) -> UserPublic: ) -> UserPublic:
if accounts.get_user_by_username(db, body.username): try:
raise HTTPException(status_code=409, detail="Username already taken") updated = accounts.update_user(
user = accounts.create_user(
db, db,
body.username, user_id,
body.password, username=body.username,
password=body.password,
is_superuser=body.is_superuser, is_superuser=body.is_superuser,
) )
return UserPublic.model_validate(user) except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
if updated is None:
raise HTTPException(status_code=404, detail="User not found")
return UserPublic.model_validate(updated)
@router.delete("/{user_id}") @router.delete("/{user_id}")

View File

@@ -1,4 +1,4 @@
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator, model_validator
class RegisterRequest(BaseModel): class RegisterRequest(BaseModel):
@@ -6,6 +6,14 @@ class RegisterRequest(BaseModel):
password: str = Field(min_length=8, max_length=128) password: str = Field(min_length=8, max_length=128)
invite_token: str | None = Field(default=None, min_length=8, max_length=256) invite_token: str | None = Field(default=None, min_length=8, max_length=256)
@field_validator("invite_token", mode="before")
@classmethod
def normalize_invite_token(cls, v: object) -> object:
if isinstance(v, str):
s = v.strip()
return s if s else None
return v
@field_validator("username") @field_validator("username")
@classmethod @classmethod
def username_chars(cls, v: str) -> str: def username_chars(cls, v: str) -> str:
@@ -28,19 +36,34 @@ class UserPublic(BaseModel):
model_config = {"from_attributes": True} model_config = {"from_attributes": True}
class UserCreateAdmin(BaseModel): class UserUpdateAdmin(BaseModel):
username: str = Field(min_length=3, max_length=64) username: str | None = Field(default=None, min_length=3, max_length=64)
password: str = Field(min_length=8, max_length=128) password: str | None = Field(default=None, min_length=8, max_length=128)
is_superuser: bool = False is_superuser: bool | None = None
@field_validator("username", "password", mode="before")
@classmethod
def empty_str_to_none(cls, v: object) -> object:
if isinstance(v, str) and not v.strip():
return None
return v
@field_validator("username") @field_validator("username")
@classmethod @classmethod
def username_chars(cls, v: str) -> str: def username_chars(cls, v: str | None) -> str | None:
if v is None:
return None
s = v.strip() s = v.strip()
if not s.replace("_", "").isalnum(): if not s.replace("_", "").isalnum():
raise ValueError("Username may only contain letters, numbers, and underscores") raise ValueError("Username may only contain letters, numbers, and underscores")
return s return s
@model_validator(mode="after")
def none_empty_patch(self):
if self.username is None and self.password is None and self.is_superuser is None:
raise ValueError("Provide username, password, and/or superuser changes")
return self
class AuthStatusResponse(BaseModel): class AuthStatusResponse(BaseModel):
auth_enabled: bool auth_enabled: bool

View File

@@ -88,7 +88,7 @@ def register_user_with_invite(db: Session, username: str, password: str, invite_
raise ValueError("Invite is invalid or expired") raise ValueError("Invite is invalid or expired")
if get_user_by_username(db, username): if get_user_by_username(db, username):
raise ValueError("Username already taken") raise ValueError("Username already taken")
user = create_user(db, username, password, is_superuser=False) user = create_user(db, username, password, is_superuser=bool(invite.grants_superuser))
invite.used_at = _utc_naive() invite.used_at = _utc_naive()
invite.consumed_by_user_id = user.id invite.consumed_by_user_id = user.id
db.add(invite) db.add(invite)
@@ -149,11 +149,59 @@ def delete_user(db: Session, user_id: int) -> bool:
return True return True
def update_user(
db: Session,
user_id: int,
*,
username: str | None = None,
password: str | None = None,
is_superuser: bool | None = None,
) -> User | None:
user = get_user_by_id(db, user_id)
if user is None:
return None
old_username = user.username
if username is not None:
normalized = username.strip()
if normalized != old_username:
dup = db.scalars(select(User).where(User.username == normalized, User.id != user_id)).first()
if dup is not None:
raise ValueError("Username already taken")
user_workspace.rename_user_workspace_leaf(user_id, old_username, normalized)
user.username = normalized
if password is not None:
user.password_hash = hash_password(password)
if is_superuser is not None:
if user.is_superuser and not is_superuser:
remaining = db.scalar(
select(func.count())
.select_from(User)
.where(User.is_superuser.is_(True), User.id != user_id)
)
if not remaining:
raise ValueError("Cannot demote the last administrator")
user.is_superuser = is_superuser
db.add(user)
db.commit()
db.refresh(user)
return user
def invite_required() -> bool: def invite_required() -> bool:
return os.environ.get("AUTH_INVITE_ONLY", "true").strip().lower() in ("1", "true", "yes", "on") return os.environ.get("AUTH_INVITE_ONLY", "true").strip().lower() in ("1", "true", "yes", "on")
def create_invite(db: Session, email: str, invited_by_user_id: int | None = None, expires_days: int = 7) -> InviteToken: def create_invite(
db: Session,
email: str,
invited_by_user_id: int | None = None,
expires_days: int = 7,
) -> InviteToken:
token = secrets.token_urlsafe(36) token = secrets.token_urlsafe(36)
expires = _utc_naive() + dt.timedelta(days=max(1, min(30, int(expires_days)))) expires = _utc_naive() + dt.timedelta(days=max(1, min(30, int(expires_days))))
row = InviteToken( row = InviteToken(

View File

@@ -1,12 +1,20 @@
from __future__ import annotations from __future__ import annotations
import re import re
import shutil
from pathlib import Path from pathlib import Path
from editor_app import config from editor_app import config
DEFAULT_MAIN_PY = 'print("Hello, World!")\n' DEFAULT_MAIN_PY = 'print("Hello, World!")\n'
# Self-contained demos copied from shipped `workspace/code/` (stdlib + machine/neopixel/time only).
_CANONICAL_DEMO_FILENAMES = (
"pattern_rainbow_demo.py",
"pattern_twinkle_demo.py",
"pattern_chase_demo.py",
)
def safe_workspace_leaf(username: str, user_id: int) -> str: def safe_workspace_leaf(username: str, user_id: int) -> str:
base = re.sub(r"[^a-zA-Z0-9._-]+", "-", username.strip()).strip("-").lower() or "user" base = re.sub(r"[^a-zA-Z0-9._-]+", "-", username.strip()).strip("-").lower() or "user"
@@ -18,10 +26,41 @@ def user_workspace_root(user_id: int, username: str, workspace_root: Path | None
return root / "users" / safe_workspace_leaf(username, user_id) return root / "users" / safe_workspace_leaf(username, user_id)
def _seed_canonical_demos_into_code(code_dir: Path) -> None:
src_root = config.PROJECT_ROOT.resolve() / "workspace" / "code"
for filename in _CANONICAL_DEMO_FILENAMES:
dst = code_dir / filename
if dst.exists():
continue
src = src_root / filename
if src.is_file():
dst.write_text(src.read_text(encoding="utf-8"), encoding="utf-8")
def ensure_default_code_main(user_root: Path) -> None: def ensure_default_code_main(user_root: Path) -> None:
"""Ensure code/ exists and add a starter main.py when missing.""" """Ensure code/ has main.py and self-contained NeoPixel demos (copied from repo workspace/code/)."""
code_dir = user_root / "code" code_dir = user_root / "code"
code_dir.mkdir(parents=True, exist_ok=True) code_dir.mkdir(parents=True, exist_ok=True)
main_py = code_dir / "main.py" main_py = code_dir / "main.py"
if not main_py.exists(): if not main_py.exists():
main_py.write_text(DEFAULT_MAIN_PY, encoding="utf-8") main_py.write_text(DEFAULT_MAIN_PY, encoding="utf-8")
_seed_canonical_demos_into_code(code_dir)
def rename_user_workspace_leaf(
user_id: int, old_username: str, new_username: str, workspace_root: Path | None = None
) -> None:
"""Rename per-user workspace directory when login name changes."""
root = (workspace_root or config.WORKSPACE_ROOT).resolve()
users_dir = root / "users"
src = users_dir / safe_workspace_leaf(old_username, user_id)
dst = users_dir / safe_workspace_leaf(new_username, user_id)
if src.resolve() == dst.resolve():
return
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists():
raise ValueError("Workspace folder for new username already exists; pick another username.")
if src.exists():
shutil.move(str(src), str(dst))
else:
ensure_default_code_main(dst)

View File

@@ -105,6 +105,79 @@
border-radius: 6px; border-radius: 6px;
padding: 0.25rem 0.5rem; padding: 0.25rem 0.5rem;
} }
.user-actions {
display: flex;
flex-wrap: wrap;
gap: 0.35rem;
align-items: center;
justify-content: flex-end;
}
.btn-danger {
background: transparent;
border-color: #f87171;
color: #fecaca;
}
.btn-danger:hover:not(:disabled) {
background: rgba(248, 113, 113, 0.15);
}
.btn:disabled {
opacity: 0.45;
cursor: not-allowed;
}
#admin-users-feedback:not(:empty) {
padding: 0.5rem 0.65rem;
border-radius: 8px;
font-size: 0.85rem;
margin: 0.5rem 0;
}
#admin-users-feedback.ok {
background: rgba(34, 197, 94, 0.12);
border: 1px solid rgba(74, 222, 128, 0.35);
color: #bbf7d0;
}
#admin-users-feedback.err {
background: rgba(248, 113, 113, 0.1);
border: 1px solid rgba(248, 113, 113, 0.35);
color: #fecaca;
}
.user-edit-form {
margin-top: 1rem;
padding-top: 1rem;
border-top: 1px solid rgba(148, 163, 184, 0.25);
}
.user-edit-form label {
display: block;
margin-bottom: 0.65rem;
}
.user-edit-form input[type='text'],
.user-edit-form input[type='password'] {
width: 100%;
max-width: 320px;
padding: 0.45rem 0.55rem;
border-radius: 8px;
border: 1px solid #64748b;
background: #0f172a;
color: #e2e8f0;
margin-top: 0.25rem;
box-sizing: border-box;
}
.user-edit-form .edit-actions {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
margin-top: 0.65rem;
}
.user-edit-form .super-line {
display: flex;
align-items: center;
gap: 0.5rem;
font-size: 0.85rem;
color: #cbd5e1;
margin-bottom: 0.65rem;
}
.user-edit-form .super-line input {
accent-color: #3b82f6;
}
</style> </style>
</head> </head>
<body> <body>
@@ -123,19 +196,48 @@
<input id="api-key" type="password" autocomplete="off" placeholder="Leave blank if not used" /> <input id="api-key" type="password" autocomplete="off" placeholder="Leave blank if not used" />
<p class="note">The key is kept in <code>sessionStorage</code>. You can also use <code>?api_key=…</code> on the editor URL.</p> <p class="note">The key is kept in <code>sessionStorage</code>. You can also use <code>?api_key=…</code> on the editor URL.</p>
</div> </div>
<section id="invite-panel" class="invite-panel hidden">
<strong>Admin invites</strong>
<p class="note" style="margin-top:0.4rem">Create an email invite link for signup.</p>
<div class="invite-row">
<input id="invite-email" type="email" placeholder="new.user@example.com" autocomplete="email" />
<button type="button" class="btn btn-primary" id="invite-create-btn">Create invite</button>
<button type="button" class="btn btn-ghost" id="invite-link-btn">Create link only</button>
</div>
<div id="invite-result" class="invite-result"></div>
</section>
<section id="users-panel" class="users-panel hidden"> <section id="users-panel" class="users-panel hidden">
<strong>User management</strong> <strong>User management</strong>
<p class="note" style="margin-top:0.35rem">
New accounts sign up via an <strong>invite link</strong> below. Remove accounts here or open their workspace.
</p>
<div id="admin-users-feedback" role="status" aria-live="polite"></div>
<p style="margin: 1rem 0 0.35rem 0; font-size: 0.9rem; color: #94a3b8">Accounts</p>
<div id="users-list" class="users-list"></div> <div id="users-list" class="users-list"></div>
<div id="user-edit-form" class="user-edit-form hidden">
<strong id="user-edit-heading">Edit account</strong>
<p class="note" style="margin: 0.35rem 0 0.5rem">Change login name or admin role; set a password only when you mean to reset it.</p>
<input type="hidden" id="edit-user-id" autocomplete="off" />
<label for="edit-user-username">
Username
<input type="text" id="edit-user-username" name="username" autocomplete="username" minlength="3" maxlength="64" />
</label>
<label for="edit-user-password">
New password
<input type="password" id="edit-user-password" name="password" autocomplete="new-password" minlength="8" maxlength="128" placeholder="Leave blank to keep current" />
</label>
<label class="super-line">
<input type="checkbox" id="edit-user-super" />
Superuser (can manage accounts and invites)
</label>
<div class="edit-actions">
<button type="button" class="btn btn-primary" id="edit-user-save">Save changes</button>
<button type="button" class="btn btn-ghost" id="edit-user-cancel">Cancel</button>
</div>
</div>
</section>
<section id="invite-panel" class="invite-panel hidden">
<strong>Add users via invite link</strong>
<p class="note" style="margin-top:0.4rem">
Each link lets <strong>one</strong> person create their own account at <code>/register?invite=…</code> with a password they choose. After it is used, create a new link for the next person.
</p>
<div class="invite-row">
<input id="invite-email" type="email" placeholder="new.user@example.com" autocomplete="email" />
<button type="button" class="btn btn-primary" id="invite-create-btn">Email invite</button>
<button type="button" class="btn btn-ghost" id="invite-link-btn">Invite link only</button>
</div>
<div id="invite-result" class="invite-result"></div>
<button type="button" class="btn btn-ghost hidden" id="invite-copy-btn" aria-label="Copy invite link">Copy invite link</button>
</section> </section>
<div class="nav"> <div class="nav">
<a class="btn btn-primary" href="/editor" id="open-editor">Open Editor</a> <a class="btn btn-primary" href="/editor" id="open-editor">Open Editor</a>
@@ -146,6 +248,29 @@
const storageKey = 'python-editor.api_key'; const storageKey = 'python-editor.api_key';
const input = document.getElementById('api-key'); const input = document.getElementById('api-key');
const openLink = document.getElementById('open-editor'); const openLink = document.getElementById('open-editor');
let currentAdminUserId = null;
function formatApiDetail(body) {
if (!body || body.detail === undefined || body.detail === null) return '';
const d = body.detail;
if (typeof d === 'string') return d;
if (Array.isArray(d))
return d
.map((item) =>
typeof item === 'object' && item && item.msg ? String(item.msg) : JSON.stringify(item)
)
.join(' ');
return String(d);
}
function setAdminUsersFeedback(kind, msg) {
const el = document.getElementById('admin-users-feedback');
if (!el) return;
el.textContent = msg || '';
el.classList.remove('ok', 'err');
if (kind === 'ok') el.classList.add('ok');
if (kind === 'err') el.classList.add('err');
}
async function refreshAuthNav() { async function refreshAuthNav() {
const st = await fetch('/api/auth/status'); const st = await fetch('/api/auth/status');
@@ -158,6 +283,7 @@
const invitePanel = document.getElementById('invite-panel'); const invitePanel = document.getElementById('invite-panel');
const usersPanel = document.getElementById('users-panel'); const usersPanel = document.getElementById('users-panel');
if (!status.auth_enabled) { if (!status.auth_enabled) {
currentAdminUserId = null;
loginEl.classList.add('hidden'); loginEl.classList.add('hidden');
regEl.classList.add('hidden'); regEl.classList.add('hidden');
outEl.classList.add('hidden'); outEl.classList.add('hidden');
@@ -181,15 +307,18 @@
if (optionalKey) optionalKey.classList.add('hidden'); if (optionalKey) optionalKey.classList.add('hidden');
if (invitePanel) { if (invitePanel) {
if (j.user && j.user.is_superuser) { if (j.user && j.user.is_superuser) {
currentAdminUserId = j.user.id;
invitePanel.classList.remove('hidden'); invitePanel.classList.remove('hidden');
if (usersPanel) usersPanel.classList.remove('hidden'); if (usersPanel) usersPanel.classList.remove('hidden');
await refreshUsersList(); await refreshUsersList(j.user.id);
} else { } else {
currentAdminUserId = null;
invitePanel.classList.add('hidden'); invitePanel.classList.add('hidden');
if (usersPanel) usersPanel.classList.add('hidden'); if (usersPanel) usersPanel.classList.add('hidden');
} }
} }
} else { } else {
currentAdminUserId = null;
outEl.classList.add('hidden'); outEl.classList.add('hidden');
greet.classList.add('hidden'); greet.classList.add('hidden');
if (invitePanel) invitePanel.classList.add('hidden'); if (invitePanel) invitePanel.classList.add('hidden');
@@ -197,7 +326,7 @@
} }
} }
async function refreshUsersList() { async function refreshUsersList(viewerId) {
const usersList = document.getElementById('users-list'); const usersList = document.getElementById('users-list');
if (!usersList) return; if (!usersList) return;
usersList.textContent = 'Loading users...'; usersList.textContent = 'Loading users...';
@@ -214,11 +343,54 @@
row.className = 'user-row'; row.className = 'user-row';
const name = document.createElement('span'); const name = document.createElement('span');
name.textContent = `${user.username}${user.is_superuser ? ' (admin)' : ''}`; name.textContent = `${user.username}${user.is_superuser ? ' (admin)' : ''}`;
const actions = document.createElement('div');
actions.className = 'user-actions';
const link = document.createElement('a'); const link = document.createElement('a');
link.href = `/editor?workspace_user_id=${encodeURIComponent(String(user.id))}`; link.href = `/editor?workspace_user_id=${encodeURIComponent(String(user.id))}`;
link.textContent = 'Open workspace'; link.textContent = 'Open workspace';
const editBtn = document.createElement('button');
editBtn.type = 'button';
editBtn.className = 'btn btn-ghost';
editBtn.textContent = 'Edit';
editBtn.title = `Edit ${user.username}`;
editBtn.addEventListener('click', () => openUserEdit(user.id, user.username, user.is_superuser));
const delBtn = document.createElement('button');
delBtn.type = 'button';
delBtn.className = 'btn btn-ghost btn-danger';
delBtn.textContent = 'Remove';
const isSelf = Number(user.id) === Number(viewerId);
delBtn.disabled = isSelf;
delBtn.title = isSelf ? 'You cannot delete your own account here' : 'Permanently remove this user';
delBtn.addEventListener('click', async () => {
const ok = confirm(
`Remove user “${user.username}”? Their workspace folder stays on disk until you delete it manually.`
);
if (!ok) return;
delBtn.disabled = true;
setAdminUsersFeedback('', '');
try {
const dres = await fetch(`/api/users/${encodeURIComponent(String(user.id))}`, {
method: 'DELETE',
credentials: 'include'
});
const body = await dres.json().catch(() => ({}));
if (!dres.ok) {
setAdminUsersFeedback('err', formatApiDetail(body) || dres.statusText || 'Could not remove user');
delBtn.disabled = false;
return;
}
setAdminUsersFeedback('ok', `Removed ${user.username}.`);
await refreshUsersList(viewerId);
} catch (err) {
setAdminUsersFeedback('err', String(err.message || err));
delBtn.disabled = false;
}
});
actions.appendChild(editBtn);
actions.appendChild(link);
actions.appendChild(delBtn);
row.appendChild(name); row.appendChild(name);
row.appendChild(link); row.appendChild(actions);
usersList.appendChild(row); usersList.appendChild(row);
} }
} catch (_err) { } catch (_err) {
@@ -226,6 +398,65 @@
} }
} }
function closeUserEdit() {
const sheet = document.getElementById('user-edit-form');
if (sheet) sheet.classList.add('hidden');
}
function openUserEdit(userId, username, isSuperuser) {
document.getElementById('edit-user-id').value = String(userId);
document.getElementById('edit-user-username').value = username;
document.getElementById('edit-user-password').value = '';
document.getElementById('edit-user-super').checked = Boolean(isSuperuser);
document.getElementById('user-edit-heading').textContent = `Edit @${username}`;
document.getElementById('user-edit-form').classList.remove('hidden');
}
document.getElementById('edit-user-cancel').addEventListener('click', () => closeUserEdit());
document.getElementById('edit-user-save').addEventListener('click', async () => {
const id = document.getElementById('edit-user-id').value;
const u = document.getElementById('edit-user-username').value.trim();
const pw = document.getElementById('edit-user-password').value;
const superU = document.getElementById('edit-user-super').checked;
const saveBtn = document.getElementById('edit-user-save');
setAdminUsersFeedback('', '');
if (!u || u.length < 3) {
setAdminUsersFeedback('err', 'Username must be at least 3 characters.');
return;
}
const payload = { username: u, is_superuser: superU };
const tpw = pw.trim();
if (tpw.length > 0) {
if (tpw.length < 8) {
setAdminUsersFeedback('err', 'New password must be at least 8 characters (or leave blank).');
return;
}
payload.password = tpw;
}
saveBtn.disabled = true;
try {
const res = await fetch(`/api/users/${encodeURIComponent(String(id))}`, {
method: 'PATCH',
credentials: 'include',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
const body = await res.json().catch(() => ({}));
if (!res.ok) {
setAdminUsersFeedback('err', formatApiDetail(body) || res.statusText || 'Update failed');
saveBtn.disabled = false;
return;
}
setAdminUsersFeedback('ok', `Updated @${body.username}.`);
closeUserEdit();
await refreshUsersList(currentAdminUserId);
} catch (err) {
setAdminUsersFeedback('err', String(err.message || err));
}
saveBtn.disabled = false;
});
document.getElementById('btn-logout').addEventListener('click', async () => { document.getElementById('btn-logout').addEventListener('click', async () => {
await fetch('/api/auth/logout', { method: 'POST', credentials: 'include' }); await fetch('/api/auth/logout', { method: 'POST', credentials: 'include' });
window.location.reload(); window.location.reload();
@@ -253,11 +484,56 @@
} catch (_e) {} } catch (_e) {}
}); });
function showInviteOutcome(inviteUrl, headline) {
const result = document.getElementById('invite-result');
const copyBtn = document.getElementById('invite-copy-btn');
result.textContent = '';
const line = document.createElement('p');
line.style.margin = '0 0 0.35rem 0';
line.textContent = headline;
const a = document.createElement('a');
a.href = inviteUrl;
a.textContent = inviteUrl;
a.style.color = '#93c5fd';
a.style.wordBreak = 'break-all';
result.appendChild(line);
result.appendChild(a);
copyBtn.classList.remove('hidden');
copyBtn.dataset.inviteUrl = inviteUrl;
}
function clearInviteOutcome() {
const result = document.getElementById('invite-result');
const copyBtn = document.getElementById('invite-copy-btn');
result.textContent = '';
copyBtn.classList.add('hidden');
copyBtn.textContent = 'Copy invite link';
delete copyBtn.dataset.inviteUrl;
}
document.getElementById('invite-copy-btn').addEventListener('click', async () => {
const copyBtn = document.getElementById('invite-copy-btn');
const url = copyBtn.dataset.inviteUrl;
if (!url) return;
try {
await navigator.clipboard.writeText(url);
copyBtn.textContent = 'Copied!';
setTimeout(() => {
copyBtn.textContent = 'Copy invite link';
}, 2000);
} catch (_e) {
copyBtn.textContent = 'Copy failed — select the link above';
setTimeout(() => {
copyBtn.textContent = 'Copy invite link';
}, 2500);
}
});
document.getElementById('invite-create-btn').addEventListener('click', async () => { document.getElementById('invite-create-btn').addEventListener('click', async () => {
const emailInput = document.getElementById('invite-email'); const emailInput = document.getElementById('invite-email');
const result = document.getElementById('invite-result'); const result = document.getElementById('invite-result');
const email = (emailInput.value || '').trim(); const email = (emailInput.value || '').trim();
result.textContent = ''; clearInviteOutcome();
if (!email) { if (!email) {
result.textContent = 'Enter an email address first.'; result.textContent = 'Enter an email address first.';
return; return;
@@ -271,18 +547,20 @@
}); });
const body = await res.json().catch(() => ({})); const body = await res.json().catch(() => ({}));
if (!res.ok) { if (!res.ok) {
result.textContent = body.detail || res.statusText || 'Failed to create invite'; result.textContent = formatApiDetail(body) || res.statusText || 'Failed to create invite';
return; return;
} }
const delivered = body.delivered ? 'Email sent.' : 'Email not sent (SMTP not configured), copy link manually.'; const headline = body.delivered
result.innerHTML = `${delivered}<br><a href="${body.invite_url}" style="color:#93c5fd">${body.invite_url}</a>`; ? 'Email sent. They can also use this link to register:'
: 'Email not sent (SMTP not configured). Share this registration link:';
showInviteOutcome(body.invite_url, headline);
} catch (err) { } catch (err) {
result.textContent = String(err.message || err); result.textContent = String(err.message || err);
} }
}); });
document.getElementById('invite-link-btn').addEventListener('click', async () => { document.getElementById('invite-link-btn').addEventListener('click', async () => {
const result = document.getElementById('invite-result'); const result = document.getElementById('invite-result');
result.textContent = ''; clearInviteOutcome();
try { try {
const res = await fetch('/api/users/invites', { const res = await fetch('/api/users/invites', {
method: 'POST', method: 'POST',
@@ -292,10 +570,10 @@
}); });
const body = await res.json().catch(() => ({})); const body = await res.json().catch(() => ({}));
if (!res.ok) { if (!res.ok) {
result.textContent = body.detail || res.statusText || 'Failed to create invite link'; result.textContent = formatApiDetail(body) || res.statusText || 'Failed to create invite link';
return; return;
} }
result.innerHTML = `Invite link created.<br><a href="${body.invite_url}" style="color:#93c5fd">${body.invite_url}</a>`; showInviteOutcome(body.invite_url, 'Share this link so they can create an account:');
} catch (err) { } catch (err) {
result.textContent = String(err.message || err); result.textContent = String(err.message || err);
} }

View File

@@ -54,6 +54,7 @@
<body> <body>
<main class="card"> <main class="card">
<h1>Create account</h1> <h1>Create account</h1>
<p id="invite-banner" class="hint hidden"></p>
<p class="hint">Username: letters, numbers, underscore (364). Password: at least 8 characters.</p> <p class="hint">Username: letters, numbers, underscore (364). Password: at least 8 characters.</p>
<div id="err" class="err"></div> <div id="err" class="err"></div>
<form id="form"> <form id="form">
@@ -67,9 +68,31 @@
<p><a href="/login">Sign in</a> · <a href="/">Home</a></p> <p><a href="/login">Sign in</a> · <a href="/">Home</a></p>
</main> </main>
<script> <script>
const inviteToken = new URLSearchParams(window.location.search).get("invite") || ""; const inviteTokenRaw = new URLSearchParams(window.location.search).get("invite") || "";
const inviteToken = inviteTokenRaw.trim();
const inviteInput = document.getElementById("invite-token"); const inviteInput = document.getElementById("invite-token");
if (inviteInput) inviteInput.value = inviteToken; if (inviteInput) inviteInput.value = inviteToken;
if (inviteToken) {
const banner = document.getElementById("invite-banner");
if (banner) {
banner.classList.remove("hidden");
banner.textContent =
"You're using an invite link. Choose a username and password — the link works for one signup only.";
}
}
function formatApiDetail(body) {
if (!body || body.detail === undefined || body.detail === null) return "";
const d = body.detail;
if (typeof d === "string") return d;
if (Array.isArray(d))
return d
.map((item) =>
typeof item === "object" && item && item.msg ? String(item.msg) : JSON.stringify(item)
)
.join(" ");
return String(d);
}
(async function checkStatus() { (async function checkStatus() {
try { try {
@@ -98,7 +121,7 @@
const body = { const body = {
username: document.getElementById("username").value.trim(), username: document.getElementById("username").value.trim(),
password: document.getElementById("password").value, password: document.getElementById("password").value,
invite_token: inviteToken || null, invite_token: inviteToken ? inviteToken : null,
}; };
const res = await fetch("/api/auth/register", { const res = await fetch("/api/auth/register", {
method: "POST", method: "POST",
@@ -107,7 +130,7 @@
}); });
if (!res.ok) { if (!res.ok) {
const j = await res.json().catch(() => ({})); const j = await res.json().catch(() => ({}));
err.textContent = typeof j.detail === "string" ? j.detail : JSON.stringify(j.detail) || res.statusText; err.textContent = formatApiDetail(j) || res.statusText;
return; return;
} }
window.location.href = "/login?next=/editor"; window.location.href = "/login?next=/editor";

View File

@@ -92,15 +92,27 @@ def test_new_user_workspace_has_default_main_py(tmp_path, monkeypatch):
assert reg.status_code == 200 assert reg.status_code == 200
assert reg.json()["username"] == "alice" assert reg.json()["username"] == "alice"
uid = reg.json()["id"] uid = reg.json()["id"]
on_disk = tmp_path / "users" / f"alice-{uid}" / "code" / "main.py" code_root = tmp_path / "users" / f"alice-{uid}" / "code"
on_disk = code_root / "main.py"
assert on_disk.is_file() assert on_disk.is_file()
assert on_disk.read_text(encoding="utf-8") == 'print("Hello, World!")\n' assert on_disk.read_text(encoding="utf-8") == 'print("Hello, World!")\n'
canonical = ("pattern_rainbow_demo.py", "pattern_twinkle_demo.py", "pattern_chase_demo.py")
for fname in canonical:
cp = code_root / fname
assert cp.is_file(), f"missing bundled copy {fname} (workspace/code must ship with app)"
text = cp.read_text(encoding="utf-8")
assert len(text.strip()) > 20
assert "from led_patterns" not in text
assert "import led_patterns" not in text
assert client.post("/api/auth/login", json={"username": "alice", "password": "password99"}).status_code == 200 assert client.post("/api/auth/login", json={"username": "alice", "password": "password99"}).status_code == 200
fetched = client.get("/api/file/code/main.py") fetched = client.get("/api/file/code/main.py")
assert fetched.status_code == 200 assert fetched.status_code == 200
assert fetched.json()["filename"] == "main.py" assert fetched.json()["filename"] == "main.py"
assert 'Hello, World!' in fetched.json()["content"] assert 'Hello, World!' in fetched.json()["content"]
chase = client.get("/api/file/code/pattern_chase_demo.py")
assert chase.status_code == 200
assert "knight_rider_scanner_frame" in chase.json()["content"]
def test_second_user_not_superuser(tmp_path, monkeypatch): def test_second_user_not_superuser(tmp_path, monkeypatch):
@@ -179,7 +191,54 @@ def test_invite_required_blocks_public_register(tmp_path, monkeypatch):
assert blocked.status_code == 403 assert blocked.status_code == 403
def test_superuser_lists_and_creates_users(tmp_path, monkeypatch): def test_superuser_can_patch_user_account(tmp_path, monkeypatch):
with TestClient(
_reload_app(tmp_path, monkeypatch, AUTH_ENABLED="true", AUTH_REGISTER_OPEN="true")
) as client:
client.post("/api/auth/register", json={"username": "admin", "password": "password99"})
client.post("/api/auth/login", json={"username": "admin", "password": "password99"})
invite = client.post("/api/users/invites", json={"email": None, "expires_days": 7})
token = invite.json()["invite_url"].split("invite=", 1)[1]
client.post("/api/auth/logout")
sub = client.post(
"/api/auth/register",
json={"username": "subacc", "password": "original99", "invite_token": token},
).json()
uid = sub["id"]
client.post("/api/auth/login", json={"username": "admin", "password": "password99"})
pat = client.patch(
f"/api/users/{uid}",
json={"username": "renamed", "is_superuser": True, "password": "renewedpw8"},
)
assert pat.status_code == 200
assert pat.json()["username"] == "renamed"
assert pat.json()["is_superuser"] is True
client.post("/api/auth/logout")
assert client.post(
"/api/auth/login",
json={"username": "renamed", "password": "renewedpw8"},
).status_code == 200
def test_superuser_cannot_demote_last_admin(tmp_path, monkeypatch):
with TestClient(
_reload_app(tmp_path, monkeypatch, AUTH_ENABLED="true", AUTH_REGISTER_OPEN="true")
) as client:
client.post("/api/auth/register", json={"username": "solo_admin", "password": "password99"})
client.post("/api/auth/login", json={"username": "solo_admin", "password": "password99"})
uid = client.get("/api/auth/me").json()["user"]["id"]
demote = client.patch(
f"/api/users/{uid}",
json={"username": "solo_admin", "is_superuser": False},
)
assert demote.status_code == 400
detail = demote.json().get("detail") or ""
assert "last" in detail.lower() or "administrator" in detail.lower()
def test_superuser_lists_users_after_invite_signup(tmp_path, monkeypatch):
with TestClient( with TestClient(
_reload_app(tmp_path, monkeypatch, AUTH_ENABLED="true", AUTH_REGISTER_OPEN="true") _reload_app(tmp_path, monkeypatch, AUTH_ENABLED="true", AUTH_REGISTER_OPEN="true")
) as client: ) as client:
@@ -190,14 +249,20 @@ def test_superuser_lists_and_creates_users(tmp_path, monkeypatch):
assert listed.status_code == 200 assert listed.status_code == 200
assert len(listed.json()) == 1 assert len(listed.json()) == 1
created = client.post( invite = client.post("/api/users/invites", json={"email": None, "expires_days": 7})
"/api/users", assert invite.status_code == 200
json={"username": "sub", "password": "password99", "is_superuser": False}, token = invite.json()["invite_url"].split("invite=", 1)[1]
)
assert created.status_code == 200
assert created.json()["username"] == "sub"
assert created.json()["is_superuser"] is False
client.post("/api/auth/logout")
reg = client.post(
"/api/auth/register",
json={"username": "sub", "password": "password99", "invite_token": token},
)
assert reg.status_code == 200
assert reg.json()["username"] == "sub"
assert reg.json()["is_superuser"] is False
client.post("/api/auth/login", json={"username": "admin", "password": "password99"})
names = {u["username"] for u in client.get("/api/users").json()} names = {u["username"] for u in client.get("/api/users").json()}
assert names == {"admin", "sub"} assert names == {"admin", "sub"}

View File

@@ -30,6 +30,33 @@ def test_chase_frame_has_head_and_tail():
assert sum(1 for c in frame if c != (0, 0, 0)) == 2 assert sum(1 for c in frame if c != (0, 0, 0)) == 2
def test_bounce_head_pingpongs_off_ends():
patterns = _load_patterns_module()
bounce = patterns._bounce_head_index
assert bounce(5, 0) == 0 and bounce(5, 4) == 4
assert bounce(5, 5) == 3 and bounce(5, 8) == 1 and bounce(5, 16) == bounce(5, 0)
def test_scanner_bounce_has_head_and_fading_tail():
patterns = _load_patterns_module()
frame = patterns.scanner_bounce_frame(
8, 20, head_color=(80, 10, 10), tail_color=(20, 50, 90), tail_len=4
)
assert len(frame) == 8
bright = max(range(8), key=lambda i: sum(frame[i]))
assert sum(frame[bright]) >= 80
def test_knight_rider_tail_falls_off():
patterns = _load_patterns_module()
fr = patterns.knight_rider_scanner_frame(16, 55, tail_len=8, falloff_gamma=3.0)
assert len(fr) == 16
reds_sorted = sorted(fr[i][0] for i in range(16) if sum(fr[i]) > 0)
assert len(reds_sorted) >= 2
assert reds_sorted[-1] >= 200
assert reds_sorted[0] < reds_sorted[-1] * 0.5
def test_twinkle_frame_is_deterministic_for_same_inputs(): def test_twinkle_frame_is_deterministic_for_same_inputs():
patterns = _load_patterns_module() patterns = _load_patterns_module()
a = patterns.twinkle_frame(20, frame=9, seed=777, sparkles=4) a = patterns.twinkle_frame(20, frame=9, seed=777, sparkles=4)

View File

@@ -48,6 +48,82 @@ def chase_frame(
return out return out
def _bounce_head_index(led_count: int, frame: int) -> int:
"""Map frame to a triangular index sweep 0..N-1..0 (Ping-Pong position)."""
if led_count <= 1:
return 0
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return 0
t = frame % cycle
return t if t <= span else 2 * span - t
def _bounce_phase_tail_direction(led_count: int, frame: int) -> int:
"""Extend tail opposite motion: -1 fades toward lower indices, +1 toward higher."""
if led_count <= 1:
return -1
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return -1
t = frame % cycle
if t <= span:
return -1
return 1
def knight_rider_scanner_frame(
led_count: int,
frame: int,
head_color: Color = (220, 0, 28),
tail_len: int = 8,
falloff_gamma: float = 2.6,
) -> list[Color]:
"""KITT-style bouncing scanner: saturated head with exponential tail fading to off."""
if led_count <= 0:
return []
out: list[Color] = [(0, 0, 0) for _ in range(led_count)]
tl = max(1, tail_len)
head = _bounce_head_index(led_count, frame)
direc = _bounce_phase_tail_direction(led_count, frame)
gamma = max(1.05, falloff_gamma)
for rk in reversed(range(tl)):
idx = head + direc * rk
if idx < 0 or idx >= led_count:
continue
w = max(0.0, float(tl - rk) / float(tl))
strength = w**gamma
out[idx] = tuple(_clamp(int(head_color[ch] * strength)) for ch in range(3))
return out
def scanner_bounce_frame(
led_count: int,
frame: int,
head_color: Color = (0, 220, 255),
tail_color: Color = (0, 40, 90),
tail_len: int = 5,
) -> list[Color]:
"""Ping-pong scanner: head reverses at both ends with a directional fading tail."""
if led_count <= 0:
return []
out: list[Color] = [(0, 0, 0) for _ in range(led_count)]
tl = max(1, tail_len)
for rk in reversed(range(tl)):
past = frame - rk
if past < 0:
continue
idx = _bounce_head_index(led_count, past)
strength = max(0.0, float(tl - rk) / float(tl))
if rk == 0:
out[idx] = tuple(_clamp(int(c)) for c in head_color)
else:
out[idx] = tuple(_clamp(int(tail_color[i] * strength)) for i in range(3))
return out
def twinkle_frame( def twinkle_frame(
led_count: int, led_count: int,
frame: int, frame: int,

View File

@@ -1,16 +1,79 @@
"""Chase pattern demo using led_patterns helpers.""" """Knight Riderstyle bouncing scanner — self-contained (stdlib + simulated hardware only)."""
import time
from machine import Pin from machine import Pin
import neopixel import neopixel
import time
from led_patterns import chase_frame # --- helpers
np = neopixel.NeoPixel(Pin(4), 24) def _clamp(channel: int) -> int:
return max(0, min(255, int(channel)))
for frame in range(120):
frame_colors = chase_frame(len(np), frame, color=(0, 220, 255), tail=(0, 40, 55)) def _bounce_head_index(led_count: int, frame: int) -> int:
if led_count <= 1:
return 0
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return 0
t = frame % cycle
return t if t <= span else 2 * span - t
def _bounce_phase_tail_direction(led_count: int, frame: int) -> int:
if led_count <= 1:
return -1
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return -1
t = frame % cycle
if t <= span:
return -1
return 1
def knight_rider_scanner_frame(
led_count: int,
frame: int,
head_color=(220, 0, 28),
tail_len: int = 8,
falloff_gamma: float = 2.6,
):
if led_count <= 0:
return []
out = [(0, 0, 0) for _ in range(led_count)]
tl = max(1, tail_len)
head = _bounce_head_index(led_count, frame)
direc = _bounce_phase_tail_direction(led_count, frame)
gamma = max(1.05, falloff_gamma)
for rk in reversed(range(tl)):
idx = head + direc * rk
if idx < 0 or idx >= led_count:
continue
w = max(0.0, float(tl - rk) / float(tl))
strength = w**gamma
out[idx] = tuple(_clamp(int(head_color[ch] * strength)) for ch in range(3))
return out
# --- demo
NUM_LEDS = 16
np = neopixel.NeoPixel(Pin(4), NUM_LEDS)
for frame in range(200):
frame_colors = knight_rider_scanner_frame(
len(np),
frame,
head_color=(220, 0, 36),
tail_len=10,
falloff_gamma=2.85,
)
for i, color in enumerate(frame_colors): for i, color in enumerate(frame_colors):
np[i] = color np[i] = color
np.write() np.write()

View File

@@ -1,13 +1,40 @@
"""Rainbow pattern demo using led_patterns helpers.""" """Rainbow NeoPixel sweep — self-contained (stdlib + simulated hardware only)."""
import time
from machine import Pin from machine import Pin
import neopixel import neopixel
import time
from led_patterns import rainbow_frame # --- helpers (same logic as bundled led_patterns.py, inlined here)
np = neopixel.NeoPixel(Pin(4), 256) def _clamp(channel: int) -> int:
return max(0, min(255, int(channel)))
def wheel(pos: int):
"""Return rainbow RGB at position 0255."""
pos = 255 - (pos & 255)
if pos < 85:
return (_clamp(255 - pos * 3), 0, _clamp(pos * 3))
if pos < 170:
pos -= 85
return (0, _clamp(pos * 3), _clamp(255 - pos * 3))
pos -= 170
return (_clamp(pos * 3), _clamp(255 - pos * 3), 0)
def rainbow_frame(led_count: int, frame: int, step: int = 4):
if led_count <= 0:
return []
return [wheel((i * 256 // led_count + frame * step) & 255) for i in range(led_count)]
# --- demo
NUM_LEDS = 16
np = neopixel.NeoPixel(Pin(4), NUM_LEDS)
for frame in range(120): for frame in range(120):
frame_colors = rainbow_frame(len(np), frame, step=5) frame_colors = rainbow_frame(len(np), frame, step=5)

View File

@@ -1,13 +1,41 @@
"""Twinkle pattern demo using led_patterns helpers.""" """Twinkle NeoPixel demo — self-contained (stdlib + simulated hardware only)."""
import random
import time
from machine import Pin from machine import Pin
import neopixel import neopixel
import time
from led_patterns import twinkle_frame # --- helpers
np = neopixel.NeoPixel(Pin(4), 36) def _clamp(channel: int) -> int:
return max(0, min(255, int(channel)))
def twinkle_frame(
led_count: int,
frame: int,
base=(0, 0, 8),
sparkle=(255, 255, 180),
sparkles: int = 3,
seed: int = 1337,
):
if led_count <= 0:
return []
out = [tuple(_clamp(v) for v in base) for _ in range(led_count)]
rng = random.Random(seed + frame)
for _ in range(min(max(0, sparkles), led_count)):
idx = rng.randrange(led_count)
out[idx] = tuple(_clamp(v) for v in sparkle)
return out
# --- demo
NUM_LEDS = 16
np = neopixel.NeoPixel(Pin(4), NUM_LEDS)
for frame in range(120): for frame in range(120):
frame_colors = twinkle_frame( frame_colors = twinkle_frame(
@@ -15,7 +43,7 @@ for frame in range(120):
frame, frame,
base=(0, 0, 6), base=(0, 0, 6),
sparkle=(255, 210, 130), sparkle=(255, 210, 130),
sparkles=5, sparkles=3,
) )
for i, color in enumerate(frame_colors): for i, color in enumerate(frame_colors):
np[i] = color np[i] = color

View File

@@ -50,6 +50,82 @@ def chase_frame(
return out return out
def _bounce_head_index(led_count: int, frame: int) -> int:
"""Map frame to a triangular index sweep 0..N-1..0 (Ping-Pong position)."""
if led_count <= 1:
return 0
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return 0
t = frame % cycle
return t if t <= span else 2 * span - t
def _bounce_phase_tail_direction(led_count: int, frame: int) -> int:
"""Extend tail opposite motion: -1 fades toward lower indices, +1 toward higher."""
if led_count <= 1:
return -1
span = led_count - 1
cycle = span * 2
if cycle <= 0:
return -1
t = frame % cycle
if t <= span:
return -1
return 1
def knight_rider_scanner_frame(
led_count: int,
frame: int,
head_color: Color = (220, 0, 28),
tail_len: int = 8,
falloff_gamma: float = 2.6,
) -> list[Color]:
"""KITT-style bouncing scanner: saturated head with exponential tail fading to off."""
if led_count <= 0:
return []
out: list[Color] = [(0, 0, 0) for _ in range(led_count)]
tl = max(1, tail_len)
head = _bounce_head_index(led_count, frame)
direc = _bounce_phase_tail_direction(led_count, frame)
gamma = max(1.05, falloff_gamma)
for rk in reversed(range(tl)):
idx = head + direc * rk
if idx < 0 or idx >= led_count:
continue
w = max(0.0, float(tl - rk) / float(tl))
strength = w**gamma
out[idx] = tuple(_clamp(int(head_color[ch] * strength)) for ch in range(3))
return out
def scanner_bounce_frame(
led_count: int,
frame: int,
head_color: Color = (0, 220, 255),
tail_color: Color = (0, 40, 90),
tail_len: int = 5,
) -> list[Color]:
"""Ping-pong scanner: head reverses at both ends with a directional fading tail."""
if led_count <= 0:
return []
out: list[Color] = [(0, 0, 0) for _ in range(led_count)]
tl = max(1, tail_len)
for rk in reversed(range(tl)):
past = frame - rk
if past < 0:
continue
idx = _bounce_head_index(led_count, past)
strength = max(0.0, float(tl - rk) / float(tl))
if rk == 0:
out[idx] = tuple(_clamp(int(c)) for c in head_color)
else:
out[idx] = tuple(_clamp(int(tail_color[i] * strength)) for i in range(3))
return out
def twinkle_frame( def twinkle_frame(
led_count: int, led_count: int,
frame: int, frame: int,