Compare commits

2 Commits

30 changed files with 2722 additions and 504 deletions

51
docs/pattern-contract.md Normal file
View File

@@ -0,0 +1,51 @@
# Pattern Contract (Important)
Pattern classes are loaded dynamically by `Presets._load_dynamic_patterns()`.
Patterns must follow this contract exactly.
## Required class shape
- File name is the pattern id (for example `blink.py` -> pattern name `blink`).
- Module exports a class with:
- `__init__(self, driver)` where `driver` is the `Presets` instance.
- `run(self, preset)` that returns a generator.
`Presets` binds patterns like this:
- `pattern_class(self).run`
- then calls `self.patterns[preset.p](preset)` and stores that generator.
- every frame, `Presets.tick()` does `next(self.generator)`.
## `run()` generator rules
- `run()` must `yield` frequently (normally once per tick loop).
- Do not block inside `run()`:
- no `sleep()` / `sleep_ms()` / long loops without `yield`.
- no network or file I/O.
- Use time checks (`utime.ticks_ms()` + `utime.ticks_diff(...)`) to schedule updates.
- Keep pattern state inside local variables in `run()` (or object fields if needed).
## Drawing and brightness
- Use `self.driver.apply_brightness(color, preset.b)` for per-preset brightness.
- Write pixels through `self.driver.n[...]` / `self.driver.n.fill(...)`.
- Flush frame with `self.driver.n.write()`.
- If a pattern needs to clear, use black `(0, 0, 0)`.
## Step semantics
- `self.driver.step` is shared pattern state managed by `Presets.select(...)` and patterns.
- Patterns that use step-based progression should update `self.driver.step` themselves.
- `select(..., step=...)` may set an explicit starting step.
## Error handling
- Let unexpected errors raise inside the generator.
- `Presets.tick()` catches exceptions, logs, and stops the active generator.
- Pattern code should not swallow broad exceptions unless there is a clear recovery path.
## Built-ins
- `off` and `on` are built-in methods on `Presets`, not loaded from this folder.
- `__init__.py` is ignored by dynamic loader.

2
lib/microdot/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from microdot.microdot import Microdot, Request, Response, abort, redirect, \
send_file # noqa: F401

8
lib/microdot/helpers.py Normal file
View File

@@ -0,0 +1,8 @@
try:
from functools import wraps
except ImportError: # pragma: no cover
# MicroPython does not currently implement functools.wraps
def wraps(wrapped):
def _(wrapper):
return wrapper
return _

1450
lib/microdot/microdot.py Normal file

File diff suppressed because it is too large Load Diff

225
lib/microdot/session.py Normal file
View File

@@ -0,0 +1,225 @@
try:
import jwt
HAS_JWT = True
except ImportError:
HAS_JWT = False
try:
import ubinascii
except ImportError:
import binascii as ubinascii
try:
import uhashlib as hashlib
except ImportError:
import hashlib
try:
import uhmac as hmac
except ImportError:
try:
import hmac
except ImportError:
hmac = None
import json
from microdot.microdot import invoke_handler
from microdot.helpers import wraps
class SessionDict(dict):
"""A session dictionary.
The session dictionary is a standard Python dictionary that has been
extended with convenience ``save()`` and ``delete()`` methods.
"""
def __init__(self, request, session_dict):
super().__init__(session_dict)
self.request = request
def save(self):
"""Update the session cookie."""
self.request.app._session.update(self.request, self)
def delete(self):
"""Delete the session cookie."""
self.request.app._session.delete(self.request)
class Session:
"""Session handling
:param app: The application instance.
:param secret_key: The secret key, as a string or bytes object.
:param cookie_options: A dictionary with cookie options to pass as
arguments to :meth:`Response.set_cookie()
<microdot.Response.set_cookie>`.
"""
secret_key = None
def __init__(self, app=None, secret_key=None, cookie_options=None):
self.secret_key = secret_key
self.cookie_options = cookie_options or {}
if app is not None:
self.initialize(app)
def initialize(self, app, secret_key=None, cookie_options=None):
if secret_key is not None:
self.secret_key = secret_key
if cookie_options is not None:
self.cookie_options = cookie_options
if 'path' not in self.cookie_options:
self.cookie_options['path'] = '/'
if 'http_only' not in self.cookie_options:
self.cookie_options['http_only'] = True
app._session = self
def get(self, request):
"""Retrieve the user session.
:param request: The client request.
The return value is a session dictionary with the data stored in the
user's session, or ``{}`` if the session data is not available or
invalid.
"""
if not self.secret_key:
raise ValueError('The session secret key is not configured')
if hasattr(request.g, '_session'):
return request.g._session
session = request.cookies.get('session')
if session is None:
request.g._session = SessionDict(request, {})
return request.g._session
request.g._session = SessionDict(request, self.decode(session))
return request.g._session
def update(self, request, session):
"""Update the user session.
:param request: The client request.
:param session: A dictionary with the update session data for the user.
Applications would normally not call this method directly, instead they
would use the :meth:`SessionDict.save` method on the session
dictionary, which calls this method. For example::
@app.route('/')
@with_session
def index(request, session):
session['foo'] = 'bar'
session.save()
return 'Hello, World!'
Calling this method adds a cookie with the updated session to the
request currently being processed.
"""
if not self.secret_key:
raise ValueError('The session secret key is not configured')
encoded_session = self.encode(session)
@request.after_request
def _update_session(request, response):
response.set_cookie('session', encoded_session,
**self.cookie_options)
return response
def delete(self, request):
"""Remove the user session.
:param request: The client request.
Applications would normally not call this method directly, instead they
would use the :meth:`SessionDict.delete` method on the session
dictionary, which calls this method. For example::
@app.route('/')
@with_session
def index(request, session):
session.delete()
return 'Hello, World!'
Calling this method adds a cookie removal header to the request
currently being processed.
"""
@request.after_request
def _delete_session(request, response):
response.delete_cookie('session', **self.cookie_options)
return response
def encode(self, payload, secret_key=None):
"""Encode session data using JWT if available, otherwise use simple HMAC."""
if HAS_JWT:
return jwt.encode(payload, secret_key or self.secret_key,
algorithm='HS256')
else:
# Simple encoding for MicroPython: base64(json) + HMAC signature
key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key)
payload_json = json.dumps(payload)
payload_b64 = ubinascii.b2a_base64(payload_json.encode()).decode().strip()
# Create HMAC signature
if hmac:
# Use hmac module if available
h = hmac.new(key, payload_json.encode(), hashlib.sha256)
else:
# Fallback: simple SHA256(key + message)
h = hashlib.sha256(key + payload_json.encode())
signature = ubinascii.b2a_base64(h.digest()).decode().strip()
return f"{payload_b64}.{signature}"
def decode(self, session, secret_key=None):
"""Decode session data using JWT if available, otherwise use simple HMAC."""
if HAS_JWT:
try:
payload = jwt.decode(session, secret_key or self.secret_key,
algorithms=['HS256'])
except jwt.exceptions.PyJWTError: # pragma: no cover
return {}
return payload
else:
try:
# Simple decoding for MicroPython
if '.' not in session:
return {}
payload_b64, signature = session.rsplit('.', 1)
payload_json = ubinascii.a2b_base64(payload_b64).decode()
# Verify HMAC signature
key = (secret_key or self.secret_key).encode() if isinstance(secret_key or self.secret_key, str) else (secret_key or self.secret_key)
if hmac:
# Use hmac module if available
h = hmac.new(key, payload_json.encode(), hashlib.sha256)
else:
# Fallback: simple SHA256(key + message)
h = hashlib.sha256(key + payload_json.encode())
expected_signature = ubinascii.b2a_base64(h.digest()).decode().strip()
if signature != expected_signature:
return {}
return json.loads(payload_json)
except Exception:
return {}
def with_session(f):
"""Decorator that passes the user session to the route handler.
The session dictionary is passed to the decorated function as an argument
after the request object. Example::
@app.route('/')
@with_session
def index(request, session):
return 'Hello, World!'
Note that the decorator does not save the session. To update the session,
call the :func:`session.save() <microdot.session.SessionDict.save>` method.
"""
@wraps(f)
async def wrapper(request, *args, **kwargs):
return await invoke_handler(
f, request, request.app._session.get(request), *args, **kwargs)
return wrapper

70
lib/microdot/utemplate.py Normal file
View File

@@ -0,0 +1,70 @@
from utemplate import recompile
_loader = None
class Template:
"""A template object.
:param template: The filename of the template to render, relative to the
configured template directory.
"""
@classmethod
def initialize(cls, template_dir='templates',
loader_class=recompile.Loader):
"""Initialize the templating subsystem.
:param template_dir: the directory where templates are stored. This
argument is optional. The default is to load
templates from a *templates* subdirectory.
:param loader_class: the ``utemplate.Loader`` class to use when loading
templates. This argument is optional. The default
is the ``recompile.Loader`` class, which
automatically recompiles templates when they
change.
"""
global _loader
_loader = loader_class(None, template_dir)
def __init__(self, template):
if _loader is None: # pragma: no cover
self.initialize()
#: The name of the template
self.name = template
self.template = _loader.load(template)
def generate(self, *args, **kwargs):
"""Return a generator that renders the template in chunks, with the
given arguments."""
return self.template(*args, **kwargs)
def render(self, *args, **kwargs):
"""Render the template with the given arguments and return it as a
string."""
return ''.join(self.generate(*args, **kwargs))
def generate_async(self, *args, **kwargs):
"""Return an asynchronous generator that renders the template in
chunks, using the given arguments."""
class sync_to_async_iter():
def __init__(self, iter):
self.iter = iter
def __aiter__(self):
return self
async def __anext__(self):
try:
return next(self.iter)
except StopIteration:
raise StopAsyncIteration
return sync_to_async_iter(self.generate(*args, **kwargs))
async def render_async(self, *args, **kwargs):
"""Render the template with the given arguments asynchronously and
return it as a string."""
response = ''
async for chunk in self.generate_async(*args, **kwargs):
response += chunk
return response

231
lib/microdot/websocket.py Normal file
View File

@@ -0,0 +1,231 @@
import binascii
import hashlib
from microdot import Request, Response
from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception
from microdot.helpers import wraps
class WebSocketError(Exception):
"""Exception raised when an error occurs in a WebSocket connection."""
pass
class WebSocket:
"""A WebSocket connection object.
An instance of this class is sent to handler functions to manage the
WebSocket connection.
"""
CONT = 0
TEXT = 1
BINARY = 2
CLOSE = 8
PING = 9
PONG = 10
#: Specify the maximum message size that can be received when calling the
#: ``receive()`` method. Messages with payloads that are larger than this
#: size will be rejected and the connection closed. Set to 0 to disable
#: the size check (be aware of potential security issues if you do this),
#: or to -1 to use the value set in
#: ``Request.max_body_length``. The default is -1.
#:
#: Example::
#:
#: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages
max_message_length = -1
def __init__(self, request):
self.request = request
self.closed = False
async def handshake(self):
response = self._handshake_response()
await self.request.sock[1].awrite(
b'HTTP/1.1 101 Switching Protocols\r\n')
await self.request.sock[1].awrite(b'Upgrade: websocket\r\n')
await self.request.sock[1].awrite(b'Connection: Upgrade\r\n')
await self.request.sock[1].awrite(
b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
async def receive(self):
"""Receive a message from the client."""
while True:
opcode, payload = await self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
if send_opcode: # pragma: no cover
await self.send(data, send_opcode)
elif data: # pragma: no branch
return data
async def send(self, data, opcode=None):
"""Send a message to the client.
:param data: the data to send, given as a string or bytes.
:param opcode: a custom frame opcode to use. If not given, the opcode
is ``TEXT`` or ``BINARY`` depending on the type of the
data.
"""
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
data)
await self.request.sock[1].awrite(frame)
async def close(self):
"""Close the websocket connection."""
if not self.closed: # pragma: no cover
self.closed = True
await self.send(b'', self.CLOSE)
def _handshake_response(self):
connection = False
upgrade = False
websocket_key = None
for header, value in self.request.headers.items():
h = header.lower()
if h == 'connection':
connection = True
if 'upgrade' not in value.lower():
return self.request.app.abort(400)
elif h == 'upgrade':
upgrade = True
if not value.lower() == 'websocket':
return self.request.app.abort(400)
elif h == 'sec-websocket-key':
websocket_key = value
if not connection or not upgrade or not websocket_key:
return self.request.app.abort(400)
d = hashlib.sha1(websocket_key.encode())
d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
return binascii.b2a_base64(d.digest())[:-1]
@classmethod
def _parse_frame_header(cls, header):
fin = header[0] & 0x80
opcode = header[0] & 0x0f
if fin == 0 or opcode == cls.CONT: # pragma: no cover
raise WebSocketError('Continuation frames not supported')
has_mask = header[1] & 0x80
length = header[1] & 0x7f
if length == 126:
length = -2
elif length == 127:
length = -8
return fin, opcode, has_mask, length
def _process_websocket_frame(self, opcode, payload):
if opcode == self.TEXT:
payload = payload.decode()
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
raise WebSocketError('Websocket connection closed')
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
return None, None
return None, payload
@classmethod
def _encode_websocket_frame(cls, opcode, payload):
frame = bytearray()
frame.append(0x80 | opcode)
if opcode == cls.TEXT:
payload = payload.encode()
if len(payload) < 126:
frame.append(len(payload))
elif len(payload) < (1 << 16):
frame.append(126)
frame.extend(len(payload).to_bytes(2, 'big'))
else:
frame.append(127)
frame.extend(len(payload).to_bytes(8, 'big'))
frame.extend(payload)
return frame
async def _read_frame(self):
header = await self.request.sock[0].read(2)
if len(header) != 2: # pragma: no cover
raise WebSocketError('Websocket connection closed')
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length == -2:
length = await self.request.sock[0].read(2)
length = int.from_bytes(length, 'big')
elif length == -8:
length = await self.request.sock[0].read(8)
length = int.from_bytes(length, 'big')
max_allowed_length = Request.max_body_length \
if self.max_message_length == -1 else self.max_message_length
if length > max_allowed_length:
raise WebSocketError('Message too large')
if has_mask: # pragma: no cover
mask = await self.request.sock[0].read(4)
payload = await self.request.sock[0].read(length)
if has_mask: # pragma: no cover
payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
return opcode, payload
async def websocket_upgrade(request):
"""Upgrade a request handler to a websocket connection.
This function can be called directly inside a route function to process a
WebSocket upgrade handshake, for example after the user's credentials are
verified. The function returns the websocket object::
@app.route('/echo')
async def echo(request):
if not authenticate_user(request):
abort(401)
ws = await websocket_upgrade(request)
while True:
message = await ws.receive()
await ws.send(message)
"""
ws = WebSocket(request)
await ws.handshake()
@request.after_request
async def after_request(request, response):
return Response.already_handled
return ws
def websocket_wrapper(f, upgrade_function):
@wraps(f)
async def wrapper(request, *args, **kwargs):
ws = await upgrade_function(request)
try:
await f(request, ws, *args, **kwargs)
except OSError as exc:
if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover
raise
except WebSocketError:
pass
except Exception as exc:
print_exception(exc)
finally: # pragma: no cover
try:
await ws.close()
except Exception:
pass
return Response.already_handled
return wrapper
def with_websocket(f):
"""Decorator to make a route a WebSocket endpoint.
This decorator is used to define a route that accepts websocket
connections. The route then receives a websocket object as a second
argument that it can use to send and receive messages::
@app.route('/echo')
@with_websocket
async def echo(request, ws):
while True:
message = await ws.receive()
await ws.send(message)
"""
return websocket_wrapper(f, websocket_upgrade)

View File

14
lib/utemplate/compiled.py Normal file
View File

@@ -0,0 +1,14 @@
class Loader:
def __init__(self, pkg, dir):
if dir == ".":
dir = ""
else:
dir = dir.replace("/", ".") + "."
if pkg and pkg != "__main__":
dir = pkg + "." + dir
self.p = dir
def load(self, name):
name = name.replace(".", "_")
return __import__(self.p + name, None, None, (name,)).render

View File

@@ -0,0 +1,21 @@
# (c) 2014-2020 Paul Sokolovsky. MIT license.
try:
from uos import stat, remove
except:
from os import stat, remove
from . import source
class Loader(source.Loader):
def load(self, name):
o_path = self.pkg_path + self.compiled_path(name)
i_path = self.pkg_path + self.dir + "/" + name
try:
o_stat = stat(o_path)
i_stat = stat(i_path)
if i_stat[8] > o_stat[8]:
# input file is newer, remove output to force recompile
remove(o_path)
finally:
return super().load(name)

188
lib/utemplate/source.py Normal file
View File

@@ -0,0 +1,188 @@
# (c) 2014-2019 Paul Sokolovsky. MIT license.
from . import compiled
class Compiler:
START_CHAR = "{"
STMNT = "%"
STMNT_END = "%}"
EXPR = "{"
EXPR_END = "}}"
def __init__(self, file_in, file_out, indent=0, seq=0, loader=None):
self.file_in = file_in
self.file_out = file_out
self.loader = loader
self.seq = seq
self._indent = indent
self.stack = []
self.in_literal = False
self.flushed_header = False
self.args = "*a, **d"
def indent(self, adjust=0):
if not self.flushed_header:
self.flushed_header = True
self.indent()
self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args))
self.stack.append("def")
self.file_out.write(" " * (len(self.stack) + self._indent + adjust))
def literal(self, s):
if not s:
return
if not self.in_literal:
self.indent()
self.file_out.write('yield """')
self.in_literal = True
self.file_out.write(s.replace('"', '\\"'))
def close_literal(self):
if self.in_literal:
self.file_out.write('"""\n')
self.in_literal = False
def render_expr(self, e):
self.indent()
self.file_out.write('yield str(' + e + ')\n')
def parse_statement(self, stmt):
tokens = stmt.split(None, 1)
if tokens[0] == "args":
if len(tokens) > 1:
self.args = tokens[1]
else:
self.args = ""
elif tokens[0] == "set":
self.indent()
self.file_out.write(stmt[3:].strip() + "\n")
elif tokens[0] == "include":
if not self.flushed_header:
# If there was no other output, we still need a header now
self.indent()
tokens = tokens[1].split(None, 1)
args = ""
if len(tokens) > 1:
args = tokens[1]
if tokens[0][0] == "{":
self.indent()
# "1" as fromlist param is uPy hack
self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2])
self.indent()
self.file_out.write("yield from _.render(%s)\n" % args)
return
with self.loader.input_open(tokens[0][1:-1]) as inc:
self.seq += 1
c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq)
inc_id = self.seq
self.seq = c.compile()
self.indent()
self.file_out.write("yield from render%d(%s)\n" % (inc_id, args))
elif len(tokens) > 1:
if tokens[0] == "elif":
assert self.stack[-1] == "if"
self.indent(-1)
self.file_out.write(stmt + ":\n")
else:
self.indent()
self.file_out.write(stmt + ":\n")
self.stack.append(tokens[0])
else:
if stmt.startswith("end"):
assert self.stack[-1] == stmt[3:]
self.stack.pop(-1)
elif stmt == "else":
assert self.stack[-1] == "if"
self.indent(-1)
self.file_out.write("else:\n")
else:
assert False
def parse_line(self, l):
while l:
start = l.find(self.START_CHAR)
if start == -1:
self.literal(l)
return
self.literal(l[:start])
self.close_literal()
sel = l[start + 1]
#print("*%s=%s=" % (sel, EXPR))
if sel == self.STMNT:
end = l.find(self.STMNT_END)
assert end > 0
stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip()
self.parse_statement(stmt)
end += len(self.STMNT_END)
l = l[end:]
if not self.in_literal and l == "\n":
break
elif sel == self.EXPR:
# print("EXPR")
end = l.find(self.EXPR_END)
assert end > 0
expr = l[start + len(self.START_CHAR + self.EXPR):end].strip()
self.render_expr(expr)
end += len(self.EXPR_END)
l = l[end:]
else:
self.literal(l[start])
l = l[start + 1:]
def header(self):
self.file_out.write("# Autogenerated file\n")
def compile(self):
self.header()
for l in self.file_in:
self.parse_line(l)
self.close_literal()
return self.seq
class Loader(compiled.Loader):
def __init__(self, pkg, dir):
super().__init__(pkg, dir)
self.dir = dir
if pkg == "__main__":
# if pkg isn't really a package, don't bother to use it
# it means we're running from "filesystem directory", not
# from a package.
pkg = None
self.pkg_path = ""
if pkg:
p = __import__(pkg)
if isinstance(p.__path__, str):
# uPy
self.pkg_path = p.__path__
else:
# CPy
self.pkg_path = p.__path__[0]
self.pkg_path += "/"
def input_open(self, template):
path = self.pkg_path + self.dir + "/" + template
return open(path)
def compiled_path(self, template):
return self.dir + "/" + template.replace(".", "_") + ".py"
def load(self, name):
try:
return super().load(name)
except (OSError, ImportError):
pass
compiled_path = self.pkg_path + self.compiled_path(name)
f_in = self.input_open(name)
f_out = open(compiled_path, "w")
c = Compiler(f_in, f_out, loader=self)
c.compile()
f_in.close()
f_out.close()
return super().load(name)

218
src/controller_messages.py Normal file
View File

@@ -0,0 +1,218 @@
"""Parse controller JSON (v1) and apply brightness, presets, OTA patterns, etc."""
import json
import socket
from utils import convert_and_reorder_colors
try:
import uos as os
except ImportError:
import os
def process_data(payload, settings, presets, controller_ip=None):
"""Read one controller message; json.loads (bytes or str), then apply fields."""
try:
data = json.loads(payload)
print(payload)
if data.get("v", "") != "1":
return
except (ValueError, TypeError):
return
if "b" in data:
apply_brightness(data, settings, presets)
if "presets" in data:
apply_presets(data, settings, presets)
if "select" in data:
apply_select(data, settings, presets)
if "default" in data:
apply_default(data, settings, presets)
if "manifest" in data:
apply_patterns_ota(data, presets, controller_ip=controller_ip)
if "save" in data and ("presets" in data or "default" in data):
presets.save()
def apply_brightness(data, settings, presets):
try:
presets.b = max(0, min(255, int(data["b"])))
settings["brightness"] = presets.b
except (TypeError, ValueError):
pass
def apply_presets(data, settings, presets):
presets_map = data["presets"]
for id, preset_data in presets_map.items():
if not preset_data:
continue
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None:
try:
preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings
)
except (TypeError, ValueError, KeyError):
continue
presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
def apply_select(data, settings, presets):
select_map = data["select"]
device_name = settings["name"]
select_list = select_map.get(device_name, [])
if not select_list:
return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
presets.select(preset_name, step=step)
def apply_default(data, settings, presets):
targets = data.get("targets") or []
default_name = data["default"]
if (
settings["name"] in targets
and isinstance(default_name, str)
and default_name in presets.presets
):
settings["default"] = default_name
def _parse_http_url(url):
"""Parse http://host[:port]/path into (host, port, path)."""
if not isinstance(url, str):
raise ValueError("url must be a string")
if not url.startswith("http://"):
raise ValueError("only http:// URLs are supported")
remainder = url[7:]
slash_idx = remainder.find("/")
if slash_idx == -1:
host_port = remainder
path = "/"
else:
host_port = remainder[:slash_idx]
path = remainder[slash_idx:]
if ":" in host_port:
host, port_s = host_port.rsplit(":", 1)
port = int(port_s)
else:
host = host_port
port = 80
if not host:
raise ValueError("missing host")
return host, port, path
def _http_get_raw(url, timeout_s=10.0):
host, port, path = _parse_http_url(url)
req = (
"GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host)
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host, int(port)))
sock.send(req)
data = b""
while True:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
finally:
try:
sock.close()
except Exception:
pass
sep = b"\r\n\r\n"
if sep not in data:
raise OSError("invalid HTTP response")
head, body = data.split(sep, 1)
status_line = head.split(b"\r\n", 1)[0]
if b" 200 " not in status_line:
raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8"))
return body
def _http_get_json(url, timeout_s=10.0):
body = _http_get_raw(url, timeout_s=timeout_s)
return json.loads(body.decode("utf-8"))
def _http_get_text(url, timeout_s=10.0, controller_ip=None):
# Support relative URLs from controller messages.
if isinstance(url, str) and url.startswith("/"):
if not controller_ip:
raise OSError("controller IP unavailable for relative URL")
url = "http://%s%s" % (controller_ip, url)
try:
body = _http_get_raw(url, timeout_s=timeout_s)
return body.decode("utf-8")
except Exception:
# Fallback for mDNS/unresolvable host: retry against current controller IP.
if not controller_ip or not isinstance(url, str) or not url.startswith("http://"):
raise
_host, _port, path = _parse_http_url(url)
fallback = "http://%s:%d%s" % (controller_ip, _port, path)
body = _http_get_raw(fallback, timeout_s=timeout_s)
return body.decode("utf-8")
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def apply_patterns_ota(data, presets, controller_ip=None):
manifest_payload = data.get("manifest")
if not manifest_payload:
return
try:
if isinstance(manifest_payload, dict):
manifest = manifest_payload
elif isinstance(manifest_payload, str):
manifest = _http_get_json(manifest_payload, timeout_s=20.0)
else:
print("patterns_ota: invalid manifest payload type")
return
files = manifest.get("files", [])
if not isinstance(files, list) or not files:
print("patterns_ota: no files in manifest")
return
try:
os.mkdir("patterns")
except OSError:
pass
updated = 0
for item in files:
if not isinstance(item, dict):
continue
name = item.get("name")
url = item.get("url")
inline_code = item.get("code")
if not _safe_pattern_filename(name):
continue
if isinstance(inline_code, str):
code = inline_code
elif isinstance(url, str):
code = _http_get_text(url, timeout_s=20.0, controller_ip=controller_ip)
else:
continue
with open("patterns/" + name, "w") as f:
f.write(code)
updated += 1
if updated > 0:
presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e:
print("patterns_ota failed:", e)

View File

@@ -1,4 +1,8 @@
"""LED hello payload and UDP broadcast discovery (controller IP via echo on port 8766). """LED hello JSON line and UDP broadcast on port 8766.
Used so led-controller can register the device (name, MAC, IP) when ``wait_reply`` is
false; the controller may then connect to the device's WebSocket. With
``wait_reply`` true, blocks for an echo and returns the controller IP (legacy discovery).
Wi-Fi must already be connected; this module does not use Settings or call connect(). Wi-Fi must already be connected; this module does not use Settings or call connect().
""" """
@@ -40,7 +44,13 @@ def ipv4_broadcast(ip, netmask):
im = [int(x) for x in netmask.split(".")] im = [int(x) for x in netmask.split(".")]
if len(ia) != 4 or len(im) != 4: if len(ia) != 4 or len(im) != 4:
return None return None
return ".".join(str(ia[i] | (255 - im[i])) for i in range(4)) # STA often reports 255.255.255.255; "broadcast" would equal the host IP — useless for LAN.
if netmask == "255.255.255.255":
return None
bcast = ".".join(str(ia[i] | (255 - im[i])) for i in range(4))
if bcast == ip:
return None
return bcast
def udp_discovery_targets(ip, mask): def udp_discovery_targets(ip, mask):
@@ -52,6 +62,14 @@ def udp_discovery_targets(ip, mask):
return out return out
def _udp_discovery_targets_single(ip, mask):
"""One destination: subnet broadcast if known, else limited broadcast."""
b = ipv4_broadcast(ip, mask)
if b:
return [(b, DISCOVERY_UDP_PORT)]
return [("255.255.255.255", DISCOVERY_UDP_PORT)]
def broadcast_hello_udp( def broadcast_hello_udp(
sta, sta,
device_name="", device_name="",
@@ -59,11 +77,17 @@ def broadcast_hello_udp(
wait_reply=True, wait_reply=True,
recv_timeout_s=DEFAULT_RECV_TIMEOUT_S, recv_timeout_s=DEFAULT_RECV_TIMEOUT_S,
wdt=None, wdt=None,
dual_destinations=True,
): ):
""" """
Send pack_hello_line via directed then 255.255.255.255 on DISCOVERY_UDP_PORT. Send pack_hello_line on DISCOVERY_UDP_PORT.
STA must already be connected with a valid IPv4 (caller brings up Wi-Fi). STA must already be connected with a valid IPv4 (caller brings up Wi-Fi).
If dual_destinations (default), send subnet broadcast then 255.255.255.255 so
discovery works on awkward APs — the controller may receive two packets.
If dual_destinations is False, send only one (subnet broadcast or limited),
e.g. after TCP connect so the Pi does not run duplicate resync handlers.
If wait_reply, wait for first UDP echo. Returns controller IP string or None. If wait_reply, wait for first UDP echo. Returns controller IP string or None.
""" """
ip, mask, _gw, _dns = sta.ifconfig() ip, mask, _gw, _dns = sta.ifconfig()
@@ -89,7 +113,12 @@ def broadcast_hello_udp(
pass pass
discovered = None discovered = None
for dest_ip, dest_port in udp_discovery_targets(ip, mask): targets = (
udp_discovery_targets(ip, mask)
if dual_destinations
else _udp_discovery_targets_single(ip, mask)
)
for dest_ip, dest_port in targets:
if wdt is not None: if wdt is not None:
wdt.feed() wdt.feed()
label = "%s:%s" % (dest_ip, dest_port) label = "%s:%s" % (dest_ip, dest_port)

View File

@@ -1,68 +0,0 @@
"""Minimal HTTP/1.1 POST JSON client for driver long-poll (MicroPython)."""
import json
import socket
def _send_all(sock, data):
n = 0
while n < len(data):
m = sock.send(data[n:])
if m <= 0:
raise OSError("socket send failed")
n += m
def _read_http_json_body(sock, max_headers=8192):
buf = b""
while b"\r\n\r\n" not in buf:
chunk = sock.recv(256)
if not chunk:
break
buf += chunk
if len(buf) > max_headers:
raise OSError("response headers too large")
if b"\r\n\r\n" not in buf:
raise OSError("incomplete response headers")
head, rest = buf.split(b"\r\n\r\n", 1)
cl = None
for line in head.split(b"\r\n"):
if line.lower().startswith(b"content-length:"):
try:
cl = int(line.split(b":", 1)[1].strip())
except (ValueError, IndexError):
cl = None
if cl is None:
body = rest
else:
body = rest
while len(body) < cl:
chunk = sock.recv(min(2048, cl - len(body)))
if not chunk:
break
body += chunk
return json.loads(body.decode("utf-8"))
def http_driver_poll(host, port, payload_dict, timeout_s=40.0):
"""
POST ``/driver/v1/poll`` with JSON body; return parsed JSON (expects ``{"lines": [...]}``).
"""
path = "/driver/v1/poll"
body_bytes = json.dumps(payload_dict).encode("utf-8")
host_s = str(host)
req_head = (
"POST %s HTTP/1.1\r\nHost: %s\r\nContent-Type: application/json\r\nContent-Length: %d\r\nConnection: close\r\n\r\n"
% (path, host_s, len(body_bytes))
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host_s, int(port)))
_send_all(sock, req_head + body_bytes)
return _read_http_json_body(sock)
finally:
try:
sock.close()
except Exception:
pass

View File

@@ -1,23 +1,13 @@
from settings import Settings from settings import Settings
from machine import WDT from machine import WDT
import utime
import network import network
import utime
import asyncio
from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
from utils import convert_and_reorder_colors from controller_messages import process_data
import json from hello import broadcast_hello_udp
import time
import select
import socket
import ubinascii
from hello import discover_controller_udp
try:
import uos as os
except ImportError:
import os
BROADCAST_MAC = b"\xff\xff\xff\xff\xff\xff"
CONTROLLER_TCP_PORT = 8765
controller_ip = None
settings = Settings() settings = Settings()
print(settings) print(settings)
@@ -27,364 +17,74 @@ presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
default_preset = settings.get("default", "") default_preset = settings.get("default", "")
if default_preset and default_preset in presets.presets: if default_preset and default_preset in presets.presets:
presets.select(default_preset) if presets.select(default_preset):
print(f"Selected startup preset: {default_preset}") print(f"Selected startup preset: {default_preset}")
else:
print("Startup preset failed (invalid pattern?):", default_preset)
wdt = WDT(timeout=10000) wdt = WDT(timeout=10000)
wdt.feed() wdt.feed()
# --- Controller JSON (bytes or str): parse v1, then apply -------------------------
def process_data(payload):
"""Read one controller message; json.loads (bytes or str), then apply fields."""
try:
data = json.loads(payload)
print(payload)
if data.get("v", "") != "1":
return
except (ValueError, TypeError):
return
if "b" in data:
apply_brightness(data)
if "presets" in data:
apply_presets(data)
if "select" in data:
apply_select(data)
if "default" in data:
apply_default(data)
if "manifest" in data:
apply_patterns_ota(data)
if "save" in data and ("presets" in data or "default" in data):
presets.save()
def apply_brightness(data):
try:
presets.b = max(0, min(255, int(data["b"])))
settings["brightness"] = presets.b
except (TypeError, ValueError):
pass
def apply_presets(data):
presets_map = data["presets"]
for id, preset_data in presets_map.items():
if not preset_data:
continue
color_key = "c" if "c" in preset_data else ("colors" if "colors" in preset_data else None)
if color_key is not None:
try:
preset_data[color_key] = convert_and_reorder_colors(
preset_data[color_key], settings
)
except (TypeError, ValueError, KeyError):
continue
presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
def apply_select(data):
select_map = data["select"]
device_name = settings["name"]
select_list = select_map.get(device_name, [])
if not select_list:
return
preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None
presets.select(preset_name, step=step)
def apply_default(data):
targets = data.get("targets") or []
default_name = data["default"]
if (
settings["name"] in targets
and isinstance(default_name, str)
and default_name in presets.presets
):
settings["default"] = default_name
def _parse_http_url(url):
"""Parse http://host[:port]/path into (host, port, path)."""
if not isinstance(url, str):
raise ValueError("url must be a string")
if not url.startswith("http://"):
raise ValueError("only http:// URLs are supported")
remainder = url[7:]
slash_idx = remainder.find("/")
if slash_idx == -1:
host_port = remainder
path = "/"
else:
host_port = remainder[:slash_idx]
path = remainder[slash_idx:]
if ":" in host_port:
host, port_s = host_port.rsplit(":", 1)
port = int(port_s)
else:
host = host_port
port = 80
if not host:
raise ValueError("missing host")
return host, port, path
def _http_get_raw(url, timeout_s=10.0):
host, port, path = _parse_http_url(url)
req = (
"GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (path, host)
).encode("utf-8")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(timeout_s)
sock.connect((host, int(port)))
sock.send(req)
data = b""
while True:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
finally:
try:
sock.close()
except Exception:
pass
sep = b"\r\n\r\n"
if sep not in data:
raise OSError("invalid HTTP response")
head, body = data.split(sep, 1)
status_line = head.split(b"\r\n", 1)[0]
if b" 200 " not in status_line:
raise OSError("HTTP status not OK: %s" % status_line.decode("utf-8"))
return body
def _http_get_json(url, timeout_s=10.0):
body = _http_get_raw(url, timeout_s=timeout_s)
return json.loads(body.decode("utf-8"))
def _http_get_text(url, timeout_s=10.0):
global controller_ip
# Support relative URLs from controller messages.
if isinstance(url, str) and url.startswith("/"):
if not controller_ip:
raise OSError("controller IP unavailable for relative URL")
url = "http://%s%s" % (controller_ip, url)
try:
body = _http_get_raw(url, timeout_s=timeout_s)
return body.decode("utf-8")
except Exception:
# Fallback for mDNS/unresolvable host: retry against current controller IP.
if not controller_ip or not isinstance(url, str) or not url.startswith("http://"):
raise
_host, _port, path = _parse_http_url(url)
fallback = "http://%s:%d%s" % (controller_ip, _port, path)
body = _http_get_raw(fallback, timeout_s=timeout_s)
return body.decode("utf-8")
def _safe_pattern_filename(name):
if not isinstance(name, str):
return False
if not name.endswith(".py"):
return False
if "/" in name or "\\" in name or ".." in name:
return False
return True
def apply_patterns_ota(data):
manifest_payload = data.get("manifest")
if not manifest_payload:
return
try:
if isinstance(manifest_payload, dict):
manifest = manifest_payload
elif isinstance(manifest_payload, str):
manifest = _http_get_json(manifest_payload, timeout_s=20.0)
else:
print("patterns_ota: invalid manifest payload type")
return
files = manifest.get("files", [])
if not isinstance(files, list) or not files:
print("patterns_ota: no files in manifest")
return
try:
os.mkdir("patterns")
except OSError:
pass
updated = 0
for item in files:
if not isinstance(item, dict):
continue
name = item.get("name")
url = item.get("url")
inline_code = item.get("code")
if not _safe_pattern_filename(name):
continue
if isinstance(inline_code, str):
code = inline_code
elif isinstance(url, str):
code = _http_get_text(url, timeout_s=20.0)
else:
continue
with open("patterns/" + name, "w") as f:
f.write(code)
updated += 1
if updated > 0:
presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e:
print("patterns_ota failed:", e)
# --- TCP framing (bytes) → process_data -------------------------------------------
def tcp_append_and_drain_lines(buf, chunk):
"""Return (new_buf, list of non-empty stripped line byte strings)."""
buf += chunk
lines = []
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if line:
lines.append(line)
return buf, lines
# --- Network + hello --------------------------------------------------------------
sta_if = network.WLAN(network.STA_IF) sta_if = network.WLAN(network.STA_IF)
sta_if.active(True) sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE) sta_if.config(pm=network.WLAN.PM_NONE)
mac = sta_if.config("mac")
hello_payload = {
"v": "1",
"device_name": settings.get("name", ""),
"mac": ubinascii.hexlify(mac).decode().lower(),
"type": "led",
}
hello_bytes = json.dumps(hello_payload).encode("utf-8")
if settings["transport_type"] == "espnow":
from espnow import ESPNow # import only in this branch (avoids load when using Wi-Fi)
sta_if.disconnect()
sta_if.config(channel=settings.get("wifi_channel", 1))
e = ESPNow()
e.active(True)
e.add_peer(BROADCAST_MAC)
e.add_peer(mac)
e.send(BROADCAST_MAC, hello_bytes)
while True:
if e.any():
_peer, msg = e.recv()
if msg:
process_data(msg)
presets.tick()
wdt.feed()
elif settings["transport_type"] == "wifi":
sta_if.connect(settings["ssid"], settings["password"]) sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected(): while not sta_if.isconnected():
time.sleep(1) utime.sleep(1)
print(f"WiFi connected {sta_if.ifconfig()[0]}") wdt.feed()
controller_ip = discover_controller_udp(
device_name=settings.get("name", ""),
wdt=wdt,
)
if not controller_ip:
raise SystemExit("No controller IP discovered for Wi-Fi transport")
def pick_controller_ip(current): print(sta_if.ifconfig())
ip = discover_controller_udp(
device_name=settings.get("name", ""),
wdt=wdt,
)
if ip and ip != current:
print("Controller IP updated to", ip)
return ip if ip else current
reconnect_ms = 1000 app = Microdot()
next_connect_at = 0
client = None
poller = None
buf = b""
@app.route("/ws")
@with_websocket
async def ws_handler(request, ws):
print("WS client connected")
try:
while True: while True:
now = utime.ticks_ms() data = await ws.receive()
if not data:
if client is None and utime.ticks_diff(now, next_connect_at) >= 0: print("WS client disconnected (closed)")
c = None
try:
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect((controller_ip, CONTROLLER_TCP_PORT))
c.setblocking(False)
p = select.poll()
p.register(c, select.POLLIN)
client = c
poller = p
buf = b""
print("TCP connected")
except Exception:
if c is not None:
try:
c.close()
except Exception:
pass
controller_ip = pick_controller_ip(controller_ip)
next_connect_at = utime.ticks_add(now, reconnect_ms)
if client is not None and poller is not None:
try:
events = poller.poll(0)
except Exception:
events = []
reconnect_needed = False
for fd, event in events:
if (event & select.POLLHUP) or (event & select.POLLERR):
reconnect_needed = True
break
if event & select.POLLIN:
try:
chunk = client.recv(512)
except OSError:
reconnect_needed = True
break break
print(data)
process_data(data, settings, presets)
except WebSocketError as e:
print("WS client disconnected:", e)
except OSError as e:
print("WS client dropped (OSError):", e)
if not chunk:
reconnect_needed = True
break
buf, lines = tcp_append_and_drain_lines(buf, chunk)
for raw_line in lines:
process_data(raw_line)
if reconnect_needed:
print("TCP disconnected, reconnecting...")
try:
poller.unregister(client)
except Exception:
pass
try:
client.close()
except Exception:
pass
client = None
poller = None
buf = b""
controller_ip = pick_controller_ip(controller_ip)
next_connect_at = utime.ticks_add(now, reconnect_ms)
async def presets_loop():
while True:
presets.tick() presets.tick()
wdt.feed() wdt.feed()
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
print("UDP hello: broadcasting…")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80):
asyncio.create_task(presets_loop())
asyncio.create_task(_udp_hello_after_http_ready())
await app.start_server(host="0.0.0.0", port=port)
if __name__ == "__main__":
asyncio.run(main(port=80))

View File

@@ -40,7 +40,7 @@ class Presets:
return loaded return loaded
for filename in files: for filename in files:
if not filename.endswith(".py") or filename == "__init__.py": if not filename.endswith(".py") or filename in ("__init__.py", "main.py"):
continue continue
module_basename = filename[:-3] module_basename = filename[:-3]
module_name = "patterns." + module_basename module_name = "patterns." + module_basename

View File

@@ -12,19 +12,25 @@ class Settings(dict):
self.color_order = self.get_color_order(self["color_order"]) self.color_order = self.get_color_order(self["color_order"])
def set_defaults(self): def set_defaults(self):
self["led_pin"] = 10 self["led_pin"] = 10
self["num_leds"] = 119 self["num_leds"] = 119
self["color_order"] = "rgb" self["color_order"] = "rgb"
self["name"] = "a"
sta = network.WLAN(network.STA_IF)
sta.active(True)
#use led-mac for name
mac = sta.config("mac")
mac = ubinascii.hexlify(mac).decode().lower()
self["name"] = "led-" + mac
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "wifi" self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
# Wi-Fi + TCP to controller: set ssid and password. Use transport_type "espnow" # ESP-NOW transport (requires espnow firmware; uses wifi_channel).
# for ESP-NOW (requires espnow firmware).
self["ssid"] = "" self["ssid"] = ""
self["password"] = "" self["password"] = ""

View File

@@ -7,7 +7,7 @@ import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -23,7 +23,7 @@ class _TestContext:
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
self.wdt.feed() self.wdt.feed()
self.presets.tick() run_tick(self.presets)
utime.sleep_ms(sleep_ms) utime.sleep_ms(sleep_ms)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, duration_ms): def run_for(p, wdt, duration_ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, duration_ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)
@@ -52,7 +52,7 @@ def main():
p.select("rainbow_manual") p.select("rainbow_manual")
print("Calling tick() 5 times (should advance 5 steps)...") print("Calling tick() 5 times (should advance 5 steps)...")
for i in range(5): for i in range(5):
p.tick() run_tick(p)
utime.sleep_ms(100) # Small delay to see changes utime.sleep_ms(100) # Small delay to see changes
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}") print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
@@ -96,7 +96,7 @@ def main():
tick_count = 0 tick_count = 0
max_ticks = 200 # Safety limit max_ticks = 200 # Safety limit
while p.generator is not None and tick_count < max_ticks: while p.generator is not None and tick_count < max_ticks:
p.tick() run_tick(p)
tick_count += 1 tick_count += 1
utime.sleep_ms(10) utime.sleep_ms(10)
@@ -133,7 +133,7 @@ def main():
tick_count = 0 tick_count = 0
max_ticks = 200 max_ticks = 200
while p.generator is not None and tick_count < max_ticks: while p.generator is not None and tick_count < max_ticks:
p.tick() run_tick(p)
tick_count += 1 tick_count += 1
utime.sleep_ms(10) utime.sleep_ms(10)
@@ -162,7 +162,7 @@ def main():
print("Calling tick() 3 times in manual mode...") print("Calling tick() 3 times in manual mode...")
for i in range(3): for i in range(3):
p.tick() run_tick(p)
utime.sleep_ms(100) utime.sleep_ms(100)
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}") print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
@@ -178,7 +178,7 @@ def main():
print("\nCleaning up...") print("\nCleaning up...")
p.edit("cleanup_off", {"p": "off"}) p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off") p.select("cleanup_off")
p.tick() run_tick(p)
utime.sleep_ms(100) utime.sleep_ms(100)
print("\n" + "=" * 50) print("\n" + "=" * 50)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def main(): def main():
@@ -25,7 +25,7 @@ def main():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 1500: while utime.ticks_diff(utime.ticks_ms(), start) < 1500:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, ms): def run_for(p, wdt, ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms: while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)
@@ -123,7 +123,7 @@ def main():
print(" Advancing pattern with 10 beats (select + tick)...") print(" Advancing pattern with 10 beats (select + tick)...")
for i in range(10): for i in range(10):
p.select("chase_manual") # Simulate beat - restarts generator p.select("chase_manual") # Simulate beat - restarts generator
p.tick() # Advance one step run_tick(p) # Advance one step
utime.sleep_ms(500) # Pause to see the pattern utime.sleep_ms(500) # Pause to see the pattern
wdt.feed() wdt.feed()
print(f" Beat {i+1}: step={p.step}") print(f" Beat {i+1}: step={p.step}")
@@ -141,7 +141,7 @@ def main():
p.step = 0 p.step = 0
initial_step = p.step initial_step = p.step
p.select("chase_manual2") p.select("chase_manual2")
p.tick() run_tick(p)
final_step = p.step final_step = p.step
print(f" Step updated from {initial_step} to {final_step} (expected: 1)") print(f" Step updated from {initial_step} to {final_step} (expected: 1)")
if final_step == 1: if final_step == 1:

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, ms): def run_for(p, wdt, ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms: while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def main(): def main():
@@ -20,7 +20,7 @@ def main():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 200: while utime.ticks_diff(utime.ticks_ms(), start) < 200:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def main(): def main():
@@ -29,7 +29,7 @@ def main():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 800: while utime.ticks_diff(utime.ticks_ms(), start) < 800:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)
# OFF phase # OFF phase
@@ -37,7 +37,7 @@ def main():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 100: while utime.ticks_diff(utime.ticks_ms(), start) < 100:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, ms): def run_for(p, wdt, ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms: while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, ms): def run_for(p, wdt, ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms: while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)
@@ -81,7 +81,7 @@ def main():
for i in range(10): for i in range(10):
p.select("rainbow5") p.select("rainbow5")
# One tick advances the generator one frame when auto=False # One tick advances the generator one frame when auto=False
p.tick() run_tick(p)
utime.sleep_ms(100) utime.sleep_ms(100)
wdt.feed() wdt.feed()
@@ -94,7 +94,7 @@ def main():
}) })
initial_step = p.step initial_step = p.step
p.select("rainbow6") p.select("rainbow6")
p.tick() run_tick(p)
final_step = p.step final_step = p.step
print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)") print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)")
@@ -130,7 +130,7 @@ def main():
p.step = 0 p.step = 0
initial_step = p.step initial_step = p.step
p.select("rainbow9") p.select("rainbow9")
p.tick() run_tick(p)
final_step = p.step final_step = p.step
expected_step = (initial_step + 5) % 256 expected_step = (initial_step + 5) % 256
print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})") print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})")

View File

@@ -2,7 +2,7 @@
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
def run_for(p, wdt, ms): def run_for(p, wdt, ms):
@@ -10,7 +10,7 @@ def run_for(p, wdt, ms):
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms: while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed() wdt.feed()
p.tick() run_tick(p)
utime.sleep_ms(10) utime.sleep_ms(10)

View File

@@ -4,7 +4,7 @@ import json
import os import os
import utime import utime
from settings import Settings from settings import Settings
from presets import Presets from presets import Presets, run_tick
from utils import convert_and_reorder_colors from utils import convert_and_reorder_colors
@@ -54,7 +54,7 @@ def run_main_loop_iterations(espnow, patterns, settings, wdt, max_iterations=10)
while iterations < max_iterations: while iterations < max_iterations:
wdt.feed() wdt.feed()
patterns.tick() run_tick(patterns)
if espnow.any(): if espnow.any():
host, msg = espnow.recv() host, msg = espnow.recv()
@@ -363,7 +363,7 @@ def test_switch_presets():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000: while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed() wdt.feed()
patterns.tick() run_tick(patterns)
utime.sleep_ms(10) utime.sleep_ms(10)
# Switch to second preset and run for 2 seconds # Switch to second preset and run for 2 seconds
@@ -381,7 +381,7 @@ def test_switch_presets():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000: while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed() wdt.feed()
patterns.tick() run_tick(patterns)
utime.sleep_ms(10) utime.sleep_ms(10)
# Switch to third preset and run for 2 seconds # Switch to third preset and run for 2 seconds
@@ -399,7 +399,7 @@ def test_switch_presets():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000: while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed() wdt.feed()
patterns.tick() run_tick(patterns)
utime.sleep_ms(10) utime.sleep_ms(10)
# Switch back to first preset and run for 2 seconds # Switch back to first preset and run for 2 seconds
@@ -417,7 +417,7 @@ def test_switch_presets():
start = utime.ticks_ms() start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < 2000: while utime.ticks_diff(utime.ticks_ms(), start) < 2000:
wdt.feed() wdt.feed()
patterns.tick() run_tick(patterns)
utime.sleep_ms(10) utime.sleep_ms(10)
print(" ✓ Preset switching works correctly") print(" ✓ Preset switching works correctly")
@@ -577,7 +577,7 @@ def test_select_with_step():
mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2) mock_espnow.send_message(b"\xbb\xbb\xbb\xbb\xbb\xbb", msg2)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step # Ensure tick() is called after select() to advance the step
patterns.tick() run_tick(patterns)
assert patterns.selected == "step_preset", "Should select step_preset" assert patterns.selected == "step_preset", "Should select step_preset"
# Step is set to 10, then tick() advances it, so it should be 11 # Step is set to 10, then tick() advances it, so it should be 11
@@ -596,7 +596,7 @@ def test_select_with_step():
initial_step = patterns.step # Should be 11 initial_step = patterns.step # Should be 11
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step # Ensure tick() is called after select() to advance the step
patterns.tick() run_tick(patterns)
# Since it's the same preset, step should not be reset, but tick() will advance it # Since it's the same preset, step should not be reset, but tick() will advance it
# So step should be initial_step + 1 (one tick call) # So step should be initial_step + 1 (one tick call)
assert patterns.step == initial_step + 1, f"Step should advance from {initial_step} to {initial_step + 1} (not reset), got {patterns.step}" assert patterns.step == initial_step + 1, f"Step should advance from {initial_step} to {initial_step + 1} (not reset), got {patterns.step}"
@@ -614,7 +614,7 @@ def test_select_with_step():
mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg4) mock_espnow.send_message(b"\xdd\xdd\xdd\xdd\xdd\xdd", msg4)
run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2) run_main_loop_iterations(mock_espnow, patterns, settings, wdt, max_iterations=2)
# Ensure tick() is called after select() to advance the step # Ensure tick() is called after select() to advance the step
patterns.tick() run_tick(patterns)
assert patterns.selected == "other_preset", "Should select other_preset" assert patterns.selected == "other_preset", "Should select other_preset"
# Step is set to 5, then tick() advances it, so it should be 6 # Step is set to 5, then tick() advances it, so it should be 6

View File

@@ -15,7 +15,7 @@ Deploy src to the device (including utils.py with mdns_hostname), then from the
mpremote connect PORT run tests/test_mdns.py mpremote connect PORT run tests/test_mdns.py
If ImportError: copy utils.py from src/ to the device, or rely on the built-in fallback below. Copy ``utils.py`` from ``src/`` onto the device if imports fail.
Or with cwd led-driver: Or with cwd led-driver:
@@ -30,26 +30,7 @@ import utime
from machine import WDT from machine import WDT
from settings import Settings from settings import Settings
try:
from utils import mdns_hostname from utils import mdns_hostname
except ImportError:
def mdns_hostname(settings):
"""Same as utils.mdns_hostname (fallback if device utils.py is older than host repo)."""
raw = settings.get("name") or "led"
suffix = []
for c in str(raw).lower():
o = ord(c)
if (48 <= o <= 57) or (97 <= o <= 122):
suffix.append(c)
s = "".join(suffix)
if not s:
s = "device"
h = "led" + s
if len(h) > 32:
h = h[:32]
return h
CONNECT_TIMEOUT_S = 45 CONNECT_TIMEOUT_S = 45
# ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work # ESP32 MicroPython WDT timeout is capped (typically 10000 ms). Longer blocking work
@@ -213,16 +194,6 @@ def main():
"Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)." "Set SELF_LOCAL_GETADDRINFO = True to attempt (may hang)."
) )
# Optional: built-in mdns module (not present on all ESP32 builds)
_dbg(t0, "checking for optional 'mdns' module")
try:
import mdns # noqa: F401
print("Note: 'mdns' module is present; check your port's docs for Server/API.")
except ImportError:
print("No top-level 'mdns' module; relying on stack mDNS from hostname.")
_dbg(t0, "mdns import check done")
if HOLD_S != 0: if HOLD_S != 0:
forever = HOLD_S < 0 forever = HOLD_S < 0
_dbg( _dbg(

102
tests/test_wifi.py Normal file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""Wi-Fi connection smoke test for MicroPython on ESP32.
Runs on-device via mpremote and uses /settings.json credentials.
Usage:
mpremote connect /dev/ttyACM0 run tests/test_wifi.py
"""
import time
import utime
import network
from machine import WDT
from settings import Settings
CONNECT_TIMEOUT_S = 30
RETRY_DELAY_S = 2
WDT_TIMEOUT_MS = 10000
def _wifi_status_label(code):
names = {
getattr(network, "STAT_IDLE", 0): "idle",
getattr(network, "STAT_CONNECTING", 1): "connecting",
getattr(network, "STAT_WRONG_PASSWORD", -3): "wrong_password",
getattr(network, "STAT_NO_AP_FOUND", -2): "no_ap_found",
getattr(network, "STAT_CONNECT_FAIL", -1): "connect_fail",
getattr(network, "STAT_GOT_IP", 3): "got_ip",
}
return names.get(code, str(code))
def connect_wifi_with_wdt(sta, ssid, password, wdt):
attempt = 0
while not sta.isconnected():
attempt += 1
print("[wifi-test] attempt", attempt, "ssid=", repr(ssid))
try:
sta.disconnect()
except Exception:
pass
sta.connect(ssid, password)
start = utime.time()
last_status = None
while not sta.isconnected():
status = sta.status()
if status != last_status:
print("[wifi-test] status:", status, _wifi_status_label(status))
last_status = status
if status in (
getattr(network, "STAT_WRONG_PASSWORD", -3),
getattr(network, "STAT_NO_AP_FOUND", -2),
getattr(network, "STAT_CONNECT_FAIL", -1),
):
break
if utime.time() - start >= CONNECT_TIMEOUT_S:
print("[wifi-test] timeout after", CONNECT_TIMEOUT_S, "seconds")
break
time.sleep(1)
wdt.feed()
if sta.isconnected():
return True
print("[wifi-test] retry in", RETRY_DELAY_S, "seconds")
for _ in range(RETRY_DELAY_S):
time.sleep(1)
wdt.feed()
return True
def main():
settings = Settings()
ssid = settings.get("ssid") or ""
password = settings.get("password") or ""
if not ssid:
print("[wifi-test] skipped: settings.ssid is empty")
raise SystemExit(0)
wdt = WDT(timeout=WDT_TIMEOUT_MS)
wdt.feed()
sta = network.WLAN(network.STA_IF)
sta.active(True)
try:
sta.config(pm=network.WLAN.PM_NONE)
except (AttributeError, ValueError, TypeError):
pass
ok = connect_wifi_with_wdt(sta, ssid, password, wdt)
if not ok or not sta.isconnected():
print("[wifi-test] FAILED: not connected")
raise SystemExit(1)
print("[wifi-test] OK:", sta.ifconfig())
if __name__ == "__main__":
main()