3 Commits

Author SHA1 Message Date
Pi User
d57fce77fb update: misc changes 2025-10-01 23:31:00 +13:00
Pi User
fbeb365932 pipenv: add sound-run; sound.py: --input-device flag 2025-10-01 23:31:00 +13:00
Pi User
ed35d6b838 pipenv: add send-net; networking: use SPI; add networking test 2025-10-01 23:31:00 +13:00
5 changed files with 199 additions and 81 deletions

View File

@@ -33,3 +33,5 @@ build-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py
flash-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p $ESPPORT -b ${ESPSPEED:-460800} flash'" flash-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p $ESPPORT -b ${ESPSPEED:-460800} flash'"
watch-esp32 = "watchfiles 'bash -c \"source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p ${ESPPORT:-/dev/ttyACM0} -b ${ESPSPEED:-460800} flash monitor\"' esp32/main" watch-esp32 = "watchfiles 'bash -c \"source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p ${ESPPORT:-/dev/ttyACM0} -b ${ESPSPEED:-460800} flash monitor\"' esp32/main"
send-json = "python test/send_json.py" send-json = "python test/send_json.py"
send-net = "python test/test_networking.py"
sound-run = "python src/sound.py"

View File

@@ -14,12 +14,12 @@ import threading
import time import time
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
from color_utils import adjust_brightness from color_utils import adjust_brightness
from networking import WebSocketClient as SPIClient # SPI transport client
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration # Configuration
LED_SERVER_URI = "ws://192.168.4.1:80/ws"
CONTROL_SERVER_PORT = 8765 CONTROL_SERVER_PORT = 8765
SOUND_CONTROL_HOST = "127.0.0.1" SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433 SOUND_CONTROL_PORT = 65433
@@ -40,57 +40,23 @@ PATTERN_NAMES = {
class LEDController: class LEDController:
"""Handles communication with LED bars via WebSocket.""" """Handles communication with LED bars via SPI (through ESP32 relay)."""
def __init__(self, led_server_uri): def __init__(self, spi_bus: int = 0, spi_device: int = 0, spi_speed_hz: int = 1_000_000):
self.led_server_uri = led_server_uri self.client = SPIClient(bus=spi_bus, device=spi_device, speed_hz=spi_speed_hz)
self.websocket = None
self.is_connected = False @property
self.reconnect_task = None def is_connected(self) -> bool:
return getattr(self.client, "is_connected", False)
async def connect(self): async def connect(self):
"""Connect to LED server.""" await self.client.connect()
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to LED server at {self.led_server_uri}...")
self.websocket = await websockets.connect(self.led_server_uri)
self.is_connected = True
logging.info("Connected to LED server")
except Exception as e:
logging.error(f"Failed to connect to LED server: {e}")
self.is_connected = False
self.websocket = None
async def send_data(self, data): async def send_data(self, data):
"""Send data to LED server.""" await self.client.send_data(data)
if not self.is_connected or not self.websocket:
logging.warning("Not connected to LED server. Attempting to reconnect...")
await self.connect()
if not self.is_connected:
logging.error("Failed to reconnect to LED server. Cannot send data.")
return
try:
await self.websocket.send(json.dumps(data))
logging.debug(f"Sent to LED server: {data}")
except Exception as e:
logging.error(f"Failed to send data to LED server: {e}")
self.is_connected = False
self.websocket = None
# Attempt to reconnect
await self.connect()
async def close(self): async def close(self):
"""Close LED server connection.""" await self.client.close()
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from LED server")
class SoundController: class SoundController:
@@ -119,7 +85,8 @@ class LightingController:
"""Main lighting control logic.""" """Main lighting control logic."""
def __init__(self): def __init__(self):
self.led_controller = LEDController(LED_SERVER_URI) # SPI defaults: bus 0, CE0, 1MHz; adjust here if needed
self.led_controller = LEDController(spi_bus=0, spi_device=0, spi_speed_hz=1_000_000)
self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT) self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT)
# Lighting state # Lighting state

View File

@@ -1,53 +1,79 @@
import asyncio
import websockets
import json import json
import os
import time
try:
import spidev
except Exception as e:
spidev = None
class WebSocketClient: class WebSocketClient:
def __init__(self, uri): def __init__(self, uri=None, *, bus=None, device=None, speed_hz=None):
self.uri = uri # SPI configuration (defaults can be overridden by args or env)
self.websocket = None self.bus = 0 if bus is None else int(bus)
self.device = 0 if device is None else int(device)
self.speed_hz = (
int(os.getenv("SPI_SPEED_HZ", "1000000")) if speed_hz is None else int(speed_hz)
)
self.spi = None
self.is_connected = False self.is_connected = False
async def connect(self): async def connect(self):
"""Establishes the WebSocket connection.""" """Initializes the SPI connection."""
if self.is_connected and self.websocket: if self.is_connected and self.spi:
print("Already connected.") return
if spidev is None:
print("spidev not available; cannot open SPI")
self.is_connected = False
self.spi = None
return return
try: try:
print(f"Connecting to {self.uri}...") spi = spidev.SpiDev()
self.websocket = await websockets.connect(self.uri) spi.open(self.bus, self.device)
spi.max_speed_hz = self.speed_hz
spi.mode = 0
spi.bits_per_word = 8
self.spi = spi
self.is_connected = True self.is_connected = True
print("WebSocket connected.") print(f"SPI connected: bus={self.bus} device={self.device} speed={self.speed_hz}Hz mode=0")
except (ConnectionError, websockets.exceptions.ConnectionClosedOK) as e: except Exception as e:
print(f"Error connecting: {e}") print(f"Error opening SPI: {e}")
self.is_connected = False self.is_connected = False
self.websocket = None self.spi = None
async def send_data(self, data): async def send_data(self, data):
print(data) """Sends a JSON object over SPI as UTF-8 bytes."""
"""Sends data over the open WebSocket connection.""" if not self.is_connected or not self.spi:
if not self.is_connected or not self.websocket:
print("WebSocket not connected. Attempting to reconnect...")
await self.connect() await self.connect()
if not self.is_connected: if not self.is_connected:
print("Failed to reconnect. Cannot send data.") print("SPI not connected; cannot send")
return return
try: try:
await self.websocket.send(json.dumps(data)) json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
print(f"Sent: {data}") payload = list(json_str.encode("utf-8"))
except (ConnectionError, websockets.exceptions.ConnectionClosed) as e: if not payload:
print(f"Error sending data: {e}") return
# Keep payload comfortably below ESP-NOW max; trim if necessary
if len(payload) > 240:
payload = payload[:240]
self.spi.xfer2(payload)
except Exception as e:
print(f"SPI send failed: {e}")
# Attempt simple reopen on next call
self.is_connected = False self.is_connected = False
self.websocket = None # Reset connection on error self.spi = None
await self.connect() # Attempt to reconnect
async def close(self): async def close(self):
"""Closes the WebSocket connection.""" """Closes the SPI connection."""
if self.websocket and self.is_connected: try:
await self.websocket.close() if self.spi:
self.spi.close()
except Exception:
pass
self.is_connected = False self.is_connected = False
self.websocket = None self.spi = None
print("WebSocket closed.")

View File

@@ -5,6 +5,7 @@ import aubio
import numpy as np import numpy as np
from time import sleep from time import sleep
import json import json
import argparse
import socket import socket
import time import time
import logging # Added logging import import logging # Added logging import
@@ -24,7 +25,7 @@ SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433 SOUND_CONTROL_PORT = 65433
class SoundBeatDetector: class SoundBeatDetector:
def __init__(self, tcp_host: str, tcp_port: int): def __init__(self, tcp_host: str, tcp_port: int, *, input_device: int | None = None):
self.tcp_host = tcp_host self.tcp_host = tcp_host
self.tcp_port = tcp_port self.tcp_port = tcp_port
self.tcp_socket = None self.tcp_socket = None
@@ -34,7 +35,7 @@ class SoundBeatDetector:
self.bufferSize = 512 self.bufferSize = 512
self.windowSizeMultiple = 2 self.windowSizeMultiple = 2
self.audioInputDeviceIndex = 7 self.audioInputDeviceIndex = 7 if input_device is None else int(input_device)
self.audioInputChannels = 1 self.audioInputChannels = 1
self.pa = pyaudio.PyAudio() self.pa = pyaudio.PyAudio()
@@ -196,11 +197,15 @@ class SoundBeatDetector:
# Removed async def run(self) # Removed async def run(self)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sound beat detector")
parser.add_argument("--input-device", type=int, help="Audio input device index to use")
args = parser.parse_args()
# TCP Server Configuration (should match midi.py) # TCP Server Configuration (should match midi.py)
MIDI_TCP_HOST = "127.0.0.1" MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432 MIDI_TCP_PORT = 65432
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT) sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT, input_device=args.input_device)
logging.info("Starting SoundBeatDetector...") logging.info("Starting SoundBeatDetector...")
try: try:
sound_detector.start_stream() sound_detector.start_stream()

118
test/test_networking.py Normal file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Networking SPI test: builds a legacy led-bar payload and sends it via src/networking.py SPI client.
Usage examples:
python test/test_networking.py --type b --pattern on --colors ff0000,00ff00,0000ff
python test/test_networking.py --type u --brightness 128 --delay 50
python test/test_networking.py --data '{"d":{"t":"b","pt":"off"}}'
"""
import argparse
import asyncio
import json
import os
import re
import sys
# Import SPI networking client
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(SCRIPT_DIR)
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from src.networking import WebSocketClient # SPI client with same API
HEX6_RE = re.compile(r"^[0-9a-fA-F]{6}$")
def parse_hex6_to_rgb(value: str):
v = value.strip()
if v.startswith("0x") or v.startswith("0X"):
v = v[2:]
if v.startswith("#"):
v = v[1:]
if not HEX6_RE.match(v):
raise ValueError(f"Invalid hex color: {value}")
return [int(v[0:2], 16), int(v[2:4], 16), int(v[4:6], 16)]
def build_payload(args: argparse.Namespace) -> dict:
if args.data:
return json.loads(args.data)
if args.file:
with open(args.file, "r", encoding="utf-8") as f:
return json.load(f)
d = {"t": args.type}
if args.pattern:
d["pt"] = args.pattern
if args.brightness is not None:
d["br"] = int(args.brightness)
if args.delay is not None:
d["dl"] = int(args.delay)
if args.n1 is not None:
d["n1"] = int(args.n1)
if args.n2 is not None:
d["n2"] = int(args.n2)
if args.n3 is not None:
d["n3"] = int(args.n3)
if args.step is not None:
d["s"] = int(args.step)
if args.colors:
items = [c.strip() for c in args.colors.split(',') if c.strip()]
d["cl"] = [parse_hex6_to_rgb(c) for c in items]
payload = {"d": d}
if args.name:
# For convenience, mirror defaults as per-device override
payload[args.name] = {k: v for k, v in d.items() if k != "t"}
return payload
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Send SPI networking test payload (legacy led-bar format)")
src = p.add_mutually_exclusive_group()
src.add_argument("--data", help="Raw JSON payload to send")
src.add_argument("--file", help="Path to JSON file to send")
p.add_argument("--type", choices=["b", "u"], default="b", help="Message type (beat/update)")
p.add_argument("--pattern", help="Pattern name (pt)")
p.add_argument("--brightness", type=int, help="Brightness (br)")
p.add_argument("--delay", type=int, help="Delay (dl)")
p.add_argument("--n1", type=int, help="n1")
p.add_argument("--n2", type=int, help="n2")
p.add_argument("--n3", type=int, help="n3")
p.add_argument("--step", type=int, help="step (s)")
p.add_argument("--colors", help="Comma-separated hex colors for cl (e.g. ff0000,00ff00,0000ff)")
p.add_argument("--name", help="Per-device override key (device name)")
# SPI config overrides
p.add_argument("--bus", type=int, default=0, help="SPI bus (default 0)")
p.add_argument("--device", type=int, default=0, help="SPI device/CE (default 0)")
p.add_argument("--speed", type=int, default=1_000_000, help="SPI speed Hz (default 1MHz)")
return p.parse_args()
async def main_async() -> int:
args = parse_args()
payload = build_payload(args)
client = WebSocketClient(uri=None, bus=args.bus, device=args.device, speed_hz=args.speed)
await client.connect()
await client.send_data(payload)
await client.close()
return 0
def main() -> int:
return asyncio.run(main_async())
if __name__ == "__main__":
raise SystemExit(main())