diff --git a/esp32/msg.json b/esp32/msg.json new file mode 100644 index 0000000..f2322b6 --- /dev/null +++ b/esp32/msg.json @@ -0,0 +1,21 @@ +{ + "ch": 6, + + "peers": { + "12:3456789012":{ + "select": [["name1", "preset1"]] + + , + "ff:ff:ff:ff:ff:ff": { + "presets": { + "preset1": { + "pattern": "on", + "colors": ["#FF0000", "#00FF00", "#0000FF"], + "delay": 100, + "brightness": 127, + "auto": true + } + } + } + } +} diff --git a/led-driver b/led-driver index fea4e69..a64457a 160000 --- a/led-driver +++ b/led-driver @@ -1 +1 @@ -Subproject commit fea4e69140b7142563159daced00972c5275acb5 +Subproject commit a64457a0d5ee3f6c3281ab42a33d5316a5336bbb diff --git a/patterns/__init__.py b/patterns/__init__.py new file mode 100644 index 0000000..83b9dac --- /dev/null +++ b/patterns/__init__.py @@ -0,0 +1,6 @@ +from .blink import Blink +from .rainbow import Rainbow +from .pulse import Pulse +from .transition import Transition +from .chase import Chase +from .circle import Circle diff --git a/patterns/blink.py b/patterns/blink.py new file mode 100644 index 0000000..8a63fe5 --- /dev/null +++ b/patterns/blink.py @@ -0,0 +1,33 @@ +import utime + + +class Blink: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Blink pattern: toggles LEDs on/off using preset delay, cycling through colors.""" + # Use provided colors, or default to white if none + colors = preset.c if preset.c else [(255, 255, 255)] + color_index = 0 + state = True # True = on, False = off + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + # Re-read delay each loop so live updates to preset.d take effect + delay_ms = max(1, int(preset.d)) + if utime.ticks_diff(current_time, last_update) >= delay_ms: + if state: + base_color = colors[color_index % len(colors)] + color = self.driver.apply_brightness(base_color, preset.b) + self.driver.fill(color) + # Advance to next color for the next "on" phase + color_index += 1 + else: + # "Off" phase: turn all LEDs off + self.driver.fill((0, 0, 0)) + state = not state + last_update = current_time + # Yield once per tick so other logic can run + yield diff --git a/patterns/chase.py b/patterns/chase.py new file mode 100644 index 0000000..837ac21 --- /dev/null +++ b/patterns/chase.py @@ -0,0 +1,124 @@ +import utime + + +class Chase: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Chase pattern: n1 LEDs of color0, n2 LEDs of color1, repeating. + Moves by n3 on even steps, n4 on odd steps (n3/n4 can be positive or negative)""" + colors = preset.c + if len(colors) < 1: + # Need at least 1 color + return + + # Access colors, delay, and n values from preset + if not colors: + return + # If only one color provided, use it for both colors + if len(colors) < 2: + color0 = colors[0] + color1 = colors[0] + else: + color0 = colors[0] + color1 = colors[1] + + color0 = self.driver.apply_brightness(color0, preset.b) + color1 = self.driver.apply_brightness(color1, preset.b) + + n1 = max(1, int(preset.n1)) # LEDs of color 0 + n2 = max(1, int(preset.n2)) # LEDs of color 1 + n3 = int(preset.n3) # Step movement on even steps (can be negative) + n4 = int(preset.n4) # Step movement on odd steps (can be negative) + + segment_length = n1 + n2 + + # Calculate position from step_count + step_count = self.driver.step + # Position alternates: step 0 adds n3, step 1 adds n4, step 2 adds n3, etc. + if step_count % 2 == 0: + # Even steps: (step_count//2) pairs of (n3+n4) plus one extra n3 + position = (step_count // 2) * (n3 + n4) + n3 + else: + # Odd steps: ((step_count+1)//2) pairs of (n3+n4) + position = ((step_count + 1) // 2) * (n3 + n4) + + # Wrap position to keep it reasonable + max_pos = self.driver.num_leds + segment_length + position = position % max_pos + if position < 0: + position += max_pos + + # If auto is False, run a single step and then stop + if not preset.a: + # Clear all LEDs + self.driver.n.fill((0, 0, 0)) + + # Draw repeating pattern starting at position + for i in range(self.driver.num_leds): + # Calculate position in the repeating segment + relative_pos = (i - position) % segment_length + if relative_pos < 0: + relative_pos = (relative_pos + segment_length) % segment_length + + # Determine which color based on position in segment + if relative_pos < n1: + self.driver.n[i] = color0 + else: + self.driver.n[i] = color1 + + self.driver.n.write() + + # Increment step for next beat + self.driver.step = step_count + 1 + + # Allow tick() to advance the generator once + yield + return + + # Auto mode: continuous loop + # Use transition_duration for timing and force the first update to happen immediately + transition_duration = max(10, int(preset.d)) + last_update = utime.ticks_ms() - transition_duration + + while True: + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, last_update) >= transition_duration: + # Calculate current position from step_count + if step_count % 2 == 0: + position = (step_count // 2) * (n3 + n4) + n3 + else: + position = ((step_count + 1) // 2) * (n3 + n4) + + # Wrap position + max_pos = self.driver.num_leds + segment_length + position = position % max_pos + if position < 0: + position += max_pos + + # Clear all LEDs + self.driver.n.fill((0, 0, 0)) + + # Draw repeating pattern starting at position + for i in range(self.driver.num_leds): + # Calculate position in the repeating segment + relative_pos = (i - position) % segment_length + if relative_pos < 0: + relative_pos = (relative_pos + segment_length) % segment_length + + # Determine which color based on position in segment + if relative_pos < n1: + self.driver.n[i] = color0 + else: + self.driver.n[i] = color1 + + self.driver.n.write() + + # Increment step + step_count += 1 + self.driver.step = step_count + last_update = current_time + + # Yield once per tick so other logic can run + yield diff --git a/patterns/circle.py b/patterns/circle.py new file mode 100644 index 0000000..f063724 --- /dev/null +++ b/patterns/circle.py @@ -0,0 +1,96 @@ +import utime + + +class Circle: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Circle loading pattern - grows to n2, then tail moves forward at n3 until min length n4""" + head = 0 + tail = 0 + + # Calculate timing from preset + head_rate = max(1, int(preset.n1)) # n1 = head moves per second + tail_rate = max(1, int(preset.n3)) # n3 = tail moves per second + max_length = max(1, int(preset.n2)) # n2 = max length + min_length = max(0, int(preset.n4)) # n4 = min length + + head_delay = 1000 // head_rate # ms between head movements + tail_delay = 1000 // tail_rate # ms between tail movements + + last_head_move = utime.ticks_ms() + last_tail_move = utime.ticks_ms() + + phase = "growing" # "growing", "shrinking", or "off" + + # Support up to two colors (like chase). If only one color is provided, + # use black for the second; if none, default to white. + colors = preset.c + if not colors: + base0 = base1 = (255, 255, 255) + elif len(colors) == 1: + base0 = colors[0] + base1 = (0, 0, 0) + else: + base0 = colors[0] + base1 = colors[1] + + color0 = self.driver.apply_brightness(base0, preset.b) + color1 = self.driver.apply_brightness(base1, preset.b) + + while True: + current_time = utime.ticks_ms() + + # Background: use second color during the "off" phase, otherwise clear to black + if phase == "off": + self.driver.n.fill(color1) + else: + self.driver.n.fill((0, 0, 0)) + + # Calculate segment length + segment_length = (head - tail) % self.driver.num_leds + if segment_length == 0 and head != tail: + segment_length = self.driver.num_leds + + # Draw segment from tail to head as a solid color (no per-LED alternation) + current_color = color0 + for i in range(segment_length + 1): + led_pos = (tail + i) % self.driver.num_leds + self.driver.n[led_pos] = current_color + + # Move head continuously at n1 LEDs per second + if utime.ticks_diff(current_time, last_head_move) >= head_delay: + head = (head + 1) % self.driver.num_leds + last_head_move = current_time + + # Tail behavior based on phase + if phase == "growing": + # Growing phase: tail stays at 0 until max length reached + if segment_length >= max_length: + phase = "shrinking" + elif phase == "shrinking": + # Shrinking phase: move tail forward at n3 LEDs per second + if utime.ticks_diff(current_time, last_tail_move) >= tail_delay: + tail = (tail + 1) % self.driver.num_leds + last_tail_move = current_time + + # Check if we've reached min length + current_length = (head - tail) % self.driver.num_leds + if current_length == 0 and head != tail: + current_length = self.driver.num_leds + + # For min_length = 0, we need at least 1 LED (the head) + if min_length == 0 and current_length <= 1: + phase = "off" # All LEDs off for 1 step + elif min_length > 0 and current_length <= min_length: + phase = "growing" # Cycle repeats + else: # phase == "off" + # Off phase: second color fills the ring for 1 step, then restart + tail = head # Reset tail to head position to start fresh + phase = "growing" + + self.driver.n.write() + + # Yield once per tick so other logic can run + yield diff --git a/patterns/pulse.py b/patterns/pulse.py new file mode 100644 index 0000000..1faf020 --- /dev/null +++ b/patterns/pulse.py @@ -0,0 +1,64 @@ +import utime + + +class Pulse: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + self.driver.off() + + # Get colors from preset + colors = preset.c + if not colors: + colors = [(255, 255, 255)] + + color_index = 0 + cycle_start = utime.ticks_ms() + + # State machine based pulse using a single generator loop + while True: + # Read current timing parameters from preset + attack_ms = max(0, int(preset.n1)) # Attack time in ms + hold_ms = max(0, int(preset.n2)) # Hold time in ms + decay_ms = max(0, int(preset.n3)) # Decay time in ms + delay_ms = max(0, int(preset.d)) + + total_ms = attack_ms + hold_ms + decay_ms + delay_ms + if total_ms <= 0: + total_ms = 1 + + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, cycle_start) + + base_color = colors[color_index % len(colors)] + + if elapsed < attack_ms and attack_ms > 0: + # Attack: fade 0 -> 1 + factor = elapsed / attack_ms + color = tuple(int(c * factor) for c in base_color) + self.driver.fill(self.driver.apply_brightness(color, preset.b)) + elif elapsed < attack_ms + hold_ms: + # Hold: full brightness + self.driver.fill(self.driver.apply_brightness(base_color, preset.b)) + elif elapsed < attack_ms + hold_ms + decay_ms and decay_ms > 0: + # Decay: fade 1 -> 0 + dec_elapsed = elapsed - attack_ms - hold_ms + factor = max(0.0, 1.0 - (dec_elapsed / decay_ms)) + color = tuple(int(c * factor) for c in base_color) + self.driver.fill(self.driver.apply_brightness(color, preset.b)) + elif elapsed < total_ms: + # Delay phase: LEDs off between pulses + self.driver.fill((0, 0, 0)) + else: + # End of cycle, move to next color and restart timing + color_index += 1 + cycle_start = now + if not preset.a: + break + # Skip drawing this tick, start next cycle + yield + continue + + # Yield once per tick + yield diff --git a/patterns/rainbow.py b/patterns/rainbow.py new file mode 100644 index 0000000..64c54e9 --- /dev/null +++ b/patterns/rainbow.py @@ -0,0 +1,51 @@ +import utime + + +class Rainbow: + def __init__(self, driver): + self.driver = driver + + def _wheel(self, pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + def run(self, preset): + step = self.driver.step % 256 + step_amount = max(1, int(preset.n1)) # n1 controls step increment + + # If auto is False, run a single step and then stop + if not preset.a: + for i in range(self.driver.num_leds): + rc_index = (i * 256 // self.driver.num_leds) + step + self.driver.n[i] = self.driver.apply_brightness(self._wheel(rc_index & 255), preset.b) + self.driver.n.write() + # Increment step by n1 for next manual call + self.driver.step = (step + step_amount) % 256 + # Allow tick() to advance the generator once + yield + return + + last_update = utime.ticks_ms() + + while True: + current_time = utime.ticks_ms() + sleep_ms = max(1, int(preset.d)) # Get delay from preset + if utime.ticks_diff(current_time, last_update) >= sleep_ms: + for i in range(self.driver.num_leds): + rc_index = (i * 256 // self.driver.num_leds) + step + self.driver.n[i] = self.driver.apply_brightness( + self._wheel(rc_index & 255), + preset.b, + ) + self.driver.n.write() + step = (step + step_amount) % 256 + self.driver.step = step + last_update = current_time + # Yield once per tick so other logic can run + yield diff --git a/patterns/transition.py b/patterns/transition.py new file mode 100644 index 0000000..b29545a --- /dev/null +++ b/patterns/transition.py @@ -0,0 +1,57 @@ +import utime + + +class Transition: + def __init__(self, driver): + self.driver = driver + + def run(self, preset): + """Transition between colors, blending over `delay` ms.""" + colors = preset.c + if not colors: + self.driver.off() + yield + return + + # Only one color: just keep it on + if len(colors) == 1: + while True: + self.driver.fill(self.driver.apply_brightness(colors[0], preset.b)) + yield + return + + color_index = 0 + start_time = utime.ticks_ms() + + while True: + if not colors: + break + + # Get current and next color based on live list + c1 = colors[color_index % len(colors)] + c2 = colors[(color_index + 1) % len(colors)] + + duration = max(10, int(preset.d)) # At least 10ms + now = utime.ticks_ms() + elapsed = utime.ticks_diff(now, start_time) + + if elapsed >= duration: + # End of this transition step + if not preset.a: + # One-shot: transition from first to second color only + self.driver.fill(self.driver.apply_brightness(c2, preset.b)) + break + # Auto: move to next pair + color_index = (color_index + 1) % len(colors) + start_time = now + yield + continue + + # Interpolate between c1 and c2 + factor = elapsed / duration + interpolated = tuple( + int(c1[i] + (c2[i] - c1[i]) * factor) for i in range(3) + ) + self.driver.fill(self.driver.apply_brightness(interpolated, preset.b)) + + yield diff --git a/src/models/http_driver.py b/src/models/http_driver.py new file mode 100644 index 0000000..423154e --- /dev/null +++ b/src/models/http_driver.py @@ -0,0 +1,125 @@ +"""Wi-Fi LED drivers over HTTP long-poll (same port as the web UI). + +Drivers POST /driver/v1/poll; the controller responds with queued JSON lines. +Presence: last poll within DRIVER_HTTP_SEEN_S counts as connected. +""" + +import asyncio +import time + +from models.wifi_peer import normalize_wifi_peer_ip + +# Must exceed max ``wait_s`` (60) on /driver/v1/poll so sessions are not pruned mid-wait. +DRIVER_HTTP_SEEN_S = 90.0 +_QUEUE_MAX = 64 + +_queues: dict[str, asyncio.Queue] = {} +_last_poll: dict[str, float] = {} +_connected_flag: set[str] = set() +_status_broadcast = None + + +def set_wifi_driver_status_broadcaster(coro) -> None: + global _status_broadcast + _status_broadcast = coro + + +def _schedule_status(ip: str, connected: bool) -> None: + fn = _status_broadcast + if not fn: + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + try: + loop.create_task(fn(ip, connected)) + except Exception: + pass + + +def _get_queue(ip: str) -> asyncio.Queue: + q = _queues.get(ip) + if q is None: + q = asyncio.Queue(maxsize=_QUEUE_MAX) + _queues[ip] = q + return q + + +def prune_stale_http_sessions() -> None: + """Drop timed-out sessions, clear queues, broadcast disconnect.""" + now = time.monotonic() + for ip in list(_last_poll.keys()): + if now - _last_poll[ip] <= DRIVER_HTTP_SEEN_S: + continue + _last_poll.pop(ip, None) + _queues.pop(ip, None) + if ip in _connected_flag: + _connected_flag.discard(ip) + _schedule_status(ip, False) + print(f"[HTTP driver] session timed out: {ip}") + + +def touch_http_session(ip: str) -> None: + ip = normalize_wifi_peer_ip(ip) + if not ip: + return + prune_stale_http_sessions() + now = time.monotonic() + _last_poll[ip] = now + if ip not in _connected_flag: + _connected_flag.add(ip) + _schedule_status(ip, True) + + +def wifi_driver_connected(ip: str) -> bool: + prune_stale_http_sessions() + key = normalize_wifi_peer_ip(ip) + return bool(key and key in _connected_flag) + + +def list_connected_driver_ips(): + prune_stale_http_sessions() + return list(_connected_flag) + + +async def enqueue_json_line(ip: str, json_str: str) -> bool: + ip = normalize_wifi_peer_ip(ip) + if not ip: + return False + line = json_str[:-1] if json_str.endswith("\n") else json_str + q = _get_queue(ip) + while True: + try: + q.put_nowait(line) + return True + except asyncio.QueueFull: + try: + q.get_nowait() + except asyncio.QueueEmpty: + pass + + +async def send_json_line_to_ip(ip: str, json_str: str) -> bool: + """Queue one JSON line for the driver to receive on the next long-poll.""" + return await enqueue_json_line(ip, json_str) + + +async def collect_lines_after_touch(ip: str, wait_s: float) -> list[str]: + """Wait up to wait_s for first line, then drain the rest (non-blocking).""" + ip = normalize_wifi_peer_ip(ip) + if not ip: + return [] + q = _get_queue(ip) + lines: list[str] = [] + try: + first = await asyncio.wait_for(q.get(), timeout=wait_s) + lines.append(first) + while True: + try: + lines.append(q.get_nowait()) + except asyncio.QueueEmpty: + break + except asyncio.TimeoutError: + pass + return lines diff --git a/src/models/wifi_peer.py b/src/models/wifi_peer.py new file mode 100644 index 0000000..ccdff94 --- /dev/null +++ b/src/models/wifi_peer.py @@ -0,0 +1,8 @@ +"""Normalise Wi-Fi client addresses (strip IPv4-mapped IPv6 prefix).""" + + +def normalize_wifi_peer_ip(ip: str) -> str: + s = str(ip).strip() + if s.lower().startswith("::ffff:"): + s = s[7:] + return s diff --git a/tests/test_pattern_ota_send.py b/tests/test_pattern_ota_send.py new file mode 100644 index 0000000..b16baa3 --- /dev/null +++ b/tests/test_pattern_ota_send.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +""" +Manual test helper for pattern OTA send flow. + +Examples: + python tests/test_pattern_ota_send.py --base-url http://led.local --pattern blink + python tests/test_pattern_ota_send.py --base-url http://127.0.0.1:8080 --pattern blink --device-id 102030405060 +""" + +import argparse +import json +import sys +from urllib import request, error + + +def _http_json(method, url, payload=None): + data = None + headers = {"Accept": "application/json"} + if payload is not None: + data = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + req = request.Request(url, data=data, method=method, headers=headers) + try: + with request.urlopen(req, timeout=15) as resp: + body = resp.read().decode("utf-8") + return resp.status, json.loads(body) if body else {} + except error.HTTPError as e: + body = e.read().decode("utf-8") + try: + parsed = json.loads(body) if body else {} + except Exception: + parsed = {"raw": body} + return e.code, parsed + + +def main(): + parser = argparse.ArgumentParser(description="Test /patterns//send OTA flow.") + parser.add_argument( + "--base-url", + default="http://127.0.0.1", + help="Controller base URL (default: http://127.0.0.1)", + ) + parser.add_argument( + "--pattern", + required=True, + help="Pattern name (without .py), e.g. blink", + ) + parser.add_argument( + "--device-id", + default="", + help="Optional device id (MAC). If omitted, sends to all Wi-Fi devices.", + ) + args = parser.parse_args() + + base = args.base_url.rstrip("/") + pattern = args.pattern.strip() + if not pattern: + print("Pattern name is required.") + return 2 + + # Quick visibility before send. + status, patterns = _http_json("GET", f"{base}/patterns") + print(f"GET /patterns -> {status}") + if status != 200: + print(patterns) + return 1 + if pattern not in patterns: + print(f"Pattern {pattern!r} not found in /patterns list.") + return 1 + + status, devices = _http_json("GET", f"{base}/devices") + print(f"GET /devices -> {status}") + if status != 200: + print(devices) + return 1 + wifi_ids = [ + did + for did, d in (devices or {}).items() + if isinstance(d, dict) and str(d.get("transport", "")).lower() == "wifi" + ] + print(f"Wi-Fi devices in registry: {len(wifi_ids)}") + if wifi_ids: + print(" - " + "\n - ".join(wifi_ids)) + + payload = {"device_id": args.device_id} if args.device_id else {} + status, result = _http_json( + "POST", f"{base}/patterns/{pattern}/send", payload=payload + ) + print(f"POST /patterns/{pattern}/send -> {status}") + print(json.dumps(result, indent=2)) + + if status != 200: + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/tests/udp_server.py b/tests/udp_server.py new file mode 100644 index 0000000..24234d3 --- /dev/null +++ b/tests/udp_server.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +"""UDP echo server for testing the led-driver UDP client (MicroPython ESP32). + +Listens on UDP, prints each datagram (peer + payload), sends the same bytes back. + +Run on the Pi (or any host on the LAN): + + python3 tests/udp_server.py + python3 tests/udp_server.py -p 8766 --bind 0.0.0.0 + +Pair with **`led-driver/tests/udp_client.py`**: the device broadcasts a hello; this server +echoes so the client learns the controller's **unicast IP** from the reply (firmware uses that +for HTTP to the web server only; it is not stored in settings). Some Wi‑Fi APs block broadcast between clients — +prefer a wired listener. +""" + +from __future__ import annotations + +import argparse +import json +import socket +import sys + + +DEFAULT_PORT = 8766 + + +def main() -> int: + parser = argparse.ArgumentParser(description="UDP echo server for led-driver tests") + parser.add_argument( + "--bind", + default="0.0.0.0", + metavar="ADDR", + help="Address to bind (default: all interfaces)", + ) + parser.add_argument( + "-p", + "--port", + type=int, + default=DEFAULT_PORT, + help=f"UDP port (default: {DEFAULT_PORT})", + ) + args = parser.parse_args() + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except (AttributeError, OSError): + pass + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + except (AttributeError, OSError): + pass + try: + sock.bind((args.bind, args.port)) + except OSError as e: + print(f"bind {args.bind!r}:{args.port} failed: {e}", file=sys.stderr) + return 1 + + print(f"UDP echo listening on {args.bind}:{args.port} (Ctrl+C to stop)") + while True: + try: + data, addr = sock.recvfrom(2048) + except KeyboardInterrupt: + print("\nStopping.") + return 0 + client_ip, client_port = addr[0], addr[1] + text = data.decode("utf-8", errors="replace") + print(f"client_ip={client_ip} client_udp_port={client_port} ({len(data)} bytes)") + print(f" payload: {text!r}") + line = data.split(b"\n", 1)[0].strip() + if line: + try: + obj = json.loads(line.decode("utf-8")) + if isinstance(obj, dict) and obj.get("type") == "led": + print( + " hello: device_name=%r mac=%r v=%r" + % (obj.get("device_name"), obj.get("mac"), obj.get("v")) + ) + except (UnicodeError, ValueError, TypeError): + pass + try: + sock.sendto(data, addr) + except OSError as e: + print(f" sendto failed: {e}", file=sys.stderr) + + +if __name__ == "__main__": + raise SystemExit(main())