3 Commits

Author SHA1 Message Date
af9b63565a Remove web interface 2025-10-15 18:48:51 +13:00
e1b844241d Remove web interface 2025-10-15 18:48:15 +13:00
14b87f40ef Remove web interface 2025-10-15 18:47:23 +13:00
28 changed files with 439 additions and 3634 deletions

View File

@@ -1,2 +0,0 @@
from microdot.microdot import Microdot, Request, Response, abort, redirect, \
send_file # noqa: F401

View File

@@ -1,8 +0,0 @@
try:
from functools import wraps
except ImportError: # pragma: no cover
# MicroPython does not currently implement functools.wraps
def wraps(wrapped):
def _(wrapper):
return wrapper
return _

File diff suppressed because it is too large Load Diff

View File

@@ -1,70 +0,0 @@
from utemplate import recompile
_loader = None
class Template:
"""A template object.
:param template: The filename of the template to render, relative to the
configured template directory.
"""
@classmethod
def initialize(cls, template_dir='templates',
loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default
is the ``recompile.Loader`` class, which
automatically recompiles templates when they
change.
"""
global _loader
_loader = loader_class(None, template_dir)
def __init__(self, template):
if _loader is None: # pragma: no cover
self.initialize()
#: The name of the template
self.name = template
self.template = _loader.load(template)
def generate(self, *args, **kwargs):
"""Return a generator that renders the template in chunks, with the
given arguments."""
return self.template(*args, **kwargs)
def render(self, *args, **kwargs):
"""Render the template with the given arguments and return it as a
string."""
return ''.join(self.generate(*args, **kwargs))
def generate_async(self, *args, **kwargs):
"""Return an asynchronous generator that renders the template in
chunks, using the given arguments."""
class sync_to_async_iter():
def __init__(self, iter):
self.iter = iter
def __aiter__(self):
return self
async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration
return sync_to_async_iter(self.generate(*args, **kwargs))
async def render_async(self, *args, **kwargs):
"""Render the template with the given arguments asynchronously and
return it as a string."""
response = ''
async for chunk in self.generate_async(*args, **kwargs):
response += chunk
return response

View File

@@ -1,231 +0,0 @@
import binascii
import hashlib
from microdot import Request, Response
from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception
from microdot.helpers import wraps
class WebSocketError(Exception):
"""Exception raised when an error occurs in a WebSocket connection."""
pass
class WebSocket:
"""A WebSocket connection object.
An instance of this class is sent to handler functions to manage the
WebSocket connection.
"""
CONT = 0
TEXT = 1
BINARY = 2
CLOSE = 8
PING = 9
PONG = 10
#: Specify the maximum message size that can be received when calling the
#: ``receive()`` method. Messages with payloads that are larger than this
#: size will be rejected and the connection closed. Set to 0 to disable
#: the size check (be aware of potential security issues if you do this),
#: or to -1 to use the value set in
#: ``Request.max_body_length``. The default is -1.
#:
#: Example::
#:
#: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages
max_message_length = -1
def __init__(self, request):
self.request = request
self.closed = False
async def handshake(self):
response = self._handshake_response()
await self.request.sock[1].awrite(
b'HTTP/1.1 101 Switching Protocols\r\n')
await self.request.sock[1].awrite(b'Upgrade: websocket\r\n')
await self.request.sock[1].awrite(b'Connection: Upgrade\r\n')
await self.request.sock[1].awrite(
b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
async def receive(self):
"""Receive a message from the client."""
while True:
opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
if send_opcode: # pragma: no cover
await self.send(data, send_opcode)
elif data: # pragma: no branch
return data
async def send(self, data, opcode=None):
"""Send a message to the client.
:param data: the data to send, given as a string or bytes.
:param opcode: a custom frame opcode to use. If not given, the opcode
is ``TEXT`` or ``BINARY`` depending on the type of the
data.
"""
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
data)
await self.request.sock[1].awrite(frame)
async def close(self):
"""Close the websocket connection."""
if not self.closed: # pragma: no cover
self.closed = True
await self.send(b'', self.CLOSE)
def _handshake_response(self):
connection = False
upgrade = False
websocket_key = None
for header, value in self.request.headers.items():
h = header.lower()
if h == 'connection':
connection = True
if 'upgrade' not in value.lower():
return self.request.app.abort(400)
elif h == 'upgrade':
upgrade = True
if not value.lower() == 'websocket':
return self.request.app.abort(400)
elif h == 'sec-websocket-key':
websocket_key = value
if not connection or not upgrade or not websocket_key:
return self.request.app.abort(400)
d = hashlib.sha1(websocket_key.encode())
d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
return binascii.b2a_base64(d.digest())[:-1]
@classmethod
def _parse_frame_header(cls, header):
fin = header[0] & 0x80
opcode = header[0] & 0x0f
if fin == 0 or opcode == cls.CONT: # pragma: no cover
raise WebSocketError('Continuation frames not supported')
has_mask = header[1] & 0x80
length = header[1] & 0x7f
if length == 126:
length = -2
elif length == 127:
length = -8
return fin, opcode, has_mask, length
def _process_websocket_frame(self, opcode, payload):
if opcode == self.TEXT:
payload = payload.decode()
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
raise WebSocketError('Websocket connection closed')
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
return None, None
return None, payload
@classmethod
def _encode_websocket_frame(cls, opcode, payload):
frame = bytearray()
frame.append(0x80 | opcode)
if opcode == cls.TEXT:
payload = payload.encode()
if len(payload) < 126:
frame.append(len(payload))
elif len(payload) < (1 << 16):
frame.append(126)
frame.extend(len(payload).to_bytes(2, 'big'))
else:
frame.append(127)
frame.extend(len(payload).to_bytes(8, 'big'))
frame.extend(payload)
return frame
async def _read_frame(self):
header = await self.request.sock[0].read(2)
if len(header) != 2: # pragma: no cover
raise WebSocketError('Websocket connection closed')
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == -2:
length = await self.request.sock[0].read(2)
length = int.from_bytes(length, 'big')
elif length == -8:
length = await self.request.sock[0].read(8)
length = int.from_bytes(length, 'big')
max_allowed_length = Request.max_body_length \
if self.max_message_length == -1 else self.max_message_length
if length > max_allowed_length:
raise WebSocketError('Message too large')
if has_mask: # pragma: no cover
mask = await self.request.sock[0].read(4)
payload = await self.request.sock[0].read(length)
if has_mask: # pragma: no cover
payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
return opcode, payload
async def websocket_upgrade(request):
"""Upgrade a request handler to a websocket connection.
This function can be called directly inside a route function to process a
WebSocket upgrade handshake, for example after the user's credentials are
verified. The function returns the websocket object::
@app.route('/echo')
async def echo(request):
if not authenticate_user(request):
abort(401)
ws = await websocket_upgrade(request)
while True:
message = await ws.receive()
await ws.send(message)
"""
ws = WebSocket(request)
await ws.handshake()
@request.after_request
async def after_request(request, response):
return Response.already_handled
return ws
def websocket_wrapper(f, upgrade_function):
@wraps(f)
async def wrapper(request, *args, **kwargs):
ws = await upgrade_function(request)
try:
await f(request, ws, *args, **kwargs)
except OSError as exc:
if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover
raise
except WebSocketError:
pass
except Exception as exc:
print_exception(exc)
finally: # pragma: no cover
try:
await ws.close()
except Exception:
pass
return Response.already_handled
return wrapper
def with_websocket(f):
"""Decorator to make a route a WebSocket endpoint.
This decorator is used to define a route that accepts websocket
connections. The route then receives a websocket object as a second
argument that it can use to send and receive messages::
@app.route('/echo')
@with_websocket
async def echo(request, ws):
while True:
message = await ws.receive()
await ws.send(message)
"""
return websocket_wrapper(f, websocket_upgrade)

View File

@@ -1,14 +0,0 @@
class Loader:
def __init__(self, pkg, dir):
if dir == ".":
dir = ""
else:
dir = dir.replace("/", ".") + "."
if pkg and pkg != "__main__":
dir = pkg + "." + dir
self.p = dir
def load(self, name):
name = name.replace(".", "_")
return __import__(self.p + name, None, None, (name,)).render

View File

@@ -1,21 +0,0 @@
# (c) 2014-2020 Paul Sokolovsky. MIT license.
try:
from uos import stat, remove
except:
from os import stat, remove
from . import source
class Loader(source.Loader):
def load(self, name):
o_path = self.pkg_path + self.compiled_path(name)
i_path = self.pkg_path + self.dir + "/" + name
try:
o_stat = stat(o_path)
i_stat = stat(i_path)
if i_stat[8] > o_stat[8]:
# input file is newer, remove output to force recompile
remove(o_path)
finally:
return super().load(name)

View File

@@ -1,188 +0,0 @@
# (c) 2014-2019 Paul Sokolovsky. MIT license.
from . import compiled
class Compiler:
START_CHAR = "{"
STMNT = "%"
STMNT_END = "%}"
EXPR = "{"
EXPR_END = "}}"
def __init__(self, file_in, file_out, indent=0, seq=0, loader=None):
self.file_in = file_in
self.file_out = file_out
self.loader = loader
self.seq = seq
self._indent = indent
self.stack = []
self.in_literal = False
self.flushed_header = False
self.args = "*a, **d"
def indent(self, adjust=0):
if not self.flushed_header:
self.flushed_header = True
self.indent()
self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args))
self.stack.append("def")
self.file_out.write(" " * (len(self.stack) + self._indent + adjust))
def literal(self, s):
if not s:
return
if not self.in_literal:
self.indent()
self.file_out.write('yield """')
self.in_literal = True
self.file_out.write(s.replace('"', '\\"'))
def close_literal(self):
if self.in_literal:
self.file_out.write('"""\n')
self.in_literal = False
def render_expr(self, e):
self.indent()
self.file_out.write('yield str(' + e + ')\n')
def parse_statement(self, stmt):
tokens = stmt.split(None, 1)
if tokens[0] == "args":
if len(tokens) > 1:
self.args = tokens[1]
else:
self.args = ""
elif tokens[0] == "set":
self.indent()
self.file_out.write(stmt[3:].strip() + "\n")
elif tokens[0] == "include":
if not self.flushed_header:
# If there was no other output, we still need a header now
self.indent()
tokens = tokens[1].split(None, 1)
args = ""
if len(tokens) > 1:
args = tokens[1]
if tokens[0][0] == "{":
self.indent()
# "1" as fromlist param is uPy hack
self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2])
self.indent()
self.file_out.write("yield from _.render(%s)\n" % args)
return
with self.loader.input_open(tokens[0][1:-1]) as inc:
self.seq += 1
c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq)
inc_id = self.seq
self.seq = c.compile()
self.indent()
self.file_out.write("yield from render%d(%s)\n" % (inc_id, args))
elif len(tokens) > 1:
if tokens[0] == "elif":
assert self.stack[-1] == "if"
self.indent(-1)
self.file_out.write(stmt + ":\n")
else:
self.indent()
self.file_out.write(stmt + ":\n")
self.stack.append(tokens[0])
else:
if stmt.startswith("end"):
assert self.stack[-1] == stmt[3:]
self.stack.pop(-1)
elif stmt == "else":
assert self.stack[-1] == "if"
self.indent(-1)
self.file_out.write("else:\n")
else:
assert False
def parse_line(self, l):
while l:
start = l.find(self.START_CHAR)
if start == -1:
self.literal(l)
return
self.literal(l[:start])
self.close_literal()
sel = l[start + 1]
#print("*%s=%s=" % (sel, EXPR))
if sel == self.STMNT:
end = l.find(self.STMNT_END)
assert end > 0
stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip()
self.parse_statement(stmt)
end += len(self.STMNT_END)
l = l[end:]
if not self.in_literal and l == "\n":
break
elif sel == self.EXPR:
# print("EXPR")
end = l.find(self.EXPR_END)
assert end > 0
expr = l[start + len(self.START_CHAR + self.EXPR):end].strip()
self.render_expr(expr)
end += len(self.EXPR_END)
l = l[end:]
else:
self.literal(l[start])
l = l[start + 1:]
def header(self):
self.file_out.write("# Autogenerated file\n")
def compile(self):
self.header()
for l in self.file_in:
self.parse_line(l)
self.close_literal()
return self.seq
class Loader(compiled.Loader):
def __init__(self, pkg, dir):
super().__init__(pkg, dir)
self.dir = dir
if pkg == "__main__":
# if pkg isn't really a package, don't bother to use it
# it means we're running from "filesystem directory", not
# from a package.
pkg = None
self.pkg_path = ""
if pkg:
p = __import__(pkg)
if isinstance(p.__path__, str):
# uPy
self.pkg_path = p.__path__
else:
# CPy
self.pkg_path = p.__path__[0]
self.pkg_path += "/"
def input_open(self, template):
path = self.pkg_path + self.dir + "/" + template
return open(path)
def compiled_path(self, template):
return self.dir + "/" + template.replace(".", "_") + ".py"
def load(self, name):
try:
return super().load(name)
except (OSError, ImportError):
pass
compiled_path = self.pkg_path + self.compiled_path(name)
f_in = self.input_open(name)
f_out = open(compiled_path, "w")
c = Compiler(f_in, f_out, loader=self)
c.compile()
f_in.close()
f_out.close()
return super().load(name)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

View File

@@ -1,9 +0,0 @@
import settings
import wifi
from settings import Settings
s = Settings()
name = s.get('name', 'led')
password = s.get("ap_password", "")
wifi.ap(name, password)

View File

@@ -1,54 +1,79 @@
import asyncio
import aioespnow
from settings import Settings from settings import Settings
from web import web from web import web
from patterns import Patterns from patterns import Patterns
import gc import gc
import utime
import machine
import time
import wifi
import json import json
from p2p import p2p import espnow
import network
import asyncio
import json
import machine
async def main(): def main():
settings = Settings() settings = Settings()
patterns = Patterns(settings["led_pin"], settings["num_leds"], selected=settings["pattern"])
if settings["color_order"] == "rbg": color_order = (1, 5, 3)
else: color_order = (1, 3, 5)
patterns.set_color1(tuple(int(settings["color1"][i:i+2], 16) for i in color_order))
patterns.set_color2(tuple(int(settings["color2"][i:i+2], 16) for i in color_order))
patterns.set_brightness(int(settings["brightness"]))
patterns.set_delay(int(settings["delay"]))
async def tick():
while True:
patterns.tick()
await asyncio.sleep_ms(0)
async def system():
while True:
gc.collect()
for i in range(60):
wdt.feed()
await asyncio.sleep(1)
w = web(settings, patterns)
print(settings) print(settings)
# start the server in a bacakground task
print("Starting") if settings.get("color_order", "rgb") == "rbg":
server = asyncio.create_task(w.start_server(host="0.0.0.0", port=80)) color_order = (1, 5, 3)
else:
color_order = (1, 3, 5)
patterns = Patterns(settings["led_pin"], settings["num_leds"], brightness=255)
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
e = espnow.ESPNow()
e.config(rxbuf=1024)
e.active(True)
# Increase buffer size for 8-bar payloads (default 526 bytes might be too small) # Set to 1KB to handle larger multi-bar payloads
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
asyncio.create_task(tick()) #print mac in hex
asyncio.create_task(p2p(settings, patterns)) print("Mac address", sta_if.config("mac").hex())
asyncio.create_task(system()) print("Patterns", patterns.colors)
print("Patterns", patterns.selected)
patterns.select(patterns.selected)
while True:
# advance pattern based on its own returned schedule
# due = patterns.tick(due)
wdt.feed()
patterns.tick()
# Drain all pending packets and only process the latest
last_msg = None
while True:
host, msg = e.recv(0)
if not msg:
break
last_msg = msg
# cleanup before ending the application if last_msg:
await server try:
data = json.loads(last_msg)
print(data)
asyncio.run(main()) # Always update parameters from message
patterns.brightness = data.get("brightness", patterns.brightness)
patterns.delay = data.get("delay", patterns.delay)
patterns.colors = data.get("colors", patterns.colors)
patterns.selected = data.get("pattern", patterns.selected)
patterns.n1 = data.get("n1", patterns.n1)
patterns.n2 = data.get("n2", patterns.n2)
patterns.n3 = data.get("n3", patterns.n3)
patterns.n4 = data.get("n4", patterns.n4)
patterns.step = data.get("step", patterns.step)
patterns.auto = data.get("auto", patterns.auto)
patterns.select(patterns.selected)
print("Selected pattern", patterns.selected)
except Exception as ex:
print(f"Failed to load espnow data {last_msg}: {ex}")
continue
finally:
gc.collect()
main()

View File

@@ -1,20 +0,0 @@
import asyncio
import aioespnow
import json
async def p2p(settings, patterns):
e = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support
e.active(True)
async for mac, msg in e:
try:
data = json.loads(msg)
except:
print(f"Failed to load espnow data {msg}")
continue
print(data)
if "names" not in data or settings.get("name") in data.get("names", []):
if "step" in settings and isinstance(settings["step"], int):
patterns.set_pattern_step(settings["step"])
else:
settings.set_settings(data.get("settings", {}), patterns, data.get("save", False))
print("should not print")

View File

@@ -2,272 +2,395 @@ from machine import Pin
from neopixel import NeoPixel from neopixel import NeoPixel
import utime import utime
import random import random
import _thread
import asyncio
from patterns_base import Patterns as PatternsBase
# Short-key parameter mapping for convenience setters class Patterns:
param_mapping = {
"pt": "selected",
"pa": "selected",
"cl": "colors",
"br": "brightness",
"dl": "delay",
"nl": "num_leds",
"co": "color_order",
"lp": "led_pin",
"n1": "n1",
"n2": "n2",
"n3": "n3",
"n4": "n4",
"n5": "n5",
"n6": "n6",
"auto": "auto",
}
class Patterns(PatternsBase):
def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100): def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100):
super().__init__(pin, num_leds, color1, color2, brightness, selected, delay) self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
self.auto = True self.num_leds = num_leds
self.step = 0 self.pattern_step = 0
self.last_update = utime.ticks_ms()
self.delay = delay
self.brightness = brightness
self.patterns = { self.patterns = {
"off": self.off, "off": self.off,
"on" : self.on, "on" : self.on,
"blink": self.blink, "color_wipe": self.color_wipe_step,
"rainbow": self.rainbow, "rainbow_cycle": self.rainbow_cycle_step,
"pulse": self.pulse, "theater_chase": self.theater_chase_step,
"transition": self.transition, "blink": self.blink_step,
"color_transition": self.color_transition_step, # Added new pattern
"flicker": self.flicker_step,
"scanner": self.scanner_step, # New: Single direction scanner
"bidirectional_scanner": self.bidirectional_scanner_step, # New: Bidirectional scanner
"external": None
} }
self.selected = selected
# Ensure colors list always starts with at least two for robust transition handling
self.colors = [color1, color2] if color1 != color2 else [color1, (255, 255, 255)] # Fallback if initial colors are same
if not self.colors: # Ensure at least one color exists
self.colors = [(0, 0, 0)]
self.transition_duration = delay * 50 # Default transition duration
self.hold_duration = delay * 10 # Default hold duration at each color
self.transition_step = 0 # Current step in the transition
self.current_color_idx = 0 # Index of the color currently being held/transitioned from
self.current_color = self.colors[self.current_color_idx] # The actual blended color
self.hold_start_time = utime.ticks_ms() # Time when the current color hold started
# New attributes for scanner patterns
self.scanner_direction = 1 # 1 for forward, -1 for backward
self.scanner_tail_length = 3 # Number of trailing pixels
def sync(self):
self.pattern_step=0
self.last_update = utime.ticks_ms() - self.delay
if self.selected == "color_transition":
self.transition_step = 0
self.current_color_idx = 0
self.current_color = self.colors[self.current_color_idx]
self.hold_start_time = utime.ticks_ms() # Reset hold time
# Reset scanner specific variables
self.scanner_direction = 1
self.tick()
def set_pattern_step(self, step):
self.pattern_step = step
def tick(self):
if self.patterns[self.selected]:
self.patterns[self.selected]()
def update_num_leds(self, pin, num_leds):
self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
self.num_leds = num_leds
self.pattern_step = 0
def set_delay(self, delay):
self.delay = delay
# Update transition duration and hold duration when delay changes
self.transition_duration = self.delay * 50
self.hold_duration = self.delay * 10
def blink(self): def set_brightness(self, brightness):
self.stopped = False self.brightness = brightness
self.running = True
state = True # True = on, False = off
last_update = utime.ticks_ms()
while self.running: def set_color1(self, color):
current_time = utime.ticks_ms() if len(self.colors) > 0:
if utime.ticks_diff(current_time, last_update) >= self.delay: self.colors[0] = color
if state: if self.selected == "color_transition":
self.fill(self.apply_brightness(self.colors[0])) # If the first color is changed, potentially reset transition
# to start from this new color if we were about to transition from it
if self.current_color_idx == 0:
self.transition_step = 0
self.current_color = self.colors[0]
self.hold_start_time = utime.ticks_ms()
else:
self.colors.append(color)
def set_color2(self, color):
if len(self.colors) > 1:
self.colors[1] = color
elif len(self.colors) == 1:
self.colors.append(color)
else: # List is empty
self.colors.append((0,0,0)) # Dummy color
self.colors.append(color)
def set_colors(self, colors):
if colors and len(colors) >= 2:
self.colors = colors
if self.selected == "color_transition":
self.sync() # Reset transition if new color list is provided
elif colors and len(colors) == 1:
self.colors = [colors[0], (255,255,255)] # Add a default second color
if self.selected == "color_transition":
print("Warning: 'color_transition' requires at least two colors. Adding a default second color.")
self.sync()
else:
print("Error: set_colors requires a list of at least one color.")
self.colors = [(0,0,0), (255,255,255)] # Fallback
if self.selected == "color_transition":
self.sync()
def set_color(self, num, color):
# Changed: More robust index check
if 0 <= num < len(self.colors):
self.colors[num] = color
# If the changed color is part of the current or next transition,
# restart the transition for smoother updates
if self.selected == "color_transition":
current_from_idx = self.current_color_idx
current_to_idx = (self.current_color_idx + 1) % len(self.colors)
if num == current_from_idx or num == current_to_idx:
# If we change a color involved in the current transition,
# it's best to restart the transition state for smoothness.
self.transition_step = 0
self.current_color_idx = current_from_idx # Stay at the current starting color
self.current_color = self.colors[self.current_color_idx]
self.hold_start_time = utime.ticks_ms() # Reset hold
return True
elif num == len(self.colors): # Allow setting a new color at the end
self.colors.append(color)
return True
return False
def add_color(self, color):
self.colors.append(color)
if self.selected == "color_transition" and len(self.colors) == 2:
# If we just added the second color needed for transition
self.sync()
def del_color(self, num):
# Changed: More robust index check and using del for lists
if 0 <= num < len(self.colors):
del self.colors[num]
# If the color being deleted was part of the current transition,
# re-evaluate the current_color_idx
if self.selected == "color_transition":
if len(self.colors) < 2: # Need at least two colors for transition
print("Warning: Not enough colors for 'color_transition'. Switching to 'on'.")
self.select("on") # Or some other default
else:
# Adjust index if it's out of bounds after deletion or was the one transitioning from
self.current_color_idx %= len(self.colors)
self.transition_step = 0
self.current_color = self.colors[self.current_color_idx]
self.hold_start_time = utime.ticks_ms()
return True
return False
def apply_brightness(self, color, brightness_override=None):
effective_brightness = brightness_override if brightness_override is not None else self.brightness
return tuple(int(c * effective_brightness / 255) for c in color)
def select(self, pattern):
if pattern in self.patterns:
self.selected = pattern
self.sync() # Reset pattern state when selecting a new pattern
if pattern == "color_transition":
if len(self.colors) < 2:
print("Warning: 'color_transition' requires at least two colors. Switching to 'on'.")
self.selected = "on" # Fallback if not enough colors
self.sync() # Re-sync for the new pattern
else: else:
self.fill((0, 0, 0)) self.transition_step = 0
state = not state self.current_color_idx = 0 # Start from the first color in the list
last_update = current_time self.current_color = self.colors[self.current_color_idx]
self.running = False self.hold_start_time = utime.ticks_ms() # Reset hold timer
self.stopped = True self.transition_duration = self.delay * 50 # Initialize transition duration
self.hold_duration = self.delay * 10 # Initialize hold duration
return True
return False
def set(self, i, color):
self.n[i] = color
def rainbow(self): def write(self):
self.stopped = False self.n.write()
self.running = True
step = self.step % 256
step_amount = max(1, int(self.n1)) # n1 controls step increment
# If auto is False, run once and update step def fill(self, color=None):
if not self.auto: fill_color = color if color is not None else self.colors[0]
for i in range(self.num_leds): for i in range(self.num_leds):
rc_index = (i * 256 // self.num_leds) + step self.n[i] = fill_color
self.n[i] = self.apply_brightness(self.wheel(rc_index & 255)) self.n.write()
self.n.write()
# Increment step by n1 for next call
self.step = (step + step_amount) % 256
self.running = False
self.stopped = True
return
# Auto is True: run continuously def off(self):
sleep_ms = max(1, int(self.delay / 5)) self.fill((0, 0, 0))
last_update = utime.ticks_ms()
while self.running: def on(self):
current_time = utime.ticks_ms() self.fill(self.apply_brightness(self.colors[0]))
if utime.ticks_diff(current_time, last_update) >= sleep_ms:
def color_wipe_step(self):
color = self.apply_brightness(self.colors[0])
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, self.last_update) >= self.delay:
if self.pattern_step < self.num_leds:
for i in range(self.num_leds): for i in range(self.num_leds):
rc_index = (i * 256 // self.num_leds) + step self.n[i] = (0, 0, 0)
self.n[i] = self.apply_brightness(self.wheel(rc_index & 255)) self.n[self.pattern_step] = self.apply_brightness(color)
self.n.write() self.n.write()
step = (step + step_amount) % 256 self.pattern_step += 1
self.step = step else:
last_update = current_time self.pattern_step = 0
self.running = False self.last_update = current_time
self.stopped = True
def rainbow_cycle_step(self):
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, self.last_update) >= self.delay/5:
def wheel(pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def pulse(self): for i in range(self.num_leds):
self.stopped = False rc_index = (i * 256 // self.num_leds) + self.pattern_step
self.running = True self.n[i] = self.apply_brightness(wheel(rc_index & 255))
self.off() self.n.write()
self.pattern_step = (self.pattern_step + 1) % 256
self.last_update = current_time
# Get timing parameters with defaults if not set def theater_chase_step(self):
attack_ms = getattr(self, 'n1', 200) # Attack time in ms current_time = utime.ticks_ms()
hold_ms = getattr(self, 'n2', 200) # Hold time in ms if utime.ticks_diff(current_time, self.last_update) >= self.delay:
decay_ms = getattr(self, 'n3', 200) # Decay time in ms for i in range(self.num_leds):
if (i + self.pattern_step) % 3 == 0:
self.n[i] = self.apply_brightness(self.colors[0])
else:
self.n[i] = (0, 0, 0)
self.n.write()
self.pattern_step = (self.pattern_step + 1) % 3
self.last_update = current_time
# Ensure we have at least one color def blink_step(self):
if not self.colors: current_time = utime.ticks_ms()
self.colors = [(255, 255, 255)] if utime.ticks_diff(current_time, self.last_update) >= self.delay:
if self.pattern_step % 2 == 0:
self.fill(self.apply_brightness(self.colors[0]))
else:
self.fill((0, 0, 0))
self.pattern_step = (self.pattern_step + 1) % 2
self.last_update = current_time
color_index = 0 def color_transition_step(self):
# Calculate minimum update interval based on LED count current_time = utime.ticks_ms()
# NeoPixel timing: ~30µs per LED + reset time = ~6ms for 200 LEDs
# Use 10ms minimum to ensure writes complete + overhead
min_write_time_ms = (self.num_leds * 30) // 1000 + 1 # Convert µs to ms, add 1ms overhead
update_interval = max(10, min_write_time_ms + 4) # At least 10ms, add margin for safety
while self.running: # Check for hold duration first
cycle_start = utime.ticks_ms() if utime.ticks_diff(current_time, self.hold_start_time) < self.hold_duration:
# Still in hold phase, just display the current solid color
# Get the current color from the cycle self.fill(self.apply_brightness(self.current_color))
base_color = self.colors[color_index % len(self.colors)] self.last_update = current_time # Keep updating last_update to avoid skipping frames
# Attack phase: fade from 0 to full brightness
if attack_ms > 0:
attack_start = utime.ticks_ms()
last_update = attack_start
while self.running and utime.ticks_diff(utime.ticks_ms(), attack_start) < attack_ms:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= update_interval:
elapsed = utime.ticks_diff(now, attack_start)
brightness_factor = min(1.0, elapsed / attack_ms)
color = tuple(int(c * brightness_factor) for c in base_color)
self.fill(self.apply_brightness(color))
last_update = now
# Hold phase: maintain full brightness
if hold_ms > 0 and self.running:
self.fill(self.apply_brightness(base_color))
hold_start = utime.ticks_ms()
while self.running and utime.ticks_diff(utime.ticks_ms(), hold_start) < hold_ms:
pass
# Decay phase: fade from full brightness to 0
if decay_ms > 0:
decay_start = utime.ticks_ms()
last_update = decay_start
while self.running and utime.ticks_diff(utime.ticks_ms(), decay_start) < decay_ms:
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= update_interval:
elapsed = utime.ticks_diff(now, decay_start)
brightness_factor = max(0.0, 1.0 - (elapsed / decay_ms))
color = tuple(int(c * brightness_factor) for c in base_color)
self.fill(self.apply_brightness(color))
last_update = now
# Move to next color in the cycle
color_index += 1
# If auto flag is False, run only once and exit
if not self.auto:
break
# Ensure the cycle takes exactly delay milliseconds before restarting
if self.running:
self.off()
wait_until = utime.ticks_add(cycle_start, self.delay)
while self.running and utime.ticks_diff(wait_until, utime.ticks_ms()) > 0:
pass
self.running = False
self.stopped = True
def transition(self):
"""Transition between colors, taking delay ms between each color"""
self.stopped = False
self.running = True
if not self.colors:
# No colors, turn off
self.off()
self.running = False
self.stopped = True
return return
if len(self.colors) == 1: # If hold duration is over, proceed with transition
# Only one color, just stay that color if utime.ticks_diff(current_time, self.last_update) >= self.delay:
last_update = utime.ticks_ms() num_colors = len(self.colors)
while self.running: if num_colors < 2:
current_time = utime.ticks_ms() # Should not happen if select handles it, but as a safeguard
if utime.ticks_diff(current_time, last_update) >= 100: self.select("on")
self.fill(self.apply_brightness(self.colors[0]))
last_update = current_time
self.running = False
self.stopped = True
return
# If auto is False, only transition between color1 and color2
if not self.auto:
if len(self.colors) < 2:
# Need at least 2 colors for transition
self.running = False
self.stopped = True
return return
transition_duration = max(10, self.delay) # At least 10ms from_color = self.colors[self.current_color_idx]
update_interval = max(10, transition_duration // 50) # Update every ~2% of transition to_color_idx = (self.current_color_idx + 1) % num_colors
to_color = self.colors[to_color_idx]
# Transition from color1 to color2 # Calculate interpolation factor (0.0 to 1.0)
color1 = self.colors[0] # transition_step goes from 0 to transition_duration - 1
color2 = self.colors[1] if self.transition_duration > 0:
interp_factor = self.transition_step / self.transition_duration
else:
interp_factor = 1.0 # Immediately transition if duration is zero
transition_start = utime.ticks_ms() # Interpolate each color component
last_update = transition_start r = int(from_color[0] + (to_color[0] - from_color[0]) * interp_factor)
g = int(from_color[1] + (to_color[1] - from_color[1]) * interp_factor)
b = int(from_color[2] + (to_color[2] - from_color[2]) * interp_factor)
while self.running and utime.ticks_diff(utime.ticks_ms(), transition_start) < transition_duration: self.current_color = (r, g, b)
now = utime.ticks_ms() self.fill(self.apply_brightness(self.current_color))
if utime.ticks_diff(now, last_update) >= update_interval:
# Calculate interpolation factor (0.0 to 1.0)
elapsed = utime.ticks_diff(now, transition_start)
factor = min(1.0, elapsed / transition_duration)
# Interpolate between color1 and color2 self.transition_step += self.delay # Advance the transition step by the delay
interpolated = tuple(
int(color1[i] + (color2[i] - color1[i]) * factor)
for i in range(3)
)
# Apply brightness and fill if self.transition_step >= self.transition_duration:
self.fill(self.apply_brightness(interpolated)) # Transition complete, move to the next color and reset for hold phase
last_update = now self.current_color_idx = to_color_idx
self.current_color = self.colors[self.current_color_idx] # Ensure current_color is the exact target color
self.transition_step = 0 # Reset transition progress
self.hold_start_time = current_time # Start hold phase for the new color
self.running = False self.last_update = current_time
self.stopped = True
return
# Auto is True: cycle through all colors continuously def flicker_step(self):
color_index = 0 current_time = utime.ticks_ms()
transition_duration = max(10, self.delay) # At least 10ms if utime.ticks_diff(current_time, self.last_update) >= self.delay/5:
update_interval = max(10, transition_duration // 50) # Update every ~2% of transition base_color = self.colors[0]
# Increase the range for flicker_brightness_offset
# Changed from self.brightness // 4 to self.brightness // 2 (or even self.brightness for max intensity)
flicker_brightness_offset = random.randint(-int(self.brightness // 1.5), int(self.brightness // 1.5))
flicker_brightness = max(0, min(255, self.brightness + flicker_brightness_offset))
while self.running: flicker_color = self.apply_brightness(base_color, brightness_override=flicker_brightness)
# Get current and next color self.fill(flicker_color)
current_color = self.colors[color_index % len(self.colors)] self.last_update = current_time
next_color = self.colors[(color_index + 1) % len(self.colors)]
# Transition from current to next color def scanner_step(self):
transition_start = utime.ticks_ms() """
last_update = transition_start Mimics a 'Knight Rider' style scanner, moving in one direction.
"""
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, self.last_update) >= self.delay:
self.fill((0, 0, 0)) # Clear all LEDs
while self.running and utime.ticks_diff(utime.ticks_ms(), transition_start) < transition_duration: # Calculate the head and tail position
now = utime.ticks_ms() head_pos = self.pattern_step
if utime.ticks_diff(now, last_update) >= update_interval: color = self.apply_brightness(self.colors[0])
# Calculate interpolation factor (0.0 to 1.0)
elapsed = utime.ticks_diff(now, transition_start)
factor = min(1.0, elapsed / transition_duration)
# Interpolate between colors # Draw the head
interpolated = tuple( if 0 <= head_pos < self.num_leds:
int(current_color[i] + (next_color[i] - current_color[i]) * factor) self.n[head_pos] = color
for i in range(3)
)
# Apply brightness and fill # Draw the trailing pixels with decreasing brightness
self.fill(self.apply_brightness(interpolated)) for i in range(1, self.scanner_tail_length + 1):
last_update = now tail_pos = head_pos - i
if 0 <= tail_pos < self.num_leds:
# Calculate fading color for tail
# Example: linear fade from full brightness to off
fade_factor = 1.0 - (i / (self.scanner_tail_length + 1))
faded_color = tuple(int(c * fade_factor) for c in color)
self.n[tail_pos] = faded_color
# Move to next color self.n.write()
color_index = (color_index + 1) % len(self.colors)
self.running = False self.pattern_step += 1
self.stopped = True if self.pattern_step >= self.num_leds + self.scanner_tail_length:
self.pattern_step = 0 # Reset to start
self.last_update = current_time
def bidirectional_scanner_step(self):
"""
Mimics a 'Knight Rider' style scanner, moving back and forth.
"""
current_time = utime.ticks_ms()
if utime.ticks_diff(current_time, self.last_update) >= self.delay/100:
self.fill((0, 0, 0)) # Clear all LEDs
color = self.apply_brightness(self.colors[0])
# Calculate the head position based on direction
head_pos = self.pattern_step
# Draw the head
if 0 <= head_pos < self.num_leds:
self.n[head_pos] = color
# Draw the trailing pixels with decreasing brightness
for i in range(1, self.scanner_tail_length + 1):
tail_pos = head_pos - (i * self.scanner_direction)
if 0 <= tail_pos < self.num_leds:
fade_factor = 1.0 - (i / (self.scanner_tail_length + 1))
faded_color = tuple(int(c * fade_factor) for c in color)
self.n[tail_pos] = faded_color
self.n.write()
self.pattern_step += self.scanner_direction
# Change direction if boundaries are reached
if self.scanner_direction == 1 and self.pattern_step >= self.num_leds:
self.scanner_direction = -1
self.pattern_step = self.num_leds - 1 # Start moving back from the last LED
elif self.scanner_direction == -1 and self.pattern_step < 0:
self.scanner_direction = 1
self.pattern_step = 0 # Start moving forward from the first LED
self.last_update = current_time

View File

@@ -1,142 +0,0 @@
from machine import Pin
from neopixel import NeoPixel
import utime
import random
import _thread
import asyncio
# Short-key parameter mapping for convenience setters
param_mapping = {
"pt": "selected",
"pa": "selected",
"cl": "colors",
"br": "brightness",
"dl": "delay",
"nl": "num_leds",
"co": "color_order",
"lp": "led_pin",
"n1": "n1",
"n2": "n2",
"n3": "n3",
"n4": "n4",
"n5": "n5",
"n6": "n6",
"auto": "auto",
}
class Patterns:
def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100):
self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
self.num_leds = num_leds
self.pattern_step = 0
self.last_update = utime.ticks_ms()
self.delay = delay
self.brightness = brightness
self.auto = False
self.patterns = {}
self.selected = selected
# Ensure colors list always starts with at least two for robust transition handling
self.colors = [color1, color2] if color1 != color2 else [color1, (255, 255, 255)] # Fallback if initial colors are same
if not self.colors: # Ensure at least one color exists
self.colors = [(0, 0, 0)]
self.transition_duration = delay * 50 # Default transition duration
self.hold_duration = delay * 10 # Default hold duration at each color
self.transition_step = 0 # Current step in the transition
self.current_color_idx = 0 # Index of the color currently being held/transitioned from
self.current_color = self.colors[self.current_color_idx] # The actual blended color
self.hold_start_time = utime.ticks_ms() # Time when the current color hold started
# New attributes for scanner patterns
self.scanner_direction = 1 # 1 for forward, -1 for backward
self.scanner_tail_length = 3 # Number of trailing pixels
self.running = False
self.stopped = True
def select(self, pattern):
if pattern in self.patterns:
self.selected = pattern
return True
return False
async def run(self):
print(f"Stopping pattern")
await self.stop()
self.running = True
print(f"Starting pattern {self.selected}")
if self.selected in self.patterns:
_thread.start_new_thread(self.patterns[self.selected], ())
else:
print(f"Pattern {self.selected} not found")
async def stop(self):
self.running = False
start = utime.ticks_ms()
while not self.stopped and utime.ticks_diff(utime.ticks_ms(), start) < 1000:
await asyncio.sleep_ms(0)
self.stopped = True
def set_param(self, key, value):
if key in param_mapping:
setattr(self, param_mapping[key], value)
return True
print(f"Invalid parameter: {key}")
return False
def update_num_leds(self, pin, num_leds):
self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds)
self.num_leds = num_leds
self.pattern_step = 0
def set_color(self, num, color):
# Changed: More robust index check
if 0 <= num < len(self.colors):
self.colors[num] = color
# If the changed color is part of the current or next transition,
# restart the transition for smoother updates
return True
elif num == len(self.colors): # Allow setting a new color at the end
self.colors.append(color)
return True
return False
def del_color(self, num):
# Changed: More robust index check and using del for lists
if 0 <= num < len(self.colors):
del self.colors[num]
return True
return False
def apply_brightness(self, color, brightness_override=None):
effective_brightness = brightness_override if brightness_override is not None else self.brightness
return tuple(int(c * effective_brightness / 255) for c in color)
def fill(self, color=None):
fill_color = color if color is not None else self.colors[0]
for i in range(self.num_leds):
self.n[i] = fill_color
self.n.write()
def off(self):
self.fill((0, 0, 0))
def on(self):
self.fill(self.apply_brightness(self.colors[0]))
def wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return (0, pos * 3, 255 - pos * 3)

View File

@@ -1,109 +0,0 @@
body {
font-family: Arial, sans-serif;
max-width: 600px;
margin: 0 auto;
padding: 20px;
line-height: 1.6;
}
h1 {
text-align: center;
}
form {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 5px;
}
input[type="text"],
input[type="submit"],
input[type="range"],
input[type="color"] {
width: 100%;
margin-bottom: 10px;
box-sizing: border-box;
}
input[type="range"] {
-webkit-appearance: none;
appearance: none;
height: 25px;
background: #d3d3d3;
outline: none;
opacity: 0.7;
transition: opacity 0.2s;
}
input[type="range"]:hover {
opacity: 1;
}
input[type="range"]::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 25px;
height: 25px;
background: #4caf50;
cursor: pointer;
border-radius: 50%;
}
input[type="range"]::-moz-range-thumb {
width: 25px;
height: 25px;
background: #4caf50;
cursor: pointer;
border-radius: 50%;
}
#pattern_buttons {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin-bottom: 20px;
}
#pattern_buttons button {
flex: 1 0 calc(33.333% - 10px);
padding: 10px;
background-color: #4caf50;
color: white;
border: none;
cursor: pointer;
transition: background-color 0.3s;
}
#pattern_buttons button:hover {
background-color: #45a049;
}
@media (max-width: 480px) {
#pattern_buttons button {
flex: 1 0 calc(50% - 10px);
}
}
#connection-status {
width: 15px;
height: 15px;
border-radius: 50%;
display: inline-block; /* Or block, depending on where you put it */
margin-left: 10px; /* Adjust spacing as needed */
vertical-align: middle; /* Align with nearby text */
background-color: grey; /* Default: Unknown */
}
#connection-status.connecting {
background-color: yellow;
}
#connection-status.open {
background-color: green;
}
#connection-status.closing,
#connection-status.closed {
background-color: red;
}
#color_order_form label,
#color_order_form input[type="radio"] {
/* Ensures they behave as inline elements */
display: inline-block;
/* Adds some space between them for readability */
margin-right: 10px;
vertical-align: middle; /* Aligns them nicely if heights vary */
}

View File

@@ -1,244 +0,0 @@
let delayTimeout;
let brightnessTimeout;
let colorTimeout;
let color2Timeout;
let ws; // Variable to hold the WebSocket connection
let connectionStatusElement; // Variable to hold the connection status element
// Function to update the connection status indicator
function updateConnectionStatus(status) {
if (!connectionStatusElement) {
connectionStatusElement = document.getElementById("connection-status");
}
if (connectionStatusElement) {
connectionStatusElement.className = ""; // Clear existing classes
connectionStatusElement.classList.add(status);
// Optionally, you could also update text content based on status
// connectionStatusElement.textContent = status.charAt(0).toUpperCase() + status.slice(1);
}
}
// Function to establish WebSocket connection
function connectWebSocket() {
// Determine the WebSocket URL based on the current location
const wsUrl = `ws://${window.location.host}/ws`;
ws = new WebSocket(wsUrl);
updateConnectionStatus("connecting"); // Indicate connecting state
ws.onopen = function (event) {
console.log("WebSocket connection opened:", event);
updateConnectionStatus("open"); // Indicate open state
// Optionally, you could send an initial message here
};
ws.onmessage = function (event) {
console.log("WebSocket message received:", event.data);
};
ws.onerror = function (event) {
console.error("WebSocket error:", event);
updateConnectionStatus("closed"); // Indicate error state (treat as closed)
};
ws.onclose = function (event) {
if (event.wasClean) {
console.log(
`WebSocket connection closed cleanly, code=${event.code}, reason=${event.reason}`,
);
updateConnectionStatus("closed"); // Indicate closed state
} else {
console.error("WebSocket connection died");
updateConnectionStatus("closed"); // Indicate closed state
}
// Attempt to reconnect after a delay
setTimeout(connectWebSocket, 1000);
};
}
// Function to send data over WebSocket
function sendWebSocketData(data) {
if (ws && ws.readyState === WebSocket.OPEN) {
console.log("Sending data over WebSocket:", data);
ws.send(JSON.stringify(data));
} else {
console.error("WebSocket is not connected. Cannot send data:", data);
// You might want to queue messages or handle this in a different way
}
}
// Keep the post and get functions for now, they might still be useful
async function post(path, data) {
console.log(`POST to ${path}`, data);
try {
const response = await fetch(path, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(data),
});
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
} catch (error) {
console.error("Error during POST request:", error);
}
}
async function get(path) {
try {
const response = await fetch(path);
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
return await response.json();
} catch (error) {
console.error("Error during GET request:", error);
}
}
async function updateColor(event) {
event.preventDefault();
clearTimeout(colorTimeout);
colorTimeout = setTimeout(function () {
const color = document.getElementById("color").value;
sendWebSocketData({ color1: color });
}, 500);
}
async function updateColor2(event) {
event.preventDefault();
clearTimeout(color2Timeout);
color2Timeout = setTimeout(function () {
const color = document.getElementById("color2").value;
sendWebSocketData({ color2: color });
}, 500);
}
async function updatePattern(pattern) {
sendWebSocketData({ pattern: pattern });
}
async function updateBrightness(event) {
event.preventDefault();
clearTimeout(brightnessTimeout);
brightnessTimeout = setTimeout(function () {
const brightness = document.getElementById("brightness").value;
sendWebSocketData({ brightness: brightness });
}, 500);
}
async function updateDelay(event) {
event.preventDefault();
clearTimeout(delayTimeout);
delayTimeout = setTimeout(function () {
const delay = document.getElementById("delay").value;
sendWebSocketData({ delay: delay });
}, 500);
}
async function updateNumLeds(event) {
event.preventDefault();
const numLeds = document.getElementById("num_leds").value;
sendWebSocketData({ num_leds: parseInt(numLeds) });
}
async function updateName(event) {
event.preventDefault();
const name = document.getElementById("name").value;
sendWebSocketData({ name: name });
}
async function updateID(event) {
event.preventDefault();
const id = document.getElementById("id").value;
sendWebSocketData({ id: parseInt(id) });
}
async function updateLedPin(event) {
event.preventDefault();
const ledpin = document.getElementById("led_pin").value;
sendWebSocketData({ led_pin: parseInt(ledpin) });
}
function handleRadioChange(event) {
event.preventDefault();
console.log("Selected color order:", event.target.value);
// Add your specific logic here
if (event.target.value === "rgb") {
console.log("RGB order selected!");
} else if (event.target.value === "rbg") {
console.log("RBG order selected!");
}
sendWebSocketData({ color_order: event.target.value });
}
function createPatternButtons(patterns) {
const container = document.getElementById("pattern_buttons");
container.innerHTML = ""; // Clear previous buttons
patterns.forEach((pattern) => {
const button = document.createElement("button");
button.type = "button";
button.textContent = pattern;
button.value = pattern;
button.addEventListener("click", async function (event) {
event.preventDefault();
await updatePattern(pattern);
});
container.appendChild(button);
});
}
document.addEventListener("DOMContentLoaded", async function () {
// Get the connection status element once the DOM is ready
connectionStatusElement = document.getElementById("connection-status");
// Establish WebSocket connection on page load
connectWebSocket();
document.getElementById("color").addEventListener("input", updateColor);
document.getElementById("color2").addEventListener("input", updateColor2);
document.getElementById("delay").addEventListener("input", updateDelay);
document
.getElementById("brightness")
.addEventListener("input", updateBrightness);
document
.getElementById("num_leds_form")
.addEventListener("submit", updateNumLeds);
document.getElementById("name_form").addEventListener("submit", updateName);
document.getElementById("id_form").addEventListener("submit", updateID);
document
.getElementById("led_pin_form")
.addEventListener("submit", updateLedPin);
document.getElementById("delay").addEventListener("touchend", updateDelay);
document
.getElementById("brightness")
.addEventListener("touchend", updateBrightness);
document.getElementById("rgb").addEventListener("change", handleRadioChange);
document.getElementById("rbg").addEventListener("change", handleRadioChange);
document.querySelectorAll(".pattern_button").forEach((button) => {
console.log(button.value);
button.addEventListener("click", async (event) => {
event.preventDefault();
await updatePattern(button.value);
});
});
});
// Function to toggle the display of the settings menu
function selectSettings() {
const settingsMenu = document.getElementById("settings_menu");
controls = document.getElementById("controls");
settingsMenu.style.display = "block";
controls.style.display = "none";
}
function selectControls() {
const settingsMenu = document.getElementById("settings_menu");
controls = document.getElementById("controls");
settingsMenu.style.display = "none";
controls.style.display = "block";
}

View File

@@ -1,124 +0,0 @@
{% args settings, patterns, mac %}
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{{settings['name']}}</title>
<script src="static/main.js"></script>
<link rel="stylesheet" href="static/main.css" />
</head>
<body>
<h1>{{settings['name']}}</h1>
<button onclick="selectControls()">Controls</button>
<button onclick="selectSettings()">Settings</button>
<!-- Main LED Controls -->
<div id="controls">
<div id="pattern_buttons">
{% for p in patterns %}
<button class="pattern_button" value="{{p}}">{{p}}</button>
{% endfor %}
<!-- Pattern buttons will be inserted here -->
</div>
<form id="delay_form" method="post" action="/delay">
<label for="delay">Delay:</label>
<input
type="range"
id="delay"
name="delay"
min="1"
max="1000"
value="{{settings['delay']}}"
step="10"
/>
</form>
<form id="brightness_form" method="post" action="/brightness">
<label for="brightness">Brightness:</label>
<input
type="range"
id="brightness"
name="brightness"
min="0"
max="100"
value="{{settings['brightness']}}"
step="1"
/>
</form>
<form id="color_form" method="post" action="/color">
<input
type="color"
id="color"
name="color"
value="{{settings['color1']}}"
/>
</form>
<form id="color2_form" method="post" action="/color2">
<input
type="color"
id="color2"
name="color2"
value="{{settings['color2']}}"
/>
</form>
</div>
<!-- Settings Menu for num_leds, Wi-Fi SSID, and Password -->
<div id="settings_menu" style="display: none">
<h2>Settings</h2>
<form id="name_form" method="post" action="/name">
<label for="name">Name:</label>
<input
type="text"
id="name"
name="num_leds"
value="{{settings['name']}}"
/>
<input type="submit" value="Update Name" />
</form>
<form id="id_form" method="post" action="/id">
<label for="id">ID:</label>
<input
type="text"
id="id"
name="id"
value="{{settings['id']}}"
/>
<input type="submit" value="Update ID" />
</form>
<!-- Separate form for submitting num_leds -->
<form id="num_leds_form" method="post" action="/num_leds">
<label for="num_leds">Number of LEDs:</label>
<input
type="text"
id="num_leds"
name="num_leds"
value="{{settings['num_leds']}}"
/>
<input type="submit" value="Update Number of LEDs" />
</form>
<form id="led_pin_form" method="post" action="/led_pin">
<label for="num_leds">Led pin:</label>
<input
type="text"
id="led_pin"
name="led_pin"
value="{{settings['led_pin']}}"
/>
<input type="submit" value="Update Led Pin" />
</form>
<form id="color_order_form">
<label for="rgb">RGB:</label>
<input type="radio" id="rgb" name="color_order" value="rgb" {{'checked' if settings["color_order"]=="rgb" else ''}} />
<label for="rbg">RBG</label>
<input type="radio" id="rbg" name="color_order" value="rbg" {{'checked' if settings["color_order"]=="rbg" else ''}}/>
</form>
<p>Mac address: {{mac}}</p>
</div>
<div id="connection-status"></div>
</body>
</html>

View File

@@ -1,43 +0,0 @@
from microdot import Microdot, send_file, Response
from microdot.utemplate import Template
from microdot.websocket import with_websocket
import machine
import wifi
import json
def web(settings, patterns):
app = Microdot()
Response.default_content_type = 'text/html'
@app.route('/')
async def index_hnadler(request):
mac = wifi.get_mac().hex()
return Template('index.html').render(settings=settings, patterns=patterns.patterns.keys())
@app.route("/static/<path:path>")
def static_handler(request, path):
if '..' in path:
# Directory traversal is not allowed
return 'Not found', 404
return send_file('static/' + path)
@app.post("/settings")
def settings_handler(request):
# Keep the POST handler for compatibility or alternative usage if needed
# For WebSocket updates, the /ws handler is now primary
return settings.set_settings(request.body.decode('utf-8'), patterns)
@app.route("/ws")
@with_websocket
async def ws(request, ws):
while True:
data = await ws.receive()
if data:
# Process the received data
_, status_code = await settings.set_settings(json.loads(data), patterns, True)
#await ws.send(status_code)
else:
break
return app

View File

@@ -1,39 +0,0 @@
import network
from time import sleep
def connect(ssid, password, ip, gateway):
try:
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
if ssid == "" or password == "":
print("Missing ssid or password")
return None
if ip != "" and gateway != "":
sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1'))
print('connecting to network...')
sta_if.active(True)
sta_if.connect(ssid, password)
sleep(0.1)
if sta_if.isconnected():
return sta_if.ifconfig()
return None
return sta_if.ifconfig()
except Exception as e:
print(f"Failed to connect to wifi {e}")
return None
def ap(ssid, password):
ap_if = network.WLAN(network.AP_IF)
ap_mac = ap_if.config('mac')
print(ssid)
ap_if.active(True)
ap_if.config(essid=ssid, password=password)
ap_if.active(False)
ap_if.active(True)
print(ap_if.ifconfig())
def get_mac():
ap_if = network.WLAN(network.AP_IF)
return ap_if.config('mac')

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from patterns import Patterns
def run():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Baseline params
p.set_param("br", 64)
p.set_param("dl", 500)
p.set_param("cl", [(255, 0, 0), (0, 0, 255)])
p.set_param("n1", 200)
p.set_param("n2", 200)
p.set_param("n3", 1)
p.set_param("n4", 1)
for name, fn in p.patterns.items():
if fn is None:
continue
print(name)
p.set_param("pt", name)
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
p.tick()
wdt.feed()
utime.sleep_ms(10)
p.set_param("pt", "off")
p.tick()
utime.sleep_ms(200)
if __name__ == "__main__":
run()

View File

@@ -1,32 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
import utime
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
p.set_param("br", 64)
p.set_param("dl", 200)
p.set_param("cl", [(255, 0, 0), (0, 0, 255)])
p.select("blink")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,26 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
p.select("off")
task = asyncio.create_task(p.run())
wdt.feed()
await asyncio.sleep_ms(200)
p.stopped = True
await task
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,34 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
p.set_param("br", 64)
p.set_param("dl", 120)
p.set_param("cl", [(255, 0, 0), (0, 0, 255)])
p.select("on")
task = asyncio.create_task(p.run())
await asyncio.sleep_ms(800)
p.stopped = True
await task
p.stopped = False
p.select("off")
task = asyncio.create_task(p.run())
await asyncio.sleep_ms(100)
p.stopped = True
await task
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,160 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
import utime
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic pulse with attack, hold, and decay
print("Test 1: Basic pulse pattern")
p.set_param("br", 255)
p.set_param("dl", 1000) # 1 second delay between pulses
p.set_param("auto", True) # Run continuously
p.set_param("cl", [(255, 255, 255), (255, 255, 255)])
p.set_param("n1", 200) # Attack: 200ms
p.set_param("n2", 200) # Hold: 200ms
p.set_param("n3", 200) # Decay: 200ms
p.select("pulse")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 3 seconds to see multiple pulse cycles
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 2: Fast pulse with shorter delay
print("Test 2: Fast pulse pattern")
p.stopped = False
p.set_param("dl", 500) # 500ms delay between pulses
p.set_param("auto", True) # Run continuously
p.set_param("n1", 100) # Attack: 100ms
p.set_param("n2", 100) # Hold: 100ms
p.set_param("n3", 100) # Decay: 100ms
p.select("pulse")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 2 seconds
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 3: Colored pulse
print("Test 3: Colored pulse pattern")
p.stopped = False
p.set_param("dl", 800)
p.set_param("auto", True) # Run continuously
p.set_param("cl", [(255, 0, 0), (0, 0, 255)]) # Red pulse
p.set_param("n1", 150)
p.set_param("n2", 150)
p.set_param("n3", 150)
p.select("pulse")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 2 seconds
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 4: Verify delay restart timing
print("Test 4: Testing delay restart timing")
p.stopped = False
p.set_param("dl", 500) # 500ms delay
p.set_param("auto", True) # Run continuously
p.set_param("n1", 100) # Total attack+hold+decay = 300ms, should wait 200ms more
p.set_param("n2", 100)
p.set_param("n3", 100)
p.select("pulse")
task = asyncio.create_task(p.run())
# Monitor pulse cycles
cycle_count = 0
last_cycle_time = utime.ticks_ms()
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
# Check if we're near the start of a new cycle (LEDs off)
# This is a simplified check - in practice you'd monitor LED state
p.stopped = True
await task
# Test 5: Single-shot pulse (auto=False)
print("Test 5: Single-shot pulse (auto=False)")
p.stopped = False
p.set_param("dl", 500) # Delay between pulses
p.set_param("auto", False) # Run only once
p.set_param("cl", [(0, 255, 0), (0, 255, 0)]) # Green pulse
p.set_param("n1", 150) # Attack: 150ms
p.set_param("n2", 150) # Hold: 150ms
p.set_param("n3", 150) # Decay: 150ms
p.select("pulse")
task = asyncio.create_task(p.run())
# The pulse should complete once and then stop
# Total time should be ~450ms (attack + hold + decay)
# Wait a bit longer to verify it doesn't repeat
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 1000:
wdt.feed()
await asyncio.sleep_ms(10)
# Task should have completed on its own (not stopped manually)
# Verify it's stopped
if not p.stopped:
print("Warning: Pulse should have stopped automatically with auto=False")
p.stopped = True
await task
# Test 6: Pulse cycles through colors
print("Test 6: Pulse cycles through colors")
p.stopped = False
p.set_param("dl", 300) # cycle interval
p.set_param("auto", True) # Run continuously
p.set_param("cl", [
(255, 0, 0), # red
(0, 255, 0), # green
(0, 0, 255), # blue
(255, 255, 0), # yellow
])
p.set_param("n1", 50)
p.set_param("n2", 0)
p.set_param("n3", 50)
p.select("pulse")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run long enough to observe multiple color cycles
while utime.ticks_diff(utime.ticks_ms(), start) < 10000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Cleanup
print("Test complete, turning off")
p.stopped = False
p.select("off")
task = asyncio.create_task(p.run())
await asyncio.sleep_ms(100)
p.stopped = True
await task
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,167 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
import utime
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic rainbow with auto=True (continuous)
print("Test 1: Basic rainbow (auto=True, n1=1)")
p.set_param("br", 255)
p.set_param("dl", 100) # Delay affects animation speed
p.set_param("n1", 1) # Step increment of 1
p.set_param("auto", True) # Run continuously
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 3 seconds to see rainbow animation
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 2: Fast rainbow
print("Test 2: Fast rainbow (low delay, n1=1)")
p.stopped = False
p.set_param("dl", 50) # Faster animation
p.set_param("n1", 1)
p.set_param("auto", True)
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 3: Slow rainbow
print("Test 3: Slow rainbow (high delay, n1=1)")
p.stopped = False
p.set_param("dl", 500) # Slower animation
p.set_param("n1", 1)
p.set_param("auto", True)
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 4: Low brightness rainbow
print("Test 4: Low brightness rainbow (n1=1)")
p.stopped = False
p.set_param("br", 64) # Low brightness
p.set_param("dl", 100)
p.set_param("n1", 1)
p.set_param("auto", True)
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 5: Single-step rainbow (auto=False)
print("Test 5: Single-step rainbow (auto=False, n1=1)")
p.stopped = False
p.set_param("br", 255)
p.set_param("dl", 100)
p.set_param("n1", 1)
p.set_param("auto", False) # Run once per call
p.set_param("step", 0) # Reset step
p.select("rainbow")
# Call rainbow multiple times to see step progression
for i in range(10):
task = asyncio.create_task(p.run())
await task
await asyncio.sleep_ms(100) # Small delay between steps
wdt.feed()
# Test 6: Verify step updates correctly
print("Test 6: Verify step updates (auto=False, n1=1)")
p.stopped = False
p.set_param("n1", 1)
initial_step = p.step
p.select("rainbow")
task = asyncio.create_task(p.run())
await task
final_step = p.step
print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)")
# Test 7: Fast step increment (n1=5)
print("Test 7: Fast rainbow (n1=5, auto=True)")
p.stopped = False
p.set_param("br", 255)
p.set_param("dl", 100)
p.set_param("n1", 5) # Step increment of 5 (5x faster)
p.set_param("auto", True)
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 8: Very fast step increment (n1=10)
print("Test 8: Very fast rainbow (n1=10, auto=True)")
p.stopped = False
p.set_param("n1", 10) # Step increment of 10 (10x faster)
p.set_param("auto", True)
p.select("rainbow")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
await p.stop()
await task
# Test 9: Verify n1 controls step increment (auto=False)
print("Test 9: Verify n1 step increment (auto=False, n1=5)")
p.stopped = False
p.set_param("n1", 5) # Step increment of 5
p.set_param("auto", False)
p.set_param("step", 0) # Reset step
initial_step = p.step
p.select("rainbow")
task = asyncio.create_task(p.run())
await task
final_step = p.step
expected_step = (initial_step + 5) % 256
print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})")
if final_step == expected_step:
print("✓ n1 step increment working correctly")
else:
print(f"✗ Step increment mismatch! Expected {expected_step}, got {final_step}")
# Cleanup
print("Test complete, turning off")
p.stopped = False
p.select("off")
task = asyncio.create_task(p.run())
await asyncio.sleep_ms(100)
await p.stop()
await task
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,165 +0,0 @@
#!/usr/bin/env python3
import uasyncio as asyncio
import utime
from machine import WDT
from settings import Settings
from patterns import Patterns
async def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Patterns(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic transition with 2 colors (auto=True, cycles continuously)
print("Test 1: Basic transition (2 colors, 1000ms delay, auto=True)")
p.set_param("br", 255)
p.set_param("dl", 1000) # 1 second transition time
p.set_param("auto", True) # Cycle continuously
p.set_param("cl", [(255, 0, 0), (0, 255, 0)]) # Red to Green
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 5 seconds to see multiple transitions
while utime.ticks_diff(utime.ticks_ms(), start) < 5000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 2: Fast transition (auto=True, cycles continuously)
print("Test 2: Fast transition (500ms delay, auto=True)")
p.stopped = False
p.set_param("dl", 500) # 500ms transition time
p.set_param("auto", True) # Cycle continuously
p.set_param("cl", [(0, 0, 255), (255, 255, 0)]) # Blue to Yellow
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 3 seconds
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 3: Multiple colors transition (auto=True, cycles continuously)
print("Test 3: Multiple colors transition (3 colors, auto=True)")
p.stopped = False
p.set_param("dl", 800)
p.set_param("auto", True) # Cycle continuously
p.set_param("cl", [
(255, 0, 0), # Red
(0, 255, 0), # Green
(0, 0, 255), # Blue
])
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 8 seconds to see full cycles
while utime.ticks_diff(utime.ticks_ms(), start) < 8000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 4: Single color (should just stay that color)
print("Test 4: Single color (should stay that color)")
p.stopped = False
p.set_param("dl", 1000)
p.set_param("cl", [(255, 128, 0)]) # Orange
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 3 seconds
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 5: Many colors transition (auto=True, cycles continuously)
print("Test 5: Many colors transition (5 colors, auto=True)")
p.stopped = False
p.set_param("dl", 600)
p.set_param("auto", True) # Cycle continuously
p.set_param("cl", [
(255, 0, 0), # Red
(255, 128, 0), # Orange
(255, 255, 0), # Yellow
(0, 255, 0), # Green
(0, 0, 255), # Blue
])
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 10 seconds to see multiple cycles
while utime.ticks_diff(utime.ticks_ms(), start) < 10000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 6: Low brightness transition (auto=True, cycles continuously)
print("Test 6: Low brightness transition (auto=True)")
p.stopped = False
p.set_param("br", 64) # Low brightness
p.set_param("dl", 1000)
p.set_param("auto", True) # Cycle continuously
p.set_param("cl", [(255, 0, 0), (0, 255, 0)])
p.select("transition")
task = asyncio.create_task(p.run())
start = utime.ticks_ms()
# Run for 3 seconds
while utime.ticks_diff(utime.ticks_ms(), start) < 3000:
wdt.feed()
await asyncio.sleep_ms(10)
p.stopped = True
await task
# Test 7: Single-shot transition (auto=False, only color1 to color2)
print("Test 7: Single-shot transition (auto=False, color1 to color2 only)")
p.stopped = False
p.set_param("br", 255)
p.set_param("dl", 1000) # 1 second transition
p.set_param("auto", False) # Run only once
p.set_param("cl", [
(255, 0, 0), # Red (color1)
(0, 255, 0), # Green (color2)
(0, 0, 255), # Blue (should be ignored)
(255, 255, 0), # Yellow (should be ignored)
])
p.select("transition")
task = asyncio.create_task(p.run())
# The transition should complete once (color1 to color2) and then stop
# Total time should be ~1000ms
# Wait a bit longer to verify it doesn't continue
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed()
await asyncio.sleep_ms(10)
# Task should have completed on its own (not stopped manually)
# Verify it's stopped
if not p.stopped:
print("Warning: Transition should have stopped automatically with auto=False")
p.stopped = True
await task
# Cleanup
print("Test complete, turning off")
p.stopped = False
p.select("off")
task = asyncio.create_task(p.run())
await asyncio.sleep_ms(100)
p.stopped = True
await task
if __name__ == "__main__":
asyncio.run(main())