Compare commits

...

6 Commits

Author SHA1 Message Date
80ff216e54 Update preset format with n7/n8 parameters
- Add n7 and n8 fields to preset definitions
- Update preset data format
2026-01-17 21:40:38 +13:00
1fb3dee942 Update tab storage to 2D grid format
- Change presets from flat array to 2D grid layout
- Add presets_flat array for backward compatibility
- Support 3-column grid layout for preset positioning
2026-01-17 21:40:37 +13:00
a4502055fb Add test utilities and scripts
- Add test directory with main.py, p2p.py, ws.py
- Add send_empty_json.py WebSocket test script
2026-01-17 21:40:11 +13:00
6e61ec8de6 Add P2P communication module
- Implement ESP-NOW async communication
- Support sending string, dict, or bytes data
- Use asend for async broadcast messaging
2026-01-17 21:40:10 +13:00
48d02f0e70 Update watch script path in Pipfile
- Fix watch script to use relative paths
2026-01-17 21:40:08 +13:00
cacaa3505e Add pattern definitions endpoint
- Add /definitions endpoint to pattern controller
- Load pattern.json with fallback paths for local dev and MicroPython
2026-01-17 21:40:07 +13:00
9 changed files with 418 additions and 59 deletions

View File

@@ -17,4 +17,4 @@ python_version = "3.12"
[scripts] [scripts]
web = "python /home/pi/led-controller/tests/web.py" web = "python /home/pi/led-controller/tests/web.py"
watch = "python -m watchfiles 'python /home/pi/led-controller/tests/web.py' /home/pi/led-controller/src /home/pi/led-controller/tests" watch = "python -m watchfiles 'python tests/web.py' src tests"

View File

@@ -1,57 +1 @@
{ {"1": {"name": "Warm White", "pattern": "on", "colors": ["#FFE5B4", "#FFDAB9", "#FFE4B5"], "brightness": 200, "delay": 100, "n1": 10, "n2": 10, "n3": 10, "n4": 10, "n5": 0, "n6": 0, "n7": 0, "n8": 0}, "2": {"name": "Rainbow", "pattern": "rainbow", "colors": ["#FF0000", "#FF7F00", "#FFFF00", "#00FF00", "#0000FF", "#4B0082", "#9400D3"], "brightness": 255, "delay": 50, "n1": 20, "n2": 15, "n3": 10, "n4": 5, "n5": 0, "n6": 0, "Step Rate": 20, "n7": 0, "n8": 0}, "3": {"name": "Pulse Red", "pattern": "pulse", "colors": ["#FF0000", "#CC0000", "#990000"], "brightness": 180, "delay": 200, "n1": 30, "n2": 20, "n3": 10, "n4": 5, "n5": 0, "n6": 0}}
"1": {
"name": "Warm White",
"pattern": "on",
"colors": [
"#FFE5B4",
"#FFDAB9",
"#FFE4B5"
],
"brightness": 200,
"delay": 100,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 0,
"n6": 0
},
"2": {
"name": "Rainbow",
"pattern": "rainbow",
"colors": [
"#FF0000",
"#FF7F00",
"#FFFF00",
"#00FF00",
"#0000FF",
"#4B0082",
"#9400D3"
],
"brightness": 255,
"delay": 50,
"n1": 20,
"n2": 15,
"n3": 10,
"n4": 5,
"n5": 0,
"n6": 0
},
"3": {
"name": "Pulse Red",
"pattern": "pulse",
"colors": [
"#FF0000",
"#CC0000",
"#990000"
],
"brightness": 180,
"delay": 200,
"n1": 30,
"n2": 20,
"n3": 10,
"n4": 5,
"n5": 0,
"n6": 0
}
}

View File

@@ -1 +1 @@
{"1": {"name": "Main", "names": ["1", "2", "3"], "presets": ["1", "2"]}, "2": {"name": "Accent", "names": ["4", "5"], "presets": ["2", "3"]}, "3": {"name": "", "names": [], "presets": []}, "4": {"name": "", "names": [], "presets": []}, "5": {"name": "", "names": [], "presets": []}, "6": {"name": "", "names": [], "presets": []}, "7": {"name": "", "names": [], "presets": []}, "8": {"name": "", "names": [], "presets": []}, "9": {"name": "", "names": [], "presets": []}, "10": {"name": "", "names": [], "presets": []}, "11": {"name": "", "names": [], "presets": []}, "12": {"name": "test2", "names": ["1"], "presets": [], "colors": ["#b93c3c", "#761e1e", "#ffffff"]}, "13": {"name": "test5", "names": ["1"], "presets": []}} {"1": {"name": "Main", "names": ["1", "2", "3"], "presets": [["1", "2", "3"]], "presets_flat": ["1", "2", "3"]}, "2": {"name": "Accent", "names": ["4", "5"], "presets": []}, "3": {"name": "", "names": [], "presets": []}, "4": {"name": "", "names": [], "presets": []}, "5": {"name": "", "names": [], "presets": []}, "6": {"name": "", "names": [], "presets": []}, "7": {"name": "", "names": [], "presets": []}, "8": {"name": "", "names": [], "presets": []}, "9": {"name": "", "names": [], "presets": []}, "10": {"name": "", "names": [], "presets": []}, "11": {"name": "", "names": [], "presets": []}, "12": {"name": "test2", "names": ["1"], "presets": [], "colors": ["#b93c3c", "#761e1e", "#ffffff"]}, "13": {"name": "test5", "names": ["1"], "presets": []}}

44
send_empty_json.py Normal file
View File

@@ -0,0 +1,44 @@
#!/usr/bin/env python3
import socket
import struct
import base64
import hashlib
# Connect to the WebSocket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('192.168.4.1', 80))
# Send HTTP WebSocket upgrade request
key = base64.b64encode(b'test-nonce').decode('utf-8')
request = f'''GET /ws HTTP/1.1\r
Host: 192.168.4.1\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Key: {key}\r
Sec-WebSocket-Version: 13\r
\r
'''
s.send(request.encode())
# Read upgrade response
response = s.recv(4096)
print(response.decode())
# Send WebSocket TEXT frame with empty JSON '{}'
payload = b'{}'
mask = b'\x12\x34\x56\x78'
payload_masked = bytes(p ^ mask[i % 4] for i, p in enumerate(payload))
frame = struct.pack('BB', 0x81, 0x80 | len(payload))
frame += mask
frame += payload_masked
s.send(frame)
print("Sent empty JSON to WebSocket")
s.close()

View File

@@ -1,10 +1,32 @@
from microdot import Microdot from microdot import Microdot
from models.pattern import Pattern from models.pattern import Pattern
import json import json
import sys
controller = Microdot() controller = Microdot()
patterns = Pattern() patterns = Pattern()
def load_pattern_definitions():
"""Load pattern definitions from pattern.json file."""
try:
# Try different paths for local development vs MicroPython
paths = ['db/pattern.json', 'pattern.json', '/db/pattern.json']
for path in paths:
try:
with open(path, 'r') as f:
return json.load(f)
except OSError:
continue
return {}
except Exception as e:
print(f"Error loading pattern.json: {e}")
return {}
@controller.get('/definitions')
async def get_pattern_definitions(request):
"""Get pattern definitions from pattern.json."""
definitions = load_pattern_definitions()
return json.dumps(definitions), 200, {'Content-Type': 'application/json'}
@controller.get('') @controller.get('')
async def list_patterns(request): async def list_patterns(request):

39
src/p2p.py Normal file
View File

@@ -0,0 +1,39 @@
import network
import aioespnow
import asyncio
import json
from time import sleep
class P2P:
def __init__(self):
network.WLAN(network.STA_IF).active(True)
self.broadcast = bytes.fromhex("ffffffffffff")
self.e = aioespnow.AIOESPNow()
self.e.active(True)
try:
self.e.add_peer(self.broadcast)
except:
pass
async def send(self, data):
# Convert data to bytes if it's a string or dict
if isinstance(data, str):
payload = data.encode()
elif isinstance(data, dict):
payload = json.dumps(data).encode()
else:
payload = data # Assume it's already bytes
# Use asend for async sending - returns boolean indicating success
result = await self.e.asend(self.broadcast, payload)
return result
async def main():
p = P2P()
await p.send(json.dumps({"dj": {"p": "on", "colors": ["#ff0000"]}}))
if __name__ == "__main__":
asyncio.run(main())

12
test/main.py Normal file
View File

@@ -0,0 +1,12 @@
from microdot import Microdot
from src.profile import profile_app
app = Microdot()
@app.route('/')
async def index(request):
return 'Hello, world!'
app.mount(profile_app, url_prefix="/profile")
app.run(port=8080, debug=True)

105
test/p2p.py Normal file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
# MicroPython script to test LED bar patterns over ESP-NOW (no WebSocket)
import json
import uasyncio as asyncio
# Import P2P from src/p2p.py
# Note: When running on device, ensure src/p2p.py is in the path
try:
from p2p import P2P
except ImportError:
# Fallback: import from src directory
import sys
sys.path.insert(0, 'src')
from p2p import P2P
async def main():
p2p = P2P()
# Test cases following msg.json format:
# {"g": {"df": {...}, "group_name": {...}}, "sv": true, "st": 0}
# Note: led-bar device must have matching group in settings["groups"]
tests = [
# Example 1: Default format with df defaults and dj group (matches msg.json)
{
"g": {
"df": {
"pt": "on",
"cl": ["#ff0000"],
"br": 200,
"n1": 10,
"n2": 10,
"n3": 10,
"n4": 10,
"n5": 10,
"n6": 10,
"dl": 100
},
"dj": {
"pt": "blink",
"cl": ["#00ff00"],
"dl": 500
}
},
"sv": True,
"st": 0
},
# Example 2: Different group with df defaults
{
"g": {
"df": {
"pt": "on",
"br": 150,
"dl": 100
},
"group1": {
"pt": "rainbow",
"dl": 50
}
},
"sv": False
},
# Example 3: Multiple groups
{
"g": {
"df": {
"br": 200,
"dl": 100
},
"group1": {
"pt": "on",
"cl": ["#0000ff"]
},
"group2": {
"pt": "blink",
"cl": ["#ff00ff"],
"dl": 300
}
},
"sv": True,
"st": 1
},
# Example 4: Single group without df
{
"g": {
"dj": {
"pt": "off"
}
},
"sv": False
}
]
for i, test in enumerate(tests, 1):
print(f"\n{'='*50}")
print(f"Test {i}/{len(tests)}")
print(f"Sending: {json.dumps(test, indent=2)}")
await p2p.send(json.dumps(test))
await asyncio.sleep_ms(2000)
print(f"\n{'='*50}")
print("All tests completed")
if __name__ == "__main__":
asyncio.run(main())

193
test/ws.py Normal file
View File

@@ -0,0 +1,193 @@
import asyncio
import websockets
import json
import sys
async def test_websocket():
uri = "ws://192.168.4.1:8080/ws"
tests_passed = 0
tests_total = 0
async def run_test(name, test_func):
nonlocal tests_passed, tests_total
tests_total += 1
try:
result = await test_func()
if result is not False:
print(f"{name}")
tests_passed += 1
return True
else:
print(f"{name} (failed)")
return False
except Exception as e:
print(f"{name} (error: {e})")
return False
try:
print(f"Connecting to WebSocket server at {uri}...")
async with websockets.connect(uri) as websocket:
print(f"✓ Connected to WebSocket server\n")
# Test 1: Empty JSON
print("Test 1: Empty JSON")
await run_test("Send empty JSON", lambda: websocket.send(json.dumps({})))
await asyncio.sleep(0.3)
# Test 2: Pattern on with single color
print("\nTest 2: Pattern 'on'")
await run_test("Send on pattern", lambda: websocket.send(json.dumps({
"settings": {"pattern": "on", "colors": ["#00ff00"], "brightness": 200}
})))
await asyncio.sleep(0.3)
# Test 3: Pattern blink
print("\nTest 3: Pattern 'blink'")
await run_test("Send blink pattern", lambda: websocket.send(json.dumps({
"settings": {"pattern": "blink", "colors": ["#ff0000"], "delay": 500}
})))
await asyncio.sleep(0.3)
# Test 4: Pattern rainbow
print("\nTest 4: Pattern 'rainbow'")
await run_test("Send rainbow pattern", lambda: websocket.send(json.dumps({
"settings": {"pattern": "rainbow", "delay": 100}
})))
await asyncio.sleep(0.3)
# Test 5: Pattern off
print("\nTest 5: Pattern 'off'")
await run_test("Send off pattern", lambda: websocket.send(json.dumps({
"settings": {"pattern": "off"}
})))
await asyncio.sleep(0.3)
# Test 6: Multiple colors
print("\nTest 6: Multiple colors")
await run_test("Send multiple colors", lambda: websocket.send(json.dumps({
"settings": {
"pattern": "color_transition",
"colors": ["#ff0000", "#00ff00", "#0000ff"],
"delay": 100
}
})))
await asyncio.sleep(0.3)
# Test 7: RGB tuple colors (if supported)
print("\nTest 7: RGB tuple colors")
await run_test("Send RGB tuple colors", lambda: websocket.send(json.dumps({
"settings": {
"pattern": "on",
"colors": [[255, 0, 128], [128, 255, 0]],
"brightness": 150
}
})))
await asyncio.sleep(0.3)
# Test 8: Pattern with all parameters
print("\nTest 8: Pattern with all parameters")
await run_test("Send pattern with all params", lambda: websocket.send(json.dumps({
"settings": {
"pattern": "flicker",
"colors": ["#ff8800"],
"brightness": 127,
"delay": 80,
"n1": 10,
"n2": 5,
"n3": 1,
"n4": 1
}
})))
await asyncio.sleep(0.3)
# Test 9: Short-key format (df/dj)
print("\nTest 9: Short-key format (df/dj)")
await run_test("Send df/dj format", lambda: websocket.send(json.dumps({
"df": {"pt": "on", "cl": ["#ff0000"], "br": 200},
"dj": {"pa": "blink", "cl": ["#00ff00"], "dl": 500},
"settings": {"pattern": "blink", "colors": ["#00ff00"], "delay": 500, "brightness": 200}
})))
await asyncio.sleep(0.3)
# Test 10: Rapid message sending
print("\nTest 10: Rapid message sending")
patterns = ["on", "off", "on", "blink"]
for i, pattern in enumerate(patterns):
p = pattern # Capture in closure
await run_test(f"Rapid send {i+1}/{len(patterns)}", lambda p=p: websocket.send(json.dumps({
"settings": {"pattern": p, "colors": ["#ffffff"]}
})))
await asyncio.sleep(0.1)
# Test 11: Large message
print("\nTest 11: Large message")
large_colors = [f"#{i%256:02x}{i*2%256:02x}{i*3%256:02x}" for i in range(50)]
await run_test("Send large message", lambda: websocket.send(json.dumps({
"settings": {
"pattern": "color_transition",
"colors": large_colors,
"delay": 50
}
})))
await asyncio.sleep(0.3)
# Test 12: Invalid JSON (should be handled gracefully)
print("\nTest 12: Invalid JSON handling")
try:
await websocket.send("not valid json")
print("⚠ Invalid JSON sent (server should handle gracefully)")
tests_total += 1
except Exception as e:
print(f"✗ Invalid JSON failed to send: {e}")
tests_total += 1
# Test 13: Malformed structure (missing settings)
print("\nTest 13: Malformed structure")
await run_test("Send message without settings", lambda: websocket.send(json.dumps({
"pattern": "on",
"colors": ["#ff0000"]
})))
await asyncio.sleep(0.3)
# Test 14: Just settings key, no pattern
print("\nTest 14: Settings without pattern")
await run_test("Send settings without pattern", lambda: websocket.send(json.dumps({
"settings": {"colors": ["#0000ff"], "brightness": 100}
})))
await asyncio.sleep(0.3)
# Test 15: Empty settings
print("\nTest 15: Empty settings")
await run_test("Send empty settings", lambda: websocket.send(json.dumps({
"settings": {}
})))
await asyncio.sleep(0.3)
print(f"\n{'='*50}")
print(f"Tests completed: {tests_passed}/{tests_total} passed")
if tests_passed == tests_total:
print("✓ All tests passed!")
else:
print(f"{tests_total - tests_passed} test(s) failed")
print(f"{'='*50}")
except websockets.exceptions.ConnectionClosedOK:
print("✓ WebSocket connection closed gracefully.")
except websockets.exceptions.ConnectionClosedError as e:
print(f"✗ WebSocket connection closed with error: {e}")
sys.exit(1)
except ConnectionRefusedError:
print(f"✗ Connection refused. Is the server running at {uri}?")
print("Make sure:")
print(" 1. The device is connected to WiFi")
print(" 2. The server is running on the device")
print(" 3. You can reach 192.168.4.1")
sys.exit(1)
except Exception as e:
print(f"✗ An unexpected error occurred: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(test_websocket())