Move to src folder
This commit is contained in:
parent
acba80b958
commit
12be71ae47
27
boot.py
27
boot.py
|
@ -1,27 +0,0 @@
|
|||
import network
|
||||
from machine import Pin
|
||||
from config import *
|
||||
|
||||
def do_connect():
|
||||
led = Pin(8, Pin.OUT)
|
||||
sta_if = network.WLAN(network.STA_IF)
|
||||
sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1'))
|
||||
if not sta_if.isconnected():
|
||||
print('connecting to network...')
|
||||
sta_if.active(True)
|
||||
sta_if.connect(ssid, password)
|
||||
led.on()
|
||||
while not sta_if.isconnected():
|
||||
pass
|
||||
print('network config:', sta_if.ifconfig())
|
||||
|
||||
do_connect()
|
||||
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
ap.active(True)
|
||||
ap.config(essid="led", password="qwerty1234")
|
||||
print(ap.ifconfig())
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
# import network
|
||||
# from machine import Pin
|
||||
# from config import *
|
||||
# from time import sleep
|
||||
# import ubinascii
|
||||
|
||||
# def do_connect():
|
||||
# led = Pin(8, Pin.OUT)
|
||||
# sta_if = network.WLAN(network.STA_IF)
|
||||
# sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1'))
|
||||
# if not sta_if.isconnected():
|
||||
# print('connecting to network...')
|
||||
# sta_if.active(True)
|
||||
# sta_if.connect(ssid, password)
|
||||
# led.on()
|
||||
# for i in range(10):
|
||||
# if sta_if.isconnected():
|
||||
# print('network config:', sta_if.ifconfig())
|
||||
# led.off()
|
||||
# break
|
||||
# sleep(1)
|
||||
|
||||
# do_connect()
|
||||
|
||||
# ap = network.WLAN(network.AP_IF)
|
||||
# ap.active(True)
|
||||
# ap_mac = ap.config('mac')
|
||||
# ssid = f"led-{ubinascii.hexlify(ap_mac).decode()}"
|
||||
# print(ssid)
|
||||
# ap.config(essid=ssid, password="qwerty1234")
|
||||
# print(ap.ifconfig())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import wifi
|
|
@ -0,0 +1,27 @@
|
|||
import network
|
||||
import espnow
|
||||
import select
|
||||
|
||||
# A WLAN interface must be active to send()/recv()
|
||||
ap = network.WLAN(network.AP_IF)
|
||||
ap.active(True)
|
||||
ap.disconnect() # Because ESP8266 auto-connects to last Access Point
|
||||
|
||||
ap_mac = ap.config('mac')
|
||||
print(ap_mac)
|
||||
print(ubinascii.hexlify(ap_mac))
|
||||
|
||||
e = espnow.ESPNow()
|
||||
e.active(True)
|
||||
|
||||
poll = select.poll()
|
||||
poll.register(e, select.POLLIN)
|
||||
poll.poll(0)
|
||||
|
||||
while True:
|
||||
for event in poll.poll(0) :
|
||||
if event[0] == e:
|
||||
host, msg = e.recv()
|
||||
if msg: # msg == None if timeout in recv()
|
||||
print(host, msg)
|
||||
|
|
@ -8,6 +8,7 @@ body {
|
|||
h1 {
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
form {
|
||||
margin-bottom: 20px;
|
||||
}
|
|
@ -6,6 +6,7 @@ import select
|
|||
import json
|
||||
import utime
|
||||
import sys
|
||||
import espnow
|
||||
|
||||
class LEDServer:
|
||||
SETTINGS_FILE = "/settings.json" # Path should be adjusted for MicroPython's filesystem
|
||||
|
@ -15,10 +16,13 @@ class LEDServer:
|
|||
self.settings = Settings()
|
||||
|
||||
print(self.settings)
|
||||
self.patterns = Patterns(pin, num_leds)
|
||||
self.patterns = Patterns(pin, self.settings["num_leds"])
|
||||
self.patterns.select(self.settings["selected_pattern"])
|
||||
self.patterns.set_color1(tuple(int(self.settings["color1"][i:i+2], 16) for i in (1, 5, 3)))
|
||||
self.patterns.set_color2(tuple(int(self.settings["color2"][i:i+2], 16) for i in (1, 5, 3)))
|
||||
self.patterns.set_brightness(int(self.settings["brightness"]))
|
||||
self.patterns.set_delay(int(self.settings["delay"]))
|
||||
|
||||
# Initialize single LED
|
||||
self.led = Pin(led_pin, Pin.OUT)
|
||||
|
||||
|
@ -29,8 +33,14 @@ class LEDServer:
|
|||
self.server_socket.listen(1)
|
||||
self.server_socket.settimeout(1) # Adjust timeout as needed
|
||||
|
||||
self.e = espnow.ESPNow()
|
||||
self.e.active(True)
|
||||
|
||||
self.poll = select.poll()
|
||||
self.poll.register(self.server_socket, select.POLLIN)
|
||||
self.poll.register(self.e, select.POLLIN)
|
||||
|
||||
|
||||
|
||||
self.html = self.read_file("/index.html")
|
||||
self.js = self.read_file("/main.js").encode('utf-8')
|
||||
|
@ -39,6 +49,8 @@ class LEDServer:
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def read_file(self, file_path):
|
||||
try:
|
||||
with open(file_path, 'r') as file:
|
||||
|
@ -103,7 +115,7 @@ class LEDServer:
|
|||
|
||||
def color(self, client_socket, post_data):
|
||||
try:
|
||||
self.patterns.set_color1(tuple(int(post_data[i:i+2], 16) for i in (1, 5, 3))) # Convert hex to RGB
|
||||
self.patterns.set_color1(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
|
||||
self.settings["color1"] = post_data
|
||||
self.settings.save()
|
||||
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||||
|
@ -112,7 +124,7 @@ class LEDServer:
|
|||
|
||||
def color2(self, client_socket, post_data):
|
||||
try:
|
||||
self.patterns.set_color2(tuple(int(post_data[i:i+2], 16) for i in (1, 5, 3))) # Convert hex to RGB
|
||||
self.patterns.set_color2(tuple(int(post_data[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB
|
||||
self.settings["color2"] = post_data
|
||||
self.settings.save()
|
||||
client_socket.send(b'HTTP/1.0 200 OK\r\n\r\n')
|
||||
|
@ -121,6 +133,7 @@ class LEDServer:
|
|||
|
||||
def handle_get(self, path, client_socket):
|
||||
if path == "/":
|
||||
print(self.settings)
|
||||
client_socket.send(b'HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
|
||||
client_socket.send(self.html.format(
|
||||
num_leds=self.settings["num_leds"],
|
||||
|
@ -148,7 +161,7 @@ class LEDServer:
|
|||
try:
|
||||
while True:
|
||||
count += 1
|
||||
events = self.poll.poll(1)
|
||||
events = self.poll.poll(0)
|
||||
for file in events:
|
||||
if file[0] == self.server_socket:
|
||||
client_socket, addr = self.server_socket.accept()
|
||||
|
@ -161,6 +174,10 @@ class LEDServer:
|
|||
elif method == "GET":
|
||||
self.handle_get(path, client_socket)
|
||||
client_socket.close()
|
||||
elif file[0] == self.e:
|
||||
host, msg = self.e.recv()
|
||||
if msg: # msg == None if timeout in recv()
|
||||
print(host, msg)
|
||||
|
||||
if count > 50:
|
||||
self.led.off()
|
||||
|
@ -176,6 +193,7 @@ class LEDServer:
|
|||
sys.print_exception(e)
|
||||
finally:
|
||||
self.server_socket.close()
|
||||
self.settings.save()
|
||||
|
||||
# Example of creating and starting the server
|
||||
if __name__ == "__main__":
|
|
@ -34,7 +34,7 @@ class Settings(dict):
|
|||
self.update(loaded_settings)
|
||||
print("Settings loaded successfully.")
|
||||
except Exception as e:
|
||||
print(f"Error loading settings: {e}")
|
||||
print(f"Error loading settings")
|
||||
self.set_defaults()
|
||||
|
||||
# Example usage
|
|
@ -0,0 +1,62 @@
|
|||
# Rui Santos & Sara Santos - Random Nerd Tutorials
|
||||
# Complete project details at https://RandomNerdTutorials.com/raspberry-pi-pico-w-asynchronous-web-server-micropython/
|
||||
|
||||
# Import necessary modules
|
||||
import network
|
||||
import asyncio
|
||||
import socket
|
||||
import time
|
||||
import random
|
||||
from machine import Pin
|
||||
|
||||
# Asynchronous functio to handle client's requests
|
||||
async def handle_client(reader, writer):
|
||||
global state
|
||||
|
||||
print("Client connected")
|
||||
request_line = await reader.readline()
|
||||
print('Request:', request_line)
|
||||
|
||||
# Skip HTTP request headers
|
||||
while await reader.readline() != b"\r\n":
|
||||
pass
|
||||
|
||||
request = str(request_line, 'utf-8').split()[1]
|
||||
print('Request:', request)
|
||||
|
||||
|
||||
|
||||
# Generate HTML response
|
||||
response = "Hello"
|
||||
|
||||
# Send the HTTP response and close the connection
|
||||
writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
|
||||
writer.write(response)
|
||||
await writer.drain()
|
||||
await writer.wait_closed()
|
||||
print('Client Disconnected')
|
||||
|
||||
async def main():
|
||||
# Start the server and run the event loop
|
||||
print('Setting up server')
|
||||
server = asyncio.start_server(handle_client, "0.0.0.0", 80)
|
||||
asyncio.create_task(server)
|
||||
|
||||
while True:
|
||||
# Add other tasks that you might need to do in the loop
|
||||
await asyncio.sleep(5)
|
||||
print('This message will be printed every 5 seconds')
|
||||
|
||||
|
||||
# Create an Event Loop
|
||||
loop = asyncio.get_event_loop()
|
||||
# Create a task to run the main function
|
||||
loop.create_task(main())
|
||||
|
||||
try:
|
||||
# Run the event loop indefinitely
|
||||
loop.run_forever()
|
||||
except Exception as e:
|
||||
print('Error occured: ', e)
|
||||
except KeyboardInterrupt:
|
||||
print('Program Interrupted by the user')
|
|
@ -0,0 +1,37 @@
|
|||
import network
|
||||
from machine import Pin
|
||||
from config import *
|
||||
from time import sleep
|
||||
import ubinascii
|
||||
|
||||
def do_connect():
|
||||
led = Pin(8, Pin.OUT)
|
||||
sta_if = network.WLAN(network.STA_IF)
|
||||
sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1'))
|
||||
if not sta_if.isconnected():
|
||||
print('connecting to network...')
|
||||
sta_if.active(True)
|
||||
sta_if.connect(ssid, password)
|
||||
led.on()
|
||||
for i in range(10):
|
||||
if sta_if.isconnected():
|
||||
print('network config:', sta_if.ifconfig())
|
||||
led.off()
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
do_connect()
|
||||
|
||||
def ap(password):
|
||||
ap_if = network.WLAN(network.AP_IF)
|
||||
ap_if.active(True)
|
||||
ap_mac = ap_if.config('mac')
|
||||
ssid = f"led-{ubinascii.hexlify(ap_mac).decode()}"
|
||||
print(ssid)
|
||||
ap_if.config(essid=ssid, password="qwerty1234")
|
||||
print(ap_if.ifconfig())
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue