188 lines
7.3 KiB
Python
188 lines
7.3 KiB
Python
|
from machine import Pin
|
||
|
from patterns import Patterns
|
||
|
import socket
|
||
|
import select
|
||
|
import json
|
||
|
|
||
|
class LEDServer:
|
||
|
SETTINGS_FILE = "/settings.json" # Path should be adjusted for MicroPython's filesystem
|
||
|
|
||
|
def __init__(self, num_leds=50, pin=4, led_pin=8, brigtness=255):
|
||
|
# Initialize NeoPixel Patterns
|
||
|
self.num_leds = num_leds
|
||
|
self.patterns = Patterns(pin, num_leds)
|
||
|
self.selected_pattern = "blink"
|
||
|
self.color = (16, 16, 0)
|
||
|
self.color2 = (16, 16, 0)
|
||
|
self.delay = 100
|
||
|
|
||
|
# Initialize single LED
|
||
|
self.led = Pin(led_pin, Pin.OUT)
|
||
|
|
||
|
# Initialize server
|
||
|
self.server_socket = socket.socket()
|
||
|
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
self.server_socket.bind(('0.0.0.0', 80))
|
||
|
self.server_socket.listen(1)
|
||
|
self.server_socket.settimeout(1) # Adjust timeout as needed
|
||
|
|
||
|
self.poll = select.poll()
|
||
|
self.poll.register(self.server_socket, select.POLLIN)
|
||
|
|
||
|
# Load settings from file
|
||
|
self.load_settings()
|
||
|
# Read static files
|
||
|
color = f'#{self.color[0]:02x}{self.color[1]:02x}{self.color[2]:02x}'
|
||
|
color2 = f'#{self.color2[0]:02x}{self.color2[1]:02x}{self.color2[2]:02x}'
|
||
|
print(color)
|
||
|
self.html = self.read_file("/index.html").format(num_leds=self.num_leds, delay=self.delay, color=color, color2=color2).encode()
|
||
|
self.js = self.read_file("/main.js").encode('utf-8')
|
||
|
self.css = self.read_file("/main.css").encode('utf-8')
|
||
|
self.patterns_json = json.dumps(list(self.patterns.patterns.keys()))
|
||
|
|
||
|
|
||
|
|
||
|
def read_file(self, file_path):
|
||
|
try:
|
||
|
with open(file_path, 'r') as file:
|
||
|
return file.read()
|
||
|
except OSError as e:
|
||
|
print(f"Error reading file {file_path}: {e}")
|
||
|
return ""
|
||
|
|
||
|
def save_settings(self):
|
||
|
settings = {
|
||
|
"num_leds": self.num_leds,
|
||
|
"selected_pattern": self.selected_pattern,
|
||
|
"color": self.color,
|
||
|
"color2": self.color2,
|
||
|
"delay": self.delay
|
||
|
}
|
||
|
print(settings)
|
||
|
try:
|
||
|
with open(self.SETTINGS_FILE, 'w') as file:
|
||
|
json.dump(settings, file)
|
||
|
except OSError as e:
|
||
|
print(f"Error saving settings: {e}")
|
||
|
|
||
|
def load_settings(self):
|
||
|
if self.file_exists(self.SETTINGS_FILE):
|
||
|
try:
|
||
|
with open(self.SETTINGS_FILE, 'r') as file:
|
||
|
settings = json.load(file)
|
||
|
self.num_leds = settings.get("num_leds", self.num_leds)
|
||
|
self.selected_pattern = settings.get("selected_pattern", self.selected_pattern)
|
||
|
self.color = tuple(settings.get("color", self.color))
|
||
|
self.color2 = tuple(settings.get("color2", self.color2))
|
||
|
self.delay = settings.get("delay", self.delay)
|
||
|
self.patterns.update_num_leds(4, self.num_leds)
|
||
|
self.patterns.set_delay(self.delay)
|
||
|
self.patterns.selected = self.selected_pattern
|
||
|
except (OSError, ValueError) as e:
|
||
|
print(f"Error loading settings: {e}")
|
||
|
|
||
|
def file_exists(self, file_path):
|
||
|
try:
|
||
|
with open(file_path, 'r'):
|
||
|
return True
|
||
|
except OSError:
|
||
|
return False
|
||
|
|
||
|
def handle_post(self, path, post_data, client_socket):
|
||
|
print(post_data)
|
||
|
if path == "/num_leds":
|
||
|
try:
|
||
|
self.num_leds = int(post_data)
|
||
|
self.patterns.update_num_leds(4, self.num_leds)
|
||
|
self.save_settings()
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||
|
except ValueError:
|
||
|
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
|
||
|
|
||
|
elif path == "/pattern":
|
||
|
self.selected_pattern = post_data
|
||
|
self.patterns.selected = post_data
|
||
|
self.save_settings()
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||
|
|
||
|
elif path == "/delay":
|
||
|
try:
|
||
|
self.delay = int(post_data)
|
||
|
self.patterns.set_delay(self.delay)
|
||
|
self.save_settings()
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||
|
except ValueError:
|
||
|
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
|
||
|
|
||
|
elif path == "/color":
|
||
|
# try:
|
||
|
self.patterns.set_color1(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
|
||
|
self.save_settings()
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||
|
# except:
|
||
|
|
||
|
# client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
|
||
|
elif path == "/color2":
|
||
|
try:
|
||
|
self.patterns.set_color2(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
|
||
|
self.save_settings()
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||
|
except:
|
||
|
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
|
||
|
else:
|
||
|
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
|
||
|
|
||
|
def handle_get(self, path, client_socket):
|
||
|
if path == "/":
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
|
||
|
client_socket.send(self.html)
|
||
|
elif path == "/main.js":
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/javascript\r\n\r\n')
|
||
|
client_socket.send(self.js)
|
||
|
elif path == "/main.css":
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/css\r\n\r\n')
|
||
|
client_socket.send(self.css)
|
||
|
elif path == "/patterns":
|
||
|
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
|
||
|
client_socket.send(self.patterns_json.encode())
|
||
|
else:
|
||
|
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
|
||
|
|
||
|
def start(self):
|
||
|
count = 0
|
||
|
try:
|
||
|
while True:
|
||
|
count += 1
|
||
|
events = self.poll.poll(1)
|
||
|
for file in events:
|
||
|
if file[0] == self.server_socket:
|
||
|
client_socket, addr = self.server_socket.accept()
|
||
|
request = client_socket.recv(1024).decode()
|
||
|
method, path, _ = request.split('\r\n')[0].split()
|
||
|
print(f"Method: {method}, Path: {path}")
|
||
|
if method == "POST":
|
||
|
post_data = request.split('\r\n\r\n')[1] if '\r\n\r\n' in request else ''
|
||
|
self.handle_post(path, post_data, client_socket)
|
||
|
elif method == "GET":
|
||
|
self.handle_get(path, client_socket)
|
||
|
client_socket.close()
|
||
|
|
||
|
if count > 50:
|
||
|
self.led.off()
|
||
|
if count > 100:
|
||
|
self.led.on()
|
||
|
count = 0
|
||
|
|
||
|
self.patterns.tick()
|
||
|
|
||
|
except Exception as e:
|
||
|
print("Error:", e)
|
||
|
finally:
|
||
|
self.server_socket.close()
|
||
|
self.save_settings()
|
||
|
|
||
|
# Example of creating and starting the server
|
||
|
if __name__ == "__main__":
|
||
|
server = LEDServer()
|
||
|
server.start()
|