From d599af271bea54e3274bcd4c70c76dc6f02861c4 Mon Sep 17 00:00:00 2001 From: jimmy Date: Tue, 16 Sep 2025 21:22:47 +1200 Subject: [PATCH] patterns: alternating uses n1 (on) and n2 (off); ensure visible ON color; return delay; phase via self.step test: WS client sends nested {name:{...}}; add iterations and repeat-delay; include n per message; use n1/n2 for alternating --- patterns.py | 428 +++++++++++++++++++++++++++++++++++++++++++ src/main.py | 34 ++-- src/patterns.py | 331 ++++----------------------------- src/patterns_base.py | 145 +-------------- src/settings.py | 18 +- test/main.py | 158 ++++++++++++++++ 6 files changed, 640 insertions(+), 474 deletions(-) create mode 100644 patterns.py create mode 100644 test/main.py diff --git a/patterns.py b/patterns.py new file mode 100644 index 0000000..ad36f08 --- /dev/null +++ b/patterns.py @@ -0,0 +1,428 @@ + +import utime +import random +from patterns_base import PatternBase # Import PatternBase + +class Patterns(PatternBase): # Inherit from PatternBase + def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100): + super().__init__(pin, num_leds, color1, color2, brightness, selected, delay) # Call parent constructor + + # Pattern-specific initializations + self.on_width = 1 # Default on width + self.off_width = 2 # Default off width (so total segment is 3, matching original behavior) + self.n1 = 0 # Default start of fill range + self.n2 = self.num_leds - 1 # Default end of fill range + self.oneshot = False # New: One-shot flag for patterns like fill_range + self.patterns = { + "off": self.off, + "on" : self.on, + "color_wipe": self.color_wipe, + "rainbow_cycle": self.rainbow_cycle, + "theater_chase": self.theater_chase, + "blink": self.blink, + "color_transition": self.color_transition, # Added new pattern + "flicker": self.flicker, + "scanner": self.scanner, # New: Single direction scanner + "bidirectional_scanner": self.bidirectional_scanner, # New: Bidirectional scanner + "fill_range": self.fill_range, # New: Fill from n1 to n2 + "n_chase": self.n_chase, # New: N1 on, N2 off repeating chase + "alternating": self.alternating, # New: N1 on/off, N2 off/on alternating chase + "external": None, + "pulse": self.pulse + } + # Beat-related functionality removed + # self.selected is already initialized in PatternBase, but we need to ensure it uses our patterns dict + # self.selected = selected # Handled by PatternBase + + # Ensure colors list always starts with at least two for robust transition handling + # self.colors handled by PatternBase + + # Transition attributes handled by PatternBase + + # Scanner attributes handled by PatternBase + # self.run handled by PatternBase + + def set_on_width(self, on_width): + self.on_width = on_width + + def set_off_width(self, off_width): + self.off_width = off_width + + def set_on_off_width(self, on_width, off_width): + self.on_width = on_width + self.off_width = off_width + self.sync() + + def set_fill_range(self, n1, n2): + self.n1 = n1 + self.n2 = n2 + self.sync() + + def set_oneshot(self, oneshot_value): + self.oneshot = oneshot_value + if self.oneshot: # Reset pattern step if enabling one-shot + self.pattern_step = 0 + self.sync() + + def select(self, pattern): + if pattern in self.patterns: + super().select(pattern) # Use parent select to set self.selected and self.transition_step + self.run = True # Set run flag + if pattern == "color_transition": + if len(self.colors) < 2: + print("Warning: 'color_transition' requires at least two colors. Switching to 'on'.") + self.selected = "on" # Fallback if not enough colors + self.sync() # Re-sync for the new pattern + else: + self.transition_step = 0 + self.current_color_idx = 0 # Start from the first color in the list + self.current_color = self.colors[self.current_color_idx] + self.hold_start_time = utime.ticks_ms() # Reset hold timer + self.transition_duration = self.delay * 50 # Initialize transition duration + self.hold_duration = self.delay * 10 # Initialize hold duration + return True + return False + + def off(self): + self.fill((0, 0, 0)) + return self.delay + + def on(self): + self.fill(self.apply_brightness(self.colors[0])) + return self.delay + + def color_wipe(self): + color = self.apply_brightness(self.colors[0]) + current_time = utime.ticks_ms() + if self.pattern_step < self.num_leds: + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + self.n[self.pattern_step] = self.apply_brightness(color) + self.n.write() + self.pattern_step += 1 + else: + self.pattern_step = 0 + self.last_update = current_time + return self.delay + + def rainbow_cycle(self): + current_time = utime.ticks_ms() + def wheel(pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + for i in range(self.num_leds): + rc_index = (i * 256 // self.num_leds) + self.pattern_step + self.n[i] = self.apply_brightness(wheel(rc_index & 255)) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 256 + self.last_update = current_time + return max(1, int(self.delay // 5)) + + def theater_chase(self): + current_time = utime.ticks_ms() + segment_length = self.on_width + self.off_width + for i in range(self.num_leds): + if (i + self.pattern_step) % segment_length < self.on_width: + self.n[i] = self.apply_brightness(self.colors[0]) + else: + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % segment_length + self.last_update = current_time + return self.delay + + def blink(self): + current_time = utime.ticks_ms() + if self.pattern_step % 2 == 0: + self.fill(self.apply_brightness(self.colors[0])) + else: + self.fill((0, 0, 0)) + self.pattern_step = (self.pattern_step + 1) % 2 + self.last_update = current_time + return self.delay + + def color_transition(self): + current_time = utime.ticks_ms() + + # Check for hold duration first + if utime.ticks_diff(current_time, self.hold_start_time) < self.hold_duration: + # Still in hold phase, just display the current solid color + self.fill(self.apply_brightness(self.current_color)) + self.last_update = current_time # Keep updating last_update to avoid skipping frames + return self.delay + + # If hold duration is over, proceed with transition + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + num_colors = len(self.colors) + if num_colors < 2: + # Should not happen if select handles it, but as a safeguard + self.select("on") + return self.delay + + from_color = self.colors[self.current_color_idx] + to_color_idx = (self.current_color_idx + 1) % num_colors + to_color = self.colors[to_color_idx] + + # Calculate interpolation factor (0.0 to 1.0) + # transition_step goes from 0 to transition_duration - 1 + if self.transition_duration > 0: + interp_factor = self.transition_step / self.transition_duration + else: + interp_factor = 1.0 # Immediately transition if duration is zero + + # Interpolate each color component + r = int(from_color[0] + (to_color[0] - from_color[0]) * interp_factor) + g = int(from_color[1] + (to_color[1] - from_color[1]) * interp_factor) + b = int(from_color[2] + (to_color[2] - from_color[2]) * interp_factor) + + self.current_color = (r, g, b) + self.fill(self.apply_brightness(self.current_color)) + + self.transition_step += self.delay # Advance the transition step by the delay + + if self.transition_step >= self.transition_duration: + # Transition complete, move to the next color and reset for hold phase + self.current_color_idx = to_color_idx + self.current_color = self.colors[self.current_color_idx] # Ensure current_color is the exact target color + self.transition_step = 0 # Reset transition progress + self.hold_start_time = current_time # Start hold phase for the new color + + self.last_update = current_time + return self.delay + + def flicker(self): + current_time = utime.ticks_ms() + base_color = self.colors[0] + # Increase the range for flicker_brightness_offset + # Changed from self.brightness // 4 to self.brightness // 2 (or even self.brightness for max intensity) + flicker_brightness_offset = random.randint(-int(self.brightness // 1.5), int(self.brightness // 1.5)) + flicker_brightness = max(0, min(255, self.brightness + flicker_brightness_offset)) + + flicker_color = self.apply_brightness(base_color, brightness_override=flicker_brightness) + self.fill(flicker_color) + self.last_update = current_time + return max(1, int(self.delay // 5)) + + def scanner(self): + """ + Mimics a 'Knight Rider' style scanner, moving in one direction. + """ + current_time = utime.ticks_ms() + self.fill((0, 0, 0)) # Clear all LEDs + + # Calculate the head and tail position + head_pos = self.pattern_step + color = self.apply_brightness(self.colors[0]) + + # Draw the head + if 0 <= head_pos < self.num_leds: + self.n[head_pos] = color + + # Draw the trailing pixels with decreasing brightness + for i in range(1, self.scanner_tail_length + 1): + tail_pos = head_pos - i + if 0 <= tail_pos < self.num_leds: + # Calculate fading color for tail + # Example: linear fade from full brightness to off + fade_factor = 1.0 - (i / (self.scanner_tail_length + 1)) + faded_color = tuple(int(c * fade_factor) for c in color) + self.n[tail_pos] = faded_color + + self.n.write() + + self.pattern_step += 1 + if self.pattern_step >= self.num_leds + self.scanner_tail_length: + self.pattern_step = 0 # Reset to start + + self.last_update = current_time + return self.delay + + def bidirectional_scanner(self): + """ + Mimics a 'Knight Rider' style scanner, moving back and forth. + """ + current_time = utime.ticks_ms() + self.fill((0, 0, 0)) # Clear all LEDs + + color = self.apply_brightness(self.colors[0]) + + # Calculate the head position based on direction + head_pos = self.pattern_step + + # Draw the head + if 0 <= head_pos < self.num_leds: + self.n[head_pos] = color + + # Draw the trailing pixels with decreasing brightness + for i in range(1, self.scanner_tail_length + 1): + tail_pos = head_pos - (i * self.scanner_direction) + if 0 <= tail_pos < self.num_leds: + fade_factor = 1.0 - (i / (self.scanner_tail_length + 1)) + faded_color = tuple(int(c * fade_factor) for c in color) + self.n[tail_pos] = faded_color + + self.n.write() + + self.pattern_step += self.scanner_direction + + # Change direction if boundaries are reached + if self.scanner_direction == 1 and self.pattern_step >= self.num_leds: + self.scanner_direction = -1 + self.pattern_step = self.num_leds - 1 # Start moving back from the last LED + elif self.scanner_direction == -1 and self.pattern_step < 0: + self.scanner_direction = 1 + self.pattern_step = 0 # Start moving forward from the first LED + + self.last_update = current_time + return self.delay + + def fill_range(self): + """ + Fills a range of LEDs from n1 to n2 with a solid color. + If self.oneshot is True, it fills once and then turns off the LEDs. + """ + current_time = utime.ticks_ms() + if self.oneshot and self.pattern_step >= 1: + self.fill((0, 0, 0)) # Turn off LEDs if one-shot already happened + else: + color = self.apply_brightness(self.colors[0]) + for i in range(self.n1, self.n2 + 1): + self.n[i] = color + self.n.write() + self.last_update = current_time + return self.delay + self.last_update = current_time + return self.delay + + def n_chase(self): + """ + A theater chase pattern using n1 for on-width and n2 for off-width. + """ + current_time = utime.ticks_ms() + segment_length = self.n1 + self.n2 + if segment_length == 0: # Avoid division by zero + self.fill((0,0,0)) + self.n.write() + self.last_update = current_time + return self.delay + + for i in range(self.num_leds): + if (i + self.pattern_step) % segment_length < self.n1: + self.n[i] = self.apply_brightness(self.colors[0]) + else: + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % segment_length + self.last_update = current_time + return self.delay + + def alternating(self): + """ + An alternating pattern where n1 LEDs are ON/OFF and n2 LEDs are OFF/ON globally, without moving. + """ + current_time = utime.ticks_ms() + total_segment_length = self.n1 + self.n2 + if total_segment_length == 0: + self.fill((0,0,0)) + self.n.write() + self.last_update = current_time + return self.delay + + # current_phase will alternate between 0 and 1 + current_phase = self.pattern_step % 2 + + for i in range(self.num_leds): + # Position within a single repeating segment (n1 + n2) + pos_in_segment = i % total_segment_length + + if current_phase == 0: # State 0: n1 ON, n2 OFF + if pos_in_segment < self.n1: + self.n[i] = self.apply_brightness(self.colors[0]) # n1 is ON + else: + self.n[i] = (0, 0, 0) # n2 is OFF + else: # State 1: n1 OFF, n2 ON + if pos_in_segment < self.n1: + self.n[i] = (0, 0, 0) # n1 is OFF + else: + self.n[i] = self.apply_brightness(self.colors[0]) # n2 is ON + + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 2 # Toggle between 0 and 1 + self.last_update = current_time + return self.delay * 2 + + def pulse(self): + if self.pattern_step == 0: + self.fill(self.apply_brightness(self.colors[0])) + self.pattern_step = 1 + + self.last_update = utime.ticks_ms() + if utime.ticks_diff(utime.ticks_ms(), self.last_update) > self.delay: + self.fill((0, 0, 0)) + print(utime.ticks_diff(utime.ticks_ms(), self.last_update)) + self.run = False + return self.delay + + +if __name__ == "__main__": + import time + from machine import WDT + wdt = WDT(timeout=2000) # Enable watchdog with a 2 second timeout + p = Patterns(pin=4, num_leds=60, color1=(255,0,0), color2=(0,0,255), brightness=127, selected="off", delay=100) + + print(p.colors, p.brightness) + + tests = [ + ("off", {"duration_ms": 500}), + ("on", {"duration_ms": 500}), + ("color_wipe", {"delay": 200, "duration_ms": 1000}), + ("rainbow_cycle", {"delay": 100, "duration_ms": 2500}), + ("theater_chase", {"on_width": 3, "off_width": 3, "delay": 1000, "duration_ms": 2500}), + ("blink", {"delay": 500, "duration_ms": 2000}), + ("color_transition", {"delay": 150, "colors": [(255,0,0),(0,255,0),(0,0,255)], "duration_ms": 5000}), + ("flicker", {"delay": 100, "duration_ms": 2000}), + ("scanner", {"delay": 150, "duration_ms": 2500}), + ("bidirectional_scanner", {"delay": 50, "duration_ms": 2500}), + ("fill_range", {"n1": 10, "n2": 20, "delay": 500, "duration_ms": 2000}), + ("n_chase", {"n1": 5, "n2": 5, "delay": 2000, "duration_ms": 2500}), + ("alternating", {"n1": 5, "n2": 5, "delay": 500, "duration_ms": 2500}), + ("pulse", {"delay": 100, "duration_ms": 700}), + ] + + + print("\n--- Running pattern self-test ---") + for name, cfg in tests: + print(f"\nPattern: {name}") + # apply simple config helpers + if "delay" in cfg: + p.set_delay(cfg["delay"]) + if "on_width" in cfg: + p.set_on_width(cfg["on_width"]) + if "off_width" in cfg: + p.set_off_width(cfg["off_width"]) + if "n1" in cfg and "n2" in cfg: + p.set_fill_range(cfg["n1"], cfg["n2"]) + if "colors" in cfg: + p.set_colors(cfg["colors"]) + + p.select(name) + + # run per configured duration using absolute-scheduled tick(next_due_ms) + start = utime.ticks_ms() + duration_ms = cfg["duration_ms"] + delay = cfg.get("delay", 0) + next_due = utime.ticks_ms() - 1 # force immediate first call + while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: + delay = p.tick(delay) + wdt.feed() + + print("\n--- Test routine finished ---") + + diff --git a/src/main.py b/src/main.py index 6eafa88..aa8969d 100644 --- a/src/main.py +++ b/src/main.py @@ -18,13 +18,7 @@ def main(): settings = Settings() print(settings) - patterns = Patterns(settings["led_pin"], settings["num_leds"], selected=settings["pattern"]) - if settings["color_order"] == "rbg": color_order = (1, 5, 3) - else: color_order = (1, 3, 5) - patterns.set_color1(tuple(int(settings["color1"][i:i+2], 16) for i in color_order)) - patterns.set_color2(tuple(int(settings["color2"][i:i+2], 16) for i in color_order)) - patterns.set_brightness(int(settings["brightness"])) - patterns.set_delay(int(settings["delay"])) + patterns = Patterns(settings["led_pin"], settings["num_leds"], selected="off") sta_if = network.WLAN(network.STA_IF) sta_if.active(True) @@ -35,7 +29,7 @@ def main(): wdt = machine.WDT(timeout=10000) wdt.feed() while True: - patterns.tick() + # patterns.tick() wdt.feed() host, msg = e.recv(0) if msg: @@ -46,19 +40,19 @@ def main(): defaults = data.get("d", {}) bar = data.get(settings.get("name"), {}) - patterns.set_brightness(bar.get("brightness", defaults.get("brightness", 100))) - patterns.set_delay(bar.get("delay", defaults.get("delay", 100))) - colors = bar.get("colors", defaults.get("colors", ["#000000", "#000000"])) + patterns.brightness = bar.get("brightness", defaults.get("brightness", patterns.brightness)) + patterns.delay = bar.get("delay", defaults.get("delay", patterns.delay)) + colors = bar.get("colors", defaults.get("colors", patterns.colors)) patterns.colors = [tuple(int(color[i:i+2], 16) for i in settings.color_order) for color in colors] - patterns.select(bar.get("pattern", defaults.get("pattern", "off"))) - patterns.n1 = bar.get("n1", defaults.get("n1", 0)) - patterns.n2 = bar.get("n2", defaults.get("n2", 58)) - patterns.on_width = bar.get("on_width", defaults.get("on_width", 1)) - patterns.off_width = bar.get("off_width", defaults.get("off_width", 2)) - patterns.oneshot = bar.get("oneshot", defaults.get("oneshot", False)) - patterns.beat = bar.get("beat", defaults.get("beat", False)) - patterns.beat_mode = bar.get("beat_mode", defaults.get("beat_mode", False)) - patterns.auto = bar.get("auto", defaults.get("auto", True)) + # patterns.select(bar.get("pattern", defaults.get("pattern", "off"))) + patterns.n1 = bar.get("n1", defaults.get("n1", patterns.n1)) + patterns.n2 = bar.get("n2", defaults.get("n2", patterns.n2)) + patterns.step = bar.get("pattern_step", defaults.get("step", patterns.step)) + selected_pattern = bar.get("pattern", defaults.get("pattern", "off")) + if selected_pattern in patterns.patterns: + patterns.patterns[selected_pattern]() + else: + print(f"Pattern {selected_pattern} not found") except: print(f"Failed to load espnow data {msg}") diff --git a/src/patterns.py b/src/patterns.py index 59a1a2a..080a6eb 100644 --- a/src/patterns.py +++ b/src/patterns.py @@ -14,195 +14,13 @@ class Patterns(PatternBase): # Inherit from PatternBase self.n2 = self.num_leds - 1 # Default end of fill range self.oneshot = False # New: One-shot flag for patterns like fill_range self.patterns = { - "off": self.off, - "on" : self.on, - "color_wipe": self.color_wipe, - "rainbow_cycle": self.rainbow_cycle, - "theater_chase": self.theater_chase, - "blink": self.blink, - "color_transition": self.color_transition, # Added new pattern "flicker": self.flicker, - "scanner": self.scanner, # New: Single direction scanner - "bidirectional_scanner": self.bidirectional_scanner, # New: Bidirectional scanner - "fill_range": self.fill_range, # New: Fill from n1 to n2 - "n_chase": self.n_chase, # New: N1 on, N2 off repeating chase - "alternating": self.alternating, # New: N1 on/off, N2 off/on alternating chase - "external": None, + "fill_range": self.fill_range, + "n_chase": self.n_chase, + "alternating": self.alternating, "pulse": self.pulse - } - # Beat-related functionality removed - # self.selected is already initialized in PatternBase, but we need to ensure it uses our patterns dict - # self.selected = selected # Handled by PatternBase - - # Ensure colors list always starts with at least two for robust transition handling - # self.colors handled by PatternBase - - # Transition attributes handled by PatternBase - - # Scanner attributes handled by PatternBase - # self.run handled by PatternBase - - def sync(self): - super().sync() # Call parent sync - # Reset pattern_step for theater_chase when chase_width changes - if self.selected == "theater_chase" or self.selected == "fill_range" or self.selected == "n_chase" or self.selected == "alternating": - self.pattern_step = 0 - self.tick() - - def set_on_width(self, on_width): - self.on_width = on_width - - def set_off_width(self, off_width): - self.off_width = off_width - - def set_on_off_width(self, on_width, off_width): - self.on_width = on_width - self.off_width = off_width - self.sync() - - def set_fill_range(self, n1, n2): - self.n1 = n1 - self.n2 = n2 - self.sync() - - def set_oneshot(self, oneshot_value): - self.oneshot = oneshot_value - if self.oneshot: # Reset pattern step if enabling one-shot - self.pattern_step = 0 - self.sync() - - def select(self, pattern): - if pattern in self.patterns: - super().select(pattern) # Use parent select to set self.selected and self.transition_step - self.run = True # Set run flag - if pattern == "color_transition": - if len(self.colors) < 2: - print("Warning: 'color_transition' requires at least two colors. Switching to 'on'.") - self.selected = "on" # Fallback if not enough colors - self.sync() # Re-sync for the new pattern - else: - self.transition_step = 0 - self.current_color_idx = 0 # Start from the first color in the list - self.current_color = self.colors[self.current_color_idx] - self.hold_start_time = utime.ticks_ms() # Reset hold timer - self.transition_duration = self.delay * 50 # Initialize transition duration - self.hold_duration = self.delay * 10 # Initialize hold duration - return True - return False - - def off(self): - self.fill((0, 0, 0)) - return self.delay - - def on(self): - self.fill(self.apply_brightness(self.colors[0])) - return self.delay - - def color_wipe(self): - color = self.apply_brightness(self.colors[0]) - current_time = utime.ticks_ms() - if self.pattern_step < self.num_leds: - for i in range(self.num_leds): - self.n[i] = (0, 0, 0) - self.n[self.pattern_step] = self.apply_brightness(color) - self.n.write() - self.pattern_step += 1 - else: - self.pattern_step = 0 - self.last_update = current_time - return self.delay - - def rainbow_cycle(self): - current_time = utime.ticks_ms() - def wheel(pos): - if pos < 85: - return (pos * 3, 255 - pos * 3, 0) - elif pos < 170: - pos -= 85 - return (255 - pos * 3, 0, pos * 3) - else: - pos -= 170 - return (0, pos * 3, 255 - pos * 3) - - for i in range(self.num_leds): - rc_index = (i * 256 // self.num_leds) + self.pattern_step - self.n[i] = self.apply_brightness(wheel(rc_index & 255)) - self.n.write() - self.pattern_step = (self.pattern_step + 1) % 256 - self.last_update = current_time - return max(1, int(self.delay // 5)) - - def theater_chase(self): - current_time = utime.ticks_ms() - segment_length = self.on_width + self.off_width - for i in range(self.num_leds): - if (i + self.pattern_step) % segment_length < self.on_width: - self.n[i] = self.apply_brightness(self.colors[0]) - else: - self.n[i] = (0, 0, 0) - self.n.write() - self.pattern_step = (self.pattern_step + 1) % segment_length - self.last_update = current_time - return self.delay - - def blink(self): - current_time = utime.ticks_ms() - if self.pattern_step % 2 == 0: - self.fill(self.apply_brightness(self.colors[0])) - else: - self.fill((0, 0, 0)) - self.pattern_step = (self.pattern_step + 1) % 2 - self.last_update = current_time - return self.delay - - def color_transition(self): - current_time = utime.ticks_ms() - - # Check for hold duration first - if utime.ticks_diff(current_time, self.hold_start_time) < self.hold_duration: - # Still in hold phase, just display the current solid color - self.fill(self.apply_brightness(self.current_color)) - self.last_update = current_time # Keep updating last_update to avoid skipping frames - return self.delay - - # If hold duration is over, proceed with transition - if utime.ticks_diff(current_time, self.last_update) >= self.delay: - num_colors = len(self.colors) - if num_colors < 2: - # Should not happen if select handles it, but as a safeguard - self.select("on") - return self.delay - - from_color = self.colors[self.current_color_idx] - to_color_idx = (self.current_color_idx + 1) % num_colors - to_color = self.colors[to_color_idx] - - # Calculate interpolation factor (0.0 to 1.0) - # transition_step goes from 0 to transition_duration - 1 - if self.transition_duration > 0: - interp_factor = self.transition_step / self.transition_duration - else: - interp_factor = 1.0 # Immediately transition if duration is zero - - # Interpolate each color component - r = int(from_color[0] + (to_color[0] - from_color[0]) * interp_factor) - g = int(from_color[1] + (to_color[1] - from_color[1]) * interp_factor) - b = int(from_color[2] + (to_color[2] - from_color[2]) * interp_factor) - - self.current_color = (r, g, b) - self.fill(self.apply_brightness(self.current_color)) - - self.transition_step += self.delay # Advance the transition step by the delay - - if self.transition_step >= self.transition_duration: - # Transition complete, move to the next color and reset for hold phase - self.current_color_idx = to_color_idx - self.current_color = self.colors[self.current_color_idx] # Ensure current_color is the exact target color - self.transition_step = 0 # Reset transition progress - self.hold_start_time = current_time # Start hold phase for the new color - - self.last_update = current_time - return self.delay + } + self.step = 0 def flicker(self): current_time = utime.ticks_ms() @@ -217,79 +35,6 @@ class Patterns(PatternBase): # Inherit from PatternBase self.last_update = current_time return max(1, int(self.delay // 5)) - def scanner(self): - """ - Mimics a 'Knight Rider' style scanner, moving in one direction. - """ - current_time = utime.ticks_ms() - self.fill((0, 0, 0)) # Clear all LEDs - - # Calculate the head and tail position - head_pos = self.pattern_step - color = self.apply_brightness(self.colors[0]) - - # Draw the head - if 0 <= head_pos < self.num_leds: - self.n[head_pos] = color - - # Draw the trailing pixels with decreasing brightness - for i in range(1, self.scanner_tail_length + 1): - tail_pos = head_pos - i - if 0 <= tail_pos < self.num_leds: - # Calculate fading color for tail - # Example: linear fade from full brightness to off - fade_factor = 1.0 - (i / (self.scanner_tail_length + 1)) - faded_color = tuple(int(c * fade_factor) for c in color) - self.n[tail_pos] = faded_color - - self.n.write() - - self.pattern_step += 1 - if self.pattern_step >= self.num_leds + self.scanner_tail_length: - self.pattern_step = 0 # Reset to start - - self.last_update = current_time - return self.delay - - def bidirectional_scanner(self): - """ - Mimics a 'Knight Rider' style scanner, moving back and forth. - """ - current_time = utime.ticks_ms() - self.fill((0, 0, 0)) # Clear all LEDs - - color = self.apply_brightness(self.colors[0]) - - # Calculate the head position based on direction - head_pos = self.pattern_step - - # Draw the head - if 0 <= head_pos < self.num_leds: - self.n[head_pos] = color - - # Draw the trailing pixels with decreasing brightness - for i in range(1, self.scanner_tail_length + 1): - tail_pos = head_pos - (i * self.scanner_direction) - if 0 <= tail_pos < self.num_leds: - fade_factor = 1.0 - (i / (self.scanner_tail_length + 1)) - faded_color = tuple(int(c * fade_factor) for c in color) - self.n[tail_pos] = faded_color - - self.n.write() - - self.pattern_step += self.scanner_direction - - # Change direction if boundaries are reached - if self.scanner_direction == 1 and self.pattern_step >= self.num_leds: - self.scanner_direction = -1 - self.pattern_step = self.num_leds - 1 # Start moving back from the last LED - elif self.scanner_direction == -1 and self.pattern_step < 0: - self.scanner_direction = 1 - self.pattern_step = 0 # Start moving forward from the first LED - - self.last_update = current_time - return self.delay - def fill_range(self): """ Fills a range of LEDs from n1 to n2 with a solid color. @@ -331,50 +76,46 @@ class Patterns(PatternBase): # Inherit from PatternBase return self.delay def alternating(self): - """ - An alternating pattern where n1 LEDs are ON/OFF and n2 LEDs are OFF/ON globally, without moving. - """ - current_time = utime.ticks_ms() - total_segment_length = self.n1 + self.n2 - if total_segment_length == 0: - self.fill((0,0,0)) + # Use n1 as ON width and n2 as OFF width + segment_on = max(0, int(self.n1)) + segment_off = max(0, int(self.n2)) + total_segment_length = segment_on + segment_off + if total_segment_length <= 0: + self.fill((0, 0, 0)) self.n.write() - self.last_update = current_time return self.delay - - # current_phase will alternate between 0 and 1 - current_phase = self.pattern_step % 2 - + + current_phase = self.step % 2 + + active_color = self.apply_brightness(self.colors[0]) + for i in range(self.num_leds): - # Position within a single repeating segment (n1 + n2) pos_in_segment = i % total_segment_length - - if current_phase == 0: # State 0: n1 ON, n2 OFF - if pos_in_segment < self.n1: - self.n[i] = self.apply_brightness(self.colors[0]) # n1 is ON + if current_phase == 0: + # ON then OFF + if pos_in_segment < segment_on: + self.n[i] = active_color else: - self.n[i] = (0, 0, 0) # n2 is OFF - else: # State 1: n1 OFF, n2 ON - if pos_in_segment < self.n1: - self.n[i] = (0, 0, 0) # n1 is OFF + self.n[i] = (0, 0, 0) + else: + # OFF then ON + if pos_in_segment < segment_on: + self.n[i] = (0, 0, 0) else: - self.n[i] = self.apply_brightness(self.colors[0]) # n2 is ON - + self.n[i] = active_color + self.n.write() - self.pattern_step = (self.pattern_step + 1) % 2 # Toggle between 0 and 1 - self.last_update = current_time - return self.delay * 2 + self.step = (self.step + 1) % 2 + return self.delay + def pulse(self): - if self.pattern_step == 0: - self.fill(self.apply_brightness(self.colors[0])) - self.pattern_step = 1 - - self.last_update = utime.ticks_ms() - if utime.ticks_diff(utime.ticks_ms(), self.last_update) > self.delay: - self.fill((0, 0, 0)) - print(utime.ticks_diff(utime.ticks_ms(), self.last_update)) - self.run = False + self.fill(self.apply_brightness(self.colors[0])) + start = utime.ticks_ms() + + while utime.ticks_diff(utime.ticks_ms(), start) < self.delay: + pass + self.fill((0, 0, 0)) return self.delay diff --git a/src/patterns_base.py b/src/patterns_base.py index 5c025b9..4184d4f 100644 --- a/src/patterns_base.py +++ b/src/patterns_base.py @@ -33,138 +33,21 @@ class PatternBase: # Store last pattern-returned delay to use for subsequent gating self._last_returned_delay = None - - def sync(self): - self.pattern_step=0 - self.last_update = utime.ticks_ms() - self.delay - if self.selected == "color_transition": - self.transition_step = 0 - self.current_color_idx = 0 - self.current_color = self.colors[self.current_color_idx] - self.hold_start_time = utime.ticks_ms() # Reset hold time - # Reset scanner specific variables - self.scanner_direction = 1 - # self.tick() # Tick moved to Patterns, as patterns dict is there - - def set_pattern_step(self, step): - self.pattern_step = step - - def tick(self, delay=0): - now =utime.ticks_ms() - if self.patterns.get(self.selected) and self.run: - if delay == 0: - self.patterns[self.selected]() - print("manual tick") - return 0 - if utime.ticks_diff(now, delay) > 0: - delay = self.patterns[self.selected]() - print("auto tick") - return delay + now - else: - return delay - def update_num_leds(self, pin, num_leds): self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) self.num_leds = num_leds - self.pattern_step = 0 - - def set_delay(self, delay): - self.delay = delay - # Update transition duration and hold duration when delay changes - self.transition_duration = self.delay * 50 - self.hold_duration = self.delay * 10 - # Reset last returned delay so next tick recomputes - self._last_returned_delay = None - - - def set_brightness(self, brightness): - self.brightness = brightness - - def set_color1(self, color): - if len(self.colors) > 0: - self.colors[0] = color - if self.selected == "color_transition": - # If the first color is changed, potentially reset transition - # to start from this new color if we were about to transition from it - if self.current_color_idx == 0: - self.transition_step = 0 - self.current_color = self.colors[0] - self.hold_start_time = utime.ticks_ms() - else: - self.colors.append(color) - - - def set_color2(self, color): - if len(self.colors) > 1: - self.colors[1] = color - elif len(self.colors) == 1: - self.colors.append(color) - else: # List is empty - self.colors.append((0,0,0)) # Dummy color - self.colors.append(color) - - - def set_colors(self, colors): - if colors and len(colors) >= 2: - self.colors = colors - if self.selected == "color_transition": - self.sync() # Reset transition if new color list is provided - elif colors and len(colors) == 1: - self.colors = [colors[0], (255,255,255)] # Add a default second color - if self.selected == "color_transition": - print("Warning: 'color_transition' requires at least two colors. Adding a default second color.") - self.sync() - else: - print("Error: set_colors requires a list of at least one color.") - self.colors = [(0,0,0), (255,255,255)] # Fallback - if self.selected == "color_transition": - self.sync() def set_color(self, num, color): - # Changed: More robust index check if 0 <= num < len(self.colors): self.colors[num] = color - # If the changed color is part of the current or next transition, - # restart the transition for smoother updates - if self.selected == "color_transition": - current_from_idx = self.current_color_idx - current_to_idx = (self.current_color_idx + 1) % len(self.colors) - if num == current_from_idx or num == current_to_idx: - # If we change a color involved in the current transition, - # it's best to restart the transition state for smoothness. - self.transition_step = 0 - self.current_color_idx = current_from_idx # Stay at the current starting color - self.current_color = self.colors[self.current_color_idx] - self.hold_start_time = utime.ticks_ms() # Reset hold - return True elif num == len(self.colors): # Allow setting a new color at the end self.colors.append(color) return True return False - def add_color(self, color): - self.colors.append(color) - if self.selected == "color_transition" and len(self.colors) == 2: - # If we just added the second color needed for transition - self.sync() - - def del_color(self, num): - # Changed: More robust index check and using del for lists if 0 <= num < len(self.colors): del self.colors[num] - # If the color being deleted was part of the current transition, - # re-evaluate the current_color_idx - if self.selected == "color_transition": - if len(self.colors) < 2: # Need at least two colors for transition - print("Warning: Not enough colors for 'color_transition'. Switching to 'on'.") - self.select("on") # Or some other default - else: - # Adjust index if it's out of bounds after deletion or was the one transitioning from - self.current_color_idx %= len(self.colors) - self.transition_step = 0 - self.current_color = self.colors[self.current_color_idx] - self.hold_start_time = utime.ticks_ms() return True return False @@ -172,38 +55,12 @@ class PatternBase: effective_brightness = brightness_override if brightness_override is not None else self.brightness return tuple(int(c * effective_brightness / 255) for c in color) - def select(self, pattern): - # Removed self.run = True here. It should be handled by Patterns class. - if pattern in self.patterns: - self.selected = pattern - self.sync() # Reset pattern state when selecting a new pattern - # Reset last returned delay so gating can be recalculated for the new pattern - self._last_returned_delay = None - if pattern == "color_transition": - if len(self.colors) < 2: - print("Warning: 'color_transition' requires at least two colors. Switching to 'on'.") - self.selected = "on" # Fallback if not enough colors - self.sync() # Re-sync for the new pattern - else: - self.transition_step = 0 - self.current_color_idx = 0 # Start from the first color in the list - self.current_color = self.colors[self.current_color_idx] - self.hold_start_time = utime.ticks_ms() # Reset hold timer - self.transition_duration = self.delay * 50 # Initialize transition duration - self.hold_duration = self.delay * 10 # Initialize hold duration - return True - return False - - def set(self, i, color): - self.n[i] = color - def write(self): self.n.write() def fill(self, color=None): fill_color = color if color is not None else self.colors[0] - for i in range(self.num_leds): - self.n[i] = fill_color + self.n.fill(fill_color) self.n.write() def off(self): diff --git a/src/settings.py b/src/settings.py index e4183e2..9c4be26 100644 --- a/src/settings.py +++ b/src/settings.py @@ -9,26 +9,14 @@ class Settings(dict): def __init__(self): super().__init__() self.load() # Load settings from file during initialization - if self["color_order"] == "rbg": self.color_order = (1, 5, 3) + if self.get("color_order", "rgb") == "rbg": self.color_order = (1, 5, 3) else: self.color_order = (1, 3, 5) def set_defaults(self): - self["led_pin"] = 10 + self["led_pin"] = 4 self["num_leds"] = 100 - self["pattern"] = "on" - self["color1"] = "#080000" - self["color2"] = "#ff0000" - self["delay"] = 100 - self["brightness"] = 100 - self["on_width"] = 1 # Default on width for theater chase - self["off_width"] = 2 # Default off width for theater chase - self["n1"] = 0 # Default start of fill range - self["n2"] = 58 # Default end of fill range (assuming 59 leds for now) - self["oneshot"] = False # Default one-shot setting self["color_order"] = "rgb" - self["name"] = f"5" - self["ap_password"] = "" - self["id"] = 0 + self["name"] = f"3" def save(self): try: diff --git a/test/main.py b/test/main.py new file mode 100644 index 0000000..a019b46 --- /dev/null +++ b/test/main.py @@ -0,0 +1,158 @@ +import asyncio +import json +import argparse +import signal + +try: + import websockets # type: ignore +except Exception as e: + print("Please install websockets: pip install websockets") + raise + +WS_URI = "ws://192.168.4.1/ws" + +# Default pattern suite aligned with current firmware patterns +PATTERN_SUITE = [ + {"pattern": "flicker", "delay": 80, "iterations": 30, "repeat_delay": 80, "colors": ["#ffaa00"]}, + {"pattern": "fill_range", "n1": 10, "n2": 20, "delay": 400, "iterations": 1, "repeat_delay": 500, "colors": ["#888888"]}, + {"pattern": "n_chase", "n1": 5, "n2": 5, "delay": 250, "iterations": 40, "repeat_delay": 120, "colors": ["#00ff88"]}, + {"pattern": "alternating", "n1": 6, "n2": 6, "delay": 300, "iterations": 20, "repeat_delay": 300, "colors": ["#ff8800"]}, + {"pattern": "pulse", "delay": 200, "iterations": 6, "repeat_delay": 300, "colors": ["#ffffff"]}, +] + + +def build_message( + pattern: str, + n: int | None = None, + delay: int | None = None, + colors: list[str] | None = None, + brightness: int | None = None, + num_leds: int | None = None, + n1: int | None = None, + n2: int | None = None, + name: str = "0", + pattern_step: int | None = None, +): + settings: dict[str, object] = { + "pattern": pattern, + } + if n is not None: + settings["n"] = n + if delay is not None: + settings["delay"] = delay + if colors is not None: + settings["colors"] = colors + if brightness is not None: + settings["brightness"] = brightness + if num_leds is not None: + settings["num_leds"] = num_leds + if n1 is not None: + settings["n1"] = n1 + if n2 is not None: + settings["n2"] = n2 + if pattern_step is not None: + settings["pattern_step"] = pattern_step + # ESP-NOW-style nested payload keyed by name (e.g., "0") + return {name: settings} + + +async def send_once(uri: str, payload: dict, hold_ms: int | None = None): + async with websockets.connect(uri) as ws: + await ws.send(json.dumps(payload)) + if hold_ms and hold_ms > 0: + await asyncio.sleep(hold_ms / 1000) + + +async def run_suite(uri: str): + async with websockets.connect(uri) as ws: + for cfg in PATTERN_SUITE: + iterations = int(cfg.get("iterations", 10)) + interval_ms = int(cfg.get("interval_ms", cfg.get("delay", 100) or 100)) + repeat_ms = int(cfg.get("repeat_delay", interval_ms)) + for i in range(iterations): + msg = build_message( + cfg.get("pattern", "off"), + i, + delay=cfg.get("delay"), + colors=cfg.get("colors"), + brightness=cfg.get("brightness", 127), + num_leds=cfg.get("num_leds"), + n1=cfg.get("n1"), + n2=cfg.get("n2"), + name=cfg.get("name", "0"), + pattern_step=cfg.get("pattern_step"), + ) + print(msg) + await ws.send(json.dumps(msg)) + await asyncio.sleep(repeat_ms / 1000) + + +def _parse_args(): + p = argparse.ArgumentParser(description="WebSocket LED pattern tester") + p.add_argument("--uri", default=WS_URI, help="WebSocket URI, default ws://192.168.4.1/ws") + p.add_argument("--pattern", help="Single pattern to send (overrides suite)") + p.add_argument("--delay", type=int, help="Delay ms") + p.add_argument("--brightness", type=int, help="Brightness 0-255") + p.add_argument("--num-leds", type=int, help="Number of LEDs") + p.add_argument("--colors", nargs="*", help="Hex colors like #ff0000 #00ff00") + p.add_argument("--on-width", type=int) + p.add_argument("--off-width", type=int) + p.add_argument("--n1", type=int) + p.add_argument("--n2", type=int) + p.add_argument("--name", default="0", help="Target name key for nested payload (default: 0)") + p.add_argument("--iterations", type=int, help="How many cycles/messages to send") + p.add_argument("--interval", type=int, help="Interval between messages in ms (default: delay or 100)") + p.add_argument("--repeat-delay", dest="repeat_delay", type=int, help="Delay between repeats in ms (overrides --interval if set)") + p.add_argument("--hold", type=int, default=1500, help="Hold ms for single send") + return p.parse_args() + +def _setup_sigint(loop: asyncio.AbstractEventLoop): + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, loop.stop) + except NotImplementedError: + pass + + +async def main_async(): + args = _parse_args() + if args.pattern: + iterations = int(args.iterations or 1) + interval_ms = int(args.interval or (args.delay if args.delay is not None else 100)) + repeat_ms = int(args.repeat_delay or interval_ms) + async with websockets.connect(args.uri) as ws: + for i in range(iterations): + msg = build_message( + pattern=args.pattern, + n=i, + delay=args.delay, + colors=args.colors, + brightness=args.brightness, + num_leds=args.num_leds, + n1=args.n1, + n2=args.n2, + name=args.name, + ) + print(msg) + await ws.send(json.dumps(msg)) + await asyncio.sleep(repeat_ms / 1000) + else: + await run_suite(args.uri) + + +def main(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + _setup_sigint(loop) + try: + loop.run_until_complete(main_async()) + finally: + try: + loop.run_until_complete(asyncio.sleep(0)) + except Exception: + pass + loop.close() + + +if __name__ == "__main__": + main()