- ESP32-C3 SPI slave project with ESP-NOW broadcast functionality - Raspberry Pi SPI master test tools and CLI for JSON communication - Merged src/ directory from full branch with lighting controller code - Updated Pipfile with system install scripts and ESP32 monitoring - Added comprehensive test suite for SPI communication
254 lines
8.4 KiB
Python
254 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Raspberry Pi SPI Master Test for ESP32-C3 SPI Slave
|
|
GPIO Configuration:
|
|
- SCK: GPIO11
|
|
- MISO: GPIO9
|
|
- MOSI: GPIO10
|
|
- CS: GPIO8
|
|
"""
|
|
|
|
import spidev
|
|
import time
|
|
import sys
|
|
import struct
|
|
import random
|
|
import json
|
|
|
|
class SPIMasterTest:
|
|
def __init__(self, bus=0, device=0, max_speed_hz=1000000):
|
|
"""
|
|
Initialize SPI master
|
|
|
|
Args:
|
|
bus: SPI bus number (0 for Raspberry Pi)
|
|
device: SPI device number (0 for CE0)
|
|
max_speed_hz: Maximum SPI clock speed
|
|
"""
|
|
self.spi = spidev.SpiDev()
|
|
self.bus = bus
|
|
self.device = device
|
|
self.max_speed_hz = max_speed_hz
|
|
|
|
try:
|
|
self.spi.open(bus, device)
|
|
self.spi.max_speed_hz = max_speed_hz
|
|
self.spi.mode = 0 # SPI Mode 0 (CPOL=0, CPHA=0)
|
|
self.spi.bits_per_word = 8
|
|
print(f"SPI Master initialized: Bus={bus}, Device={device}, Speed={max_speed_hz}Hz, Mode=0")
|
|
except Exception as e:
|
|
print(f"Failed to initialize SPI: {e}")
|
|
sys.exit(1)
|
|
|
|
def send_data(self, data, delay=0.1):
|
|
"""
|
|
Send data to SPI slave and receive response
|
|
|
|
Args:
|
|
data: List of bytes to send
|
|
delay: Delay between transactions (seconds)
|
|
|
|
Returns:
|
|
List of bytes received from slave
|
|
"""
|
|
try:
|
|
print(f"Sending {len(data)} bytes: {[hex(b) for b in data]}")
|
|
|
|
# Send data and receive response
|
|
response = self.spi.xfer2(data)
|
|
|
|
print(f"Received {len(response)} bytes: {[hex(b) for b in response]}")
|
|
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"SPI transaction failed: {e}")
|
|
return []
|
|
|
|
def test_basic_communication(self):
|
|
"""Test basic SPI communication"""
|
|
print("\n=== Basic Communication Test ===")
|
|
|
|
# Test 1: Single byte
|
|
print("\nTest 1: Single byte")
|
|
response = self.send_data([0xAA])
|
|
|
|
# Test 2: Multiple bytes
|
|
print("\nTest 2: Multiple bytes")
|
|
response = self.send_data([0x01, 0x02, 0x03, 0x04])
|
|
|
|
# Test 3: Longer message
|
|
print("\nTest 3: Longer message")
|
|
test_data = [i for i in range(16)]
|
|
response = self.send_data(test_data)
|
|
|
|
return True
|
|
|
|
def test_data_patterns(self):
|
|
"""Test various data patterns"""
|
|
print("\n=== Data Pattern Tests ===")
|
|
|
|
patterns = [
|
|
([0x00] * 8, "All zeros"),
|
|
([0xFF] * 8, "All ones"),
|
|
([0x55] * 8, "Alternating 0x55"),
|
|
([0xAA] * 8, "Alternating 0xAA"),
|
|
([0x12, 0x34, 0x56, 0x78], "Incrementing pattern"),
|
|
([0x87, 0x65, 0x43, 0x21], "Decrementing pattern"),
|
|
]
|
|
|
|
for data, description in patterns:
|
|
print(f"\n{description}: {[hex(b) for b in data]}")
|
|
response = self.send_data(data)
|
|
time.sleep(0.2)
|
|
|
|
return True
|
|
|
|
def test_random_data(self, num_tests=5):
|
|
"""Test with random data"""
|
|
print(f"\n=== Random Data Tests ({num_tests} tests) ===")
|
|
|
|
for i in range(num_tests):
|
|
# Generate random data (1-32 bytes)
|
|
length = random.randint(1, 32)
|
|
data = [random.randint(0, 255) for _ in range(length)]
|
|
|
|
print(f"\nTest {i+1}: {length} random bytes")
|
|
response = self.send_data(data)
|
|
time.sleep(0.3)
|
|
|
|
return True
|
|
|
|
def test_espnow_trigger(self):
|
|
"""Test sending data that should trigger ESP-NOW broadcast"""
|
|
print("\n=== ESP-NOW Trigger Test ===")
|
|
|
|
# Send a recognizable pattern that the ESP32-C3 should broadcast
|
|
test_messages = [
|
|
[0x48, 0x65, 0x6C, 0x6C, 0x6F], # "Hello"
|
|
[0x54, 0x65, 0x73, 0x74], # "Test"
|
|
[0x45, 0x53, 0x50, 0x33, 0x32], # "ESP32"
|
|
[0x53, 0x50, 0x49, 0x5F, 0x54, 0x65, 0x73, 0x74], # "SPI_Test"
|
|
]
|
|
|
|
for i, data in enumerate(test_messages):
|
|
# Format bytes as readable string
|
|
readable = ''.join([chr(b) if 32 <= b <= 126 else f'\\x{b:02x}' for b in data])
|
|
print(f"\nMessage {i+1}: {readable}")
|
|
response = self.send_data(data, delay=0.5)
|
|
|
|
# Check if we received the expected test pattern response
|
|
if response and len(response) >= len(data):
|
|
print(f"Response matches test pattern: {response[:len(data)] == [i for i in range(len(data))]}")
|
|
|
|
return True
|
|
|
|
def send_json(self, payload: dict, ensure_ascii=False):
|
|
"""Send a JSON object over SPI as UTF-8 bytes.
|
|
|
|
The ESP32 will rebroadcast these bytes via ESP-NOW. Keep payload small
|
|
(< 190 bytes) to stay within radio payload limits on the ESP side.
|
|
"""
|
|
try:
|
|
json_str = json.dumps(payload, separators=(",",":"), ensure_ascii=ensure_ascii)
|
|
data_bytes = list(json_str.encode("utf-8"))
|
|
if len(data_bytes) == 0:
|
|
print("JSON payload empty; nothing to send")
|
|
return []
|
|
|
|
if len(data_bytes) > 240:
|
|
print(f"Warning: JSON length {len(data_bytes)} exceeds 240 bytes; truncating for demo")
|
|
data_bytes = data_bytes[:240]
|
|
|
|
print(f"Sending JSON ({len(data_bytes)} bytes): {json_str}")
|
|
resp = self.spi.xfer2(data_bytes)
|
|
print(f"Received {len(resp)} bytes")
|
|
return resp
|
|
except Exception as e:
|
|
print(f"Failed to send JSON: {e}")
|
|
return []
|
|
|
|
def test_stress(self, duration=10):
|
|
"""Stress test - continuous communication"""
|
|
print(f"\n=== Stress Test ({duration} seconds) ===")
|
|
|
|
start_time = time.time()
|
|
count = 0
|
|
|
|
try:
|
|
while time.time() - start_time < duration:
|
|
# Send random data
|
|
length = random.randint(1, 16)
|
|
data = [random.randint(0, 255) for _ in range(length)]
|
|
|
|
response = self.send_data(data, delay=0.01)
|
|
count += 1
|
|
|
|
if count % 100 == 0:
|
|
elapsed = time.time() - start_time
|
|
rate = count / elapsed
|
|
print(f"Transactions: {count}, Rate: {rate:.1f} tx/sec")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nStress test interrupted by user")
|
|
|
|
elapsed = time.time() - start_time
|
|
rate = count / elapsed if elapsed > 0 else 0
|
|
print(f"\nStress test completed: {count} transactions in {elapsed:.1f}s ({rate:.1f} tx/sec)")
|
|
|
|
return True
|
|
|
|
def cleanup(self):
|
|
"""Clean up SPI resources"""
|
|
try:
|
|
self.spi.close()
|
|
print("SPI connection closed")
|
|
except Exception as e:
|
|
print(f"Error closing SPI: {e}")
|
|
|
|
def main():
|
|
"""Main test function"""
|
|
print("Raspberry Pi SPI Master Test for ESP32-C3")
|
|
print("GPIO Configuration:")
|
|
print(" SCK: GPIO11")
|
|
print(" MISO: GPIO9")
|
|
print(" MOSI: GPIO10")
|
|
print(" CS: GPIO8")
|
|
print("=" * 50)
|
|
|
|
# Initialize SPI master
|
|
spi_test = SPIMasterTest(bus=0, device=0, max_speed_hz=1000000)
|
|
|
|
try:
|
|
# Run tests
|
|
spi_test.test_basic_communication()
|
|
spi_test.test_data_patterns()
|
|
spi_test.test_random_data(num_tests=3)
|
|
spi_test.test_espnow_trigger()
|
|
|
|
# Ask user if they want to run stress test
|
|
response = input("\nRun stress test? (y/n): ").lower().strip()
|
|
if response == 'y':
|
|
duration = input("Enter duration in seconds (default 10): ").strip()
|
|
try:
|
|
duration = int(duration) if duration else 10
|
|
except ValueError:
|
|
duration = 10
|
|
spi_test.test_stress(duration=duration)
|
|
|
|
print("\n" + "=" * 50)
|
|
print("All tests completed successfully!")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nTest interrupted by user")
|
|
except Exception as e:
|
|
print(f"\nTest failed with error: {e}")
|
|
finally:
|
|
spi_test.cleanup()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|