- Run app on Raspberry Pi: serial to ESP32 bridge at 912000 baud, /dev/ttyS0 - Remove ESP-NOW/MicroPython-only code from src (espnow, p2p, wifi, machine/Pin) - Transport: always send 6-byte MAC + payload; optional to/destination_mac in API and WebSocket - Settings and model DB use project paths (no root); fix sys.print_exception for CPython - Preset/settings controllers use get_current_sender(); template paths for cwd=src - Pipfile: run from src, PORT from env; scripts for port 80 (setcap) and test - ESP32 bridge: receive 6-byte addr + payload, LRU peer management (20 max), handle ESP_ERR_ESPNOW_EXIST - Add esp32/main.py, esp32/benchmark_peers.py, scripts/setup-port80.sh, scripts/test-port80.sh Made-with: Cursor
195 lines
5.7 KiB
Python
195 lines
5.7 KiB
Python
"""
|
|
Message builder for LED driver API communication.
|
|
|
|
Builds JSON messages according to the LED driver API specification
|
|
for sending presets and select commands over the transport (e.g. serial).
|
|
"""
|
|
|
|
import json
|
|
|
|
|
|
def build_message(presets=None, select=None, save=False, default=None):
|
|
"""
|
|
Build an API message (presets and/or select) as a JSON string.
|
|
|
|
Args:
|
|
presets: Dictionary mapping preset names to preset objects, or None
|
|
select: Dictionary mapping device names to select lists, or None
|
|
|
|
Returns:
|
|
JSON string ready to send over the transport
|
|
|
|
Example:
|
|
message = build_message(
|
|
presets={
|
|
"red_blink": {
|
|
"pattern": "blink",
|
|
"colors": ["#FF0000"],
|
|
"delay": 200,
|
|
"brightness": 255,
|
|
"auto": True
|
|
}
|
|
},
|
|
select={
|
|
"device1": ["red_blink"]
|
|
}
|
|
)
|
|
"""
|
|
message = {
|
|
"v": "1"
|
|
}
|
|
|
|
if presets:
|
|
message["presets"] = presets
|
|
# When sending presets, optionally include a save flag so the
|
|
# led-driver can persist them.
|
|
if save:
|
|
message["save"] = True
|
|
|
|
if select:
|
|
message["select"] = select
|
|
|
|
if default is not None:
|
|
message["default"] = default
|
|
|
|
return json.dumps(message)
|
|
|
|
|
|
def build_select_message(device_name, preset_name, step=None):
|
|
"""
|
|
Build a select message for a single device.
|
|
|
|
Args:
|
|
device_name: Name of the device
|
|
preset_name: Name of the preset to select
|
|
step: Optional step value for synchronization
|
|
|
|
Returns:
|
|
Dictionary with select field ready to use in build_message
|
|
|
|
Example:
|
|
select = build_select_message("device1", "rainbow_preset", step=10)
|
|
message = build_message(select=select)
|
|
"""
|
|
select_list = [preset_name]
|
|
if step is not None:
|
|
select_list.append(step)
|
|
|
|
return {device_name: select_list}
|
|
|
|
|
|
def build_preset_dict(preset_data):
|
|
"""
|
|
Convert preset data to API-compliant format.
|
|
|
|
Args:
|
|
preset_data: Dictionary with preset fields (may include name, pattern, colors, etc.)
|
|
|
|
Returns:
|
|
Dictionary with preset in API-compliant format (without name field)
|
|
|
|
Example:
|
|
preset = build_preset_dict({
|
|
"name": "red_blink",
|
|
"pattern": "blink",
|
|
"colors": ["#FF0000"],
|
|
"delay": 200,
|
|
"brightness": 255,
|
|
"auto": True,
|
|
"n1": 0,
|
|
"n2": 0,
|
|
"n3": 0,
|
|
"n4": 0,
|
|
"n5": 0,
|
|
"n6": 0
|
|
})
|
|
"""
|
|
# Ensure colors are in hex format
|
|
colors = preset_data.get("colors", preset_data.get("c", ["#FFFFFF"]))
|
|
if colors:
|
|
# Convert RGB tuples to hex strings if needed
|
|
if isinstance(colors[0], list) and len(colors[0]) == 3:
|
|
# RGB tuple format [r, g, b]
|
|
colors = [f"#{r:02x}{g:02x}{b:02x}" for r, g, b in colors]
|
|
elif not isinstance(colors[0], str):
|
|
# Handle other formats - convert to hex
|
|
colors = ["#FFFFFF"]
|
|
# Ensure all colors start with #
|
|
colors = [c if c.startswith("#") else f"#{c}" for c in colors]
|
|
else:
|
|
colors = ["#FFFFFF"]
|
|
|
|
# Build payload using the short keys expected by led-driver
|
|
preset = {
|
|
"p": preset_data.get("pattern", preset_data.get("p", "off")),
|
|
"c": colors,
|
|
"d": preset_data.get("delay", preset_data.get("d", 100)),
|
|
"b": preset_data.get("brightness", preset_data.get("b", preset_data.get("br", 127))),
|
|
"a": preset_data.get("auto", preset_data.get("a", True)),
|
|
"n1": preset_data.get("n1", 0),
|
|
"n2": preset_data.get("n2", 0),
|
|
"n3": preset_data.get("n3", 0),
|
|
"n4": preset_data.get("n4", 0),
|
|
"n5": preset_data.get("n5", 0),
|
|
"n6": preset_data.get("n6", 0)
|
|
}
|
|
|
|
return preset
|
|
|
|
|
|
def build_presets_dict(presets_data):
|
|
"""
|
|
Convert multiple presets to API-compliant format.
|
|
|
|
Args:
|
|
presets_data: Dictionary mapping preset names to preset data
|
|
|
|
Returns:
|
|
Dictionary mapping preset names to API-compliant preset objects
|
|
|
|
Example:
|
|
presets = build_presets_dict({
|
|
"red_blink": {
|
|
"pattern": "blink",
|
|
"colors": ["#FF0000"],
|
|
"delay": 200
|
|
},
|
|
"blue_pulse": {
|
|
"pattern": "pulse",
|
|
"colors": ["#0000FF"],
|
|
"delay": 100
|
|
}
|
|
})
|
|
"""
|
|
result = {}
|
|
for preset_name, preset_data in presets_data.items():
|
|
result[preset_name] = build_preset_dict(preset_data)
|
|
return result
|
|
|
|
|
|
def build_select_dict(device_preset_mapping, step_mapping=None):
|
|
"""
|
|
Build a select dictionary mapping device names to select lists.
|
|
|
|
Args:
|
|
device_preset_mapping: Dictionary mapping device names to preset names
|
|
step_mapping: Optional dictionary mapping device names to step values
|
|
|
|
Returns:
|
|
Dictionary with select field ready to use in build_message
|
|
|
|
Example:
|
|
select = build_select_dict(
|
|
{"device1": "rainbow_preset", "device2": "pulse_preset"},
|
|
step_mapping={"device1": 10}
|
|
)
|
|
message = build_message(select=select)
|
|
"""
|
|
select = {}
|
|
for device_name, preset_name in device_preset_mapping.items():
|
|
select_list = [preset_name]
|
|
if step_mapping and device_name in step_mapping:
|
|
select_list.append(step_mapping[device_name])
|
|
select[device_name] = select_list
|
|
return select
|