3 Commits

Author SHA1 Message Date
Pi User
d57fce77fb update: misc changes 2025-10-01 23:31:00 +13:00
Pi User
fbeb365932 pipenv: add sound-run; sound.py: --input-device flag 2025-10-01 23:31:00 +13:00
Pi User
ed35d6b838 pipenv: add send-net; networking: use SPI; add networking test 2025-10-01 23:31:00 +13:00
5 changed files with 199 additions and 81 deletions

View File

@@ -33,3 +33,5 @@ build-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py
flash-esp32 = "bash -c 'source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p $ESPPORT -b ${ESPSPEED:-460800} flash'"
watch-esp32 = "watchfiles 'bash -c \"source $HOME/esp/esp-idf/export.sh && cd esp32 && idf.py -p ${ESPPORT:-/dev/ttyACM0} -b ${ESPSPEED:-460800} flash monitor\"' esp32/main"
send-json = "python test/send_json.py"
send-net = "python test/test_networking.py"
sound-run = "python src/sound.py"

View File

@@ -14,12 +14,12 @@ import threading
import time
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
from color_utils import adjust_brightness
from networking import WebSocketClient as SPIClient # SPI transport client
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration
LED_SERVER_URI = "ws://192.168.4.1:80/ws"
CONTROL_SERVER_PORT = 8765
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
@@ -40,57 +40,23 @@ PATTERN_NAMES = {
class LEDController:
"""Handles communication with LED bars via WebSocket."""
"""Handles communication with LED bars via SPI (through ESP32 relay)."""
def __init__(self, led_server_uri):
self.led_server_uri = led_server_uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
def __init__(self, spi_bus: int = 0, spi_device: int = 0, spi_speed_hz: int = 1_000_000):
self.client = SPIClient(bus=spi_bus, device=spi_device, speed_hz=spi_speed_hz)
@property
def is_connected(self) -> bool:
return getattr(self.client, "is_connected", False)
async def connect(self):
"""Connect to LED server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to LED server at {self.led_server_uri}...")
self.websocket = await websockets.connect(self.led_server_uri)
self.is_connected = True
logging.info("Connected to LED server")
except Exception as e:
logging.error(f"Failed to connect to LED server: {e}")
self.is_connected = False
self.websocket = None
await self.client.connect()
async def send_data(self, data):
"""Send data to LED server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to LED server. Attempting to reconnect...")
await self.connect()
if not self.is_connected:
logging.error("Failed to reconnect to LED server. Cannot send data.")
return
try:
await self.websocket.send(json.dumps(data))
logging.debug(f"Sent to LED server: {data}")
except Exception as e:
logging.error(f"Failed to send data to LED server: {e}")
self.is_connected = False
self.websocket = None
# Attempt to reconnect
await self.connect()
await self.client.send_data(data)
async def close(self):
"""Close LED server connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from LED server")
await self.client.close()
class SoundController:
@@ -119,7 +85,8 @@ class LightingController:
"""Main lighting control logic."""
def __init__(self):
self.led_controller = LEDController(LED_SERVER_URI)
# SPI defaults: bus 0, CE0, 1MHz; adjust here if needed
self.led_controller = LEDController(spi_bus=0, spi_device=0, spi_speed_hz=1_000_000)
self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT)
# Lighting state

View File

@@ -1,53 +1,79 @@
import asyncio
import websockets
import json
import os
import time
try:
import spidev
except Exception as e:
spidev = None
class WebSocketClient:
def __init__(self, uri):
self.uri = uri
self.websocket = None
def __init__(self, uri=None, *, bus=None, device=None, speed_hz=None):
# SPI configuration (defaults can be overridden by args or env)
self.bus = 0 if bus is None else int(bus)
self.device = 0 if device is None else int(device)
self.speed_hz = (
int(os.getenv("SPI_SPEED_HZ", "1000000")) if speed_hz is None else int(speed_hz)
)
self.spi = None
self.is_connected = False
async def connect(self):
"""Establishes the WebSocket connection."""
if self.is_connected and self.websocket:
print("Already connected.")
"""Initializes the SPI connection."""
if self.is_connected and self.spi:
return
if spidev is None:
print("spidev not available; cannot open SPI")
self.is_connected = False
self.spi = None
return
try:
print(f"Connecting to {self.uri}...")
self.websocket = await websockets.connect(self.uri)
spi = spidev.SpiDev()
spi.open(self.bus, self.device)
spi.max_speed_hz = self.speed_hz
spi.mode = 0
spi.bits_per_word = 8
self.spi = spi
self.is_connected = True
print("WebSocket connected.")
except (ConnectionError, websockets.exceptions.ConnectionClosedOK) as e:
print(f"Error connecting: {e}")
print(f"SPI connected: bus={self.bus} device={self.device} speed={self.speed_hz}Hz mode=0")
except Exception as e:
print(f"Error opening SPI: {e}")
self.is_connected = False
self.websocket = None
self.spi = None
async def send_data(self, data):
print(data)
"""Sends data over the open WebSocket connection."""
if not self.is_connected or not self.websocket:
print("WebSocket not connected. Attempting to reconnect...")
"""Sends a JSON object over SPI as UTF-8 bytes."""
if not self.is_connected or not self.spi:
await self.connect()
if not self.is_connected:
print("Failed to reconnect. Cannot send data.")
print("SPI not connected; cannot send")
return
try:
await self.websocket.send(json.dumps(data))
print(f"Sent: {data}")
except (ConnectionError, websockets.exceptions.ConnectionClosed) as e:
print(f"Error sending data: {e}")
json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
payload = list(json_str.encode("utf-8"))
if not payload:
return
# Keep payload comfortably below ESP-NOW max; trim if necessary
if len(payload) > 240:
payload = payload[:240]
self.spi.xfer2(payload)
except Exception as e:
print(f"SPI send failed: {e}")
# Attempt simple reopen on next call
self.is_connected = False
self.websocket = None # Reset connection on error
await self.connect() # Attempt to reconnect
self.spi = None
async def close(self):
"""Closes the WebSocket connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
print("WebSocket closed.")
"""Closes the SPI connection."""
try:
if self.spi:
self.spi.close()
except Exception:
pass
self.is_connected = False
self.spi = None

View File

@@ -5,6 +5,7 @@ import aubio
import numpy as np
from time import sleep
import json
import argparse
import socket
import time
import logging # Added logging import
@@ -24,7 +25,7 @@ SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
class SoundBeatDetector:
def __init__(self, tcp_host: str, tcp_port: int):
def __init__(self, tcp_host: str, tcp_port: int, *, input_device: int | None = None):
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.tcp_socket = None
@@ -34,7 +35,7 @@ class SoundBeatDetector:
self.bufferSize = 512
self.windowSizeMultiple = 2
self.audioInputDeviceIndex = 7
self.audioInputDeviceIndex = 7 if input_device is None else int(input_device)
self.audioInputChannels = 1
self.pa = pyaudio.PyAudio()
@@ -196,11 +197,15 @@ class SoundBeatDetector:
# Removed async def run(self)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sound beat detector")
parser.add_argument("--input-device", type=int, help="Audio input device index to use")
args = parser.parse_args()
# TCP Server Configuration (should match midi.py)
MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT)
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT, input_device=args.input_device)
logging.info("Starting SoundBeatDetector...")
try:
sound_detector.start_stream()

118
test/test_networking.py Normal file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Networking SPI test: builds a legacy led-bar payload and sends it via src/networking.py SPI client.
Usage examples:
python test/test_networking.py --type b --pattern on --colors ff0000,00ff00,0000ff
python test/test_networking.py --type u --brightness 128 --delay 50
python test/test_networking.py --data '{"d":{"t":"b","pt":"off"}}'
"""
import argparse
import asyncio
import json
import os
import re
import sys
# Import SPI networking client
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(SCRIPT_DIR)
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from src.networking import WebSocketClient # SPI client with same API
HEX6_RE = re.compile(r"^[0-9a-fA-F]{6}$")
def parse_hex6_to_rgb(value: str):
v = value.strip()
if v.startswith("0x") or v.startswith("0X"):
v = v[2:]
if v.startswith("#"):
v = v[1:]
if not HEX6_RE.match(v):
raise ValueError(f"Invalid hex color: {value}")
return [int(v[0:2], 16), int(v[2:4], 16), int(v[4:6], 16)]
def build_payload(args: argparse.Namespace) -> dict:
if args.data:
return json.loads(args.data)
if args.file:
with open(args.file, "r", encoding="utf-8") as f:
return json.load(f)
d = {"t": args.type}
if args.pattern:
d["pt"] = args.pattern
if args.brightness is not None:
d["br"] = int(args.brightness)
if args.delay is not None:
d["dl"] = int(args.delay)
if args.n1 is not None:
d["n1"] = int(args.n1)
if args.n2 is not None:
d["n2"] = int(args.n2)
if args.n3 is not None:
d["n3"] = int(args.n3)
if args.step is not None:
d["s"] = int(args.step)
if args.colors:
items = [c.strip() for c in args.colors.split(',') if c.strip()]
d["cl"] = [parse_hex6_to_rgb(c) for c in items]
payload = {"d": d}
if args.name:
# For convenience, mirror defaults as per-device override
payload[args.name] = {k: v for k, v in d.items() if k != "t"}
return payload
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Send SPI networking test payload (legacy led-bar format)")
src = p.add_mutually_exclusive_group()
src.add_argument("--data", help="Raw JSON payload to send")
src.add_argument("--file", help="Path to JSON file to send")
p.add_argument("--type", choices=["b", "u"], default="b", help="Message type (beat/update)")
p.add_argument("--pattern", help="Pattern name (pt)")
p.add_argument("--brightness", type=int, help="Brightness (br)")
p.add_argument("--delay", type=int, help="Delay (dl)")
p.add_argument("--n1", type=int, help="n1")
p.add_argument("--n2", type=int, help="n2")
p.add_argument("--n3", type=int, help="n3")
p.add_argument("--step", type=int, help="step (s)")
p.add_argument("--colors", help="Comma-separated hex colors for cl (e.g. ff0000,00ff00,0000ff)")
p.add_argument("--name", help="Per-device override key (device name)")
# SPI config overrides
p.add_argument("--bus", type=int, default=0, help="SPI bus (default 0)")
p.add_argument("--device", type=int, default=0, help="SPI device/CE (default 0)")
p.add_argument("--speed", type=int, default=1_000_000, help="SPI speed Hz (default 1MHz)")
return p.parse_args()
async def main_async() -> int:
args = parse_args()
payload = build_payload(args)
client = WebSocketClient(uri=None, bus=args.bus, device=args.device, speed_hz=args.speed)
await client.connect()
await client.send_data(payload)
await client.close()
return 0
def main() -> int:
return asyncio.run(main_async())
if __name__ == "__main__":
raise SystemExit(main())