Files
led-controller/src/controllers/preset.py
Jimmy b2077c0199 Improve ESP-NOW messaging and tab defaults
- Use shared ESPNOW payload limit and message splitting
- Expand default tab names and add flash/build artifacts.

Made-with: Cursor
2026-03-14 02:41:08 +13:00

205 lines
7.6 KiB
Python

from microdot import Microdot
from microdot.session import with_session
from models.preset import Preset
from models.profile import Profile
from models.espnow import ESPNow
from util.espnow_message import build_message, build_preset_dict, ESPNOW_MAX_PAYLOAD_BYTES
import asyncio
import json
controller = Microdot()
presets = Preset()
profiles = Profile()
def get_current_profile_id(session=None):
"""Get the current active profile ID from session or fallback to first."""
profile_list = profiles.list()
session_profile = None
if session is not None:
session_profile = session.get('current_profile')
if session_profile and session_profile in profile_list:
return session_profile
if profile_list:
return profile_list[0]
return None
@controller.get('')
@with_session
async def list_presets(request, session):
"""List presets for the current profile."""
current_profile_id = get_current_profile_id(session)
if not current_profile_id:
return json.dumps({}), 200, {'Content-Type': 'application/json'}
scoped = {
pid: pdata for pid, pdata in presets.items()
if isinstance(pdata, dict) and str(pdata.get("profile_id")) == str(current_profile_id)
}
return json.dumps(scoped), 200, {'Content-Type': 'application/json'}
@controller.get('/<id>')
@with_session
async def get_preset(request, id, session):
"""Get a specific preset by ID (current profile only)."""
preset = presets.read(id)
current_profile_id = get_current_profile_id(session)
if preset and str(preset.get("profile_id")) == str(current_profile_id):
return json.dumps(preset), 200, {'Content-Type': 'application/json'}
return json.dumps({"error": "Preset not found"}), 404
@controller.post('')
@with_session
async def create_preset(request, session):
"""Create a new preset for the current profile."""
try:
try:
data = request.json or {}
except Exception:
return json.dumps({"error": "Invalid JSON"}), 400, {'Content-Type': 'application/json'}
current_profile_id = get_current_profile_id(session)
if not current_profile_id:
return json.dumps({"error": "No profile available"}), 404
preset_id = presets.create(current_profile_id)
if not isinstance(data, dict):
data = {}
data = dict(data)
data["profile_id"] = str(current_profile_id)
if presets.update(preset_id, data):
preset_data = presets.read(preset_id)
return json.dumps({preset_id: preset_data}), 201, {'Content-Type': 'application/json'}
return json.dumps({"error": "Failed to create preset"}), 400
except Exception as e:
return json.dumps({"error": str(e)}), 400
@controller.put('/<id>')
@with_session
async def update_preset(request, id, session):
"""Update an existing preset (current profile only)."""
try:
preset = presets.read(id)
current_profile_id = get_current_profile_id(session)
if not preset or str(preset.get("profile_id")) != str(current_profile_id):
return json.dumps({"error": "Preset not found"}), 404
try:
data = request.json or {}
except Exception:
return json.dumps({"error": "Invalid JSON"}), 400, {'Content-Type': 'application/json'}
if not isinstance(data, dict):
data = {}
data = dict(data)
data["profile_id"] = str(current_profile_id)
if presets.update(id, data):
return json.dumps(presets.read(id)), 200, {'Content-Type': 'application/json'}
return json.dumps({"error": "Preset not found"}), 404
except Exception as e:
return json.dumps({"error": str(e)}), 400
@controller.delete('/<id>')
@with_session
async def delete_preset(request, id, session):
"""Delete a preset (current profile only)."""
preset = presets.read(id)
current_profile_id = get_current_profile_id(session)
if not preset or str(preset.get("profile_id")) != str(current_profile_id):
return json.dumps({"error": "Preset not found"}), 404
if presets.delete(id):
return json.dumps({"message": "Preset deleted successfully"}), 200
return json.dumps({"error": "Preset not found"}), 404
@controller.post('/send')
@with_session
async def send_presets(request, session):
"""
Send one or more presets over ESPNow.
Body JSON:
{"preset_ids": ["1", "2", ...]} or {"ids": ["1", "2", ...]}
The controller:
- looks up each preset in the Preset model
- converts them to API-compliant format
- splits into <= 240-byte ESPNow messages
- sends each message to all configured ESPNow peers.
"""
try:
data = request.json or {}
except Exception:
return json.dumps({"error": "Invalid JSON"}), 400, {'Content-Type': 'application/json'}
preset_ids = data.get('preset_ids') or data.get('ids')
if not isinstance(preset_ids, list) or not preset_ids:
return json.dumps({"error": "preset_ids must be a non-empty list"}), 400, {'Content-Type': 'application/json'}
save_flag = data.get('save', True)
save_flag = bool(save_flag)
default_id = data.get('default')
# Build API-compliant preset map keyed by preset ID, include name
current_profile_id = get_current_profile_id(session)
presets_by_name = {}
for pid in preset_ids:
preset_data = presets.read(str(pid))
if not preset_data:
continue
if str(preset_data.get("profile_id")) != str(current_profile_id):
continue
preset_key = str(pid)
preset_payload = build_preset_dict(preset_data)
preset_payload["name"] = preset_data.get("name", "")
presets_by_name[preset_key] = preset_payload
if not presets_by_name:
return json.dumps({"error": "No matching presets found"}), 404, {'Content-Type': 'application/json'}
if default_id is not None and str(default_id) not in presets_by_name:
default_id = None
# Use shared ESPNow singleton
esp = ESPNow()
async def send_chunk(chunk_presets):
# Include save flag so the led-driver can persist when desired.
msg = build_message(presets=chunk_presets, save=save_flag, default=default_id)
await esp.send(msg)
MAX_BYTES = ESPNOW_MAX_PAYLOAD_BYTES
SEND_DELAY_MS = 100
entries = list(presets_by_name.items())
total_presets = len(entries)
messages_sent = 0
batch = {}
last_msg = None
for name, preset_obj in entries:
test_batch = dict(batch)
test_batch[name] = preset_obj
test_msg = build_message(presets=test_batch, save=save_flag, default=default_id)
size = len(test_msg)
if size <= MAX_BYTES or not batch:
batch = test_batch
last_msg = test_msg
else:
try:
await send_chunk(batch)
except Exception:
return json.dumps({"error": "ESP-NOW send failed"}), 503, {'Content-Type': 'application/json'}
await asyncio.sleep_ms(SEND_DELAY_MS)
messages_sent += 1
batch = {name: preset_obj}
last_msg = build_message(presets=batch, save=save_flag, default=default_id)
if batch:
try:
await send_chunk(batch)
except Exception:
return json.dumps({"error": "ESP-NOW send failed"}), 503, {'Content-Type': 'application/json'}
await asyncio.sleep_ms(SEND_DELAY_MS)
messages_sent += 1
return json.dumps({
"message": "Presets sent via ESPNow",
"presets_sent": total_presets,
"messages_sent": messages_sent
}), 200, {'Content-Type': 'application/json'}