import utime import random from patterns_base import PatternBase # Import PatternBase import _thread from machine import WDT class Patterns(PatternBase): # Inherit from PatternBase def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100): super().__init__(pin, num_leds, color1, color2, brightness, selected, delay) # Call parent constructor # Pattern-specific initializations self.on_width = 1 # Default on width self.off_width = 2 # Default off width (so total segment is 3, matching original behavior) self.n1 = 0 # Default start of fill range self.n2 = self.num_leds - 1 # Default end of fill range self.n3 = 1 # Default step factor self.n4 = 0 self.oneshot = False # New: One-shot flag for patterns like fill_range self.patterns = { # Shortened pattern names for optimized JSON payloads "o": self.off, "on": self.on, "bl": self.blink, } self.step = 0 self.run = True self.running = False self.wdt = WDT(timeout=10000) def select(self, pattern): self.selected = pattern self.run = False if pattern not in self.patterns: return False while self.running: utime.sleep_ms(1) self.running = True _thread.start_new_thread(self.patterns[pattern], ()) def on(self): """Turn on all LEDs with current color""" self.fill(self.apply_brightness(self.colors[0])) def off(self): """Turn off all LEDs""" self.fill((0, 0, 0)) def blink(self): self.run = True start = utime.ticks_ms() while self.run: self.wdt.feed() diff = utime.ticks_diff(utime.ticks_ms(), start) if diff >= self.delay: self.fill((0, 0, 0)) start = utime.ticks_ms() elif diff >= self.delay/2: self.fill(self.apply_brightness(self.colors[0])) self.run = False self.running = False # def flicker(self): # current_time = utime.ticks_ms() # base_color = self.colors[0] # # Use fixed minimum brightness of 10, flicker between 10 and full brightness # # Use n3 as step rate multiplier to control how fast patterns step # min_brightness = 10 # step_rate = max(1, int(self.n3)) # flicker_brightness_offset = random.randint(-int(self.brightness // 1.5), int(self.brightness // 1.5)) # flicker_brightness = max(min_brightness, min(255, self.brightness + flicker_brightness_offset)) # flicker_color = self.apply_brightness(base_color, brightness_override=flicker_brightness) # self.fill(flicker_color) # self.last_update = current_time # return max(1, int(self.delay // (5 * step_rate))) # def fill_range(self): # """ # Fills a range of LEDs from n1 to n2 with a solid color. # If self.oneshot is True, it fills once and then turns off the LEDs. # """ # current_time = utime.ticks_ms() # if self.oneshot and self.pattern_step >= 1: # self.fill((0, 0, 0)) # Turn off LEDs if one-shot already happened # else: # color = self.apply_brightness(self.colors[0]) # for i in range(self.n1, self.n2 + 1): # self.n[i] = color # self.n.write() # self.last_update = current_time # return self.delay # self.last_update = current_time # return self.delay # def n_chase(self): # """ # A theater chase pattern using n1 for on-width and n2 for off-width. # """ # current_time = utime.ticks_ms() # step_rate = max(1, int(self.n3)) # segment_length = self.n1 + self.n2 # if segment_length == 0: # Avoid division by zero # self.fill((0,0,0)) # self.n.write() # self.last_update = current_time # return self.delay # # Use controller's step for synchronization, but scale it for chasing # chase_step = (self.step * step_rate) % self.num_leds # for i in range(self.num_leds): # # Calculate position relative to the chase head # pos_from_head = (i - chase_step) % self.num_leds # if pos_from_head < self.n1: # self.n[i] = self.apply_brightness(self.colors[0]) # else: # self.n[i] = (0, 0, 0) # self.n.write() # # Don't update internal step - use controller's step for sync # self.last_update = current_time # return self.delay # def alternating(self): # # Use n1 as ON width and n2 as OFF width # segment_on = max(0, int(self.n1)) # segment_off = max(0, int(self.n2)) # total_segment_length = segment_on + segment_off # if total_segment_length <= 0: # self.fill((0, 0, 0)) # self.n.write() # return self.delay # current_phase = self.step % 2 # active_color = self.apply_brightness(self.colors[0]) # for i in range(self.num_leds): # pos_in_segment = i % total_segment_length # if current_phase == 0: # # ON then OFF # if pos_in_segment < segment_on: # self.n[i] = active_color # else: # self.n[i] = (0, 0, 0) # else: # # OFF then ON # if pos_in_segment < segment_on: # self.n[i] = (0, 0, 0) # else: # self.n[i] = active_color # self.n.write() # # Don't update step - use the step value sent from controller for synchronization # return max(1, int(self.delay // 2)) # def pulse(self): # # Envelope: attack=n1 ms, hold=delay ms, decay=n2 ms # attack_ms = max(0, int(self.n1)) # hold_ms = max(0, int(self.delay)) # decay_ms = max(0, int(self.n2)) # base = self.colors[0] if len(self.colors) > 0 else (255, 255, 255) # full_brightness = max(0, min(255, int(self.brightness))) # # Attack phase (0 -> full) # if attack_ms > 0: # start = utime.ticks_ms() # while utime.ticks_diff(utime.ticks_ms(), start) < attack_ms: # elapsed = utime.ticks_diff(utime.ticks_ms(), start) # frac = elapsed / attack_ms if attack_ms > 0 else 1.0 # b = int(full_brightness * frac) # self.fill(self.apply_brightness(base, brightness_override=b)) # else: # self.fill(self.apply_brightness(base, brightness_override=full_brightness)) # # Hold phase # if hold_ms > 0: # start = utime.ticks_ms() # while utime.ticks_diff(utime.ticks_ms(), start) < hold_ms: # pass # # Decay phase (full -> 0) # if decay_ms > 0: # start = utime.ticks_ms() # while utime.ticks_diff(utime.ticks_ms(), start) < decay_ms: # elapsed = utime.ticks_diff(utime.ticks_ms(), start) # frac = 1.0 - (elapsed / decay_ms if decay_ms > 0 else 1.0) # if frac < 0: # frac = 0 # b = int(full_brightness * frac) # self.fill(self.apply_brightness(base, brightness_override=b)) # # Ensure off at the end and stop auto-run # self.fill((0, 0, 0)) # self.run = False # return self.delay # def rainbow(self): # # Wheel function to map 0-255 to RGB # def wheel(pos): # if pos < 85: # return (pos * 3, 255 - pos * 3, 0) # elif pos < 170: # pos -= 85 # return (255 - pos * 3, 0, pos * 3) # else: # pos -= 170 # return (0, pos * 3, 255 - pos * 3) # step_rate = max(1, int(self.n3)) # # Use controller's step for synchronization, scaled for rainbow cycling # rainbow_step = (self.step * step_rate) % 256 # for i in range(self.num_leds): # rc_index = (i * 256 // max(1, self.num_leds)) + rainbow_step # self.n[i] = self.apply_brightness(wheel(rc_index & 255)) # self.n.write() # # Don't update internal step - use controller's step for sync # return max(1, int(self.delay // 5)) # def specto(self): # # Light up LEDs from 0 up to n1 (exclusive) and turn the rest off # count = int(self.n1) # if count < 0: # count = 0 # if count > self.num_leds: # count = self.num_leds # color = self.apply_brightness(self.colors[0] if len(self.colors) > 0 else (255, 255, 255)) # for i in range(self.num_leds): # self.n[i] = color if i < count else (0, 0, 0) # self.n.write() # return self.delay # def radiate(self): # # Radiate outward from origins spaced every n1 LEDs, stepping each ring by self.delay # sep = max(1, int(self.n1) if self.n1 else 1) # color = self.apply_brightness(self.colors[0] if len(self.colors) > 0 else (255, 255, 255)) # # Start with strip off # self.fill((0, 0, 0)) # origins = list(range(0, self.num_leds, sep)) # radius = 0 # lit_total = 0 # while True: # drew_any = False # for o in origins: # left = o - radius # right = o + radius # if 0 <= left < self.num_leds: # if self.n[left] == (0, 0, 0): # lit_total += 1 # self.n[left] = color # drew_any = True # if 0 <= right < self.num_leds: # if self.n[right] == (0, 0, 0): # lit_total += 1 # self.n[right] = color # drew_any = True # self.n.write() # # If we didn't draw anything new, we've reached beyond edges # if not drew_any: # break # # If all LEDs are now lit, immediately proceed to dark sweep # if lit_total >= self.num_leds: # break # # wait self.delay ms before next ring # start = utime.ticks_us() # while utime.ticks_diff(utime.ticks_us(), start) < self.delay: # pass # radius += 1 # # Radiate back out (darkness outward): turn off from center to edges # last_radius = max(0, radius - 1) # for r in range(0, last_radius + 1): # for o in origins: # left = o - r # right = o + r # if 0 <= left < self.num_leds: # self.n[left] = (0, 0, 0) # if 0 <= right < self.num_leds: # self.n[right] = (0, 0, 0) # self.n.write() # start = utime.ticks_us() # while utime.ticks_diff(utime.ticks_us(), start) < self.delay: # pass # # ensure all LEDs are off at completion # self.fill((0, 0, 0)) # # mark complete so scheduler won't auto-run again until re-selected # self.run = False # return self.delay # def segmented_movement(self): # """ # Segmented movement pattern that alternates forward and backward. # Parameters: # n1: Number of LEDs per segment # n2: Spacing between segments (currently unused) # n3: Forward movement steps per beat # n4: Backward movement steps per beat # Movement: Alternates between moving forward n3 steps and backward n4 steps each beat. # """ # try: # # Get parameters # segment_length = max(1, int(self.n1)) if hasattr(self, 'n1') else 3 # segment_spacing = max(0, int(self.n2)) if hasattr(self, 'n2') else 2 # forward_step = max(0, int(self.n3)) if hasattr(self, 'n3') else 1 # backward_step = max(0, int(self.n4)) if hasattr(self, 'n4') else 0 # # Initialize position tracking if not exists # if not hasattr(self, '_sm_position'): # self._sm_position = 0 # self._sm_last_step = -1 # # Check if this is a new beat (step changed) # if self.step != self._sm_last_step: # # Alternate between forward and backward movement # if self.step % 2 == 0: # # Even steps: move forward (if n3 > 0) # if forward_step > 0: # self._sm_position += forward_step # direction = "FWD" # elif backward_step > 0: # # If no forward, still move backward # self._sm_position -= backward_step # direction = "BWD" # else: # direction = "NONE" # else: # # Odd steps: move backward (if n4 > 0) # if backward_step > 0: # self._sm_position -= backward_step # direction = "BWD" # elif forward_step > 0: # # If no backward, still move forward # self._sm_position += forward_step # direction = "FWD" # else: # direction = "NONE" # # Wrap position around strip length # strip_length = self.num_leds + segment_length # self._sm_position = self._sm_position % strip_length # # Update last step # self._sm_last_step = self.step # # DEBUG: Print every beat # if self.step % 5 == 0: # print(f"SM: step={self.step}, dir={direction}, n3={forward_step}, n4={backward_step}, pos={self._sm_position}") # # Clear all LEDs # self.fill((0, 0, 0)) # # Get color # color = self.apply_brightness(self.colors[0]) # # Calculate segment width (segment + spacing) # segment_width = segment_length + segment_spacing # # Draw multiple segments across the strip # if segment_width > 0: # base_position = int(self._sm_position) % segment_width # # Draw segments starting from base_position # current_pos = base_position # while current_pos < self.num_leds: # # Draw segment from current_pos to current_pos + segment_length # segment_end = min(current_pos + segment_length, self.num_leds) # for i in range(max(0, current_pos), segment_end): # self.n[i] = color # # Move to next segment position # current_pos += segment_width # # Handle wrap-around: draw segments that start before 0 # wrap_position = base_position - segment_width # while wrap_position > -segment_length: # if wrap_position < 0: # # Partial segment at start # segment_end = min(wrap_position + segment_length, self.num_leds) # for i in range(0, segment_end): # self.n[i] = color # wrap_position -= segment_width # self.n.write() # return self.delay # except Exception as e: # # DEBUG: Print error # print(f"SM Error: {e}") # # If anything goes wrong, turn off LEDs and return # self.fill((0, 0, 0)) # self.n.write() # return self.delay # if __name__ == "__main__": # import time # from machine import WDT # wdt = WDT(timeout=2000) # Enable watchdog with a 2 second timeout # p = Patterns(pin=4, num_leds=60, color1=(255,0,0), color2=(0,0,255), brightness=127, selected="off", delay=100) # print(p.colors, p.brightness) # tests = [ # ("off", {"duration_ms": 500}), # ("on", {"duration_ms": 500}), # ("color_wipe", {"delay": 200, "duration_ms": 1000}), # ("rainbow_cycle", {"delay": 100, "duration_ms": 2500}), # ("theater_chase", {"on_width": 3, "off_width": 3, "delay": 1000, "duration_ms": 2500}), # ("blink", {"delay": 500, "duration_ms": 2000}), # ("color_transition", {"delay": 150, "colors": [(255,0,0),(0,255,0),(0,0,255)], "duration_ms": 5000}), # ("flicker", {"delay": 100, "duration_ms": 2000}), # ("scanner", {"delay": 150, "duration_ms": 2500}), # ("bidirectional_scanner", {"delay": 50, "duration_ms": 2500}), # ("fill_range", {"n1": 10, "n2": 20, "delay": 500, "duration_ms": 2000}), # ("n_chase", {"n1": 5, "n2": 5, "delay": 2000, "duration_ms": 2500}), # ("alternating", {"n1": 5, "n2": 5, "delay": 500, "duration_ms": 2500}), # ("pulse", {"delay": 100, "duration_ms": 700}), # ] # print("\n--- Running pattern self-test ---") # for name, cfg in tests: # print(f"\nPattern: {name}") # # apply simple config helpers # if "delay" in cfg: # p.set_delay(cfg["delay"]) # if "on_width" in cfg: # p.set_on_width(cfg["on_width"]) # if "off_width" in cfg: # p.set_off_width(cfg["off_width"]) # if "n1" in cfg and "n2" in cfg: # p.set_fill_range(cfg["n1"], cfg["n2"]) # if "colors" in cfg: # p.set_colors(cfg["colors"]) # p.select(name) # # run per configured duration using absolute-scheduled tick(next_due_ms) # start = utime.ticks_ms() # duration_ms = cfg["duration_ms"] # delay = cfg.get("delay", 0) # next_due = utime.ticks_ms() - 1 # force immediate first call # while utime.ticks_diff(utime.ticks_ms(), start) < duration_ms: # delay = p.tick(delay) # wdt.feed() # print("\n--- Test routine finished ---")