Compare commits
6 Commits
97ffc69b12
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 80ff216e54 | |||
| 1fb3dee942 | |||
| a4502055fb | |||
| 6e61ec8de6 | |||
| 48d02f0e70 | |||
| cacaa3505e |
2
Pipfile
2
Pipfile
@@ -17,4 +17,4 @@ python_version = "3.12"
|
||||
|
||||
[scripts]
|
||||
web = "python /home/pi/led-controller/tests/web.py"
|
||||
watch = "python -m watchfiles 'python /home/pi/led-controller/tests/web.py' /home/pi/led-controller/src /home/pi/led-controller/tests"
|
||||
watch = "python -m watchfiles 'python tests/web.py' src tests"
|
||||
|
||||
@@ -1,57 +1 @@
|
||||
{
|
||||
"1": {
|
||||
"name": "Warm White",
|
||||
"pattern": "on",
|
||||
"colors": [
|
||||
"#FFE5B4",
|
||||
"#FFDAB9",
|
||||
"#FFE4B5"
|
||||
],
|
||||
"brightness": 200,
|
||||
"delay": 100,
|
||||
"n1": 10,
|
||||
"n2": 10,
|
||||
"n3": 10,
|
||||
"n4": 10,
|
||||
"n5": 0,
|
||||
"n6": 0
|
||||
},
|
||||
"2": {
|
||||
"name": "Rainbow",
|
||||
"pattern": "rainbow",
|
||||
"colors": [
|
||||
"#FF0000",
|
||||
"#FF7F00",
|
||||
"#FFFF00",
|
||||
"#00FF00",
|
||||
"#0000FF",
|
||||
"#4B0082",
|
||||
"#9400D3"
|
||||
],
|
||||
"brightness": 255,
|
||||
"delay": 50,
|
||||
"n1": 20,
|
||||
"n2": 15,
|
||||
"n3": 10,
|
||||
"n4": 5,
|
||||
"n5": 0,
|
||||
"n6": 0
|
||||
},
|
||||
"3": {
|
||||
"name": "Pulse Red",
|
||||
"pattern": "pulse",
|
||||
"colors": [
|
||||
"#FF0000",
|
||||
"#CC0000",
|
||||
"#990000"
|
||||
],
|
||||
"brightness": 180,
|
||||
"delay": 200,
|
||||
"n1": 30,
|
||||
"n2": 20,
|
||||
"n3": 10,
|
||||
"n4": 5,
|
||||
"n5": 0,
|
||||
"n6": 0
|
||||
}
|
||||
}
|
||||
{"1": {"name": "Warm White", "pattern": "on", "colors": ["#FFE5B4", "#FFDAB9", "#FFE4B5"], "brightness": 200, "delay": 100, "n1": 10, "n2": 10, "n3": 10, "n4": 10, "n5": 0, "n6": 0, "n7": 0, "n8": 0}, "2": {"name": "Rainbow", "pattern": "rainbow", "colors": ["#FF0000", "#FF7F00", "#FFFF00", "#00FF00", "#0000FF", "#4B0082", "#9400D3"], "brightness": 255, "delay": 50, "n1": 20, "n2": 15, "n3": 10, "n4": 5, "n5": 0, "n6": 0, "Step Rate": 20, "n7": 0, "n8": 0}, "3": {"name": "Pulse Red", "pattern": "pulse", "colors": ["#FF0000", "#CC0000", "#990000"], "brightness": 180, "delay": 200, "n1": 30, "n2": 20, "n3": 10, "n4": 5, "n5": 0, "n6": 0}}
|
||||
@@ -1 +1 @@
|
||||
{"1": {"name": "Main", "names": ["1", "2", "3"], "presets": ["1", "2"]}, "2": {"name": "Accent", "names": ["4", "5"], "presets": ["2", "3"]}, "3": {"name": "", "names": [], "presets": []}, "4": {"name": "", "names": [], "presets": []}, "5": {"name": "", "names": [], "presets": []}, "6": {"name": "", "names": [], "presets": []}, "7": {"name": "", "names": [], "presets": []}, "8": {"name": "", "names": [], "presets": []}, "9": {"name": "", "names": [], "presets": []}, "10": {"name": "", "names": [], "presets": []}, "11": {"name": "", "names": [], "presets": []}, "12": {"name": "test2", "names": ["1"], "presets": [], "colors": ["#b93c3c", "#761e1e", "#ffffff"]}, "13": {"name": "test5", "names": ["1"], "presets": []}}
|
||||
{"1": {"name": "Main", "names": ["1", "2", "3"], "presets": [["1", "2", "3"]], "presets_flat": ["1", "2", "3"]}, "2": {"name": "Accent", "names": ["4", "5"], "presets": []}, "3": {"name": "", "names": [], "presets": []}, "4": {"name": "", "names": [], "presets": []}, "5": {"name": "", "names": [], "presets": []}, "6": {"name": "", "names": [], "presets": []}, "7": {"name": "", "names": [], "presets": []}, "8": {"name": "", "names": [], "presets": []}, "9": {"name": "", "names": [], "presets": []}, "10": {"name": "", "names": [], "presets": []}, "11": {"name": "", "names": [], "presets": []}, "12": {"name": "test2", "names": ["1"], "presets": [], "colors": ["#b93c3c", "#761e1e", "#ffffff"]}, "13": {"name": "test5", "names": ["1"], "presets": []}}
|
||||
44
send_empty_json.py
Normal file
44
send_empty_json.py
Normal file
@@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
import socket
|
||||
import struct
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
# Connect to the WebSocket
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.connect(('192.168.4.1', 80))
|
||||
|
||||
# Send HTTP WebSocket upgrade request
|
||||
key = base64.b64encode(b'test-nonce').decode('utf-8')
|
||||
request = f'''GET /ws HTTP/1.1\r
|
||||
Host: 192.168.4.1\r
|
||||
Upgrade: websocket\r
|
||||
Connection: Upgrade\r
|
||||
Sec-WebSocket-Key: {key}\r
|
||||
Sec-WebSocket-Version: 13\r
|
||||
\r
|
||||
'''
|
||||
s.send(request.encode())
|
||||
|
||||
# Read upgrade response
|
||||
response = s.recv(4096)
|
||||
print(response.decode())
|
||||
|
||||
# Send WebSocket TEXT frame with empty JSON '{}'
|
||||
payload = b'{}'
|
||||
mask = b'\x12\x34\x56\x78'
|
||||
payload_masked = bytes(p ^ mask[i % 4] for i, p in enumerate(payload))
|
||||
|
||||
frame = struct.pack('BB', 0x81, 0x80 | len(payload))
|
||||
frame += mask
|
||||
frame += payload_masked
|
||||
|
||||
s.send(frame)
|
||||
print("Sent empty JSON to WebSocket")
|
||||
s.close()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,10 +1,32 @@
|
||||
from microdot import Microdot
|
||||
from models.pattern import Pattern
|
||||
import json
|
||||
import sys
|
||||
|
||||
controller = Microdot()
|
||||
patterns = Pattern()
|
||||
|
||||
def load_pattern_definitions():
|
||||
"""Load pattern definitions from pattern.json file."""
|
||||
try:
|
||||
# Try different paths for local development vs MicroPython
|
||||
paths = ['db/pattern.json', 'pattern.json', '/db/pattern.json']
|
||||
for path in paths:
|
||||
try:
|
||||
with open(path, 'r') as f:
|
||||
return json.load(f)
|
||||
except OSError:
|
||||
continue
|
||||
return {}
|
||||
except Exception as e:
|
||||
print(f"Error loading pattern.json: {e}")
|
||||
return {}
|
||||
|
||||
@controller.get('/definitions')
|
||||
async def get_pattern_definitions(request):
|
||||
"""Get pattern definitions from pattern.json."""
|
||||
definitions = load_pattern_definitions()
|
||||
return json.dumps(definitions), 200, {'Content-Type': 'application/json'}
|
||||
|
||||
@controller.get('')
|
||||
async def list_patterns(request):
|
||||
|
||||
39
src/p2p.py
Normal file
39
src/p2p.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import network
|
||||
import aioespnow
|
||||
import asyncio
|
||||
import json
|
||||
from time import sleep
|
||||
|
||||
|
||||
class P2P:
|
||||
def __init__(self):
|
||||
network.WLAN(network.STA_IF).active(True)
|
||||
self.broadcast = bytes.fromhex("ffffffffffff")
|
||||
self.e = aioespnow.AIOESPNow()
|
||||
self.e.active(True)
|
||||
try:
|
||||
self.e.add_peer(self.broadcast)
|
||||
except:
|
||||
pass
|
||||
|
||||
async def send(self, data):
|
||||
# Convert data to bytes if it's a string or dict
|
||||
if isinstance(data, str):
|
||||
payload = data.encode()
|
||||
elif isinstance(data, dict):
|
||||
payload = json.dumps(data).encode()
|
||||
else:
|
||||
payload = data # Assume it's already bytes
|
||||
|
||||
# Use asend for async sending - returns boolean indicating success
|
||||
result = await self.e.asend(self.broadcast, payload)
|
||||
return result
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
p = P2P()
|
||||
await p.send(json.dumps({"dj": {"p": "on", "colors": ["#ff0000"]}}))
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
12
test/main.py
Normal file
12
test/main.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from microdot import Microdot
|
||||
from src.profile import profile_app
|
||||
|
||||
app = Microdot()
|
||||
|
||||
@app.route('/')
|
||||
async def index(request):
|
||||
return 'Hello, world!'
|
||||
|
||||
app.mount(profile_app, url_prefix="/profile")
|
||||
|
||||
app.run(port=8080, debug=True)
|
||||
105
test/p2p.py
Normal file
105
test/p2p.py
Normal file
@@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
# MicroPython script to test LED bar patterns over ESP-NOW (no WebSocket)
|
||||
|
||||
import json
|
||||
import uasyncio as asyncio
|
||||
|
||||
# Import P2P from src/p2p.py
|
||||
# Note: When running on device, ensure src/p2p.py is in the path
|
||||
try:
|
||||
from p2p import P2P
|
||||
except ImportError:
|
||||
# Fallback: import from src directory
|
||||
import sys
|
||||
sys.path.insert(0, 'src')
|
||||
from p2p import P2P
|
||||
|
||||
async def main():
|
||||
p2p = P2P()
|
||||
|
||||
# Test cases following msg.json format:
|
||||
# {"g": {"df": {...}, "group_name": {...}}, "sv": true, "st": 0}
|
||||
# Note: led-bar device must have matching group in settings["groups"]
|
||||
tests = [
|
||||
# Example 1: Default format with df defaults and dj group (matches msg.json)
|
||||
{
|
||||
"g": {
|
||||
"df": {
|
||||
"pt": "on",
|
||||
"cl": ["#ff0000"],
|
||||
"br": 200,
|
||||
"n1": 10,
|
||||
"n2": 10,
|
||||
"n3": 10,
|
||||
"n4": 10,
|
||||
"n5": 10,
|
||||
"n6": 10,
|
||||
"dl": 100
|
||||
},
|
||||
"dj": {
|
||||
"pt": "blink",
|
||||
"cl": ["#00ff00"],
|
||||
"dl": 500
|
||||
}
|
||||
},
|
||||
"sv": True,
|
||||
"st": 0
|
||||
},
|
||||
# Example 2: Different group with df defaults
|
||||
{
|
||||
"g": {
|
||||
"df": {
|
||||
"pt": "on",
|
||||
"br": 150,
|
||||
"dl": 100
|
||||
},
|
||||
"group1": {
|
||||
"pt": "rainbow",
|
||||
"dl": 50
|
||||
}
|
||||
},
|
||||
"sv": False
|
||||
},
|
||||
# Example 3: Multiple groups
|
||||
{
|
||||
"g": {
|
||||
"df": {
|
||||
"br": 200,
|
||||
"dl": 100
|
||||
},
|
||||
"group1": {
|
||||
"pt": "on",
|
||||
"cl": ["#0000ff"]
|
||||
},
|
||||
"group2": {
|
||||
"pt": "blink",
|
||||
"cl": ["#ff00ff"],
|
||||
"dl": 300
|
||||
}
|
||||
},
|
||||
"sv": True,
|
||||
"st": 1
|
||||
},
|
||||
# Example 4: Single group without df
|
||||
{
|
||||
"g": {
|
||||
"dj": {
|
||||
"pt": "off"
|
||||
}
|
||||
},
|
||||
"sv": False
|
||||
}
|
||||
]
|
||||
|
||||
for i, test in enumerate(tests, 1):
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Test {i}/{len(tests)}")
|
||||
print(f"Sending: {json.dumps(test, indent=2)}")
|
||||
await p2p.send(json.dumps(test))
|
||||
await asyncio.sleep_ms(2000)
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print("All tests completed")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
193
test/ws.py
Normal file
193
test/ws.py
Normal file
@@ -0,0 +1,193 @@
|
||||
import asyncio
|
||||
import websockets
|
||||
import json
|
||||
import sys
|
||||
|
||||
async def test_websocket():
|
||||
uri = "ws://192.168.4.1:8080/ws"
|
||||
tests_passed = 0
|
||||
tests_total = 0
|
||||
|
||||
async def run_test(name, test_func):
|
||||
nonlocal tests_passed, tests_total
|
||||
tests_total += 1
|
||||
try:
|
||||
result = await test_func()
|
||||
if result is not False:
|
||||
print(f"✓ {name}")
|
||||
tests_passed += 1
|
||||
return True
|
||||
else:
|
||||
print(f"✗ {name} (failed)")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"✗ {name} (error: {e})")
|
||||
return False
|
||||
|
||||
try:
|
||||
print(f"Connecting to WebSocket server at {uri}...")
|
||||
async with websockets.connect(uri) as websocket:
|
||||
print(f"✓ Connected to WebSocket server\n")
|
||||
|
||||
# Test 1: Empty JSON
|
||||
print("Test 1: Empty JSON")
|
||||
await run_test("Send empty JSON", lambda: websocket.send(json.dumps({})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 2: Pattern on with single color
|
||||
print("\nTest 2: Pattern 'on'")
|
||||
await run_test("Send on pattern", lambda: websocket.send(json.dumps({
|
||||
"settings": {"pattern": "on", "colors": ["#00ff00"], "brightness": 200}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 3: Pattern blink
|
||||
print("\nTest 3: Pattern 'blink'")
|
||||
await run_test("Send blink pattern", lambda: websocket.send(json.dumps({
|
||||
"settings": {"pattern": "blink", "colors": ["#ff0000"], "delay": 500}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 4: Pattern rainbow
|
||||
print("\nTest 4: Pattern 'rainbow'")
|
||||
await run_test("Send rainbow pattern", lambda: websocket.send(json.dumps({
|
||||
"settings": {"pattern": "rainbow", "delay": 100}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 5: Pattern off
|
||||
print("\nTest 5: Pattern 'off'")
|
||||
await run_test("Send off pattern", lambda: websocket.send(json.dumps({
|
||||
"settings": {"pattern": "off"}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 6: Multiple colors
|
||||
print("\nTest 6: Multiple colors")
|
||||
await run_test("Send multiple colors", lambda: websocket.send(json.dumps({
|
||||
"settings": {
|
||||
"pattern": "color_transition",
|
||||
"colors": ["#ff0000", "#00ff00", "#0000ff"],
|
||||
"delay": 100
|
||||
}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 7: RGB tuple colors (if supported)
|
||||
print("\nTest 7: RGB tuple colors")
|
||||
await run_test("Send RGB tuple colors", lambda: websocket.send(json.dumps({
|
||||
"settings": {
|
||||
"pattern": "on",
|
||||
"colors": [[255, 0, 128], [128, 255, 0]],
|
||||
"brightness": 150
|
||||
}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 8: Pattern with all parameters
|
||||
print("\nTest 8: Pattern with all parameters")
|
||||
await run_test("Send pattern with all params", lambda: websocket.send(json.dumps({
|
||||
"settings": {
|
||||
"pattern": "flicker",
|
||||
"colors": ["#ff8800"],
|
||||
"brightness": 127,
|
||||
"delay": 80,
|
||||
"n1": 10,
|
||||
"n2": 5,
|
||||
"n3": 1,
|
||||
"n4": 1
|
||||
}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 9: Short-key format (df/dj)
|
||||
print("\nTest 9: Short-key format (df/dj)")
|
||||
await run_test("Send df/dj format", lambda: websocket.send(json.dumps({
|
||||
"df": {"pt": "on", "cl": ["#ff0000"], "br": 200},
|
||||
"dj": {"pa": "blink", "cl": ["#00ff00"], "dl": 500},
|
||||
"settings": {"pattern": "blink", "colors": ["#00ff00"], "delay": 500, "brightness": 200}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 10: Rapid message sending
|
||||
print("\nTest 10: Rapid message sending")
|
||||
patterns = ["on", "off", "on", "blink"]
|
||||
for i, pattern in enumerate(patterns):
|
||||
p = pattern # Capture in closure
|
||||
await run_test(f"Rapid send {i+1}/{len(patterns)}", lambda p=p: websocket.send(json.dumps({
|
||||
"settings": {"pattern": p, "colors": ["#ffffff"]}
|
||||
})))
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Test 11: Large message
|
||||
print("\nTest 11: Large message")
|
||||
large_colors = [f"#{i%256:02x}{i*2%256:02x}{i*3%256:02x}" for i in range(50)]
|
||||
await run_test("Send large message", lambda: websocket.send(json.dumps({
|
||||
"settings": {
|
||||
"pattern": "color_transition",
|
||||
"colors": large_colors,
|
||||
"delay": 50
|
||||
}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 12: Invalid JSON (should be handled gracefully)
|
||||
print("\nTest 12: Invalid JSON handling")
|
||||
try:
|
||||
await websocket.send("not valid json")
|
||||
print("⚠ Invalid JSON sent (server should handle gracefully)")
|
||||
tests_total += 1
|
||||
except Exception as e:
|
||||
print(f"✗ Invalid JSON failed to send: {e}")
|
||||
tests_total += 1
|
||||
|
||||
# Test 13: Malformed structure (missing settings)
|
||||
print("\nTest 13: Malformed structure")
|
||||
await run_test("Send message without settings", lambda: websocket.send(json.dumps({
|
||||
"pattern": "on",
|
||||
"colors": ["#ff0000"]
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 14: Just settings key, no pattern
|
||||
print("\nTest 14: Settings without pattern")
|
||||
await run_test("Send settings without pattern", lambda: websocket.send(json.dumps({
|
||||
"settings": {"colors": ["#0000ff"], "brightness": 100}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# Test 15: Empty settings
|
||||
print("\nTest 15: Empty settings")
|
||||
await run_test("Send empty settings", lambda: websocket.send(json.dumps({
|
||||
"settings": {}
|
||||
})))
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Tests completed: {tests_passed}/{tests_total} passed")
|
||||
if tests_passed == tests_total:
|
||||
print("✓ All tests passed!")
|
||||
else:
|
||||
print(f"⚠ {tests_total - tests_passed} test(s) failed")
|
||||
print(f"{'='*50}")
|
||||
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
print("✓ WebSocket connection closed gracefully.")
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
print(f"✗ WebSocket connection closed with error: {e}")
|
||||
sys.exit(1)
|
||||
except ConnectionRefusedError:
|
||||
print(f"✗ Connection refused. Is the server running at {uri}?")
|
||||
print("Make sure:")
|
||||
print(" 1. The device is connected to WiFi")
|
||||
print(" 2. The server is running on the device")
|
||||
print(" 3. You can reach 192.168.4.1")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"✗ An unexpected error occurred: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(test_websocket())
|
||||
Reference in New Issue
Block a user