6 Commits

Author SHA1 Message Date
55a97ac51c feat(patterns): merge pattern styles and add mode support
Consolidate legacy pattern ids into meteor, particles, sparkle, chase,
and colour_cycle with n6/mode style selection; add pattern_modes helper,
self-contained tests/all.py, and preset mode alias on wire.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 21:14:54 +12:00
794f1a2841 feat(patterns): add northern wave, candle glow, starfall, ice sparkle
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 15:11:32 +12:00
8f8bc894a9 feat(patterns): add icicles blizzard and rime winter effects
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 15:09:59 +12:00
2a768376d0 chore(release): beta-1.03
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-10 16:13:59 +12:00
170a0e05ab feat(patterns): align manual and auto behaviour
Unify manual/auto timing semantics for key patterns, add preset background support, and improve runtime observability while keeping the driver responsive under beat-triggered selects.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-09 20:07:58 +12:00
4879fcfe90 fix(patterns): use preset background fallback across animations
Align pattern background rendering to use preset.background_or(...) and update pulse/radiate single-step behaviour to preserve visible frames and step progression.
2026-05-09 14:28:05 +12:00
52 changed files with 1732 additions and 1242 deletions

26
dev.py
View File

@@ -67,27 +67,9 @@ for cmd in sys.argv[1:]:
print("Error: Port required for 'db' command") print("Error: Port required for 'db' command")
case "test": case "test":
if port: if port:
if "all" in sys.argv[1:]: # Single self-contained suite (tests/all.py); requires ``src`` on device first.
test_files = sorted( subprocess.call(
str(path) [*mpremote_base(), "connect", port, "run", "tests/all.py"]
for path in Path("test").rglob("*.py") )
if path.is_file()
)
failed = []
for test_file in test_files:
print(f"Running {test_file}")
code = subprocess.call(
[*mpremote_base(), "connect", port, "run", test_file]
)
if code != 0:
failed.append((test_file, code))
if failed:
print("Some tests failed:")
for test_file, code in failed:
print(f" {test_file} (exit {code})")
else:
subprocess.call(
[*mpremote_base(), "connect", port, "run", "test/all.py"]
)
else: else:
print("Error: Port required for 'test' command") print("Error: Port required for 'test' command")

118
docs/patterns.md Normal file
View File

@@ -0,0 +1,118 @@
# Patterns and presets on the LED driver
This document describes **how patterns are wired**, how **presets** map to patterns, and what each **shipped pattern** expects. For the JSON wire format (`v`: `"1"`, `presets`, `select`, short keys `p` / `c` / `b`, etc.), see [API.md](API.md).
## End-to-end control
1. The controller sends a **v1 JSON** object (ESP-NOW, serial bridge, or one line per message over TCP WebSocket in Wi-Fi mode).
2. `controller_messages.process_data()` parses it and applies fields in a fixed order (see `src/controller_messages.py`):
- `device_config` — name, LED count, colour order, startup mode; may reload `presets.json` and re-select the previous preset.
- `b`**global** output brightness (0255), stored in settings and in `presets.b`.
- `presets` — merge definitions into the in-memory preset table (`Presets.edit()` per id).
- `clear_presets` — optional wipe of all presets.
- `select` — pick the active preset (and optional step) for **this** device (matched by `settings["name"]`).
- `default` — update saved default preset when `targets` includes this device.
- `manifest` — pattern OTA: fetch pattern `.py` files and `reload_patterns()`.
- `save` — persist presets and/or settings when combined with the relevant fields.
3. The main loop calls `presets.tick()` so the active pattern **generator** advances one frame per iteration.
## Presets
- **Class:** `src/preset.py``Preset` holds the pattern configuration.
- **Short keys** (what the driver uses internally after `apply_presets` normalisation):
| Key | Meaning | Default |
|-----|---------|--------|
| `p` | Pattern id (string), must match a registered pattern | `"off"` |
| `c` | Colours as RGB tuples (after colour-order conversion) | `[(255,255,255)]` |
| `d` | Delay (ms); meaning is pattern-specific | `100` |
| `b` | Preset brightness 0255 (combined with global `presets.b`) | `127` |
| `a` | Auto: continuous animation; `false` = manual / beat-stepped where supported | `True` |
| `bg` | Background colour (hex string or RGB tuple on device) | `(0,0,0)` |
| `n1``n6` | Pattern-specific integers | `0` |
Long aliases from the controller (`pattern`, `colors`, `delay`, `brightness`, `auto`, `background`) are converted in `Preset.edit()`.
- **Persistence:** `presets.json` on flash; **`MAX_PRESETS` = 32** (exceptions for auto-created `"on"` / `"off"`).
- **Activation:** `Presets.select(preset_name, step=None)` loads the preset, looks up **`preset.p`** in the pattern registry, and sets `generator = patterns[preset.p](preset)`, then runs one `tick()` so the first frame appears.
## Brightness
- **Global:** `presets.b` from message `{"v":"1","b":…}` scales every output channel.
- **Per preset:** `preset.b`; combined in `Presets.apply_brightness(colour, preset.b)` as
`effective = round(preset_channel * presets.b / 255)` with preset level applied first conceptually (`apply_brightness` takes the presets `b` as the override for that colour).
## Pattern registry
Built in `Presets.reload_patterns()` (`src/presets.py`):
1. **Built-ins:** `"off"` and `"on"` — methods on the `Presets` instance (not separate files).
2. **Dynamic modules:** Every `patterns/*.py` on flash (except `__init__.py`), imported as `patterns.<basename>`. The loader takes the **first class** in the module that defines **`run`**, instantiates it with `Presets(self)` (the driver / NeoPixel wrapper), and registers:
```text
patterns[basename] = PatternClass(driver).run
```
So the **`p` field must equal the file basename without `.py`** (e.g. file `radiate.py` ⇒ pattern `"radiate"`).
### Adding or updating patterns on device
- **OTA:** v1 message with `"manifest"` (URL or inline JSON listing `files` with `name`, `url` or `code`) — see `apply_patterns_ota()` in `controller_messages.py`.
- **HTTP:** `POST /patterns/upload` on the device (`src/main.py`) with a safe `.py` filename; optional reload of the registry.
After new files land in `patterns/`, call `presets.reload_patterns()` (done automatically by OTA and upload when configured).
## Auto vs manual (`a`)
- **`a: true` (auto):** The main loop keeps calling `tick()`; the generator runs continuously (subject to internal `yield` timing / `utime`).
- **`a: false` (manual):** Intended for patterns that advance **once per explicit `select`** (or per beat routing from the controller). The driver does **not** call `select()` again when editing a manual preset-only push — manual steps are driven by incoming `select` messages.
Special case in `Presets.select()`: for **manual chase**, if the same preset is re-selected mid-generator, pending frames may be flushed so step indices stay aligned with beats.
## Built-in patterns
### `off`
- **Registration:** built-in method `Presets.off`.
- **Behaviour:** fills the strip with black (after generator setup, `tick` completes immediately).
- **Parameters:** ignores preset colours for the strip; optional `preset` argument unused for pixels.
### `on`
- **Registration:** built-in method `Presets.on`.
- **Behaviour:** solid fill with `preset.c[0]` (or white if no colours), via `apply_brightness(..., preset.b)`.
- **Parameters:** `c`, `b`; `d` / `n*` not used.
## Dynamic pattern: `radiate`
- **File:** `src/patterns/radiate.py`
- **Class:** `Radiate` — `run(self, preset)` is a **generator** (must `yield` each frame).
- **Pattern key:** `p` = `"radiate"`
Concept: repeating **nodes** along the strip every **`n1`** LEDs; from each node a lit region expands outward then contracts (timed by **`n2`** / **`n3`**). In **auto**, a new pulse train starts every **`d`** ms and the active colour index advances. In **manual**, a **single** out-and-back cycle runs, then the generator ends (next colour on the next `select`).
| Field | Role |
|-------|------|
| `n1` | Node spacing in LEDs (`>= 1`; half-spacing used for symmetry) |
| `n2` | Outbound travel time (ms), `>= 1` |
| `n3` | Return travel time (ms), `>= 1` |
| `d` | Auto only: interval (ms) between re-triggers; `>= 1` |
| `c` | Colour list; cycles per retrigger / per manual cycle |
| `bg` | Off state / gap colour (via `preset.background_or`) |
| `b` | Preset brightness |
| `a` | `true` = repeating pulses on a timer; `false` = one shot per select |
Debug: if `presets.debug` is true (from settings), periodic logs print timing and lit LED counts.
## Other pattern names (`blink`, `rainbow`, `pulse`, …)
Those pattern **ids** are valid on the **wire** and in **led-controller** `db/pattern.json`, but they are **not** all present in this repositorys `src/patterns/` tree. On a real device they normally appear as **additional** `patterns/*.py` files delivered by OTA or upload. For the intended **`n1``n6`** semantics on the wire, use [API.md](API.md) **Pattern-Specific Parameters**; the implementation must match that contract in each modules `run(preset)` generator.
## Quick reference: files
| File | Role |
|------|------|
| `src/preset.py` | Preset field model and aliases |
| `src/presets.py` | Registry, `select`, `tick`, `off` / `on`, dynamic load |
| `src/controller_messages.py` | Parse v1 JSON, apply presets/select/brightness/OTA |
| `src/patterns/*.py` | One pattern module per dynamic id (basename = `p`) |

View File

@@ -1,8 +1,11 @@
import asyncio import asyncio
import gc
import utime import utime
from hello import broadcast_hello_udp from hello import broadcast_hello_udp
from mem_stats import print_mem
from wifi_sta import try_reconnect
_UDP_HELLO_ATTEMPT = 0
async def presets_loop(presets, wdt): async def presets_loop(presets, wdt):
@@ -13,20 +16,33 @@ async def presets_loop(presets, wdt):
if bool(getattr(presets, "debug", False)): if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000: if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect() print_mem("runtime")
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run. # tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0) await asyncio.sleep(0)
async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state): async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
"""Broadcast hello at startup-fast cadence, then slower cadence.""" """UDP hello on cadence; if STA drops, one reconnect campaign per iteration."""
global _UDP_HELLO_ATTEMPT
await asyncio.sleep(1) await asyncio.sleep(1)
started_ms = utime.ticks_ms() started_ms = utime.ticks_ms()
while True: while True:
if runtime_state.hello: try:
print("UDP hello: broadcasting...") wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if not wifi_ok:
ssid = settings.get("ssid") or ""
if ssid:
try_reconnect(sta_if, ssid, settings.get("password") or "", wdt)
try:
wifi_ok = sta_if.isconnected()
except Exception:
wifi_ok = False
if wifi_ok and runtime_state.hello:
_UDP_HELLO_ATTEMPT += 1
print("UDP hello broadcast attempt", _UDP_HELLO_ATTEMPT)
try: try:
broadcast_hello_udp( broadcast_hello_udp(
sta_if, sta_if,
@@ -38,5 +54,5 @@ async def udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state):
except Exception as ex: except Exception as ex:
print("UDP hello broadcast failed:", ex) print("UDP hello broadcast failed:", ex)
elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms) elapsed_ms = utime.ticks_diff(utime.ticks_ms(), started_ms)
interval_s = 5 if elapsed_ms < 60000 else 60 interval_s = 10 if elapsed_ms < 120000 else 30
await asyncio.sleep(interval_s) await asyncio.sleep(interval_s)

View File

@@ -12,8 +12,37 @@ except ImportError:
import os import os
def _log_rx(payload) -> None:
"""Serial log when led-controller sends a message into ``process_data``."""
try:
if isinstance(payload, (bytes, bytearray)):
n = len(payload)
if n == 0:
print("rx 0 B")
return
cap = 160
chunk = payload if n <= cap else payload[:cap]
try:
txt = bytes(chunk).decode("utf-8")
except Exception:
txt = str(chunk)
if n > cap:
txt = txt + "..."
print("rx", n, "B", txt)
else:
s = str(payload)
cap = 200
if len(s) <= cap:
print("rx", len(s), "C", s)
else:
print("rx", len(s), "C", s[:cap] + "...")
except Exception:
print("rx (logging failed)")
def process_data(payload, settings, presets, controller_ip=None): def process_data(payload, settings, presets, controller_ip=None):
"""Read one controller message; binary v1 envelope or JSON v1, then apply fields.""" """Read one controller message; binary v1 envelope or JSON v1, then apply fields."""
_log_rx(payload)
data = None data = None
if isinstance(payload, (bytes, bytearray)): if isinstance(payload, (bytes, bytearray)):
data = parse_binary_envelope(payload) data = parse_binary_envelope(payload)
@@ -27,9 +56,10 @@ def process_data(payload, settings, presets, controller_ip=None):
data = json.loads(payload) data = json.loads(payload)
except (ValueError, TypeError): except (ValueError, TypeError):
return return
print(payload)
if data.get("v", "") != "1": if data.get("v", "") != "1":
return return
if "device_config" in data:
apply_device_config(data, settings, presets)
if "b" in data: if "b" in data:
apply_brightness(data, settings, presets) apply_brightness(data, settings, presets)
if "presets" in data: if "presets" in data:
@@ -48,6 +78,88 @@ def process_data(payload, settings, presets, controller_ip=None):
presets.save() presets.save()
if "save" in data and "b" in data: if "save" in data and "b" in data:
settings.save() settings.save()
if "save" in data and "device_config" in data:
settings.save()
_VALID_DEVICE_COLOR_ORDERS = frozenset({"rgb", "rbg", "grb", "gbr", "brg", "bgr"})
_STARTUP_MODES = frozenset({"default", "last", "off"})
_MAX_DEVICE_LEDS = 2048
def apply_startup_pattern(settings, presets):
"""Apply power-on behaviour from ``startup_mode`` (default / last / off)."""
mode = str(settings.get("startup_mode", "default")).lower().strip()
if mode not in _STARTUP_MODES:
mode = "default"
if mode == "off":
if presets.select("off"):
return
presets.fill((0, 0, 0))
return
if mode == "last":
lp = settings.get("last_preset") or ""
if isinstance(lp, str) and lp.strip() and lp.strip() in presets.presets:
if presets.select(lp.strip()):
return
dp = settings.get("default", "")
if dp and dp in presets.presets:
if not presets.select(dp):
print("Startup preset failed (invalid pattern?):", dp)
def apply_device_config(data, settings, presets):
"""Apply fields from v1 ``device_config``; reload presets when strip length or colour order changes."""
dc = data.get("device_config")
if not isinstance(dc, dict):
return
strip_changed = False
meta_changed = False
if "name" in dc:
n = dc["name"]
if isinstance(n, str) and n.strip():
settings["name"] = n.strip()
meta_changed = True
if "num_leds" in dc:
try:
n = int(dc["num_leds"])
if 1 <= n <= _MAX_DEVICE_LEDS:
settings["num_leds"] = n
presets.update_num_leds(settings["led_pin"], n)
strip_changed = True
except (TypeError, ValueError):
pass
if "color_order" in dc:
co = str(dc["color_order"]).lower().strip()
if co in _VALID_DEVICE_COLOR_ORDERS:
settings["color_order"] = co
settings.color_order = settings.get_color_order(co)
strip_changed = True
if "startup_mode" in dc:
sm = str(dc["startup_mode"]).lower().strip()
if sm in _STARTUP_MODES:
settings["startup_mode"] = sm
meta_changed = True
if not strip_changed and not meta_changed:
return
if strip_changed:
prev = presets.selected
try:
presets.load(settings)
except Exception as e:
print("device_config: presets.load failed:", e)
if prev and prev in presets.presets:
presets.select(prev)
elif settings.get("default") and settings["default"] in presets.presets:
presets.select(settings["default"])
def record_last_preset(settings, preset_name):
"""Persist the last selected preset id (single entry in flash)."""
if not isinstance(preset_name, str) or not preset_name:
return
settings["last_preset"] = preset_name.strip()
settings.save()
def apply_brightness(data, settings, presets): def apply_brightness(data, settings, presets):
@@ -71,8 +183,14 @@ def apply_presets(data, settings, presets):
) )
except (TypeError, ValueError, KeyError): except (TypeError, ValueError, KeyError):
continue continue
if "bg" in preset_data:
try:
bg_color = convert_and_reorder_colors([preset_data["bg"]], settings)
if bg_color:
preset_data["bg"] = bg_color[0]
except (TypeError, ValueError, KeyError):
pass
presets.edit(id, preset_data) presets.edit(id, preset_data)
print(f"Edited preset {id}: {preset_data.get('name', '')}")
def apply_select(data, settings, presets): def apply_select(data, settings, presets):
@@ -83,7 +201,8 @@ def apply_select(data, settings, presets):
return return
preset_name = select_list[0] preset_name = select_list[0]
step = select_list[1] if len(select_list) > 1 else None step = select_list[1] if len(select_list) > 1 else None
presets.select(preset_name, step=step) if presets.select(preset_name, step=step):
record_last_preset(settings, preset_name)
def apply_clear_presets(data, presets): def apply_clear_presets(data, presets):
@@ -99,7 +218,6 @@ def apply_clear_presets(data, presets):
if not should_clear: if not should_clear:
return return
presets.delete_all() presets.delete_all()
print("Cleared all presets.")
def apply_default(data, settings, presets): def apply_default(data, settings, presets):
@@ -244,8 +362,5 @@ def apply_patterns_ota(data, presets, controller_ip=None):
updated += 1 updated += 1
if updated > 0: if updated > 0:
presets.reload_patterns() presets.reload_patterns()
print("patterns_ota: updated", updated, "pattern file(s)")
else:
print("patterns_ota: no valid files downloaded")
except Exception as e: except Exception as e:
print("patterns_ota failed:", e) print("patterns_ota failed:", e)

View File

@@ -92,7 +92,6 @@ def broadcast_hello_udp(
""" """
ip, mask, _gw, _dns = sta.ifconfig() ip, mask, _gw, _dns = sta.ifconfig()
msg = pack_hello_line(sta, device_name) msg = pack_hello_line(sta, device_name)
print("hello:", msg)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try: try:
@@ -121,11 +120,9 @@ def broadcast_hello_udp(
for dest_ip, dest_port in targets: for dest_ip, dest_port in targets:
if wdt is not None: if wdt is not None:
wdt.feed() wdt.feed()
label = "%s:%s" % (dest_ip, dest_port)
target = (dest_ip, dest_port) target = (dest_ip, dest_port)
try: try:
sock.sendto(msg, target) sock.sendto(msg, target)
print("sent hello ->", target)
except OSError as e: except OSError as e:
print("sendto failed:", e) print("sendto failed:", e)
continue continue
@@ -134,20 +131,12 @@ def broadcast_hello_udp(
if wdt is not None: if wdt is not None:
wdt.feed() wdt.feed()
try: try:
data, addr = sock.recvfrom(2048) _data, addr = sock.recvfrom(2048)
print("reply from", addr, ":", data)
remote_ip = addr[0] remote_ip = addr[0]
if data != msg:
print("(warning: reply payload differs from hello; still using source IP.)")
discovered = remote_ip discovered = remote_ip
print("Discovered controller at", remote_ip)
break break
except OSError as e: except OSError:
print("recv (no reply):", e, "via", label) pass
if dest_ip == "255.255.255.255":
print(
"(hint: many APs drop Wi-Fi client broadcast; try wired server or AP without client isolation.)"
)
sock.close() sock.close()
return discovered return discovered
@@ -171,18 +160,12 @@ def discover_controller_udp(device_name="", wdt=None):
print("hello: STA has no IP address.") print("hello: STA has no IP address.")
raise SystemExit(1) raise SystemExit(1)
print("STA IP:", ip, "mask:", mask)
discovered = broadcast_hello_udp( discovered = broadcast_hello_udp(
sta, sta,
device_name, device_name,
wait_reply=True, wait_reply=True,
wdt=wdt, wdt=wdt,
) )
if discovered:
print("discover done; controller =", repr(discovered))
else:
print("discover done; controller not found")
return discovered return discovered

View File

@@ -23,7 +23,6 @@ def register_routes(app, settings, presets, runtime_state):
@app.route("/ws") @app.route("/ws")
@with_websocket @with_websocket
async def ws_handler(request, ws): async def ws_handler(request, ws):
print("WS client connected")
runtime_state.ws_connected() runtime_state.ws_connected()
controller_ip = None controller_ip = None
try: try:
@@ -34,15 +33,11 @@ def register_routes(app, settings, presets, runtime_state):
controller_ip = client_addr controller_ip = client_addr
except Exception: except Exception:
controller_ip = None controller_ip = None
print("WS controller_ip:", controller_ip)
try: try:
while True: while True:
data = await ws.receive() data = await ws.receive()
if not data: if not data:
print("WS client disconnected (closed)")
break break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data)
process_data(data, settings, presets, controller_ip=controller_ip) process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e: except WebSocketError as e:
print("WS client disconnected:", e) print("WS client disconnected:", e)
@@ -50,12 +45,6 @@ def register_routes(app, settings, presets, runtime_state):
print("WS client dropped (OSError):", e) print("WS client dropped (OSError):", e)
finally: finally:
runtime_state.ws_disconnected() runtime_state.ws_disconnected()
print(
"WS client disconnected: hello=",
runtime_state.hello,
"ws_client_count=",
runtime_state.ws_client_count,
)
@app.post("/patterns/upload") @app.post("/patterns/upload")
async def upload_pattern(request): async def upload_pattern(request):
@@ -63,19 +52,15 @@ def register_routes(app, settings, presets, runtime_state):
raw_name = request.args.get("name") raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1") reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off") reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip(): if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, {"Content-Type": "application/json"} return json.dumps({"error": "name is required"}), 400, {"Content-Type": "application/json"}
body = request.body body = request.body
if not isinstance(body, (bytes, bytearray)) or not body: if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"} return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
print("patterns/upload body_bytes:", len(body))
try: try:
code = body.decode("utf-8") code = body.decode("utf-8")
except UnicodeError: except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, {"Content-Type": "application/json"} return json.dumps({"error": "body must be utf-8 text"}), 400, {"Content-Type": "application/json"}
if not code.strip(): if not code.strip():
return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"} return json.dumps({"error": "code is required"}), 400, {"Content-Type": "application/json"}
@@ -93,16 +78,13 @@ def register_routes(app, settings, presets, runtime_state):
path = "patterns/" + name path = "patterns/" + name
try: try:
print("patterns/upload writing:", path)
with open(path, "w") as f: with open(path, "w") as f:
f.write(code) f.write(code)
if reload_patterns: if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns() presets.reload_patterns()
except OSError as e: except OSError as e:
print("patterns/upload failed:", e) print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"} return json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps( return json.dumps(
{ {

View File

@@ -1,6 +1,6 @@
import print_timestamp # noqa: F401 — prefixes every print with [ticks_ms]
from settings import Settings from settings import Settings
import machine import machine
import network
import utime import utime
import asyncio import asyncio
import json import json
@@ -8,57 +8,50 @@ import gc
from microdot import Microdot from microdot import Microdot
from microdot.websocket import WebSocketError, with_websocket from microdot.websocket import WebSocketError, with_websocket
from presets import Presets from presets import Presets
from controller_messages import process_data from controller_messages import apply_startup_pattern, process_data
from hello import broadcast_hello_udp from runtime_state import RuntimeState
from background_tasks import presets_loop, udp_hello_loop_after_http_ready
from mem_stats import print_mem
from wifi_sta import boot_sta
try: try:
import uos as os import uos as os
except ImportError: except ImportError:
import os import os
wdt = machine.WDT(timeout=10000)
wdt.feed()
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings)
wdt = machine.WDT(timeout=10000)
wdt.feed()
gc.collect() gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()}) sta_if = boot_sta(settings, wdt)
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False)) presets.debug = bool(settings.get("debug", False))
gc.collect() gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "") apply_startup_pattern(settings, presets)
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
print(f"Selected startup preset: {default_preset}")
else:
print("Startup preset failed (invalid pattern?):", default_preset)
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up.
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
sta_if = network.WLAN(network.STA_IF)
if sta_if.active():
sta_if.active(False)
utime.sleep_ms(100)
gc.collect()
sta_if.active(True)
sta_if.config(pm=network.WLAN.PM_NONE)
sta_if.connect(settings["ssid"], settings["password"])
while not sta_if.isconnected():
print("Connecting")
utime.sleep(1)
wdt.feed()
print(sta_if.ifconfig()) def _print_network_ips(controller_ip=None):
"""Always log STA address and led-controller (WS client) address when known."""
try:
led_ip = sta_if.ifconfig()[0]
except Exception:
led_ip = "?"
ctrl = controller_ip if controller_ip else "(not connected)"
print("led-driver IP:", led_ip, " led-controller IP:", ctrl)
_print_network_ips()
print_mem("startup")
runtime_state = RuntimeState()
app = Microdot() app = Microdot()
@@ -76,7 +69,7 @@ def _safe_pattern_filename(name):
@app.route("/ws") @app.route("/ws")
@with_websocket @with_websocket
async def ws_handler(request, ws): async def ws_handler(request, ws):
print("WS client connected") runtime_state.ws_connected()
controller_ip = None controller_ip = None
try: try:
client_addr = getattr(request, "client_addr", None) client_addr = getattr(request, "client_addr", None)
@@ -86,20 +79,20 @@ async def ws_handler(request, ws):
controller_ip = client_addr controller_ip = client_addr
except Exception: except Exception:
controller_ip = None controller_ip = None
print("WS controller_ip:", controller_ip) _print_network_ips(controller_ip)
print_mem("ws connect")
try: try:
while True: while True:
data = await ws.receive() data = await ws.receive()
if not data: if not data:
print("WS client disconnected (closed)")
break break
print("WS recv bytes:", len(data) if isinstance(data, (bytes, bytearray)) else len(str(data)))
print(data)
process_data(data, settings, presets, controller_ip=controller_ip) process_data(data, settings, presets, controller_ip=controller_ip)
except WebSocketError as e: except WebSocketError as e:
print("WS client disconnected:", e) print("WS client disconnected:", e)
except OSError as e: except OSError as e:
print("WS client dropped (OSError):", e) print("WS client dropped (OSError):", e)
finally:
runtime_state.ws_disconnected()
@app.post("/patterns/upload") @app.post("/patterns/upload")
@@ -108,7 +101,6 @@ async def upload_pattern(request):
raw_name = request.args.get("name") raw_name = request.args.get("name")
reload_raw = request.args.get("reload", "1") reload_raw = request.args.get("reload", "1")
reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off") reload_patterns = str(reload_raw).strip().lower() not in ("0", "false", "no", "off")
print("patterns/upload request:", {"name": raw_name, "reload": reload_patterns})
if not isinstance(raw_name, str) or not raw_name.strip(): if not isinstance(raw_name, str) or not raw_name.strip():
return json.dumps({"error": "name is required"}), 400, { return json.dumps({"error": "name is required"}), 400, {
@@ -116,15 +108,12 @@ async def upload_pattern(request):
} }
body = request.body body = request.body
if not isinstance(body, (bytes, bytearray)) or not body: if not isinstance(body, (bytes, bytearray)) or not body:
print("patterns/upload rejected: empty body")
return json.dumps({"error": "code is required"}), 400, { return json.dumps({"error": "code is required"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
print("patterns/upload body_bytes:", len(body))
try: try:
code = body.decode("utf-8") code = body.decode("utf-8")
except UnicodeError: except UnicodeError:
print("patterns/upload rejected: body not utf-8")
return json.dumps({"error": "body must be utf-8 text"}), 400, { return json.dumps({"error": "body must be utf-8 text"}), 400, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
@@ -148,18 +137,15 @@ async def upload_pattern(request):
path = "patterns/" + name path = "patterns/" + name
try: try:
print("patterns/upload writing:", path)
with open(path, "w") as f: with open(path, "w") as f:
f.write(code) f.write(code)
if reload_patterns: if reload_patterns:
print("patterns/upload reloading patterns")
presets.reload_patterns() presets.reload_patterns()
except OSError as e: except OSError as e:
print("patterns/upload failed:", e) print("patterns/upload failed:", e)
return json.dumps({"error": str(e)}), 500, { return json.dumps({"error": str(e)}), 500, {
"Content-Type": "application/json" "Content-Type": "application/json"
} }
print("patterns/upload success:", {"name": name, "reloaded": reload_patterns})
return json.dumps({ return json.dumps({
"message": "pattern uploaded", "message": "pattern uploaded",
@@ -168,40 +154,11 @@ async def upload_pattern(request):
}), 201, {"Content-Type": "application/json"} }), 201, {"Content-Type": "application/json"}
async def presets_loop():
last_mem_log = utime.ticks_ms()
while True:
presets.tick()
wdt.feed()
if bool(getattr(presets, "debug", False)):
now = utime.ticks_ms()
if utime.ticks_diff(now, last_mem_log) >= 5000:
gc.collect()
print("mem runtime:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
last_mem_log = now
# tick() does not await; yield so UDP hello and HTTP/WebSocket can run.
await asyncio.sleep(0)
async def _udp_hello_after_http_ready():
"""Hello must run after the HTTP server binds, or discovery clients time out on /ws."""
await asyncio.sleep(1)
print("UDP hello: broadcasting…")
try:
broadcast_hello_udp(
sta_if,
settings.get("name", ""),
wait_reply=False,
wdt=wdt,
dual_destinations=True,
)
except Exception as ex:
print("UDP hello broadcast failed:", ex)
async def main(port=80): async def main(port=80):
asyncio.create_task(presets_loop()) asyncio.create_task(presets_loop(presets, wdt))
asyncio.create_task(_udp_hello_after_http_ready()) asyncio.create_task(
udp_hello_loop_after_http_ready(sta_if, settings, wdt, runtime_state)
)
await app.start_server(host="0.0.0.0", port=port) await app.start_server(host="0.0.0.0", port=port)

34
src/mem_stats.py Normal file
View File

@@ -0,0 +1,34 @@
"""GC / heap snapshot helpers for debug logging."""
import gc
def snapshot():
"""Return a dict of memory stats after ``gc.collect()``."""
gc.collect()
out = {
"free": gc.mem_free(),
"alloc": gc.mem_alloc(),
}
try:
import esp32
blocks = esp32.idf_heap_info(esp32.HEAP_DATA)
if blocks:
block = blocks[0]
if isinstance(block, dict):
if "total_free_bytes" in block:
out["idf_free"] = block["total_free_bytes"]
largest = block.get("largest_free_block")
if largest is None:
largest = block.get("largest_free_block_in_bytes")
if largest is not None:
out["idf_largest"] = largest
except Exception:
pass
return out
def print_mem(label):
"""Print one timestamped memory line (via ``print_timestamp`` when installed)."""
print("mem %s:" % label, snapshot())

View File

@@ -1,12 +1,16 @@
import math
import utime import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"northern_wave": 1}
class Aurora: class Aurora:
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
def run(self, preset): def _run_bands(self, preset, colors):
colors = preset.c if preset.c else [(40, 200, 140), (80, 120, 255), (160, 80, 220)]
bands = max(1, int(preset.n1) if int(preset.n1) > 0 else 3) bands = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
shimmer = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 40)) shimmer = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 40))
phase = self.driver.step % 256 phase = self.driver.step % 256
@@ -16,11 +20,17 @@ class Aurora:
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d: if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
idx = ((i * bands) // max(1, self.driver.num_leds) + (phase // 32)) % len(colors) idx = (
(i * bands) // max(1, self.driver.num_leds) + (phase // 32)
) % len(colors)
c = self.driver.apply_brightness(colors[idx], preset.b) c = self.driver.apply_brightness(colors[idx], preset.b)
w = (255 - abs(128 - ((i * 8 + phase) & 255)) * 2) w = 255 - abs(128 - ((i * 8 + phase) & 255)) * 2
w = max(0, min(255, w + shimmer)) w = max(0, min(255, w + shimmer))
self.driver.n[i] = ((c[0]*w)//255, (c[1]*w)//255, (c[2]*w)//255) self.driver.n[i] = (
(c[0] * w) // 255,
(c[1] * w) // 255,
(c[2] * w) // 255,
)
self.driver.n.write() self.driver.n.write()
phase = (phase + 1) & 255 phase = (phase + 1) & 255
self.driver.step = phase self.driver.step = phase
@@ -29,3 +39,57 @@ class Aurora:
yield yield
return return
yield yield
def _run_northern(self, preset, colors):
period = max(4, int(preset.n1) if int(preset.n1) > 0 else 20)
contrast = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 200))
drift = max(1, int(preset.n3) if int(preset.n3) > 0 else 2)
phase = 0
last = utime.ticks_ms()
ncols = len(colors)
if ncols < 2:
colors = list(colors) + [(120, 180, 255)]
ncols = len(colors)
twopi = 6.2831853
def lerp3(a, b, f):
return (
a[0] + ((b[0] - a[0]) * f) // 255,
a[1] + ((b[1] - a[1]) * f) // 255,
a[2] + ((b[2] - a[2]) * f) // 255,
)
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
t = (i * twopi / period) + (phase * twopi / 256.0)
w = (math.sin(t) + 1.0) * 0.5
u = w * (ncols - 1) * 256.0
fi = int(u) >> 8
frac = int(u) & 255
if fi >= ncols - 1:
fi = ncols - 2
frac = 255
peak = lerp3(colors[fi], colors[fi + 1], frac)
peak = self.driver.apply_brightness(peak, preset.b)
mixf = min(255, int(w * contrast * 2) >> 1)
self.driver.n[i] = lerp3(bg, peak, mixf)
self.driver.n.write()
phase = (phase + drift) % 256
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield
def run(self, preset):
"""Aurora bands (n6=0) or sine northern wave (n6=1, legacy northern_wave)."""
colors = preset.c if preset.c else [(40, 200, 140), (80, 120, 255), (160, 80, 220)]
if style_mode(preset, 0, _LEGACY) == 1:
colors = preset.c if preset.c else [(20, 55, 120), (60, 140, 220), (180, 220, 255)]
yield from self._run_northern(preset, colors)
return
yield from self._run_bands(preset, colors)

View File

@@ -16,7 +16,7 @@ class BarGraph:
target = (self.driver.num_leds * level) // 100 target = (self.driver.num_leds * level) // 100
lit = self.driver.apply_brightness(colors[0], preset.b) lit = self.driver.apply_brightness(colors[0], preset.b)
unlit = self.driver.apply_brightness( unlit = self.driver.apply_brightness(
colors[-1], preset.background_or(colors),
preset.b, preset.b,
) )
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):

View File

@@ -9,6 +9,7 @@ class Blink:
"""Blink pattern: toggles LEDs on/off using preset delay, cycling through colors.""" """Blink pattern: toggles LEDs on/off using preset delay, cycling through colors."""
# Use provided colors, or default to white if none # Use provided colors, or default to white if none
colors = preset.c if preset.c else [(255, 255, 255)] colors = preset.c if preset.c else [(255, 255, 255)]
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
color_index = 0 color_index = 0
state = True # True = on, False = off state = True # True = on, False = off
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
@@ -25,8 +26,8 @@ class Blink:
# Advance to next color for the next "on" phase # Advance to next color for the next "on" phase
color_index += 1 color_index += 1
else: else:
# "Off" phase should actually be off. # Inactive phase uses the preset background color.
self.driver.fill((0, 0, 0)) self.driver.fill(bg_color)
state = not state state = not state
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)
# Yield once per tick so other logic can run # Yield once per tick so other logic can run

65
src/patterns/blizzard.py Normal file
View File

@@ -0,0 +1,65 @@
import random
import utime
class Blizzard:
"""Dense falling flakes with sideways drift (compare `snowfall` for gentler flakes)."""
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (200, 230, 255), (180, 210, 255)]
# Higher n1 → more spawns (0255 threshold vs random)
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 90)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
# n3: 128 = no bias; <128 drift one way, >128 the other (scaled to small steps)
wraw = int(preset.n3)
if wraw <= 0:
wind = 0
else:
wind = max(-4, min(4, (wraw - 128) // 20))
flakes = []
last = utime.ticks_ms()
while True:
d_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d_ms:
nled = self.driver.num_leds
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(nled):
self.driver.n[i] = bg
if random.randint(0, 255) < density:
flakes.append(
[
nled - 1,
random.randint(0, len(colors) - 1),
0 if wind == 0 else random.randint(-1, 1),
]
)
nf = []
for pos, ci, wj in flakes:
p = pos
lateral = wind + (wj if wj else 0)
p -= speed
p += lateral
if p < -2 or p >= nled + 2:
continue
pi = max(0, min(nled - 1, int(p)))
self.driver.n[pi] = self.driver.apply_brightness(colors[ci], preset.b)
nf.append([p, ci, wj])
flakes = nf
self.driver.n.write()
last = utime.ticks_add(last, d_ms)
if not preset.a:
yield
return
yield

View File

@@ -1,40 +0,0 @@
import utime
class BreathingDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 140), (0, 120, 255)]
phase_offset = max(0, min(255, int(preset.n1)))
ease = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
p1 = phase
p2 = (phase + phase_offset) & 255
t1 = 255 - abs(128 - p1) * 2
t2 = 255 - abs(128 - p2) * 2
if ease > 1:
t1 = (t1 * t1) // 255
t2 = (t2 * t2) // 255
c1 = self.driver.apply_brightness(colors[0], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
half = self.driver.num_leds // 2
for i in range(self.driver.num_leds):
if i < half:
self.driver.n[i] = ((c1[0]*t1)//255, (c1[1]*t1)//255, (c1[2]*t1)//255)
else:
self.driver.n[i] = ((c2[0]*t2)//255, (c2[1]*t2)//255, (c2[2]*t2)//255)
self.driver.n.write()
phase = (phase + 2) & 255
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -0,0 +1,56 @@
import random
import utime
class CandleGlow:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 140, 40), (255, 200, 120), (255, 90, 20)]
n_candles = max(1, min(self.driver.num_leds, int(preset.n1) if int(preset.n1) > 0 else 4))
width = max(1, int(preset.n2) if int(preset.n2) > 0 else 3)
flicker = max(1, min(255, int(preset.n3) if int(preset.n3) > 0 else 90))
n_led = self.driver.num_leds
centers = tuple(random.randint(0, max(0, n_led - 1)) for _ in range(n_candles))
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(n_led):
self.driver.n[i] = bg
base_lo = 180 - flicker // 2
if base_lo < 40:
base_lo = 40
for ci, c in enumerate(centers):
warmth = colors[ci % len(colors)]
pulse = base_lo + random.randint(0, flicker)
if pulse > 255:
pulse = 255
for off in range(-width, width + 1):
idx = c + off
if 0 <= idx < n_led:
dist = abs(off)
fall = ((width - dist + 1) * 256) // (width + 1)
fac = (fall * pulse) // 256
px = (
(warmth[0] * fac) // 255,
(warmth[1] * fac) // 255,
(warmth[2] * fac) // 255,
)
lit = self.driver.apply_brightness(px, preset.b)
o = self.driver.n[idx]
self.driver.n[idx] = (
max(o[0], lit[0]),
max(o[1], lit[1]),
max(o[2], lit[2]),
)
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -1,13 +1,49 @@
import utime import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"marquee": 1}
class Chase: class Chase:
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
def _run_marquee(self, preset, colors):
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % (on_len + off_len)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
m = (i + phase) % (on_len + off_len)
self.driver.n[i] = c if m < on_len else bg_color
self.driver.n.write()
phase = (phase + step) % (on_len + off_len)
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield
def run(self, preset): def run(self, preset):
"""Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating. """Chase (n6=0) or marquee dashes (n6=1, legacy marquee).
Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)"""
Chase: n1/n2 segment lengths, n3/n4 step on even/odd beats.
Marquee: n1 on length, n2 off length, n3 scroll step.
"""
if style_mode(preset, 0, _LEGACY) == 1:
colors = preset.c if preset.c else [(255, 255, 255)]
yield from self._run_marquee(preset, colors)
return
colors = preset.c colors = preset.c
if len(colors) < 1: if len(colors) < 1:
# Need at least 1 color # Need at least 1 color
@@ -26,7 +62,7 @@ class Chase:
color0 = self.driver.apply_brightness(color0, preset.b) color0 = self.driver.apply_brightness(color0, preset.b)
color1 = self.driver.apply_brightness(color1, preset.b) color1 = self.driver.apply_brightness(color1, preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b) bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
n1 = max(1, int(preset.n1)) # LEDs of color 0 n1 = max(1, int(preset.n1)) # LEDs of color 0
n2 = max(1, int(preset.n2)) # LEDs of color 1 n2 = max(1, int(preset.n2)) # LEDs of color 1
@@ -36,7 +72,7 @@ class Chase:
segment_length = n1 + n2 segment_length = n1 + n2
# Calculate position from step_count # Calculate position from step_count
step_count = self.driver.step step_count = int(self.driver.step) % 2
# Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc. # Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc.
if step_count % 2 == 0: if step_count % 2 == 0:
# Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3 # Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3
@@ -70,9 +106,10 @@ class Chase:
self.driver.n[i] = color1 self.driver.n[i] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count)
# Increment step for next beat # Increment step for next beat
self.driver.step = step_count + 1 self.driver.step = (step_count + 1) % 2
# Allow tick() to advance the generator once # Allow tick() to advance the generator once
yield yield
@@ -115,9 +152,10 @@ class Chase:
self.driver.n[i] = color1 self.driver.n[i] = color1
self.driver.n.write() self.driver.n.write()
print("[chase] step", step_count)
# Increment step # Increment step
step_count += 1 step_count = (step_count + 1) % 2
self.driver.step = step_count self.driver.step = step_count
last_update = utime.ticks_add(last_update, transition_duration) last_update = utime.ticks_add(last_update, transition_duration)
transition_duration = max(10, int(preset.d)) transition_duration = max(10, int(preset.d))

View File

@@ -31,10 +31,10 @@ class Circle:
base0 = base1 = (255, 255, 255) base0 = base1 = (255, 255, 255)
elif len(colors) == 1: elif len(colors) == 1:
base0 = colors[0] base0 = colors[0]
base1 = colors[-1] base1 = preset.background_or(colors)
else: else:
base0 = colors[0] base0 = colors[0]
base1 = colors[-1] base1 = preset.background_or(colors)
color0 = self.driver.apply_brightness(base0, preset.b) color0 = self.driver.apply_brightness(base0, preset.b)
color1 = self.driver.apply_brightness(base1, preset.b) color1 = self.driver.apply_brightness(base1, preset.b)

View File

@@ -15,7 +15,7 @@ class ClockSweep:
d = max(1, int(preset.d)) d = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d: if utime.ticks_diff(now, last) >= d:
bg = self.driver.apply_brightness(colors[-1], preset.b) bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
fg = self.driver.apply_brightness(colors[0], preset.b) fg = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
self.driver.n[i] = bg self.driver.n[i] = bg

View File

@@ -1,11 +1,24 @@
import utime import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"rainbow": 1, "gradient_scroll": 0}
class ColourCycle: class ColourCycle:
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
def _render(self, colors, phase, brightness): def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
if pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def _render_gradient(self, colors, phase, brightness):
num_leds = self.driver.num_leds num_leds = self.driver.num_leds
color_count = len(colors) color_count = len(colors)
if num_leds <= 0 or color_count <= 0: if num_leds <= 0 or color_count <= 0:
@@ -15,14 +28,11 @@ class ColourCycle:
return return
full_span = color_count * 256 full_span = color_count * 256
# Match rainbow behaviour: phase is 0..255 and maps to one full-strip shift.
phase_shift = (phase * full_span) // 256 phase_shift = (phase * full_span) // 256
for i in range(num_leds): for i in range(num_leds):
# Position around the colour loop, shifted by phase.
pos = ((i * full_span) // num_leds + phase_shift) % full_span pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256 idx = pos // 256
frac = pos & 255 frac = pos & 255
c1 = colors[idx] c1 = colors[idx]
c2 = colors[(idx + 1) % color_count] c2 = colors[(idx + 1) % color_count]
blended = ( blended = (
@@ -33,23 +43,55 @@ class ColourCycle:
self.driver.n[i] = self.driver.apply_brightness(blended, brightness) self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write() self.driver.n.write()
def run(self, preset): def _render_rainbow(self, phase, brightness):
colors = preset.c if preset.c else [(255, 255, 255)] num_leds = self.driver.num_leds
phase = self.driver.step % 256 for i in range(num_leds):
step_amount = max(1, int(preset.n1)) rc_index = (i * 256 // max(1, num_leds)) + phase
self.driver.n[i] = self.driver.apply_brightness(
self._wheel(rc_index & 255), brightness
)
self.driver.n.write()
def run(self, preset):
"""Scroll gradient (n6=0) or fixed spectrum wheel (n6=1, legacy rainbow).
n1: step rate
n6: 0 gradient scroll, 1 rainbow wheel
"""
mode = style_mode(preset, 0, _LEGACY)
step_amount = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
phase = self.driver.step % 256
if mode == 1:
if not preset.a:
self._render_rainbow(phase, preset.b)
self.driver.step = (phase + step_amount) % 256
yield
return
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
self._render_rainbow(phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
yield
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
if not preset.a: if not preset.a:
self._render(colors, phase, preset.b) self._render_gradient(colors, phase, preset.b)
self.driver.step = (phase + step_amount) % 256 self.driver.step = (phase + step_amount) % 256
yield yield
return return
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
while True: while True:
current_time = utime.ticks_ms()
delay_ms = max(1, int(preset.d)) delay_ms = max(1, int(preset.d))
if utime.ticks_diff(current_time, last_update) >= delay_ms: now = utime.ticks_ms()
self._render(colors, phase, preset.b) if utime.ticks_diff(now, last_update) >= delay_ms:
self._render_gradient(colors, phase, preset.b)
phase = (phase + step_amount) % 256 phase = (phase + step_amount) % 256
self.driver.step = phase self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)

View File

@@ -1,44 +0,0 @@
import utime
class CometDual:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
tail = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
gap = max(0, int(preset.n3))
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
c1 = self.driver.apply_brightness(colors[0 % len(colors)], preset.b)
c2 = self.driver.apply_brightness(colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b)
for t in range(tail):
i1 = p1 - t
if 0 <= i1 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i1] = ((c1[0]*s)//255, (c1[1]*s)//255, (c1[2]*s)//255)
i2 = p2 + t
if 0 <= i2 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i2] = ((c2[0]*s)//255, (c2[1]*s)//255, (c2[2]*s)//255)
self.driver.n.write()
p1 += speed
p2 -= speed
if p1 - tail > self.driver.num_leds and p2 + tail < 0:
p1 = 0
p2 = self.driver.num_leds - 1 - gap
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -1,35 +0,0 @@
import random
import utime
class Fireflies:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 210, 80), (120, 255, 120)]
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 8)
bugs = [[random.randint(0, max(0, self.driver.num_leds - 1)), random.randint(0, 255)] for _ in range(count)]
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for b in bugs:
idx, ph = b
tri = 255 - abs(128 - ph) * 2
c = self.driver.apply_brightness(colors[idx % len(colors)], preset.b)
self.driver.n[idx] = ((c[0]*tri)//255, (c[1]*tri)//255, (c[2]*tri)//255)
b[1] = (ph + speed) & 255
if random.randint(0, 31) == 0:
b[0] = random.randint(0, max(0, self.driver.num_leds - 1))
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -1,57 +0,0 @@
import utime
class GradientScroll:
def __init__(self, driver):
self.driver = driver
def _render(self, colors, phase, brightness):
num_leds = self.driver.num_leds
color_count = len(colors)
if num_leds <= 0 or color_count <= 0:
return
if color_count == 1:
self.driver.fill(self.driver.apply_brightness(colors[0], brightness))
return
full_span = color_count * 256
phase_shift = (phase * full_span) // 256
for i in range(num_leds):
pos = ((i * full_span) // num_leds + phase_shift) % full_span
idx = pos // 256
frac = pos & 255
c1 = colors[idx]
c2 = colors[(idx + 1) % color_count]
blended = (
c1[0] + ((c2[0] - c1[0]) * frac) // 256,
c1[1] + ((c2[1] - c1[1]) * frac) // 256,
c1[2] + ((c2[2] - c1[2]) * frac) // 256,
)
self.driver.n[i] = self.driver.apply_brightness(blended, brightness)
self.driver.n.write()
def run(self, preset):
"""Scrolling blended gradient.
n1: phase step amount (default 1)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
phase = self.driver.step % 256
step_amount = max(1, int(preset.n1) if int(preset.n1) > 0 else 1)
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
self._render(colors, phase, preset.b)
phase = (phase + step_amount) % 256
self.driver.step = phase
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -1,36 +0,0 @@
import utime
class Heartbeat:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 0, 40)]
phase = 0
phase_start = utime.ticks_ms()
did_manual_pulse = False
while True:
p1 = max(20, int(preset.n1) if int(preset.n1) > 0 else 120)
p2 = max(20, int(preset.n2) if int(preset.n2) > 0 else 80)
pause = max(20, int(preset.n3) if int(preset.n3) > 0 else 500)
beat_gap = max(20, int(preset.d))
colors = preset.c if preset.c else [(255, 0, 40)]
lit_color = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
phase_durations = (p1, beat_gap, p2, pause)
phase_colors = (lit_color, bg_color, lit_color, bg_color)
now = utime.ticks_ms()
while utime.ticks_diff(now, phase_start) >= phase_durations[phase]:
phase_start = utime.ticks_add(phase_start, phase_durations[phase])
phase = (phase + 1) % 4
self.driver.fill(phase_colors[phase])
yield
if not preset.a:
if did_manual_pulse or phase == 0:
self.driver.fill(bg_color)
yield
return
did_manual_pulse = True

62
src/patterns/icicles.py Normal file
View File

@@ -0,0 +1,62 @@
import utime
class Icicles:
"""Icicles hanging from anchor points; tips brighten toward max length then shrink."""
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(240, 248, 255), (160, 210, 255), (255, 255, 255)]
spacing = max(1, int(preset.n1) if int(preset.n1) > 0 else 12)
nled = self.driver.num_leds
max_len = max(
2,
min(
int(preset.n2) if int(preset.n2) > 0 else min(14, max(3, nled // 4)),
max(2, nled),
),
)
span = max_len * 2
phase_step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = 0
last = utime.ticks_ms()
while True:
d_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d_ms:
bg_rgb = preset.background_or(colors)
bg = self.driver.apply_brightness(bg_rgb, preset.b)
for i in range(nled):
self.driver.n[i] = bg
aidx = 0
for anchor in range(0, nled, spacing):
tri_i = (phase + aidx * 5) % span
ic_len = tri_i if tri_i <= max_len else span - tri_i
tip_c = colors[aidx % len(colors)]
tip = self.driver.apply_brightness(tip_c, preset.b)
for k in range(ic_len):
idx = anchor + k
if idx >= nled:
break
br = ((k + 1) * 255) // max(1, ic_len)
self.driver.n[idx] = (
(tip[0] * br + bg[0] * (255 - br)) // 255,
(tip[1] * br + bg[1] * (255 - br)) // 255,
(tip[2] * br + bg[2] * (255 - br)) // 255,
)
aidx += 1
self.driver.n.write()
phase = (phase + phase_step) % span
last = utime.ticks_add(last, d_ms)
if not preset.a:
yield
return
yield

View File

@@ -1,31 +0,0 @@
import utime
class Marquee:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255)]
on_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 3)
off_len = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
step = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % (on_len + off_len)
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
m = (i + phase) % (on_len + off_len)
self.driver.n[i] = c if m < on_len else bg_color
self.driver.n.write()
phase = (phase + step) % (on_len + off_len)
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

156
src/patterns/meteor.py Normal file
View File

@@ -0,0 +1,156 @@
import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"comet_dual": 1, "scanner": 2}
class Meteor:
def __init__(self, driver):
self.driver = driver
def _fade(self, color, fade_amount):
return (
(color[0] * fade_amount) // 255,
(color[1] * fade_amount) // 255,
(color[2] * fade_amount) // 255,
)
def _run_meteor(self, preset, colors, color_index, head, direction, last_update):
tail_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 8)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
fade_amount = int(preset.n3) if int(preset.n3) > 0 else 192
fade_amount = max(1, min(255, fade_amount))
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) < delay_ms:
return color_index, head, direction, last_update, False
for i in range(self.driver.num_leds):
self.driver.n[i] = self._fade(self.driver.n[i], fade_amount)
base = colors[color_index % len(colors)]
lit = self.driver.apply_brightness(base, preset.b)
if 0 <= head < self.driver.num_leds:
self.driver.n[head] = lit
self.driver.n.write()
head += direction * speed
if head >= self.driver.num_leds + tail_len:
head = self.driver.num_leds - 1
direction = -1
color_index += 1
elif head < -tail_len:
head = 0
direction = 1
color_index += 1
return color_index, head, direction, utime.ticks_add(last_update, delay_ms), True
def _run_comet_dual(self, preset, colors, p1, p2, last):
tail = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
gap = max(0, int(preset.n3))
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return p1, p2, last, False
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
c1 = self.driver.apply_brightness(colors[0 % len(colors)], preset.b)
c2 = self.driver.apply_brightness(
colors[1 % len(colors)] if len(colors) > 1 else colors[0], preset.b
)
for t in range(tail):
i1 = p1 - t
if 0 <= i1 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i1] = ((c1[0] * s) // 255, (c1[1] * s) // 255, (c1[2] * s) // 255)
i2 = p2 + t
if 0 <= i2 < self.driver.num_leds:
s = (255 * (tail - t)) // max(1, tail)
self.driver.n[i2] = ((c2[0] * s) // 255, (c2[1] * s) // 255, (c2[2] * s) // 255)
self.driver.n.write()
p1 += speed
p2 -= speed
if p1 - tail > self.driver.num_leds and p2 + tail < 0:
p1 = 0
p2 = self.driver.num_leds - 1 - gap
return p1, p2, utime.ticks_add(last, d), True
def _run_scanner(self, preset, colors, color_index, center, direction, pause_frames, last_update):
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
end_pause = max(0, int(preset.n2))
delay_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) < delay_ms:
return color_index, center, direction, pause_frames, last_update, False
base = self.driver.apply_brightness(colors[color_index % len(colors)], preset.b)
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
dist = i - center
if dist < 0:
dist = -dist
if dist > width:
self.driver.n[i] = bg_color
else:
scale = ((width - dist) * 255) // max(1, width)
self.driver.n[i] = (
(base[0] * scale) // 255,
(base[1] * scale) // 255,
(base[2] * scale) // 255,
)
self.driver.n.write()
if pause_frames > 0:
pause_frames -= 1
else:
center += direction
if center >= self.driver.num_leds - 1:
center = self.driver.num_leds - 1
direction = -1
pause_frames = end_pause
color_index += 1
elif center <= 0:
center = 0
direction = 1
pause_frames = end_pause
color_index += 1
return color_index, center, direction, pause_frames, utime.ticks_add(last_update, delay_ms), True
def run(self, preset):
"""Moving lights: n6 style 0 meteor, 1 dual comet, 2 scanner (legacy ids still work)."""
mode = style_mode(preset, 0, _LEGACY)
colors = preset.c if preset.c else [(255, 255, 255)]
if mode == 1:
gap = max(0, int(preset.n3))
p1, p2 = 0, self.driver.num_leds - 1 - gap
last = utime.ticks_ms()
while True:
p1, p2, last, stepped = self._run_comet_dual(preset, colors, p1, p2, last)
if stepped and not preset.a:
yield
return
yield
if mode == 2:
color_index, center, direction, pause_frames = 0, 0, 1, 0
last_update = utime.ticks_ms()
while True:
color_index, center, direction, pause_frames, last_update, stepped = (
self._run_scanner(
preset, colors, color_index, center, direction, pause_frames, last_update
)
)
if stepped and not preset.a:
yield
return
yield
color_index, head, direction = 0, 0, 1
last_update = utime.ticks_ms()
while True:
color_index, head, direction, last_update, stepped = self._run_meteor(
preset, colors, color_index, head, direction, last_update
)
if stepped and not preset.a:
yield
return
yield

View File

@@ -1,62 +0,0 @@
import utime
class MeteorRain:
def __init__(self, driver):
self.driver = driver
def _fade(self, color, fade_amount):
return (
(color[0] * fade_amount) // 255,
(color[1] * fade_amount) // 255,
(color[2] * fade_amount) // 255,
)
def run(self, preset):
"""Single meteor with a fading tail.
n1: tail length (default 8)
n2: speed in LEDs per frame (default 1)
n3: fade amount per frame, 1..255 (default 192)
"""
colors = preset.c if preset.c else [(255, 255, 255)]
color_index = 0
head = 0
direction = 1
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
tail_len = max(1, int(preset.n1) if int(preset.n1) > 0 else 8)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
fade_amount = int(preset.n3) if int(preset.n3) > 0 else 192
fade_amount = max(1, min(255, fade_amount))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
for i in range(self.driver.num_leds):
self.driver.n[i] = self._fade(self.driver.n[i], fade_amount)
base = colors[color_index % len(colors)]
lit = self.driver.apply_brightness(base, preset.b)
if 0 <= head < self.driver.num_leds:
self.driver.n[head] = lit
self.driver.n.write()
head += direction * speed
if head >= self.driver.num_leds + tail_len:
head = self.driver.num_leds - 1
direction = -1
color_index += 1
elif head < -tail_len:
head = 0
direction = 1
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -15,7 +15,7 @@ class Orbit:
d = max(1, int(preset.d)) d = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d: if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b) bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color self.driver.n[i] = bg_color
for k in range(orbits): for k in range(orbits):

108
src/patterns/particles.py Normal file
View File

@@ -0,0 +1,108 @@
import random
import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"starfall": 1}
class Particles:
def __init__(self, driver):
self.driver = driver
def _run_snowfall(self, preset, colors, flakes, last):
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return flakes, last, False
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
if random.randint(0, 255) < density:
flakes.append([self.driver.num_leds - 1, random.randint(0, len(colors) - 1)])
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
nf = []
for pos, ci in flakes:
if 0 <= pos < self.driver.num_leds:
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b)
pos -= speed
if pos >= -1:
nf.append([pos, ci])
self.driver.n.write()
return nf, utime.ticks_add(last, d), True
def _run_starfall(self, preset, colors, stars, last):
rate = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 14))
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 2)
tail = max(2, int(preset.n3) if int(preset.n3) > 0 else 10)
max_stars = 4
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return stars, last, False
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg
if len(stars) < max_stars and random.randint(0, 255) < rate:
top = self.driver.num_leds - 1 + random.randint(
0, min(8, self.driver.num_leds // 2)
)
stars.append({"h": float(top), "ci": random.randint(0, len(colors) - 1)})
ns = []
for s in stars:
h = s["h"]
ci = s["ci"]
ih = int(h)
for t in range(tail):
idx = ih + t
if 0 <= idx < self.driver.num_leds:
fade = 255 - (t * 255 // max(1, tail - 1))
base = colors[ci]
lit = (
(base[0] * fade) // 255,
(base[1] * fade) // 255,
(base[2] * fade) // 255,
)
lit = self.driver.apply_brightness(lit, preset.b)
o = self.driver.n[idx]
self.driver.n[idx] = (
max(o[0], lit[0]),
max(o[1], lit[1]),
max(o[2], lit[2]),
)
h -= speed
if h >= -tail:
s["h"] = h
ns.append(s)
stars = ns
self.driver.n.write()
return stars, utime.ticks_add(last, d), True
def run(self, preset):
"""Falling particles: n6 0 snowfall flakes, 1 starfall streaks."""
mode = style_mode(preset, 0, _LEGACY)
colors = preset.c if preset.c else [(255, 255, 255), (180, 220, 255)]
last = utime.ticks_ms()
if mode == 1:
colors = preset.c if preset.c else [
(255, 255, 255),
(200, 230, 255),
(255, 248, 220),
]
stars = []
while True:
stars, last, stepped = self._run_starfall(preset, colors, stars, last)
if stepped and not preset.a:
yield
return
yield
flakes = []
while True:
flakes, last, stepped = self._run_snowfall(preset, colors, flakes, last)
if stepped and not preset.a:
yield
return
yield

View File

@@ -0,0 +1,18 @@
"""Resolve pattern style from n6 or legacy preset pattern id (p)."""
def style_mode(preset, default=0, legacy=None):
legacy = legacy or {}
p = getattr(preset, "p", "") or ""
if p in legacy:
return legacy[p]
mode = getattr(preset, "mode", None)
if mode is None and isinstance(preset, dict):
mode = preset.get("mode")
if mode is not None:
try:
return int(mode)
except (TypeError, ValueError):
pass
n6 = int(getattr(preset, "n6", 0) or 0)
return n6 if n6 > 0 else default

View File

@@ -6,19 +6,25 @@ class Pulse:
self.driver = driver self.driver = driver
def run(self, preset): def run(self, preset):
self.driver.off()
# Get colors from preset # Get colors from preset
colors = preset.c colors = preset.c
if not colors: if not colors:
colors = [(255, 255, 255)] colors = [(255, 255, 255)]
bg_base = preset.background_or(colors)
self.driver.fill(self.driver.apply_brightness(bg_base, preset.b))
color_index = 0 color_index = self.driver.step % max(1, len(colors))
if not preset.a:
# Manual / beat trigger: each select restarts this generator and resets
# cycle_start below. Advancing step here makes each beat the next colour
# without requiring a full wall-clock cycle between beats.
nclr = max(1, len(colors))
self.driver.step = (color_index + 1) % nclr
cycle_start = utime.ticks_ms() cycle_start = utime.ticks_ms()
# State machine based pulse using a single generator loop # State machine based pulse using a single generator loop
while True: while True:
bg_color = self.driver.apply_brightness(colors[-1], preset.b) bg_color = self.driver.apply_brightness(bg_base, preset.b)
# Read current timing parameters from preset # Read current timing parameters from preset
attack_ms = max(0, int(preset.n1)) # Attack time in ms attack_ms = max(0, int(preset.n1)) # Attack time in ms
hold_ms = max(0, int(preset.n2)) # Hold time in ms hold_ms = max(0, int(preset.n2)) # Hold time in ms
@@ -52,12 +58,13 @@ class Pulse:
# Delay phase: LEDs off between pulses # Delay phase: LEDs off between pulses
self.driver.fill(bg_color) self.driver.fill(bg_color)
else: else:
# End of cycle, move to next color and restart timing # End of cycle: auto advances colour and loops; manual already
color_index += 1 # advanced step at run start for the next beat.
cycle_start = now
if not preset.a: if not preset.a:
break break
# Skip drawing this tick, start next cycle color_index = (color_index + 1) % max(1, len(colors))
self.driver.step = color_index
cycle_start = now
yield yield
continue continue

View File

@@ -1,11 +1,13 @@
import utime import utime
_RADIATE_DBG_INTERVAL_MS = 1000 # When ``driver.debug`` is True (``settings["debug"]``), log at most this often (ms).
_RADIATE_DBG_INTERVAL_MS = 2500
class Radiate: class Radiate:
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
self._color_step = 0
def run(self, preset): def run(self, preset):
"""Radiate from nodes every n1 LEDs, retriggering every delay (d). """Radiate from nodes every n1 LEDs, retriggering every delay (d).
@@ -16,15 +18,14 @@ class Radiate:
- d: retrigger interval in ms - d: retrigger interval in ms
""" """
colors = preset.c if preset.c else [(255, 255, 255)] colors = preset.c if preset.c else [(255, 255, 255)]
base_on = colors[0] base_off = preset.background_or(colors)
base_off = colors[-1]
spacing = max(1, int(preset.n1)) spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2)) outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3)) return_ms = max(1, int(preset.n3))
max_dist = spacing // 2 max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b) lit_color = self.driver.apply_brightness(colors[self._color_step % max(1, len(colors))], preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b) off_color = self.driver.apply_brightness(base_off, preset.b)
now = utime.ticks_ms() now = utime.ticks_ms()
@@ -34,17 +35,73 @@ class Radiate:
dbg_banner = False dbg_banner = False
if not preset.a: if not preset.a:
# Single-step render uses only the first instant pulse. # Manual mode: one-shot pulse using the same ms-based timing as auto.
active_pulses = [utime.ticks_ms()] cycle_start = utime.ticks_ms()
last_dbg = cycle_start
while True:
dbg = bool(getattr(self.driver, "debug", False))
spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3))
max_dist = spacing // 2
on_color = colors[self._color_step % max(1, len(colors))]
lit_color = self.driver.apply_brightness(on_color, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b)
pulse_lifetime = outward_ms + return_ms
now = utime.ticks_ms()
age = utime.ticks_diff(now, cycle_start)
if age < 1:
age = 1
if age <= outward_ms:
front = (age * max_dist + outward_ms - 1) // outward_ms
elif age <= outward_ms + return_ms:
back_age = age - outward_ms
remaining = return_ms - back_age
front = (remaining * max_dist + return_ms - 1) // return_ms
else:
front = 0
lit_count = 0
for i in range(self.driver.num_leds):
offset = (i + (spacing // 2)) % spacing
dist = min(offset, spacing - offset)
lit = dist <= front
self.driver.n[i] = lit_color if lit else off_color
if lit:
lit_count += 1
self.driver.n.write()
if dbg:
if not dbg_banner:
dbg_banner = True
print(
"[radiate] debug on n1=%s n2=%s n3=%s d=%s auto=%s num_leds=%d"
% (preset.n1, preset.n2, preset.n3, preset.d, preset.a, self.driver.num_leds)
)
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
print(
"[radiate] manual frame age=%d/%d front=%d lit=%d"
% (age, pulse_lifetime, front, lit_count)
)
last_dbg = now
yield
if age >= pulse_lifetime:
self._color_step += 1
return
while True: while True:
now = utime.ticks_ms() now = utime.ticks_ms()
dbg = bool(getattr(self.driver, "debug", False))
delay_ms = max(1, int(preset.d)) delay_ms = max(1, int(preset.d))
spacing = max(1, int(preset.n1)) spacing = max(1, int(preset.n1))
outward_ms = max(1, int(preset.n2)) outward_ms = max(1, int(preset.n2))
return_ms = max(1, int(preset.n3)) return_ms = max(1, int(preset.n3))
pulse_lifetime = outward_ms + return_ms
max_dist = spacing // 2 max_dist = spacing // 2
lit_color = self.driver.apply_brightness(base_on, preset.b) on_color = colors[self._color_step % max(1, len(colors))]
lit_color = self.driver.apply_brightness(on_color, preset.b)
off_color = self.driver.apply_brightness(base_off, preset.b) off_color = self.driver.apply_brightness(base_off, preset.b)
if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms: if preset.a and utime.ticks_diff(now, last_trigger) >= delay_ms:
@@ -52,33 +109,26 @@ class Radiate:
# prevents overlap from keeping color[0] continuously visible. # prevents overlap from keeping color[0] continuously visible.
active_pulses = [now] active_pulses = [now]
last_trigger = utime.ticks_add(last_trigger, delay_ms) last_trigger = utime.ticks_add(last_trigger, delay_ms)
if bool(getattr(self.driver, "debug", False)): self._color_step += 1
print(
"[radiate] trigger spacing=%d out=%d in=%d delay=%d"
% (spacing, outward_ms, return_ms, delay_ms)
)
# Drop pulses once their out-and-back lifetime ends. # Drop pulses once their out-and-back lifetime ends.
pulse_lifetime = outward_ms + return_ms
kept = [] kept = []
for start in active_pulses: for start in active_pulses:
age = utime.ticks_diff(now, start) age = utime.ticks_diff(now, start)
if age < pulse_lifetime: if age < pulse_lifetime:
kept.append(start) kept.append(start)
active_pulses = kept active_pulses = kept
debug_front = -1
lit_count = 0
lit_count = 0
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
# Nearest node distance for a repeating node grid every `spacing` LEDs. # Nearest node distance for a repeating node grid every `spacing` LEDs.
offset = i % spacing offset = (i + (spacing // 2)) % spacing
dist = min(offset, spacing - offset) dist = min(offset, spacing - offset)
lit = False lit = False
for start in active_pulses: for start in active_pulses:
age = utime.ticks_diff(now, start) age = utime.ticks_diff(now, start)
# Do not render on the exact trigger tick; this avoids # Auto: skip the exact trigger tick (age==0) so nodes are not stuck on.
# node LEDs appearing "stuck on" between cycles.
if age <= 0: if age <= 0:
continue continue
if age <= outward_ms: if age <= outward_ms:
@@ -94,8 +144,6 @@ class Radiate:
if dist <= front: if dist <= front:
lit = True lit = True
if front > debug_front:
debug_front = front
break break
self.driver.n[i] = lit_color if lit else off_color self.driver.n[i] = lit_color if lit else off_color
@@ -104,33 +152,21 @@ class Radiate:
self.driver.n.write() self.driver.n.write()
if bool(getattr(self.driver, "debug", False)): if dbg:
if not dbg_banner: if not dbg_banner:
dbg_banner = True dbg_banner = True
print( print(
"[radiate] debug on: spacing=%s out=%s in=%s d=%s num=%d" "[radiate] debug on n1=%s n2=%s n3=%s d=%s auto=%s num_leds=%d"
% ( % (preset.n1, preset.n2, preset.n3, preset.d, preset.a, self.driver.num_leds)
preset.n1,
preset.n2,
preset.n3,
preset.d,
self.driver.num_leds,
)
) )
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS: if utime.ticks_diff(now, last_dbg) >= _RADIATE_DBG_INTERVAL_MS:
pulse_age = -1
if active_pulses:
pulse_age = utime.ticks_diff(now, active_pulses[0])
print( print(
"[radiate] age=%d front=%d max=%d active=%d lit=%d" "[radiate] pulses=%d first_age=%d lit=%d lifetime=%d"
% (pulse_age, debug_front, max_dist, len(active_pulses), lit_count) % (len(active_pulses), pulse_age, lit_count, pulse_lifetime)
) )
if lit_count == 0:
print("[radiate] fully off")
last_dbg = now last_dbg = now
if not preset.a:
yield
return
yield yield

View File

@@ -16,7 +16,7 @@ class RainDrops:
d = max(1, int(preset.d)) d = max(1, int(preset.d))
now = utime.ticks_ms() now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d: if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b) bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds): for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color self.driver.n[i] = bg_color
if random.randint(0, 255) < rate: if random.randint(0, 255) < rate:

View File

@@ -1,51 +0,0 @@
import utime
class Rainbow:
def __init__(self, driver):
self.driver = driver
def _wheel(self, pos):
if pos < 85:
return (pos * 3, 255 - pos * 3, 0)
elif pos < 170:
pos -= 85
return (255 - pos * 3, 0, pos * 3)
else:
pos -= 170
return (0, pos * 3, 255 - pos * 3)
def run(self, preset):
step = self.driver.step % 256
step_amount = max(1, int(preset.n1)) # n1 controls step increment
# If auto is False, run a single step and then stop
if not preset.a:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b)
self.driver.n.write()
# Increment step by n1 for next manual call
self.driver.step = (step + step_amount) % 256
# Allow tick() to advance the generator once
yield
return
last_update = utime.ticks_ms()
while True:
current_time = utime.ticks_ms()
sleep_ms = max(1, int(preset.d)) # Get delay from preset
if utime.ticks_diff(current_time, last_update) >= sleep_ms:
for i in range(self.driver.num_leds):
rc_index = (i * 256 // self.driver.num_leds) + step
self.driver.n[i] = self.driver.apply_brightness(
self._wheel(rc_index & 255),
preset.b,
)
self.driver.n.write()
step = (step + step_amount) % 256
self.driver.step = step
last_update = utime.ticks_add(last_update, sleep_ms)
# Yield once per tick so other logic can run
yield

72
src/patterns/rime.py Normal file
View File

@@ -0,0 +1,72 @@
import random
import utime
class Rime:
"""Slow frost build-up on a chilly background — gentle random brightening then decay."""
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(220, 235, 255), (255, 255, 255), (185, 220, 255)]
num = self.driver.num_leds
if num <= 0:
while True:
yield
return
# n1: spawn tendency (like twinkle upper range)
chill = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 36))
# n2: decay per refresh (subtract from glow buffer)
melt = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 12))
# n3: how many LEDs can flash brighter per refresh (cap)
spark_cap = max(1, min(num, int(preset.n3) if int(preset.n3) > 0 else 3))
glow = [0] * num
last = utime.ticks_ms()
while True:
d_ms = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d_ms:
base_bg = preset.background_or(colors)
bg = self.driver.apply_brightness(base_bg, preset.b)
for i in range(num):
if glow[i] > melt:
glow[i] -= melt
else:
glow[i] = 0
spawned = 0
tries = spark_cap + num // 8
for _ in range(tries):
if spawned >= spark_cap:
break
if random.randint(0, 255) >= chill:
continue
j = random.randint(0, num - 1)
glow[j] = min(255, glow[j] + random.randint(80, 200))
spawned += 1
palette = colors
for i in range(num):
g = glow[i]
fg = palette[i % len(palette)]
hi = self.driver.apply_brightness(fg, preset.b)
mix = max(0, min(255, g))
self.driver.n[i] = (
(hi[0] * mix + bg[0] * (255 - mix)) // 255,
(hi[1] * mix + bg[1] * (255 - mix)) // 255,
(hi[2] * mix + bg[2] * (255 - mix)) // 255,
)
self.driver.n.write()
last = utime.ticks_add(last, d_ms)
if not preset.a:
yield
return
yield

View File

@@ -1,67 +0,0 @@
import utime
class Scanner:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Classic scanner eye with soft falloff.
n1: eye width (default 4)
n2: end pause in frames (default 0)
"""
colors = preset.c if preset.c else [(255, 0, 0)]
color_index = 0
center = 0
direction = 1
pause_frames = 0
last_update = utime.ticks_ms()
while True:
delay_ms = max(1, int(preset.d))
width = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
end_pause = max(0, int(preset.n2))
now = utime.ticks_ms()
if utime.ticks_diff(now, last_update) >= delay_ms:
base = colors[color_index % len(colors)]
base = self.driver.apply_brightness(base, preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
dist = i - center
if dist < 0:
dist = -dist
if dist > width:
self.driver.n[i] = bg_color
else:
scale = ((width - dist) * 255) // max(1, width)
self.driver.n[i] = (
(base[0] * scale) // 255,
(base[1] * scale) // 255,
(base[2] * scale) // 255,
)
self.driver.n.write()
if pause_frames > 0:
pause_frames -= 1
else:
center += direction
if center >= self.driver.num_leds - 1:
center = self.driver.num_leds - 1
direction = -1
pause_frames = end_pause
color_index += 1
elif center <= 0:
center = 0
direction = 1
pause_frames = end_pause
color_index += 1
last_update = utime.ticks_add(last_update, delay_ms)
if not preset.a:
yield
return
yield

View File

@@ -1,45 +0,0 @@
import utime
class SegmentChase:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
"""Independent moving segments (distinct from classic two-color chase).
n1: segment size (LEDs per segment)
n2: step size (phase increment each frame)
n3: per-segment phase offset
n4: gap spacing inside segment (0 = solid segment)
"""
colors = preset.c if preset.c else [(255, 0, 0), (0, 0, 255)]
seg = max(1, int(preset.n1) if int(preset.n1) > 0 else 4)
phase_step = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
seg_offset = max(0, int(preset.n3))
gap = max(0, int(preset.n4))
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
for i in range(self.driver.num_leds):
seg_idx = i // seg
in_seg = i % seg
local_phase = (phase + seg_idx * seg_offset) % seg
lit_idx = (in_seg + local_phase) % seg
if gap > 0 and lit_idx >= max(1, seg - gap):
self.driver.n[i] = bg_color
else:
color_idx = seg_idx % len(colors)
self.driver.n[i] = self.driver.apply_brightness(colors[color_idx], preset.b)
self.driver.n.write()
phase = (phase + phase_step) % seg
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -1,37 +0,0 @@
import random
import utime
class Snowfall:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(255, 255, 255), (180, 220, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 20)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 1)
flakes = []
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
bg_color = self.driver.apply_brightness(colors[-1], preset.b)
if random.randint(0, 255) < density:
flakes.append([self.driver.num_leds - 1, random.randint(0, len(colors)-1)])
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
nf = []
for pos, ci in flakes:
if 0 <= pos < self.driver.num_leds:
self.driver.n[pos] = self.driver.apply_brightness(colors[ci], preset.b)
pos -= speed
if pos >= -1:
nf.append([pos, ci])
flakes = nf
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

147
src/patterns/sparkle.py Normal file
View File

@@ -0,0 +1,147 @@
import random
import utime
from patterns.pattern_modes import style_mode
_LEGACY = {"sparkle_trail": 0, "ice_sparkle": 1, "fireflies": 2}
class Sparkle:
def __init__(self, driver):
self.driver = driver
def _run_trail(self, preset, colors, last):
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 24)
decay = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 210))
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return last, False
for i in range(self.driver.num_leds):
r, g, b = self.driver.n[i]
self.driver.n[i] = ((r * decay) // 255, (g * decay) // 255, (b * decay) // 255)
sparks = max(1, self.driver.num_leds * density // 255)
for _ in range(sparks):
idx = random.randint(0, max(0, self.driver.num_leds - 1))
c = self.driver.apply_brightness(
colors[random.randint(0, len(colors) - 1)], preset.b
)
self.driver.n[idx] = c
self.driver.n.write()
return utime.ticks_add(last, d), True
def _run_ice(self, preset, colors, sparks, last):
rate = max(1, min(255, int(preset.n1) if int(preset.n1) > 0 else 55))
decay = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 140))
halo = max(0, min(3, int(preset.n3)))
cap = 28
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return sparks, last, False
bg = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg
ns = []
for s in sparks:
lv = s["lv"] - decay
if lv > 0:
s["lv"] = lv
ns.append(s)
sparks = ns
if len(sparks) < cap and random.randint(0, 255) < rate:
sparks.append(
{
"p": random.randint(0, max(0, self.driver.num_leds - 1)),
"lv": 255,
"ci": random.randint(0, len(colors) - 1),
}
)
for s in sparks:
p = s["p"]
lv = s["lv"]
ci = s["ci"]
base = colors[ci]
for off in range(-halo, halo + 1):
idx = p + off
if 0 <= idx < self.driver.num_leds:
dist = abs(off)
fac = lv if dist == 0 else (lv * (halo - dist + 1)) // (halo + 1)
lit = self.driver.apply_brightness(
(
(base[0] * fac) // 255,
(base[1] * fac) // 255,
(base[2] * fac) // 255,
),
preset.b,
)
o = self.driver.n[idx]
self.driver.n[idx] = (
min(255, o[0] + lit[0]),
min(255, o[1] + lit[1]),
min(255, o[2] + lit[2]),
)
self.driver.n.write()
return sparks, utime.ticks_add(last, d), True
def _run_fireflies(self, preset, colors, bugs, last):
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
speed = max(1, int(preset.n2) if int(preset.n2) > 0 else 8)
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) < d:
return bugs, last, False
bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
for i in range(self.driver.num_leds):
self.driver.n[i] = bg_color
for b in bugs:
idx, ph = b
tri = 255 - abs(128 - ph) * 2
c = self.driver.apply_brightness(colors[idx % len(colors)], preset.b)
self.driver.n[idx] = ((c[0] * tri) // 255, (c[1] * tri) // 255, (c[2] * tri) // 255)
b[1] = (ph + speed) & 255
if random.randint(0, 31) == 0:
b[0] = random.randint(0, max(0, self.driver.num_leds - 1))
self.driver.n.write()
return bugs, utime.ticks_add(last, d), True
def run(self, preset):
"""Sparkles: n6 0 trail decay, 1 ice burst+halo, 2 fireflies."""
mode = style_mode(preset, 0, _LEGACY)
colors = preset.c if preset.c else [(120, 120, 255)]
last = utime.ticks_ms()
if mode == 2:
colors = preset.c if preset.c else [(255, 210, 80), (120, 255, 120)]
count = max(1, int(preset.n1) if int(preset.n1) > 0 else 6)
bugs = [
[random.randint(0, max(0, self.driver.num_leds - 1)), random.randint(0, 255)]
for _ in range(count)
]
while True:
bugs, last, stepped = self._run_fireflies(preset, colors, bugs, last)
if stepped and not preset.a:
yield
return
yield
if mode == 1:
colors = preset.c if preset.c else [
(240, 248, 255),
(200, 235, 255),
(255, 255, 255),
]
sparks = []
while True:
sparks, last, stepped = self._run_ice(preset, colors, sparks, last)
if stepped and not preset.a:
yield
return
yield
while True:
last, stepped = self._run_trail(preset, colors, last)
if stepped and not preset.a:
yield
return
yield

View File

@@ -1,31 +0,0 @@
import random
import utime
class SparkleTrail:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(120, 120, 255)]
density = max(1, int(preset.n1) if int(preset.n1) > 0 else 24)
decay = max(1, min(255, int(preset.n2) if int(preset.n2) > 0 else 210))
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
for i in range(self.driver.num_leds):
r,g,b = self.driver.n[i]
self.driver.n[i] = ((r*decay)//255, (g*decay)//255, (b*decay)//255)
sparks = max(1, self.driver.num_leds * density // 255)
for _ in range(sparks):
idx = random.randint(0, max(0, self.driver.num_leds - 1))
c = self.driver.apply_brightness(colors[random.randint(0, len(colors)-1)], preset.b)
self.driver.n[idx] = c
self.driver.n.write()
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -16,7 +16,7 @@ class StrobeBurst:
cooldown = max(1, int(preset.n3) if int(preset.n3) > 0 else 400) cooldown = max(1, int(preset.n3) if int(preset.n3) > 0 else 400)
on_ms = max(1, int(preset.d) // 2) on_ms = max(1, int(preset.d) // 2)
c = self.driver.apply_brightness(colors[0], preset.b) c = self.driver.apply_brightness(colors[0], preset.b)
bg_color = self.driver.apply_brightness(colors[-1], preset.b) bg_color = self.driver.apply_brightness(preset.background_or(colors), preset.b)
now = utime.ticks_ms() now = utime.ticks_ms()
if state == "flash_on": if state == "flash_on":

View File

@@ -2,9 +2,6 @@ import random
import utime import utime
# Default cool palette (icy blues, violet, mint) when preset has no colours. # Default cool palette (icy blues, violet, mint) when preset has no colours.
# When `driver.debug` is True, print stats every N twinkle ticks (serial can be slow).
_TWINKLE_DBG_INTERVAL = 40
_DEFAULT_COOL = ( _DEFAULT_COOL = (
(120, 200, 255), (120, 200, 255),
(80, 140, 255), (80, 140, 255),
@@ -39,7 +36,7 @@ class Twinkle:
"""Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs.""" """Twinkle: n1 activity, n2 density; n3/n4 min/max length of adjacent on/off runs."""
palette = self._palette(preset) palette = self._palette(preset)
num = self.driver.num_leds num = self.driver.num_leds
bg_color = self.driver.apply_brightness(palette[-1], preset.b) bg_color = self.driver.apply_brightness(preset.background_or(palette), preset.b)
if num <= 0: if num <= 0:
while True: while True:
yield yield
@@ -93,32 +90,6 @@ class Twinkle:
on = [random.randint(0, 255) < dens for _ in range(num)] on = [random.randint(0, 255) < dens for _ in range(num)]
colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)] colour_i = [random.randint(0, len(palette) - 1) for _ in range(num)]
last_update = utime.ticks_ms() last_update = utime.ticks_ms()
dbg_tick = 0
dbg_banner = False
def on_run_min_max(bits):
"""Smallest and largest contiguous run of True in bits (0,0 if all off)."""
best_min = num + 1
best_max = 0
cur = 0
for j in range(num):
if bits[j]:
cur += 1
else:
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
cur = 0
if cur:
if cur < best_min:
best_min = cur
if cur > best_max:
best_max = cur
if best_min == num + 1:
return 0, 0
return best_min, best_max
if not preset.a: if not preset.a:
for i in range(num): for i in range(num):
@@ -137,15 +108,12 @@ class Twinkle:
if utime.ticks_diff(now, last_update) >= delay_ms: if utime.ticks_diff(now, last_update) >= delay_ms:
rate = activity_rate() rate = activity_rate()
dens = density255() dens = density255()
dbg = bool(getattr(self.driver, "debug", False))
dbg_tick += 1
# Snapshot for decisions; apply all darks then all lights so # Snapshot for decisions; apply all darks then all lights so
# overlaps in the same tick favour lit runs (lights win). # overlaps in the same tick favour lit runs (lights win).
prev_on = on[:] prev_on = on[:]
prev_ci = colour_i[:] prev_ci = colour_i[:]
next_on = list(prev_on) next_on = list(prev_on)
next_ci = list(prev_ci) next_ci = list(prev_ci)
dbg_ops = {"L": 0, "D": 0}
light_i = [] light_i = []
dark_i = [] dark_i = []
@@ -160,7 +128,6 @@ class Twinkle:
dark_i.append(i) dark_i.append(i)
def light_adjacent(start): def light_adjacent(start):
dbg_ops["L"] += 1
k = random_cluster_len() k = random_cluster_len()
b = cluster_base_index(start, k) b = cluster_base_index(start, k)
for dj in range(k): for dj in range(k):
@@ -169,7 +136,6 @@ class Twinkle:
next_ci[idx] = random.randint(0, len(palette) - 1) next_ci[idx] = random.randint(0, len(palette) - 1)
def dark_adjacent(start): def dark_adjacent(start):
dbg_ops["D"] += 1
k = random_cluster_len() k = random_cluster_len()
b = cluster_base_index(start, k) b = cluster_base_index(start, k)
for dj in range(k): for dj in range(k):
@@ -191,38 +157,4 @@ class Twinkle:
on = next_on on = next_on
colour_i = next_ci colour_i = next_ci
last_update = utime.ticks_add(last_update, delay_ms) last_update = utime.ticks_add(last_update, delay_ms)
if dbg:
lo, hi = cluster_len_bounds()
if not dbg_banner:
dbg_banner = True
print(
"[twinkle] debug on: n1=%s n2=%s n3=%s n4=%s d=%s -> lo=%d hi=%d num=%d"
% (
preset.n1,
preset.n2,
preset.n3,
preset.n4,
preset.d,
lo,
hi,
num,
)
)
rmin, rmax = on_run_min_max(on)
bad = lo > 0 and rmin > 0 and rmin < lo and num >= lo
if bad or (dbg_tick % _TWINKLE_DBG_INTERVAL == 0):
print(
"[twinkle] tick=%d rate=%d dens=%d L=%d D=%d on_runs min=%d max=%d%s"
% (
dbg_tick,
rate,
dens,
dbg_ops["L"],
dbg_ops["D"],
rmin,
rmax,
" **run<lo**" if bad else "",
)
)
yield yield

View File

@@ -1,32 +0,0 @@
import utime
class Wave:
def __init__(self, driver):
self.driver = driver
def run(self, preset):
colors = preset.c if preset.c else [(0, 180, 255)]
wavelength = max(2, int(preset.n1) if int(preset.n1) > 0 else 12)
amp = max(0, min(255, int(preset.n2) if int(preset.n2) > 0 else 180))
drift = max(1, int(preset.n3) if int(preset.n3) > 0 else 1)
phase = self.driver.step % 256
last = utime.ticks_ms()
while True:
d = max(1, int(preset.d))
now = utime.ticks_ms()
if utime.ticks_diff(now, last) >= d:
base = self.driver.apply_brightness(colors[0], preset.b)
for i in range(self.driver.num_leds):
x = (i * 256 // wavelength + phase) & 255
tri = 255 - abs(128 - x) * 2
s = (tri * amp) // 255
self.driver.n[i] = ((base[0]*s)//255, (base[1]*s)//255, (base[2]*s)//255)
self.driver.n.write()
phase = (phase + drift) % 256
self.driver.step = phase
last = utime.ticks_add(last, d)
if not preset.a:
yield
return
yield

View File

@@ -12,6 +12,7 @@ class Preset:
self.n4 = 0 self.n4 = 0
self.n5 = 0 self.n5 = 0
self.n6 = 0 self.n6 = 0
self.bg = (0, 0, 0)
# Override defaults with provided data # Override defaults with provided data
self.edit(data) self.edit(data)
@@ -25,9 +26,11 @@ class Preset:
"delay": "d", "delay": "d",
"brightness": "b", "brightness": "b",
"auto": "a", "auto": "a",
"background": "bg",
"mode": "n6",
} }
int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"} int_fields = {"d", "b", "n1", "n2", "n3", "n4", "n5", "n6"}
allowed_fields = {"p", "c", "d", "b", "a", "n1", "n2", "n3", "n4", "n5", "n6"} allowed_fields = {"p", "c", "d", "b", "a", "bg", "n1", "n2", "n3", "n4", "n5", "n6"}
for key, value in data.items(): for key, value in data.items():
key = aliases.get(key, key) key = aliases.get(key, key)
if key not in allowed_fields: if key not in allowed_fields:
@@ -56,6 +59,21 @@ class Preset:
elif key == "c": elif key == "c":
if isinstance(value, (list, tuple)): if isinstance(value, (list, tuple)):
self.c = value self.c = value
elif key == "bg":
if isinstance(value, str) and value.startswith("#") and len(value) == 7:
try:
self.bg = (
int(value[1:3], 16),
int(value[3:5], 16),
int(value[5:7], 16),
)
except (TypeError, ValueError):
continue
elif isinstance(value, (list, tuple)) and len(value) == 3:
try:
self.bg = tuple(max(0, min(255, int(x))) for x in value)
except (TypeError, ValueError):
continue
else: else:
setattr(self, key, value) setattr(self, key, value)
return True return True
@@ -100,6 +118,15 @@ class Preset:
def auto(self, value): def auto(self, value):
self.a = value self.a = value
def background_or(self, colors=None, default=(0, 0, 0)):
bg = getattr(self, "bg", None)
if isinstance(bg, (list, tuple)) and len(bg) == 3:
try:
return tuple(max(0, min(255, int(x))) for x in bg)
except (TypeError, ValueError):
return default
return default
def to_dict(self): def to_dict(self):
return { return {
"p": self.p, "p": self.p,
@@ -107,6 +134,7 @@ class Preset:
"b": self.b, "b": self.b,
"c": self.c, "c": self.c,
"a": self.a, "a": self.a,
"bg": self.bg,
"n1": self.n1, "n1": self.n1,
"n2": self.n2, "n2": self.n2,
"n3": self.n3, "n3": self.n3,

View File

@@ -70,8 +70,29 @@ class Presets:
except Exception as e: except Exception as e:
print("Pattern init failed:", module_name, e) print("Pattern init failed:", module_name, e)
self._apply_pattern_aliases(loaded)
return loaded return loaded
def _apply_pattern_aliases(self, loaded):
"""Legacy pattern ids -> merged implementations (same generator)."""
aliases = (
("rainbow", "colour_cycle"),
("gradient_scroll", "colour_cycle"),
("meteor_rain", "meteor"),
("comet_dual", "meteor"),
("scanner", "meteor"),
("snowfall", "particles"),
("starfall", "particles"),
("sparkle_trail", "sparkle"),
("ice_sparkle", "sparkle"),
("fireflies", "sparkle"),
("marquee", "chase"),
("northern_wave", "aurora"),
)
for old, new in aliases:
if new in loaded and old not in loaded:
loaded[old] = loaded[new]
def save(self): def save(self):
"""Save the presets to a file.""" """Save the presets to a file."""
with open("presets.json", "w") as f: with open("presets.json", "w") as f:
@@ -106,17 +127,30 @@ class Presets:
preset_data[color_key], order preset_data[color_key], order
) )
self.presets[name] = Preset(preset_data) self.presets[name] = Preset(preset_data)
if self.presets:
print("Loaded presets:")
#for name in sorted(self.presets.keys()):
# print(f" {name}: {self.presets[name].to_dict()}")
return True return True
def edit(self, name, data): def edit(self, name, data):
"""Create or update a preset with the given name.""" """Create or update a preset with the given name."""
if name in self.presets: if name in self.presets:
# Update existing preset # Update existing preset
was_auto = self.presets[name].a
self.presets[name].edit(data) self.presets[name].edit(data)
# Editing the live preset: auto still re-selects (one tick) so the strip
# restarts without a separate select message (controller often sends both).
# Manual must NOT call select() here — presets-only pushes (e.g. zone sequence
# arming the first step) would otherwise run select's first tick and consume a
# beat/step. Manual advances only on explicit select from the controller.
if self.selected == name:
preset = self.presets[name]
if preset.a:
self.step = 0
self.generator = None
self.fill((0, 0, 0))
self.select(name)
elif was_auto:
self.step = 0
self.generator = None
self.fill((0, 0, 0))
else: else:
if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"): if len(self.presets) >= MAX_PRESETS and name not in ("on", "off"):
print("Preset limit reached:", MAX_PRESETS) print("Preset limit reached:", MAX_PRESETS)
@@ -159,6 +193,16 @@ class Presets:
if preset_name in self.presets: if preset_name in self.presets:
preset = self.presets[preset_name] preset = self.presets[preset_name]
if preset.p in self.patterns: if preset.p in self.patterns:
# Manual single-shot patterns: if this select arrives before the main loop has
# tick()'d the previous frame, completing it first keeps step in sync with beats.
if (
preset_name == self.selected
and not preset.a
and preset.p == "chase"
and self.generator is not None
):
while self.generator is not None:
self.tick()
# Set step value if explicitly provided # Set step value if explicitly provided
if step is not None: if step is not None:
self.step = step self.step = step
@@ -166,6 +210,7 @@ class Presets:
self.step = 0 self.step = 0
self.generator = self.patterns[preset.p](preset) self.generator = self.patterns[preset.p](preset)
self.selected = preset_name # Store the preset name, not the object self.selected = preset_name # Store the preset name, not the object
self.tick()
return True return True
print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p) print("select failed: pattern not found for preset", preset_name, "pattern=", preset.p)
return False return False

17
src/print_timestamp.py Normal file
View File

@@ -0,0 +1,17 @@
"""Install a builtins.print wrapper that prefixes each line with uptime (ms).
Import this module before other led-driver imports that print (e.g. first in main).
"""
import builtins
import utime
_original_print = builtins.print
def _timestamped_print(*args, **kwargs):
ts = utime.ticks_ms()
return _original_print("[%d]" % ts, *args, **kwargs)
builtins.print = _timestamped_print

View File

@@ -27,6 +27,9 @@ class Settings(dict):
self["debug"] = False self["debug"] = False
self["default"] = "on" self["default"] = "on"
self["last_preset"] = ""
# Power-on: "default" | "last" | "off"
self["startup_mode"] = "default"
self["brightness"] = 32 self["brightness"] = 32
self["transport_type"] = "espnow" self["transport_type"] = "espnow"
self["wifi_channel"] = 1 self["wifi_channel"] = 1
@@ -39,7 +42,6 @@ class Settings(dict):
j = json.dumps(self) j = json.dumps(self)
with open(self.SETTINGS_FILE, 'w') as file: with open(self.SETTINGS_FILE, 'w') as file:
file.write(j) file.write(j)
print("Settings saved successfully.")
except Exception as e: except Exception as e:
print(f"Error saving settings: {e}") print(f"Error saving settings: {e}")
@@ -48,7 +50,17 @@ class Settings(dict):
with open(self.SETTINGS_FILE, 'r') as file: with open(self.SETTINGS_FILE, 'r') as file:
loaded_settings = json.load(file) loaded_settings = json.load(file)
self.update(loaded_settings) self.update(loaded_settings)
print("Settings loaded successfully.") old_recent = self.pop("recent_presets", None)
if isinstance(old_recent, list) and old_recent and not self.get("last_preset"):
for x in reversed(old_recent):
if isinstance(x, str) and x.strip():
self["last_preset"] = x.strip()
break
if x is not None:
s = str(x).strip()
if s:
self["last_preset"] = s
break
except Exception as e: except Exception as e:
print(f"Error loading settings") print(f"Error loading settings")
self.set_defaults() self.set_defaults()

View File

@@ -5,33 +5,26 @@ import utime
from presets import Presets from presets import Presets
from settings import Settings from settings import Settings
from controller_messages import apply_startup_pattern
def initialize_runtime(): def initialize_runtime():
machine.freq(160000000) machine.freq(160000000)
settings = Settings() settings = Settings()
print(settings)
wdt = machine.WDT(timeout=10000) wdt = machine.WDT(timeout=10000)
wdt.feed() wdt.feed()
gc.collect() gc.collect()
print("mem before presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
presets = Presets(settings["led_pin"], settings["num_leds"]) presets = Presets(settings["led_pin"], settings["num_leds"])
presets.load(settings) presets.load(settings)
presets.b = settings.get("brightness", 255) presets.b = settings.get("brightness", 255)
presets.debug = bool(settings.get("debug", False)) presets.debug = bool(settings.get("debug", False))
gc.collect() gc.collect()
print("mem after presets:", {"free": gc.mem_free(), "alloc": gc.mem_alloc()})
default_preset = settings.get("default", "") apply_startup_pattern(settings, presets)
if default_preset and default_preset in presets.presets:
if presets.select(default_preset):
print("Selected startup preset:", default_preset)
else:
print("Startup preset failed (invalid pattern?):", default_preset)
# On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated. # On ESP32-C3, soft reboots can leave Wi-Fi driver state allocated.
# Reset both interfaces and collect before bringing STA up. # Reset both interfaces and collect before bringing STA up.
@@ -49,5 +42,10 @@ def initialize_runtime():
utime.sleep(1) utime.sleep(1)
wdt.feed() wdt.feed()
print(sta_if.ifconfig()) try:
led_ip = sta_if.ifconfig()[0]
except Exception:
led_ip = "?"
print("led-driver IP:", led_ip, " led-controller IP:", "(not connected)")
return settings, presets, wdt, sta_if return settings, presets, wdt, sta_if

129
src/wifi_sta.py Normal file
View File

@@ -0,0 +1,129 @@
"""STA connect helpers aligned with tests/test_wifi.py (status polling, fatal codes)."""
import gc
import machine
import utime
import network
_CONNECT_TIMEOUT_S = 45
_RETRY_DELAY_S = 2
def _wifi_status_label(code):
names = {
getattr(network, "STAT_IDLE", 0): "idle",
getattr(network, "STAT_CONNECTING", 1): "connecting",
getattr(network, "STAT_WRONG_PASSWORD", -3): "wrong_password",
getattr(network, "STAT_NO_AP_FOUND", -2): "no_ap_found",
getattr(network, "STAT_CONNECT_FAIL", -1): "connect_fail",
getattr(network, "STAT_GOT_IP", 3): "got_ip",
}
return names.get(code, str(code))
# Only abort the wait loop immediately on wrong password. NO_AP_FOUND / CONNECT_FAIL are often
# transient while the radio is still scanning (ESP32-C3 may report them before the AP appears).
_ABORT_WAIT_IMMEDIATE = (
getattr(network, "STAT_WRONG_PASSWORD", -3),
)
def _one_association_campaign(sta_if, ssid, password, wdt):
"""disconnect → connect → wait until connected, wrong password, or timeout. Returns True if connected."""
try:
sta_if.disconnect()
except Exception:
pass
utime.sleep_ms(200)
try:
sta_if.connect(ssid, password)
except Exception as ex:
print("wifi_sta: connect raised:", ex)
return False
start = utime.time()
last_status = None
while not sta_if.isconnected():
status = sta_if.status()
if status != last_status:
print("wifi_sta: status", status, _wifi_status_label(status))
last_status = status
if status in _ABORT_WAIT_IMMEDIATE:
return False
if utime.time() - start >= _CONNECT_TIMEOUT_S:
print("wifi_sta: association timeout")
return False
utime.sleep(1)
if wdt is not None:
wdt.feed()
return True
def boot_sta(settings, wdt):
"""Tear down and bring up STA. Call before large heap users (NeoPixel, patterns).
On ESP32-C3, soft reboots can leave the Wi-Fi driver allocated; init while the
heap is still free. If re-init fails after a soft reboot, hard-reset once.
"""
sta_if = network.WLAN(network.STA_IF)
try:
if sta_if.active():
try:
sta_if.disconnect()
except Exception:
pass
sta_if.active(False)
except Exception:
pass
utime.sleep_ms(100)
gc.collect()
try:
sta_if.active(True)
except OSError as e:
err = str(e)
if "Out of Memory" in err or "WiFi" in err:
if machine.reset_cause() == machine.SOFT_RESET:
print("wifi_sta: init failed after soft reboot, hard reset:", err)
machine.reset()
raise
sta_if.config(pm=network.WLAN.PM_NONE)
ssid = settings.get("ssid") or ""
if ssid:
connect_until_up(sta_if, ssid, settings.get("password") or "", wdt)
return sta_if
def connect_until_up(sta_if, ssid, password, wdt):
"""Boot: repeat campaigns until STA has a route (same strategy as tests/test_wifi.py)."""
if not ssid:
print("wifi_sta: no ssid in settings")
return False
attempt = 0
while True:
attempt += 1
print("wifi_sta: boot attempt", attempt, "ssid=", repr(ssid))
if _one_association_campaign(sta_if, ssid, password, wdt):
try:
print("wifi_sta: connected", sta_if.ifconfig()[0])
except Exception:
print("wifi_sta: connected")
return True
print("wifi_sta: retry in", _RETRY_DELAY_S, "s")
for _ in range(_RETRY_DELAY_S):
utime.sleep(1)
if wdt is not None:
wdt.feed()
def try_reconnect(sta_if, ssid, password, wdt):
"""Runtime: single association campaign after link loss; non-looping."""
if not ssid:
return False
print("wifi_sta: reconnect")
ok = _one_association_campaign(sta_if, ssid, password, wdt)
if ok:
try:
print("wifi_sta: connected", sta_if.ifconfig()[0])
except Exception:
print("wifi_sta: connected")
return ok

View File

@@ -1,14 +1,50 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Self-contained led-driver test runner for MicroPython/mpremote.""" """Self-contained led-driver test runner for MicroPython/mpremote.
Run on device (from led-driver repo root)::
mpremote connect <port> run tests/all.py
Or via dev helper::
python dev.py <port> test
"""
import json import json
import os import sys
import utime import utime
from machine import WDT from machine import WDT
from settings import Settings
from presets import Presets, run_tick def _bootstrap_import_path():
from utils import convert_and_reorder_colors """Find ``settings`` / ``presets`` whether this file lives in ``tests/`` or ``:/``."""
try:
import uos as os
except ImportError:
import os
candidates = []
try:
here = __file__.rsplit("/", 1)[0]
if here:
candidates.append(here)
parent = here.rsplit("/", 1)[0]
if parent:
candidates.append(parent)
except NameError:
pass
candidates.extend([".", "..", "/"])
for p in candidates:
if p and p not in sys.path:
sys.path.insert(0, p)
_bootstrap_import_path()
from settings import Settings # noqa: E402
from presets import Presets, run_tick # noqa: E402
from preset import Preset # noqa: E402
from utils import convert_and_reorder_colors # noqa: E402
class _TestContext: class _TestContext:
@@ -27,6 +63,20 @@ class _TestContext:
utime.sleep_ms(sleep_ms) utime.sleep_ms(sleep_ms)
def _pattern_loaded(ctx, pattern_id):
return pattern_id in ctx.presets.patterns
def _smoke_preset(ctx, name, data, ms=80):
pattern_id = data.get("p") or data.get("pattern")
if not _pattern_loaded(ctx, pattern_id):
raise AssertionError("pattern not loaded: %s" % pattern_id)
ctx.presets.edit(name, data)
if not ctx.presets.select(name):
raise AssertionError("select failed: %s" % name)
ctx.tick_for_ms(ms)
def _process_message(ctx, payload): def _process_message(ctx, payload):
"""Small test helper that mirrors the main message handling logic.""" """Small test helper that mirrors the main message handling logic."""
try: try:
@@ -93,8 +143,7 @@ def _process_message(ctx, payload):
should_apply_default = this_device_name_norm in normalized_targets should_apply_default = this_device_name_norm in normalized_targets
if ( if (
should_apply_default should_apply_default
and and isinstance(default_name, str)
isinstance(default_name, str)
and default_name and default_name
and default_name in ctx.presets.presets and default_name in ctx.presets.presets
): ):
@@ -145,6 +194,40 @@ def test_preset_edit_sanitization():
assert not hasattr(p, "unknown_field") assert not hasattr(p, "unknown_field")
def test_preset_mode_alias_maps_to_n6():
ctx = _TestContext()
ctx.presets.edit(
"rainbow_mode",
{"pattern": "colour_cycle", "mode": 1, "d": 50, "n1": 2, "a": True},
)
p = ctx.presets.presets["rainbow_mode"]
assert p.p == "colour_cycle"
assert p.n6 == 1
def test_style_mode_and_legacy_aliases():
from patterns.pattern_modes import style_mode
p = Preset({"p": "colour_cycle", "mode": 0, "d": 50, "c": [(255, 0, 0)]})
assert style_mode(p, 0, {"rainbow": 1}) == 0
legacy = Preset({"p": "rainbow", "d": 50, "c": [(255, 0, 0)]})
assert style_mode(legacy, 0, {"rainbow": 1}) == 1
ctx = _TestContext()
legacy_ids = (
"rainbow",
"meteor_rain",
"snowfall",
"sparkle_trail",
"marquee",
"northern_wave",
)
for lid in legacy_ids:
if not _pattern_loaded(ctx, lid):
raise AssertionError("legacy alias not registered: %s" % lid)
def test_colour_conversion_and_transition(): def test_colour_conversion_and_transition():
ctx = _TestContext() ctx = _TestContext()
msg = { msg = {
@@ -162,7 +245,6 @@ def test_colour_conversion_and_transition():
result = _process_message(ctx, msg) result = _process_message(ctx, msg)
assert result == "ok" assert result == "ok"
assert ctx.presets.selected == "fade" assert ctx.presets.selected == "fade"
# Smoke-run the generator to ensure math runs without type errors.
ctx.tick_for_ms(250) ctx.tick_for_ms(250)
@@ -172,19 +254,54 @@ def test_pattern_smoke():
"t_on": {"p": "on", "c": [(16, 8, 4)]}, "t_on": {"p": "on", "c": [(16, 8, 4)]},
"t_off": {"p": "off"}, "t_off": {"p": "off"},
"t_blink": {"p": "blink", "c": [(255, 0, 0)], "d": 20}, "t_blink": {"p": "blink", "c": [(255, 0, 0)], "d": 20},
"t_rainbow": {"p": "rainbow", "d": 5, "n1": 2}, "t_colour_cycle": {"p": "colour_cycle", "n6": 0, "d": 5, "n1": 2, "c": [(255, 0, 0), (0, 255, 0)]},
"t_pulse": {"p": "pulse", "c": [(255, 0, 0)], "n1": 20, "n2": 10, "n3": 20, "d": 10},
"t_transition": {"p": "transition", "c": [(255, 0, 0), (0, 0, 255)], "d": 30},
"t_chase": {"p": "chase", "c": [(255, 0, 0), (0, 0, 255)], "n1": 3, "n2": 2, "n3": 1, "n4": 1, "d": 20}, "t_chase": {"p": "chase", "c": [(255, 0, 0), (0, 0, 255)], "n1": 3, "n2": 2, "n3": 1, "n4": 1, "d": 20},
"t_circle": {"p": "circle", "c": [(255, 255, 0), (0, 0, 8)], "n1": 5, "n2": 10, "n3": 5, "n4": 2},
} }
for name, data in cases.items(): for name, data in cases.items():
ctx.presets.edit(name, data) _smoke_preset(ctx, name, data, ms=100)
assert ctx.presets.select(name), "select failed: %s" % name
ctx.tick_for_ms(120)
def test_merged_pattern_modes():
"""Smoke each style (``n6`` / ``mode``) for merged multi-mode patterns."""
ctx = _TestContext()
colors = [(200, 220, 255), (255, 180, 80)]
cases = (
("mc_grad", "colour_cycle", {"p": "colour_cycle", "n6": 0, "n1": 2, "d": 8, "c": colors}),
("mc_wheel", "colour_cycle", {"p": "colour_cycle", "mode": 1, "n1": 2, "d": 8}),
("chase_std", "chase", {"p": "chase", "n6": 0, "n1": 2, "n2": 2, "n3": 1, "n4": 1, "d": 15, "c": colors}),
("chase_marq", "chase", {"p": "chase", "n6": 1, "n1": 3, "n2": 2, "n3": 1, "d": 15, "c": colors}),
("meteor_0", "meteor", {"p": "meteor", "n6": 0, "n1": 4, "n2": 2, "n3": 8, "d": 10, "c": colors}),
("meteor_1", "meteor", {"p": "meteor", "n6": 1, "n1": 3, "n2": 2, "n3": 4, "d": 10, "c": colors}),
("part_0", "particles", {"p": "particles", "n6": 0, "n1": 4, "n2": 1, "d": 10, "c": colors}),
("part_1", "particles", {"p": "particles", "mode": 1, "n1": 3, "n2": 1, "n3": 4, "d": 10, "c": colors}),
("spark_0", "sparkle", {"p": "sparkle", "n6": 0, "n1": 4, "n2": 6, "d": 10, "c": colors}),
("spark_1", "sparkle", {"p": "sparkle", "n6": 1, "n1": 3, "n2": 4, "n3": 2, "d": 10, "c": colors}),
("aurora_0", "aurora", {"p": "aurora", "n6": 0, "n1": 3, "n2": 2, "n3": 0, "d": 12, "c": colors}),
("aurora_1", "aurora", {"p": "aurora", "mode": 1, "n1": 8, "n2": 2, "n3": 1, "d": 12, "c": colors}),
)
for name, pattern_id, data in cases:
if not _pattern_loaded(ctx, pattern_id):
continue
_smoke_preset(ctx, name, data, ms=60)
legacy_smoke = (
("leg_rainbow", "rainbow", {"p": "rainbow", "d": 8, "n1": 2}),
("leg_ice", "ice_sparkle", {"p": "ice_sparkle", "n1": 3, "n2": 2, "n3": 2, "d": 10, "c": colors}),
("leg_wave", "northern_wave", {"p": "northern_wave", "n1": 6, "n2": 2, "n3": 1, "d": 12, "c": colors}),
("leg_star", "starfall", {"p": "starfall", "n1": 3, "n2": 1, "n3": 3, "d": 10, "c": colors}),
)
for name, pattern_id, data in legacy_smoke:
if not _pattern_loaded(ctx, pattern_id):
continue
_smoke_preset(ctx, name, data, ms=60)
def test_patterns_do_not_use_blocking_sleep(): def test_patterns_do_not_use_blocking_sleep():
try:
import uos as os
except ImportError:
import os
pattern_dir = "patterns" pattern_dir = "patterns"
offenders = [] offenders = []
try: try:
@@ -192,8 +309,9 @@ def test_patterns_do_not_use_blocking_sleep():
except OSError: except OSError:
raise AssertionError("patterns directory is missing") raise AssertionError("patterns directory is missing")
skip = frozenset(("__init__.py", "main.py", "pattern_modes.py"))
for filename in files: for filename in files:
if not filename.endswith(".py") or filename in ("__init__.py", "main.py"): if not filename.endswith(".py") or filename in skip:
continue continue
path = pattern_dir + "/" + filename path = pattern_dir + "/" + filename
try: try:
@@ -223,6 +341,7 @@ def test_default_requires_existing_preset():
_process_message(ctx, {"v": "1", "default": "exists"}) _process_message(ctx, {"v": "1", "default": "exists"})
assert ctx.settings.get("default") == "exists" assert ctx.settings.get("default") == "exists"
def test_default_targets_gate_by_device_name(): def test_default_targets_gate_by_device_name():
ctx = _TestContext() ctx = _TestContext()
ctx.settings["name"] = "a" ctx.settings["name"] = "a"
@@ -243,6 +362,11 @@ def test_default_targets_gate_by_device_name():
def test_save_and_load_roundtrip(): def test_save_and_load_roundtrip():
try:
import uos as os
except ImportError:
import os
ctx = _TestContext() ctx = _TestContext()
ctx.presets.edit( ctx.presets.edit(
"persist", "persist",
@@ -270,8 +394,11 @@ def run_all():
tests = [ tests = [
test_invalid_messages_do_not_crash, test_invalid_messages_do_not_crash,
test_preset_edit_sanitization, test_preset_edit_sanitization,
test_preset_mode_alias_maps_to_n6,
test_style_mode_and_legacy_aliases,
test_colour_conversion_and_transition, test_colour_conversion_and_transition,
test_pattern_smoke, test_pattern_smoke,
test_merged_pattern_modes,
test_patterns_do_not_use_blocking_sleep, test_patterns_do_not_use_blocking_sleep,
test_default_requires_existing_preset, test_default_requires_existing_preset,
test_default_targets_gate_by_device_name, test_default_targets_gate_by_device_name,

View File

@@ -1,190 +0,0 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, duration_ms):
"""Run pattern for specified duration."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
print("=" * 50)
print("Testing Auto and Manual Modes")
print("=" * 50)
# Test 1: Rainbow in AUTO mode (continuous)
print("\nTest 1: Rainbow pattern in AUTO mode (should run continuously)")
p.edit("rainbow_auto", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": True,
})
p.select("rainbow_auto")
print("Running rainbow_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Pattern ran continuously")
# Test 2: Rainbow in MANUAL mode (one step per tick)
print("\nTest 2: Rainbow pattern in MANUAL mode (one step per tick)")
p.edit("rainbow_manual", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": False,
})
p.select("rainbow_manual")
print("Calling tick() 5 times (should advance 5 steps)...")
for i in range(5):
run_tick(p)
utime.sleep_ms(100) # Small delay to see changes
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
# Check if generator stopped after one cycle
if p.generator is None:
print("✓ Manual mode: Generator stopped after one step (as expected)")
else:
print("⚠ Manual mode: Generator still active (may need multiple ticks)")
# Test 3: Pulse in AUTO mode (continuous cycles)
print("\nTest 3: Pulse pattern in AUTO mode (should pulse continuously)")
p.edit("pulse_auto", {
"p": "pulse",
"b": 128,
"d": 100,
"n1": 500, # Attack
"n2": 200, # Hold
"n3": 500, # Decay
"c": [(255, 0, 0)],
"a": True,
})
p.select("pulse_auto")
print("Running pulse_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Pulse ran continuously")
# Test 4: Pulse in MANUAL mode (one cycle then stop)
print("\nTest 4: Pulse pattern in MANUAL mode (one cycle then stop)")
p.edit("pulse_manual", {
"p": "pulse",
"b": 128,
"d": 100,
"n1": 300, # Attack
"n2": 200, # Hold
"n3": 300, # Decay
"c": [(0, 255, 0)],
"a": False,
})
p.select("pulse_manual")
print("Running pulse_manual until generator stops...")
tick_count = 0
max_ticks = 200 # Safety limit
while p.generator is not None and tick_count < max_ticks:
run_tick(p)
tick_count += 1
utime.sleep_ms(10)
if p.generator is None:
print(f"✓ Manual mode: Pulse completed one cycle after {tick_count} ticks")
else:
print(f"⚠ Manual mode: Pulse still running after {tick_count} ticks")
# Test 5: Transition in AUTO mode (continuous transitions)
print("\nTest 5: Transition pattern in AUTO mode (continuous transitions)")
p.edit("transition_auto", {
"p": "transition",
"b": 128,
"d": 500,
"c": [(255, 0, 0), (0, 255, 0), (0, 0, 255)],
"a": True,
})
p.select("transition_auto")
print("Running transition_auto for 3 seconds...")
run_for(p, wdt, 3000)
print("✓ Auto mode: Transition ran continuously")
# Test 6: Transition in MANUAL mode (one transition then stop)
print("\nTest 6: Transition pattern in MANUAL mode (one transition then stop)")
p.edit("transition_manual", {
"p": "transition",
"b": 128,
"d": 500,
"c": [(255, 0, 0), (0, 255, 0)],
"a": False,
})
p.select("transition_manual")
print("Running transition_manual until generator stops...")
tick_count = 0
max_ticks = 200
while p.generator is not None and tick_count < max_ticks:
run_tick(p)
tick_count += 1
utime.sleep_ms(10)
if p.generator is None:
print(f"✓ Manual mode: Transition completed after {tick_count} ticks")
else:
print(f"⚠ Manual mode: Transition still running after {tick_count} ticks")
# Test 7: Switching between auto and manual modes
print("\nTest 7: Switching between auto and manual modes")
p.edit("switch_test", {
"p": "rainbow",
"b": 128,
"d": 50,
"n1": 2,
"a": True,
})
p.select("switch_test")
print("Running in auto mode for 1 second...")
run_for(p, wdt, 1000)
# Switch to manual mode by editing the preset
print("Switching to manual mode...")
p.edit("switch_test", {"a": False})
p.select("switch_test") # Re-select to apply changes
print("Calling tick() 3 times in manual mode...")
for i in range(3):
run_tick(p)
utime.sleep_ms(100)
print(f" Tick {i+1}: generator={'active' if p.generator is not None else 'stopped'}")
# Switch back to auto mode
print("Switching back to auto mode...")
p.edit("switch_test", {"a": True})
p.select("switch_test")
print("Running in auto mode for 1 second...")
run_for(p, wdt, 1000)
print("✓ Successfully switched between auto and manual modes")
# Cleanup
print("\nCleaning up...")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_tick(p)
utime.sleep_ms(100)
print("\n" + "=" * 50)
print("All tests completed!")
print("=" * 50)
if __name__ == "__main__":
main()

View File

@@ -1,151 +0,0 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets, run_tick
def run_for(p, wdt, ms):
"""Helper: run current pattern for given ms using tick()."""
start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), start) < ms:
wdt.feed()
run_tick(p)
utime.sleep_ms(10)
def main():
s = Settings()
pin = s.get("led_pin", 10)
num = s.get("num_leds", 30)
p = Presets(pin=pin, num_leds=num)
wdt = WDT(timeout=10000)
# Test 1: Basic rainbow with auto=True (continuous)
print("Test 1: Basic rainbow (auto=True, n1=1)")
p.edit("rainbow1", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 1,
"a": True,
})
p.select("rainbow1")
run_for(p, wdt, 3000)
# Test 2: Fast rainbow
print("Test 2: Fast rainbow (low delay, n1=1)")
p.edit("rainbow2", {
"p": "rainbow",
"d": 50,
"n1": 1,
"a": True,
})
p.select("rainbow2")
run_for(p, wdt, 2000)
# Test 3: Slow rainbow
print("Test 3: Slow rainbow (high delay, n1=1)")
p.edit("rainbow3", {
"p": "rainbow",
"d": 500,
"n1": 1,
"a": True,
})
p.select("rainbow3")
run_for(p, wdt, 3000)
# Test 4: Low brightness rainbow
print("Test 4: Low brightness rainbow (n1=1)")
p.edit("rainbow4", {
"p": "rainbow",
"b": 64,
"d": 100,
"n1": 1,
"a": True,
})
p.select("rainbow4")
run_for(p, wdt, 2000)
# Test 5: Single-step rainbow (auto=False)
print("Test 5: Single-step rainbow (auto=False, n1=1)")
p.edit("rainbow5", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 1,
"a": False,
})
p.step = 0
for i in range(10):
p.select("rainbow5")
# One tick advances the generator one frame when auto=False
run_tick(p)
utime.sleep_ms(100)
wdt.feed()
# Test 6: Verify step updates correctly
print("Test 6: Verify step updates (auto=False, n1=1)")
p.edit("rainbow6", {
"p": "rainbow",
"n1": 1,
"a": False,
})
initial_step = p.step
p.select("rainbow6")
run_tick(p)
final_step = p.step
print(f"Step updated from {initial_step} to {final_step} (expected increment: 1)")
# Test 7: Fast step increment (n1=5)
print("Test 7: Fast rainbow (n1=5, auto=True)")
p.edit("rainbow7", {
"p": "rainbow",
"b": 255,
"d": 100,
"n1": 5,
"a": True,
})
p.select("rainbow7")
run_for(p, wdt, 2000)
# Test 8: Very fast step increment (n1=10)
print("Test 8: Very fast rainbow (n1=10, auto=True)")
p.edit("rainbow8", {
"p": "rainbow",
"n1": 10,
"a": True,
})
p.select("rainbow8")
run_for(p, wdt, 2000)
# Test 9: Verify n1 controls step increment (auto=False)
print("Test 9: Verify n1 step increment (auto=False, n1=5)")
p.edit("rainbow9", {
"p": "rainbow",
"n1": 5,
"a": False,
})
p.step = 0
initial_step = p.step
p.select("rainbow9")
run_tick(p)
final_step = p.step
expected_step = (initial_step + 5) % 256
print(f"Step updated from {initial_step} to {final_step} (expected: {expected_step})")
if final_step == expected_step:
print("✓ n1 step increment working correctly")
else:
print(f"✗ Step increment mismatch! Expected {expected_step}, got {final_step}")
# Cleanup
print("Test complete, turning off")
p.edit("cleanup_off", {"p": "off"})
p.select("cleanup_off")
run_for(p, wdt, 100)
if __name__ == "__main__":
main()

52
tests/patterns/twinkle.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
import utime
from machine import WDT
from settings import Settings
from presets import Presets
def main():
print("[test] twinkle: start")
s = Settings()
p = Presets(pin=s.get("led_pin", 10), num_leds=s.get("num_leds", 30))
p.debug = True
wdt = WDT(timeout=10000)
print("[test] twinkle: auto phase begin")
p.edit("test_pattern", {"p": "twinkle", "b": 64, "a": True, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern"):
raise Exception("twinkle select failed in auto phase")
auto_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), auto_start) < 2500:
wdt.feed()
p.run_step()
utime.sleep_ms(20)
remaining_ms = utime.ticks_diff(p.next_tick_ms, utime.ticks_ms())
if p.next_tick_ms == 0 or remaining_ms <= 0:
raise Exception("twinkle delay scheduling invalid")
print("[test] twinkle: auto phase end")
print("[test] twinkle: manual phase begin")
p.edit("test_pattern", {"p": "twinkle", "b": 64, "a": False, "d": 3000, "c": [(255, 0, 0), (0, 0, 255)]})
if not p.select("test_pattern", step=0):
raise Exception("twinkle select failed in manual phase")
for _ in range(6):
current_step = int(p.step)
if not p.select("test_pattern", step=current_step):
raise Exception("twinkle external select failed")
p.run_step()
wdt.feed()
if int(p.step) == current_step:
raise Exception("twinkle external step did not advance")
if p.generator is not None:
raise Exception("twinkle manual mode rescheduled generator")
hold_start = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), hold_start) < 700:
wdt.feed()
utime.sleep_ms(20)
print("[test] twinkle: manual phase end")
print("[test] twinkle: pass")
if __name__ == "__main__":
main()