#!/usr/bin/env python3 """ Raspberry Pi SPI Master Test for ESP32-C3 SPI Slave GPIO Configuration: - SCK: GPIO11 - MISO: GPIO9 - MOSI: GPIO10 - CS: GPIO8 """ import spidev import time import sys import struct import random import json class SPIMasterTest: def __init__(self, bus=0, device=0, max_speed_hz=1000000): """ Initialize SPI master Args: bus: SPI bus number (0 for Raspberry Pi) device: SPI device number (0 for CE0) max_speed_hz: Maximum SPI clock speed """ self.spi = spidev.SpiDev() self.bus = bus self.device = device self.max_speed_hz = max_speed_hz try: self.spi.open(bus, device) self.spi.max_speed_hz = max_speed_hz self.spi.mode = 0 # SPI Mode 0 (CPOL=0, CPHA=0) self.spi.bits_per_word = 8 print(f"SPI Master initialized: Bus={bus}, Device={device}, Speed={max_speed_hz}Hz, Mode=0") except Exception as e: print(f"Failed to initialize SPI: {e}") sys.exit(1) def send_data(self, data, delay=0.1): """ Send data to SPI slave and receive response Args: data: List of bytes to send delay: Delay between transactions (seconds) Returns: List of bytes received from slave """ try: print(f"Sending {len(data)} bytes: {[hex(b) for b in data]}") # Send data and receive response response = self.spi.xfer2(data) print(f"Received {len(response)} bytes: {[hex(b) for b in response]}") if delay > 0: time.sleep(delay) return response except Exception as e: print(f"SPI transaction failed: {e}") return [] def test_basic_communication(self): """Test basic SPI communication""" print("\n=== Basic Communication Test ===") # Test 1: Single byte print("\nTest 1: Single byte") response = self.send_data([0xAA]) # Test 2: Multiple bytes print("\nTest 2: Multiple bytes") response = self.send_data([0x01, 0x02, 0x03, 0x04]) # Test 3: Longer message print("\nTest 3: Longer message") test_data = [i for i in range(16)] response = self.send_data(test_data) return True def test_data_patterns(self): """Test various data patterns""" print("\n=== Data Pattern Tests ===") patterns = [ ([0x00] * 8, "All zeros"), ([0xFF] * 8, "All ones"), ([0x55] * 8, "Alternating 0x55"), ([0xAA] * 8, "Alternating 0xAA"), ([0x12, 0x34, 0x56, 0x78], "Incrementing pattern"), ([0x87, 0x65, 0x43, 0x21], "Decrementing pattern"), ] for data, description in patterns: print(f"\n{description}: {[hex(b) for b in data]}") response = self.send_data(data) time.sleep(0.2) return True def test_random_data(self, num_tests=5): """Test with random data""" print(f"\n=== Random Data Tests ({num_tests} tests) ===") for i in range(num_tests): # Generate random data (1-32 bytes) length = random.randint(1, 32) data = [random.randint(0, 255) for _ in range(length)] print(f"\nTest {i+1}: {length} random bytes") response = self.send_data(data) time.sleep(0.3) return True def test_espnow_trigger(self): """Test sending data that should trigger ESP-NOW broadcast""" print("\n=== ESP-NOW Trigger Test ===") # Send a recognizable pattern that the ESP32-C3 should broadcast test_messages = [ [0x48, 0x65, 0x6C, 0x6C, 0x6F], # "Hello" [0x54, 0x65, 0x73, 0x74], # "Test" [0x45, 0x53, 0x50, 0x33, 0x32], # "ESP32" [0x53, 0x50, 0x49, 0x5F, 0x54, 0x65, 0x73, 0x74], # "SPI_Test" ] for i, data in enumerate(test_messages): # Format bytes as readable string readable = ''.join([chr(b) if 32 <= b <= 126 else f'\\x{b:02x}' for b in data]) print(f"\nMessage {i+1}: {readable}") response = self.send_data(data, delay=0.5) # Check if we received the expected test pattern response if response and len(response) >= len(data): print(f"Response matches test pattern: {response[:len(data)] == [i for i in range(len(data))]}") return True def send_json(self, payload: dict, ensure_ascii=False): """Send a JSON object over SPI as UTF-8 bytes. The ESP32 will rebroadcast these bytes via ESP-NOW. Keep payload small (< 190 bytes) to stay within radio payload limits on the ESP side. """ try: json_str = json.dumps(payload, separators=(",",":"), ensure_ascii=ensure_ascii) data_bytes = list(json_str.encode("utf-8")) if len(data_bytes) == 0: print("JSON payload empty; nothing to send") return [] if len(data_bytes) > 240: print(f"Warning: JSON length {len(data_bytes)} exceeds 240 bytes; truncating for demo") data_bytes = data_bytes[:240] print(f"Sending JSON ({len(data_bytes)} bytes): {json_str}") resp = self.spi.xfer2(data_bytes) print(f"Received {len(resp)} bytes") return resp except Exception as e: print(f"Failed to send JSON: {e}") return [] def test_stress(self, duration=10): """Stress test - continuous communication""" print(f"\n=== Stress Test ({duration} seconds) ===") start_time = time.time() count = 0 try: while time.time() - start_time < duration: # Send random data length = random.randint(1, 16) data = [random.randint(0, 255) for _ in range(length)] response = self.send_data(data, delay=0.01) count += 1 if count % 100 == 0: elapsed = time.time() - start_time rate = count / elapsed print(f"Transactions: {count}, Rate: {rate:.1f} tx/sec") except KeyboardInterrupt: print("\nStress test interrupted by user") elapsed = time.time() - start_time rate = count / elapsed if elapsed > 0 else 0 print(f"\nStress test completed: {count} transactions in {elapsed:.1f}s ({rate:.1f} tx/sec)") return True def cleanup(self): """Clean up SPI resources""" try: self.spi.close() print("SPI connection closed") except Exception as e: print(f"Error closing SPI: {e}") def main(): """Main test function""" print("Raspberry Pi SPI Master Test for ESP32-C3") print("GPIO Configuration:") print(" SCK: GPIO11") print(" MISO: GPIO9") print(" MOSI: GPIO10") print(" CS: GPIO8") print("=" * 50) # Initialize SPI master spi_test = SPIMasterTest(bus=0, device=0, max_speed_hz=1000000) try: # Run tests spi_test.test_basic_communication() spi_test.test_data_patterns() spi_test.test_random_data(num_tests=3) spi_test.test_espnow_trigger() # Ask user if they want to run stress test response = input("\nRun stress test? (y/n): ").lower().strip() if response == 'y': duration = input("Enter duration in seconds (default 10): ").strip() try: duration = int(duration) if duration else 10 except ValueError: duration = 10 spi_test.test_stress(duration=duration) print("\n" + "=" * 50) print("All tests completed successfully!") except KeyboardInterrupt: print("\n\nTest interrupted by user") except Exception as e: print(f"\nTest failed with error: {e}") finally: spi_test.cleanup() if __name__ == "__main__": main()