Files
lighting-controller/test/spi_master_test.py
Pi User 5a05ee99a1 Add ESP32-C3 SPI slave with ESP-NOW, Raspberry Pi test tools, and updated project structure
- ESP32-C3 SPI slave project with ESP-NOW broadcast functionality
- Raspberry Pi SPI master test tools and CLI for JSON communication
- Merged src/ directory from full branch with lighting controller code
- Updated Pipfile with system install scripts and ESP32 monitoring
- Added comprehensive test suite for SPI communication
2025-10-01 21:08:28 +13:00

254 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
Raspberry Pi SPI Master Test for ESP32-C3 SPI Slave
GPIO Configuration:
- SCK: GPIO11
- MISO: GPIO9
- MOSI: GPIO10
- CS: GPIO8
"""
import spidev
import time
import sys
import struct
import random
import json
class SPIMasterTest:
def __init__(self, bus=0, device=0, max_speed_hz=1000000):
"""
Initialize SPI master
Args:
bus: SPI bus number (0 for Raspberry Pi)
device: SPI device number (0 for CE0)
max_speed_hz: Maximum SPI clock speed
"""
self.spi = spidev.SpiDev()
self.bus = bus
self.device = device
self.max_speed_hz = max_speed_hz
try:
self.spi.open(bus, device)
self.spi.max_speed_hz = max_speed_hz
self.spi.mode = 0 # SPI Mode 0 (CPOL=0, CPHA=0)
self.spi.bits_per_word = 8
print(f"SPI Master initialized: Bus={bus}, Device={device}, Speed={max_speed_hz}Hz, Mode=0")
except Exception as e:
print(f"Failed to initialize SPI: {e}")
sys.exit(1)
def send_data(self, data, delay=0.1):
"""
Send data to SPI slave and receive response
Args:
data: List of bytes to send
delay: Delay between transactions (seconds)
Returns:
List of bytes received from slave
"""
try:
print(f"Sending {len(data)} bytes: {[hex(b) for b in data]}")
# Send data and receive response
response = self.spi.xfer2(data)
print(f"Received {len(response)} bytes: {[hex(b) for b in response]}")
if delay > 0:
time.sleep(delay)
return response
except Exception as e:
print(f"SPI transaction failed: {e}")
return []
def test_basic_communication(self):
"""Test basic SPI communication"""
print("\n=== Basic Communication Test ===")
# Test 1: Single byte
print("\nTest 1: Single byte")
response = self.send_data([0xAA])
# Test 2: Multiple bytes
print("\nTest 2: Multiple bytes")
response = self.send_data([0x01, 0x02, 0x03, 0x04])
# Test 3: Longer message
print("\nTest 3: Longer message")
test_data = [i for i in range(16)]
response = self.send_data(test_data)
return True
def test_data_patterns(self):
"""Test various data patterns"""
print("\n=== Data Pattern Tests ===")
patterns = [
([0x00] * 8, "All zeros"),
([0xFF] * 8, "All ones"),
([0x55] * 8, "Alternating 0x55"),
([0xAA] * 8, "Alternating 0xAA"),
([0x12, 0x34, 0x56, 0x78], "Incrementing pattern"),
([0x87, 0x65, 0x43, 0x21], "Decrementing pattern"),
]
for data, description in patterns:
print(f"\n{description}: {[hex(b) for b in data]}")
response = self.send_data(data)
time.sleep(0.2)
return True
def test_random_data(self, num_tests=5):
"""Test with random data"""
print(f"\n=== Random Data Tests ({num_tests} tests) ===")
for i in range(num_tests):
# Generate random data (1-32 bytes)
length = random.randint(1, 32)
data = [random.randint(0, 255) for _ in range(length)]
print(f"\nTest {i+1}: {length} random bytes")
response = self.send_data(data)
time.sleep(0.3)
return True
def test_espnow_trigger(self):
"""Test sending data that should trigger ESP-NOW broadcast"""
print("\n=== ESP-NOW Trigger Test ===")
# Send a recognizable pattern that the ESP32-C3 should broadcast
test_messages = [
[0x48, 0x65, 0x6C, 0x6C, 0x6F], # "Hello"
[0x54, 0x65, 0x73, 0x74], # "Test"
[0x45, 0x53, 0x50, 0x33, 0x32], # "ESP32"
[0x53, 0x50, 0x49, 0x5F, 0x54, 0x65, 0x73, 0x74], # "SPI_Test"
]
for i, data in enumerate(test_messages):
# Format bytes as readable string
readable = ''.join([chr(b) if 32 <= b <= 126 else f'\\x{b:02x}' for b in data])
print(f"\nMessage {i+1}: {readable}")
response = self.send_data(data, delay=0.5)
# Check if we received the expected test pattern response
if response and len(response) >= len(data):
print(f"Response matches test pattern: {response[:len(data)] == [i for i in range(len(data))]}")
return True
def send_json(self, payload: dict, ensure_ascii=False):
"""Send a JSON object over SPI as UTF-8 bytes.
The ESP32 will rebroadcast these bytes via ESP-NOW. Keep payload small
(< 190 bytes) to stay within radio payload limits on the ESP side.
"""
try:
json_str = json.dumps(payload, separators=(",",":"), ensure_ascii=ensure_ascii)
data_bytes = list(json_str.encode("utf-8"))
if len(data_bytes) == 0:
print("JSON payload empty; nothing to send")
return []
if len(data_bytes) > 240:
print(f"Warning: JSON length {len(data_bytes)} exceeds 240 bytes; truncating for demo")
data_bytes = data_bytes[:240]
print(f"Sending JSON ({len(data_bytes)} bytes): {json_str}")
resp = self.spi.xfer2(data_bytes)
print(f"Received {len(resp)} bytes")
return resp
except Exception as e:
print(f"Failed to send JSON: {e}")
return []
def test_stress(self, duration=10):
"""Stress test - continuous communication"""
print(f"\n=== Stress Test ({duration} seconds) ===")
start_time = time.time()
count = 0
try:
while time.time() - start_time < duration:
# Send random data
length = random.randint(1, 16)
data = [random.randint(0, 255) for _ in range(length)]
response = self.send_data(data, delay=0.01)
count += 1
if count % 100 == 0:
elapsed = time.time() - start_time
rate = count / elapsed
print(f"Transactions: {count}, Rate: {rate:.1f} tx/sec")
except KeyboardInterrupt:
print("\nStress test interrupted by user")
elapsed = time.time() - start_time
rate = count / elapsed if elapsed > 0 else 0
print(f"\nStress test completed: {count} transactions in {elapsed:.1f}s ({rate:.1f} tx/sec)")
return True
def cleanup(self):
"""Clean up SPI resources"""
try:
self.spi.close()
print("SPI connection closed")
except Exception as e:
print(f"Error closing SPI: {e}")
def main():
"""Main test function"""
print("Raspberry Pi SPI Master Test for ESP32-C3")
print("GPIO Configuration:")
print(" SCK: GPIO11")
print(" MISO: GPIO9")
print(" MOSI: GPIO10")
print(" CS: GPIO8")
print("=" * 50)
# Initialize SPI master
spi_test = SPIMasterTest(bus=0, device=0, max_speed_hz=1000000)
try:
# Run tests
spi_test.test_basic_communication()
spi_test.test_data_patterns()
spi_test.test_random_data(num_tests=3)
spi_test.test_espnow_trigger()
# Ask user if they want to run stress test
response = input("\nRun stress test? (y/n): ").lower().strip()
if response == 'y':
duration = input("Enter duration in seconds (default 10): ").strip()
try:
duration = int(duration) if duration else 10
except ValueError:
duration = 10
spi_test.test_stress(duration=duration)
print("\n" + "=" * 50)
print("All tests completed successfully!")
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
except Exception as e:
print(f"\nTest failed with error: {e}")
finally:
spi_test.cleanup()
if __name__ == "__main__":
main()