ws281x/main.py

184 lines
7.2 KiB
Python

from machine import Pin
from patterns import Patterns
from settings import Settings
import socket
import select
import json
import utime
import sys
class LEDServer:
SETTINGS_FILE = "/settings.json" # Path should be adjusted for MicroPython's filesystem
def __init__(self, num_leds=50, pin=4, led_pin=8, brigtness=255):
# Initialize NeoPixel Patterns
self.settings = Settings()
print(self.settings)
self.patterns = Patterns(pin, num_leds)
self.patterns.select(self.settings["selected_pattern"])
self.patterns.set_color1(tuple(int(self.settings["color1"][i:i+2], 16) for i in (1, 5, 3)))
self.patterns.set_color2(tuple(int(self.settings["color2"][i:i+2], 16) for i in (1, 5, 3)))
# Initialize single LED
self.led = Pin(led_pin, Pin.OUT)
# Initialize server
self.server_socket = socket.socket()
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('0.0.0.0', 80))
self.server_socket.listen(1)
self.server_socket.settimeout(1) # Adjust timeout as needed
self.poll = select.poll()
self.poll.register(self.server_socket, select.POLLIN)
self.html = self.read_file("/index.html")
self.js = self.read_file("/main.js").encode('utf-8')
self.css = self.read_file("/main.css").encode('utf-8')
self.patterns_json = json.dumps(list(self.patterns.patterns.keys()))
def read_file(self, file_path):
try:
with open(file_path, 'r') as file:
return file.read()
except OSError as e:
print(f"Error reading file {file_path}: {e}")
return ""
def handle_post(self, path, post_data, client_socket):
paths = {
"/num_leds" : self.num_leds,
"/pattern": self.pattern,
"/delay": self.delay,
"/brightness": self.brightness,
"/color": self.color,
"/color2": self.color2
}
if path in paths:
paths[path](client_socket, post_data)
else:
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
def num_leds(self, client_socket, post_data):
try:
num_leds = int(post_data)
self.patterns.update_num_leds(4,num_leds)
self.settings["num_leds"] = num_leds
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def pattern(self, client_socket, post_data):
if self.patterns.select(post_data):
self.settings["selected_pattern"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
else:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def delay(self, client_socket, post_data):
try:
delay = int(post_data)
self.patterns.set_delay(delay)
self.settings["delay"] = delay
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def brightness(self, client_socket, post_data):
try:
brightness = int(post_data)
self.patterns.set_brightness(brightness)
self.settings["brightness"] = brightness
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def color(self, client_socket, post_data):
try:
self.patterns.set_color1(tuple(int(post_data[i:i+2], 16) for i in (1, 5, 3))) # Convert hex to RGB
self.settings["color1"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def color2(self, client_socket, post_data):
try:
self.patterns.set_color2(tuple(int(post_data[i:i+2], 16) for i in (1, 5, 3))) # Convert hex to RGB
self.settings["color2"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def handle_get(self, path, client_socket):
if path == "/":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
client_socket.send(self.html.format(
num_leds=self.settings["num_leds"],
delay=self.settings["delay"],
brightness=self.settings["brightness"],
color=self.settings["color1"],
color2=self.settings["color2"],
ssid=self.settings["wifi"]["ssid"]).encode())
elif path == "/main.js":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/javascript\r\n\r\n')
client_socket.send(self.js)
elif path == "/main.css":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/css\r\n\r\n')
client_socket.send(self.css)
elif path == "/patterns":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
client_socket.send(self.patterns_json.encode())
else:
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
def start(self):
count = 0
average_time = 0
previous_time = utime.ticks_us()
try:
while True:
count += 1
events = self.poll.poll(1)
for file in events:
if file[0] == self.server_socket:
client_socket, addr = self.server_socket.accept()
request = client_socket.recv(1024).decode()
method, path, _ = request.split('\r\n')[0].split()
print(f"Method: {method}, Path: {path}")
if method == "POST":
post_data = request.split('\r\n\r\n')[1] if '\r\n\r\n' in request else ''
self.handle_post(path, post_data, client_socket)
elif method == "GET":
self.handle_get(path, client_socket)
client_socket.close()
if count > 50:
self.led.off()
#print(utime.ticks_us() - previous_time)
if count > 100:
self.led.on()
count = 0
# previous_time = utime.ticks_us()
self.patterns.tick()
except Exception as e:
print("Error:", e)
sys.print_exception(e)
finally:
self.server_socket.close()
# Example of creating and starting the server
if __name__ == "__main__":
server = LEDServer()
server.start()