Add ESP32-C3 SPI slave with ESP-NOW, Raspberry Pi test tools, and updated project structure

- ESP32-C3 SPI slave project with ESP-NOW broadcast functionality
- Raspberry Pi SPI master test tools and CLI for JSON communication
- Merged src/ directory from full branch with lighting controller code
- Updated Pipfile with system install scripts and ESP32 monitoring
- Added comprehensive test suite for SPI communication
This commit is contained in:
Pi User
2025-10-01 21:08:28 +13:00
parent aa9b5bb324
commit 5a05ee99a1
1356 changed files with 190644 additions and 87 deletions

23
test/.gitignore vendored Normal file
View File

@@ -0,0 +1,23 @@
# Python cache files
__pycache__/
*.py[cod]
*$py.class
# Virtual environments
venv/
env/
.env
# IDE files
.vscode/
.idea/
*.swp
*.swo
# OS files
.DS_Store
Thumbs.db
# Test output files
*.log
test_output/

172
test/README.md Normal file
View File

@@ -0,0 +1,172 @@
# SPI Master Test for ESP32-C3
This directory contains test scripts to verify SPI communication between a Raspberry Pi (SPI master) and the ESP32-C3 (SPI slave).
## Quick Start
1. **Setup:** `./setup_spi.sh`
2. **Send JSON:** `pipenv run send-json --beat --brightness 128 --pattern wave`
## Hardware Connections
### Raspberry Pi GPIO Configuration
- **SCK**: GPIO11 (Physical pin 23)
- **MISO**: GPIO9 (Physical pin 21)
- **MOSI**: GPIO10 (Physical pin 19)
- **CS**: GPIO8 (Physical pin 24)
### ESP32-C3 GPIO Configuration
- **SCK**: GPIO20
- **MISO**: GPIO9
- **MOSI**: GPIO10
- **CS**: GPIO7
## Setup
1. **Enable SPI on Raspberry Pi:**
```bash
./setup_spi.sh
```
2. **Flash ESP32-C3 with SPI slave firmware:**
```bash
cd ../esp32
. $HOME/esp/esp-idf/export.sh
idf.py -p /dev/ttyUSB0 flash monitor
```
3. **Connect the hardware:**
- Connect Raspberry Pi GPIO11 to ESP32-C3 GPIO20 (SCK)
- Connect Raspberry Pi GPIO9 to ESP32-C3 GPIO9 (MISO)
- Connect Raspberry Pi GPIO10 to ESP32-C3 GPIO10 (MOSI)
- Connect Raspberry Pi GPIO8 to ESP32-C3 GPIO7 (CS)
- Connect GND between both devices
## Running Tests
### Send JSON
To send your control JSON so the ESP32-C3 can `json.loads` it on the receiving side:
```bash
# Using pipenv script
pipenv run send-json --beat --brightness 128 --pattern wave
# Or directly
pipenv run python test/send_json.py --data '{"d":{"t":"b","br":128},"bar":{"pt":"off"}}'
```
Or programmatically:
```python
from test.spi_master_test import SPIMasterTest
spi = SPIMasterTest()
payload = {
"d": {"t": "b", "br": 128, "dl": 20},
"bar": {"pt": "off"}
}
spi.send_json(payload)
spi.cleanup()
```
Keep payloads under ~190 bytes to fit in the ESP-NOW payload.
### Individual Test Functions
The test script includes several test functions:
1. **Basic Communication Test**
- Single byte transmission
- Multiple byte transmission
- Longer message transmission
2. **Data Pattern Tests**
- All zeros
- All ones
- Alternating patterns
- Incrementing/decrementing patterns
3. **Random Data Tests**
- Random data of varying lengths
- Multiple random tests
4. **ESP-NOW Trigger Test**
- Sends recognizable patterns
- Should trigger ESP-NOW broadcast on ESP32-C3
- Verifies response patterns
5. **Stress Test**
- Continuous communication
- Performance measurement
- Transaction rate calculation
## Expected Behavior
### ESP32-C3 Response
The ESP32-C3 SPI slave should:
1. Receive data on MOSI (GPIO10)
2. Log received data to serial console
3. Broadcast received data via ESP-NOW
4. Send back a test pattern (0x00, 0x01, 0x02, ...)
### Serial Monitor Output
When running the ESP32-C3, you should see output like:
```
I (1234) SPI_SLAVE: Starting SPI Slave with ESP-NOW example
I (1235) SPI_SLAVE: ESP-NOW initialized successfully
I (1236) SPI_SLAVE: SPI Slave initialized successfully
I (1237) SPI_SLAVE: MOSI: GPIO10, MISO: GPIO9, SCLK: GPIO20, CS: GPIO7
I (1238) SPI_SLAVE: Received 5 bytes:
0x48 0x65 0x6c 0x6c 0x6f
I (1239) SPI_SLAVE: Broadcasting 5 bytes via ESP-NOW
I (1240) SPI_SLAVE: ESP-NOW send status: SUCCESS
```
## Troubleshooting
### Common Issues
1. **SPI device not found:**
```bash
ls -la /dev/spi*
# Should show /dev/spidev0.0 and /dev/spidev0.1
```
2. **Permission denied:**
```bash
sudo usermod -a -G spi $USER
# Log out and back in
```
3. **No response from ESP32-C3:**
- Check wiring connections
- Verify ESP32-C3 is running SPI slave firmware
- Check serial monitor for ESP32-C3 output
4. **ESP-NOW not working:**
- Ensure ESP32-C3 has WiFi/ESP-NOW initialized
- Check for other ESP32-C3 devices to receive broadcasts
- Monitor serial output for ESP-NOW status
### Debug Mode
Enable debug output by modifying the test script:
```python
# Add debug prints
print(f"SPI Mode: {spi_test.spi.mode}")
print(f"SPI Speed: {spi_test.spi.max_speed_hz}")
print(f"SPI Bits per word: {spi_test.spi.bits_per_word}")
```
## Performance Notes
- **SPI Speed**: Default 1MHz, can be increased for faster communication
- **Transaction Rate**: Typically 100-1000 transactions/second depending on data size
- **ESP-NOW Broadcast**: Adds ~1-2ms delay per transaction
- **Buffer Size**: ESP32-C3 supports up to 256 bytes per transaction
## Files
- `send_json.py` - **Main script** - Send JSON over SPI to ESP32-C3
- `quick_test.py` - Quick basic functionality test
- `spi_master_test.py` - Comprehensive test suite
- `setup_spi.sh` - Setup script for Raspberry Pi
- `requirements.txt` - Python dependencies
- `README.md` - This documentation

64
test/quick_test.py Executable file
View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""
Quick SPI test for basic functionality verification
"""
import spidev
import time
import sys
import json
def quick_spi_test():
"""Quick test to verify SPI communication"""
print("Quick SPI Test")
print("=" * 30)
try:
# Initialize SPI
spi = spidev.SpiDev()
spi.open(0, 0) # Bus 0, Device 0
spi.max_speed_hz = 1000000 # 1MHz
spi.mode = 0 # SPI Mode 0
spi.bits_per_word = 8
print("SPI initialized successfully")
print(f"Mode: {spi.mode}, Speed: {spi.max_speed_hz}Hz")
# Test 1: Simple byte
print("\nTest 1: Single byte (0xAA)")
response = spi.xfer2([0xAA])
print(f"Sent: [0xAA], Received: {[hex(b) for b in response]}")
# Test 2: Multiple bytes
print("\nTest 2: Multiple bytes")
test_data = [0x01, 0x02, 0x03, 0x04]
response = spi.xfer2(test_data)
print(f"Sent: {[hex(b) for b in test_data]}")
print(f"Received: {[hex(b) for b in response]}")
# Test 3: Hello message
print("\nTest 3: Hello message")
hello = [0x48, 0x65, 0x6C, 0x6C, 0x6F] # "Hello"
response = spi.xfer2(hello)
print(f"Sent: {[hex(b) for b in hello]} ({''.join([chr(b) for b in hello])})")
print(f"Received: {[hex(b) for b in response]}")
# Test 4: JSON message
print("\nTest 4: JSON message")
payload = {"d":{"t":"b","br":128},"bar":{"pt":"off"}}
json_bytes = list(json.dumps(payload, separators=(",",":"))).encode("utf-8")
response = spi.xfer2(list(json.dumps(payload, separators=(",",":"), ensure_ascii=False).encode("utf-8")))
print(f"Sent JSON: {json.dumps(payload)}")
print(f"Received: {[hex(b) for b in response]}")
spi.close()
print("\nTest completed successfully!")
return True
except Exception as e:
print(f"Test failed: {e}")
return False
if __name__ == "__main__":
success = quick_spi_test()
sys.exit(0 if success else 1)

13
test/requirements.txt Normal file
View File

@@ -0,0 +1,13 @@
# Python dependencies for SPI master test
# Note: spidev is typically installed via system package manager on Raspberry Pi
# For development and testing
pytest>=7.0.0
pytest-cov>=4.0.0
# For data visualization (optional)
matplotlib>=3.5.0
numpy>=1.21.0
# For logging and debugging
colorlog>=6.7.0

114
test/send_json.py Executable file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
CLI to send JSON over SPI to the ESP32-C3 SPI slave.
Examples:
./send_json.py --data '{"d":{"t":"b","br":128},"bar":{"pt":"off"}}'
./send_json.py --file payload.json
./send_json.py --beat --brightness 180 --delay 30 --pattern wave
"""
import argparse
import json
import os
import sys
# Ensure we can import the local test helper
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from spi_master_test import SPIMasterTest # noqa: E402
def build_payload_from_args(args: argparse.Namespace) -> dict:
"""Build a control payload matching the receiver format from args."""
if args.data:
try:
return json.loads(args.data)
except Exception as exc:
print(f"Invalid JSON in --data: {exc}")
sys.exit(2)
if args.file:
try:
with open(args.file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as exc:
print(f"Failed to read JSON file {args.file}: {exc}")
sys.exit(2)
# Build from individual params (beat or update)
msg_type = "b" if args.beat else "u"
defaults = {"t": msg_type}
if args.brightness is not None:
defaults["br"] = int(args.brightness)
if args.delay is not None:
defaults["dl"] = int(args.delay)
if args.n1 is not None:
defaults["n1"] = int(args.n1)
if args.n2 is not None:
defaults["n2"] = int(args.n2)
if args.n3 is not None:
defaults["n3"] = int(args.n3)
if args.step is not None:
defaults["s"] = int(args.step)
bar = {}
if args.pattern:
bar["pt"] = args.pattern
if args.name_value:
# Allow sending a specific bar (device) override by name
# Usage: --name-value mybar '{"br":200,"pt":"off"}'
try:
name, value = args.name_value
bar = json.loads(value)
return {"d": defaults, name: bar}
except Exception as exc:
print(f"Invalid --name-value: {exc}")
sys.exit(2)
# Default field name could be populated on receiver side via settings.get("name")
# We only send defaults and a generic "bar" override here for convenience.
return {"d": defaults, "bar": bar}
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Send JSON over SPI to ESP32-C3")
src = p.add_mutually_exclusive_group()
src.add_argument("--data", help="Raw JSON string to send")
src.add_argument("--file", help="Path to JSON file to send")
p.add_argument("--beat", action="store_true", help="Send as beat message (t=b)")
p.add_argument("--brightness", type=int, help="Brightness (br)")
p.add_argument("--delay", type=int, help="Delay (dl)")
p.add_argument("--pattern", help="Pattern name (pt)")
p.add_argument("--n1", type=int, help="n1 parameter")
p.add_argument("--n2", type=int, help="n2 parameter")
p.add_argument("--n3", type=int, help="n3 parameter")
p.add_argument("--step", type=int, help="step (s)")
p.add_argument("--name-value", nargs=2, metavar=("NAME", "JSON"), help="Send override under NAME key with JSON object value")
# SPI params
p.add_argument("--bus", type=int, default=0, help="SPI bus (default 0)")
p.add_argument("--device", type=int, default=0, help="SPI device/CE (default 0)")
p.add_argument("--speed", type=int, default=1_000_000, help="SPI speed Hz (default 1MHz)")
return p.parse_args()
def main() -> int:
args = parse_args()
payload = build_payload_from_args(args)
spi = SPIMasterTest(bus=args.bus, device=args.device, max_speed_hz=args.speed)
try:
spi.send_json(payload)
finally:
spi.cleanup()
return 0
if __name__ == "__main__":
raise SystemExit(main())

33
test/setup_spi.sh Executable file
View File

@@ -0,0 +1,33 @@
#!/bin/bash
# Setup script for SPI testing on Raspberry Pi
echo "Setting up SPI for testing..."
# Enable SPI interface
echo "Enabling SPI interface..."
sudo raspi-config nonint do_spi 0
# Install required packages
echo "Installing required packages..."
sudo apt update
sudo apt install -y python3-spidev python3-pip
# Install Python dependencies using pipenv
echo "Installing Python dependencies with pipenv..."
cd ..
pipenv install
# Check SPI devices
echo "Checking SPI devices..."
ls -la /dev/spi*
# Show GPIO configuration
echo "GPIO Configuration:"
echo " SCK: GPIO11 (Physical pin 23)"
echo " MISO: GPIO9 (Physical pin 21)"
echo " MOSI: GPIO10 (Physical pin 19)"
echo " CS: GPIO8 (Physical pin 24)"
echo "Setup complete!"
echo "Run send-json with:"
echo " pipenv run send-json --beat --brightness 128"

253
test/spi_master_test.py Normal file
View File

@@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Raspberry Pi SPI Master Test for ESP32-C3 SPI Slave
GPIO Configuration:
- SCK: GPIO11
- MISO: GPIO9
- MOSI: GPIO10
- CS: GPIO8
"""
import spidev
import time
import sys
import struct
import random
import json
class SPIMasterTest:
def __init__(self, bus=0, device=0, max_speed_hz=1000000):
"""
Initialize SPI master
Args:
bus: SPI bus number (0 for Raspberry Pi)
device: SPI device number (0 for CE0)
max_speed_hz: Maximum SPI clock speed
"""
self.spi = spidev.SpiDev()
self.bus = bus
self.device = device
self.max_speed_hz = max_speed_hz
try:
self.spi.open(bus, device)
self.spi.max_speed_hz = max_speed_hz
self.spi.mode = 0 # SPI Mode 0 (CPOL=0, CPHA=0)
self.spi.bits_per_word = 8
print(f"SPI Master initialized: Bus={bus}, Device={device}, Speed={max_speed_hz}Hz, Mode=0")
except Exception as e:
print(f"Failed to initialize SPI: {e}")
sys.exit(1)
def send_data(self, data, delay=0.1):
"""
Send data to SPI slave and receive response
Args:
data: List of bytes to send
delay: Delay between transactions (seconds)
Returns:
List of bytes received from slave
"""
try:
print(f"Sending {len(data)} bytes: {[hex(b) for b in data]}")
# Send data and receive response
response = self.spi.xfer2(data)
print(f"Received {len(response)} bytes: {[hex(b) for b in response]}")
if delay > 0:
time.sleep(delay)
return response
except Exception as e:
print(f"SPI transaction failed: {e}")
return []
def test_basic_communication(self):
"""Test basic SPI communication"""
print("\n=== Basic Communication Test ===")
# Test 1: Single byte
print("\nTest 1: Single byte")
response = self.send_data([0xAA])
# Test 2: Multiple bytes
print("\nTest 2: Multiple bytes")
response = self.send_data([0x01, 0x02, 0x03, 0x04])
# Test 3: Longer message
print("\nTest 3: Longer message")
test_data = [i for i in range(16)]
response = self.send_data(test_data)
return True
def test_data_patterns(self):
"""Test various data patterns"""
print("\n=== Data Pattern Tests ===")
patterns = [
([0x00] * 8, "All zeros"),
([0xFF] * 8, "All ones"),
([0x55] * 8, "Alternating 0x55"),
([0xAA] * 8, "Alternating 0xAA"),
([0x12, 0x34, 0x56, 0x78], "Incrementing pattern"),
([0x87, 0x65, 0x43, 0x21], "Decrementing pattern"),
]
for data, description in patterns:
print(f"\n{description}: {[hex(b) for b in data]}")
response = self.send_data(data)
time.sleep(0.2)
return True
def test_random_data(self, num_tests=5):
"""Test with random data"""
print(f"\n=== Random Data Tests ({num_tests} tests) ===")
for i in range(num_tests):
# Generate random data (1-32 bytes)
length = random.randint(1, 32)
data = [random.randint(0, 255) for _ in range(length)]
print(f"\nTest {i+1}: {length} random bytes")
response = self.send_data(data)
time.sleep(0.3)
return True
def test_espnow_trigger(self):
"""Test sending data that should trigger ESP-NOW broadcast"""
print("\n=== ESP-NOW Trigger Test ===")
# Send a recognizable pattern that the ESP32-C3 should broadcast
test_messages = [
[0x48, 0x65, 0x6C, 0x6C, 0x6F], # "Hello"
[0x54, 0x65, 0x73, 0x74], # "Test"
[0x45, 0x53, 0x50, 0x33, 0x32], # "ESP32"
[0x53, 0x50, 0x49, 0x5F, 0x54, 0x65, 0x73, 0x74], # "SPI_Test"
]
for i, data in enumerate(test_messages):
# Format bytes as readable string
readable = ''.join([chr(b) if 32 <= b <= 126 else f'\\x{b:02x}' for b in data])
print(f"\nMessage {i+1}: {readable}")
response = self.send_data(data, delay=0.5)
# Check if we received the expected test pattern response
if response and len(response) >= len(data):
print(f"Response matches test pattern: {response[:len(data)] == [i for i in range(len(data))]}")
return True
def send_json(self, payload: dict, ensure_ascii=False):
"""Send a JSON object over SPI as UTF-8 bytes.
The ESP32 will rebroadcast these bytes via ESP-NOW. Keep payload small
(< 190 bytes) to stay within radio payload limits on the ESP side.
"""
try:
json_str = json.dumps(payload, separators=(",",":"), ensure_ascii=ensure_ascii)
data_bytes = list(json_str.encode("utf-8"))
if len(data_bytes) == 0:
print("JSON payload empty; nothing to send")
return []
if len(data_bytes) > 240:
print(f"Warning: JSON length {len(data_bytes)} exceeds 240 bytes; truncating for demo")
data_bytes = data_bytes[:240]
print(f"Sending JSON ({len(data_bytes)} bytes): {json_str}")
resp = self.spi.xfer2(data_bytes)
print(f"Received {len(resp)} bytes")
return resp
except Exception as e:
print(f"Failed to send JSON: {e}")
return []
def test_stress(self, duration=10):
"""Stress test - continuous communication"""
print(f"\n=== Stress Test ({duration} seconds) ===")
start_time = time.time()
count = 0
try:
while time.time() - start_time < duration:
# Send random data
length = random.randint(1, 16)
data = [random.randint(0, 255) for _ in range(length)]
response = self.send_data(data, delay=0.01)
count += 1
if count % 100 == 0:
elapsed = time.time() - start_time
rate = count / elapsed
print(f"Transactions: {count}, Rate: {rate:.1f} tx/sec")
except KeyboardInterrupt:
print("\nStress test interrupted by user")
elapsed = time.time() - start_time
rate = count / elapsed if elapsed > 0 else 0
print(f"\nStress test completed: {count} transactions in {elapsed:.1f}s ({rate:.1f} tx/sec)")
return True
def cleanup(self):
"""Clean up SPI resources"""
try:
self.spi.close()
print("SPI connection closed")
except Exception as e:
print(f"Error closing SPI: {e}")
def main():
"""Main test function"""
print("Raspberry Pi SPI Master Test for ESP32-C3")
print("GPIO Configuration:")
print(" SCK: GPIO11")
print(" MISO: GPIO9")
print(" MOSI: GPIO10")
print(" CS: GPIO8")
print("=" * 50)
# Initialize SPI master
spi_test = SPIMasterTest(bus=0, device=0, max_speed_hz=1000000)
try:
# Run tests
spi_test.test_basic_communication()
spi_test.test_data_patterns()
spi_test.test_random_data(num_tests=3)
spi_test.test_espnow_trigger()
# Ask user if they want to run stress test
response = input("\nRun stress test? (y/n): ").lower().strip()
if response == 'y':
duration = input("Enter duration in seconds (default 10): ").strip()
try:
duration = int(duration) if duration else 10
except ValueError:
duration = 10
spi_test.test_stress(duration=duration)
print("\n" + "=" * 50)
print("All tests completed successfully!")
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
except Exception as e:
print(f"\nTest failed with error: {e}")
finally:
spi_test.cleanup()
if __name__ == "__main__":
main()