Files
ws281x/src/main.py
2024-12-11 22:42:56 +13:00

251 lines
9.7 KiB
Python

from machine import Pin, reset
from patterns import Patterns
from settings import Settings
import socket
import select
import json
import utime
import sys
import espnow
import wifi
class LEDServer:
SETTINGS_FILE = "/settings.json" # Path should be adjusted for MicroPython's filesystem
def __init__(self, num_leds=50, pin=4, led_pin=8, brigtness=255):
# Initialize NeoPixel Patterns
self.settings = Settings()
print(self.settings)
self.patterns = Patterns(pin, self.settings["num_leds"])
self.patterns.select(self.settings["selected_pattern"])
self.patterns.set_color1(tuple(int(self.settings["color1"][i:i+2], 16) for i in (1, 5, 3)))
self.patterns.set_color2(tuple(int(self.settings["color2"][i:i+2], 16) for i in (1, 5, 3)))
self.patterns.set_brightness(int(self.settings["brightness"]))
self.patterns.set_delay(int(self.settings["delay"]))
# Initialize single LED
self.led = Pin(led_pin, Pin.OUT)
# Initialize server
self.server_socket = socket.socket()
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('0.0.0.0', 80))
self.server_socket.listen(1)
self.server_socket.settimeout(1) # Adjust timeout as needed
self.e = espnow.ESPNow()
self.e.active(True)
self.poll = select.poll()
self.poll.register(self.server_socket, select.POLLIN)
self.poll.register(self.e, select.POLLIN)
self.html = self.read_file("/index.html")
self.js = self.read_file("/main.js").encode('utf-8')
self.css = self.read_file("/main.css").encode('utf-8')
self.patterns_json = json.dumps(list(self.patterns.patterns.keys()))
ssid = self.settings["wifi"]["ssid"]
password = self.settings["wifi"]["password"]
ip = self.settings.get("wifi", {}).get("ip", None)
gateway = self.settings.get("wifi", {}).get("gateway", None)
wifi.ap(password="qwerty")
config = wifi.do_connect(ssid, password, ip, gateway)
if config is not None:
print(config)
def read_file(self, file_path):
try:
with open(file_path, 'r') as file:
return file.read()
except OSError as e:
print(f"Error reading file {file_path}: {e}")
return ""
def handle_post(self, path, post_data, client_socket):
paths = {
"/num_leds" : self.num_leds,
"/pattern": self.pattern,
"/delay": self.delay,
"/brightness": self.brightness,
"/color": self.color,
"/color2": self.color2,
"/wifi_settings": self.wifi_settings,
"set_leds": self.set_leds
}
if path in paths:
paths[path](client_socket, post_data)
else:
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
def num_leds(self, client_socket, post_data):
try:
num_leds = int(post_data)
self.patterns.update_num_leds(4,num_leds)
self.settings["num_leds"] = num_leds
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def pattern(self, client_socket, post_data):
if self.patterns.select(post_data):
self.settings["selected_pattern"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
else:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def delay(self, client_socket, post_data):
try:
delay = int(post_data)
self.patterns.set_delay(delay)
self.settings["delay"] = delay
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def brightness(self, client_socket, post_data):
try:
brightness = int(post_data)
self.patterns.set_brightness(brightness)
self.settings["brightness"] = brightness
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def color(self, client_socket, post_data):
try:
self.patterns.set_color1(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
self.settings["color1"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def color2(self, client_socket, post_data):
try:
self.patterns.set_color2(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
self.settings["color2"] = post_data
self.settings.save()
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
except:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def wifi_settings(self, client_socket, post_data):
try:
lines = post_data.split('\n')
ssid, password, ip, gateway = (lines + ["", "", None, None])[:4]
self.settings["wifi"]["ssid"] = ssid
self.settings["wifi"]["password"] = password
self.settings["wifi"]["ip"] = ip
self.settings["wifi"]["gateway"] = gateway
self.settings.save()
self.settings.load()
print(self.settings)
print(ssid, password)
config = wifi.do_connect(ssid, password, ip ,gateway)
if config is not None:
print(config)
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
else:
client_socket.send(b'HTTP/1.0 500 Failed to connect to wifi\r\n\r\n')
except ValueError:
client_socket.send(b'HTTP/1.0 400 Bad request\r\n\r\n')
def set_leds(self, client_socket, post_data):
if len(post_data == self.settings["num_leds"]*3):
print(post_data)
self.patterns.set(0, (255,255,255))
else:
print("Wrong number of leds")
def handle_get(self, path, client_socket):
if path == "/":
print(self.settings)
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
client_socket.send(self.html.format(
num_leds=self.settings["num_leds"],
delay=self.settings["delay"],
brightness=self.settings["brightness"],
color=self.settings["color1"],
color2=self.settings["color2"],
ssid=self.settings["wifi"]["ssid"],
password=self.settings["wifi"]["password"],
ip=self.settings.get("wifi", {}).get("ip", ""),
gateway=self.settings.get("wifi", {}).get("gateway", "").encode() # Only encode if it's a string
))
elif path == "/main.js":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/javascript\r\n\r\n')
client_socket.send(self.js)
elif path == "/main.css":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/css\r\n\r\n')
client_socket.send(self.css)
elif path == "/patterns":
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
client_socket.send(self.patterns_json.encode())
else:
client_socket.send(b'HTTP/1.0 404 Not Found\r\nContent-Type: text/plain\r\n\r\n')
def start(self):
count = 0
average_time = 0
previous_time = utime.ticks_us()
try:
while True:
count += 1
events = self.poll.poll(0)
for file in events:
if file[0] == self.server_socket:
client_socket, addr = self.server_socket.accept()
request = client_socket.recv(1024).decode()
method, path, _ = request.split('\r\n')[0].split()
print(f"Method: {method}, Path: {path}")
if method == "POST":
post_data = request.split('\r\n\r\n')[1] if '\r\n\r\n' in request else ''
self.handle_post(path, post_data, client_socket)
elif method == "GET":
self.handle_get(path, client_socket)
client_socket.close()
elif file[0] == self.e:
host, msg = self.e.recv()
if msg: # msg == None if timeout in recv()
print(host, msg)
if count > 50:
self.led.off()
#print(utime.ticks_us() - previous_time)
if count > 100:
self.led.on()
count = 0
# previous_time = utime.ticks_us()
self.patterns.tick()
except Exception as e:
print("Error:", e)
sys.print_exception(e)
finally:
self.server_socket.close()
self.settings.save()
def main():
server = LEDServer()
server.start()
# Example of creating and starting the server
if __name__ == "__main__":
main()