Add IMU telemetry pipeline and live sensor UI.

Wire LSM6DS3 readings into the runtime telemetry stream, expose them over web endpoints, and render live voltage/IMU data in the dashboard with websocket updates.

Made-with: Cursor
This commit is contained in:
2026-04-01 23:04:41 +13:00
parent d0d73b422d
commit 397d48a43a
10 changed files with 948 additions and 479 deletions

View File

@@ -12,4 +12,4 @@ watchfiles = "*"
[dev-packages] [dev-packages]
[requires] [requires]
python_version = "3.12" python_version = "3"

915
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

64
lib/lsm6ds3.py Normal file
View File

@@ -0,0 +1,64 @@
"""LSM6DS3 accelerometer / gyroscope over I2C (MicroPython)."""
import struct
WHO_AM_I_REG = 0x0F
WHO_AM_I_LSM6DS3 = 0x69
CTRL1_XL = 0x10
CTRL2_G = 0x11
OUT_TEMP_L = 0x20
# Default CTRL1_XL = 0x60: 104 Hz ODR, ±2 g, anti-aliasing filter BW ~100 Hz
# Default CTRL2_G = 0x40: 104 Hz ODR, 245 dps full scale
_DEFAULT_XL = b"\x60"
_DEFAULT_G = b"\x40"
# LSB scaling for default full-scale settings above
_G_PER_LSB = 0.000061
_DPS_PER_LSB = 0.00875
def is_lsm6ds3(who_am_i: int) -> bool:
return who_am_i == WHO_AM_I_LSM6DS3
class LSM6DS3:
def __init__(self, i2c, addr=0x6A):
self.i2c = i2c
self.addr = addr
def who_am_i(self) -> int:
return self.i2c.readfrom_mem(self.addr, WHO_AM_I_REG, 1)[0]
def configure(self, ctrl1_xl=_DEFAULT_XL, ctrl2_g=_DEFAULT_G) -> None:
self.i2c.writeto_mem(self.addr, CTRL1_XL, ctrl1_xl)
self.i2c.writeto_mem(self.addr, CTRL2_G, ctrl2_g)
def read_raw(self):
"""Returns (temp_raw, gx, gy, gz, ax, ay, az) as signed 16-bit values."""
data = self.i2c.readfrom_mem(self.addr, OUT_TEMP_L, 14)
return struct.unpack("<hhhhhhh", data)
def read(self):
"""
Scaled readings for default ±2 g / 245 dps configuration.
Returns:
temp_c: float, degrees C
accel_g: (ax, ay, az) in g
gyro_dps: (gx, gy, gz) in degrees per second
"""
temp, gx, gy, gz, ax, ay, az = self.read_raw()
temp_c = 25.0 + temp / 16.0
accel_g = (
ax * _G_PER_LSB,
ay * _G_PER_LSB,
az * _G_PER_LSB,
)
gyro_dps = (
gx * _DPS_PER_LSB,
gy * _DPS_PER_LSB,
gz * _DPS_PER_LSB,
)
return temp_c, accel_g, gyro_dps

View File

@@ -1,5 +1,7 @@
import asyncio import asyncio
import aioespnow import aioespnow
import math
import random
from settings import Settings from settings import Settings
from web import web from web import web
from patterns import Patterns from patterns import Patterns
@@ -10,7 +12,8 @@ import time
import wifi import wifi
import json import json
from p2p import p2p from p2p import p2p
from machine import ADC, Pin from machine import ADC, Pin, I2C
from lsm6ds3 import LSM6DS3, is_lsm6ds3
async def main(): async def main():
settings = Settings() settings = Settings()
@@ -31,12 +34,15 @@ async def main():
await asyncio.sleep_ms(0) await asyncio.sleep_ms(0)
async def system(): async def system():
adc = ADC(Pin(2), atten=ADC.ATTN_11DB) adc = ADC(Pin(0), atten=ADC.ATTN_11DB)
while True: while True:
gc.collect() gc.collect()
for i in range(60): for i in range(60):
wdt.feed() wdt.feed()
pin_uv = adc.read_uv()
voltage = pin_uv * 2 / 1_000_000 # 1:2 divider: V_source = 2 × V_pin
telemetry["voltage"] = voltage
print("%.3f V" % voltage)
await asyncio.sleep(1) await asyncio.sleep(1)
async def sw(): async def sw():
@@ -64,8 +70,108 @@ async def main():
patterns.select("off") patterns.select("off")
await asyncio.sleep_ms(200) await asyncio.sleep_ms(200)
telemetry = {
"voltage": 0.0,
"imu": {
"ok": False,
"temp": 0.0,
"ax": 0.0,
"ay": 0.0,
"az": 0.0,
"gx": 0.0,
"gy": 0.0,
"gz": 0.0,
"roll_deg": 0.0,
"pitch_deg": 0.0,
"yaw_deg": 0.0,
},
}
w = web(settings, patterns) imu_sensor = None
try:
i2c_imu = I2C(0, scl=Pin(23), sda=Pin(22), freq=400000)
for addr in (0x6A, 0x6B):
try:
cand = LSM6DS3(i2c_imu, addr)
if is_lsm6ds3(cand.who_am_i()):
cand.configure()
utime.sleep_ms(50)
imu_sensor = cand
print("LSM6DS3 at", hex(addr))
break
except OSError:
pass
except Exception as e:
print("IMU init failed:", e)
async def imu_loop():
im = telemetry["imu"]
prev_ms = utime.ticks_ms()
last_gyro_jolt_ms = 0
# Integrated rotation this sample: max(|ω|) * dt ≥ threshold (deg)
gyro_jolt_deg_per_interval = 22.0
gyro_jolt_cooldown_ms = 700
def wrap_angle_deg(y):
while y > 180.0:
y -= 360.0
while y <= -180.0:
y += 360.0
return y
def random_bright_rgb():
return (
random.randint(48, 255),
random.randint(48, 255),
random.randint(48, 255),
)
while True:
wdt.feed()
now = utime.ticks_ms()
dt_ms = utime.ticks_diff(now, prev_ms)
prev_ms = now
if dt_ms < 0:
dt_ms = 200
dt_s = max(dt_ms / 1000.0, 0.001)
try:
t, ag, gd = imu_sensor.read()
im["ok"] = True
im["temp"] = t
im["ax"], im["ay"], im["az"] = ag
im["gx"], im["gy"], im["gz"] = gd
ax, ay, az = ag
im["roll_deg"] = math.degrees(math.atan2(ay, az))
pitch = math.degrees(
math.atan2(-ax, math.sqrt(ay * ay + az * az))
)
im["pitch_deg"] = pitch
gx, gy, gz = gd
im["yaw_deg"] = wrap_angle_deg(im["yaw_deg"] + gz * dt_s)
omega_max = max(abs(gx), abs(gy), abs(gz))
spin_deg = omega_max * dt_s
if (
spin_deg >= gyro_jolt_deg_per_interval
and utime.ticks_diff(now, last_gyro_jolt_ms)
>= gyro_jolt_cooldown_ms
):
last_gyro_jolt_ms = now
patterns.set_color1(random_bright_rgb())
patterns.set_color2(random_bright_rgb())
patterns.sync()
print(
"gyro jolt %.1f deg (|ω|max=%.0f dps) -> new colors"
% (spin_deg, omega_max)
)
except OSError:
im["ok"] = False
except Exception as e:
im["ok"] = False
print("IMU read error:", e)
await asyncio.sleep_ms(200)
w = web(settings, patterns, telemetry)
print(settings) print(settings)
# start the server in a bacakground task # start the server in a bacakground task
print("Starting") print("Starting")
@@ -77,6 +183,8 @@ async def main():
asyncio.create_task(p2p(settings, patterns)) asyncio.create_task(p2p(settings, patterns))
asyncio.create_task(system()) asyncio.create_task(system())
asyncio.create_task(sw()) asyncio.create_task(sw())
if imu_sensor is not None:
asyncio.create_task(imu_loop())
# cleanup before ending the application # cleanup before ending the application
await server await server

View File

@@ -9,6 +9,45 @@ h1 {
text-align: center; text-align: center;
} }
.sensor-section {
margin: 1.25rem 0;
padding: 1rem 1.25rem;
border: 1px solid #c5c5c5;
border-radius: 10px;
background: #f4f4f4;
box-sizing: border-box;
}
.sensor-section h2 {
margin: 0 0 0.35rem 0;
font-size: 1.15rem;
text-align: left;
}
.sensor-hint {
margin: 0 0 0.75rem 0;
font-size: 0.85rem;
color: #555;
}
.sensor-section .telemetry p {
margin: 0.4rem 0;
}
.sensor-section .imu-block {
margin-top: 0.5rem;
padding-top: 0.5rem;
border-top: 1px solid #ddd;
}
.sensor-section .yaw-note {
display: block;
font-size: 0.8rem;
font-weight: normal;
color: #666;
margin-top: 0.2rem;
}
form { form {
margin-bottom: 20px; margin-bottom: 20px;
} }

View File

@@ -5,6 +5,52 @@ let color2Timeout;
let ws; // Variable to hold the WebSocket connection let ws; // Variable to hold the WebSocket connection
let connectionStatusElement; // Variable to hold the connection status element let connectionStatusElement; // Variable to hold the connection status element
function setTelemetryText(id, text) {
const el = document.getElementById(id);
if (el) el.textContent = text;
}
function applyTelemetryDisplay(data) {
if (typeof data.voltage === "number") {
setTelemetryText("voltage", data.voltage.toFixed(3));
}
const imu = data.imu;
const dash = "—";
const imuLive =
imu &&
(imu.ok === true ||
imu.ok === 1 ||
imu.ok === "1" ||
imu.ok === "true");
if (imuLive) {
const fmt = (v, d) =>
typeof v === "number" && !Number.isNaN(v) ? v.toFixed(d) : dash;
setTelemetryText("imu-temp", fmt(imu.temp, 2));
setTelemetryText("imu-ax", fmt(imu.ax, 3));
setTelemetryText("imu-ay", fmt(imu.ay, 3));
setTelemetryText("imu-az", fmt(imu.az, 3));
setTelemetryText("imu-gx", fmt(Math.abs(imu.gx), 2));
setTelemetryText("imu-gy", fmt(Math.abs(imu.gy), 2));
setTelemetryText("imu-gz", fmt(Math.abs(imu.gz), 2));
setTelemetryText("imu-roll", fmt(imu.roll_deg, 1));
setTelemetryText("imu-pitch", fmt(imu.pitch_deg, 1));
setTelemetryText("imu-yaw", fmt(imu.yaw_deg, 1));
} else {
setTelemetryText("imu-temp", dash);
[
"imu-ax",
"imu-ay",
"imu-az",
"imu-gx",
"imu-gy",
"imu-gz",
"imu-roll",
"imu-pitch",
"imu-yaw",
].forEach((id) => setTelemetryText(id, dash));
}
}
// Function to update the connection status indicator // Function to update the connection status indicator
function updateConnectionStatus(status) { function updateConnectionStatus(status) {
if (!connectionStatusElement) { if (!connectionStatusElement) {
@@ -20,8 +66,8 @@ function updateConnectionStatus(status) {
// Function to establish WebSocket connection // Function to establish WebSocket connection
function connectWebSocket() { function connectWebSocket() {
// Determine the WebSocket URL based on the current location const proto = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsUrl = `ws://${window.location.host}/ws`; const wsUrl = `${proto}//${window.location.host}/ws`;
ws = new WebSocket(wsUrl); ws = new WebSocket(wsUrl);
updateConnectionStatus("connecting"); // Indicate connecting state updateConnectionStatus("connecting"); // Indicate connecting state
@@ -33,7 +79,15 @@ function connectWebSocket() {
}; };
ws.onmessage = function (event) { ws.onmessage = function (event) {
console.log("WebSocket message received:", event.data); let msg;
try {
msg = JSON.parse(event.data);
} catch {
return;
}
if (msg && msg._t === "telemetry") {
applyTelemetryDisplay(msg);
}
}; };
ws.onerror = function (event) { ws.onerror = function (event) {
@@ -86,18 +140,6 @@ async function post(path, data) {
} }
} }
async function get(path) {
try {
const response = await fetch(path);
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
return await response.json();
} catch (error) {
console.error("Error during GET request:", error);
}
}
async function updateColor(event) { async function updateColor(event) {
event.preventDefault(); event.preventDefault();
clearTimeout(colorTimeout); clearTimeout(colorTimeout);
@@ -220,7 +262,6 @@ document.addEventListener("DOMContentLoaded", async function () {
document.getElementById("rgb").addEventListener("change", handleRadioChange); document.getElementById("rgb").addEventListener("change", handleRadioChange);
document.getElementById("rbg").addEventListener("change", handleRadioChange); document.getElementById("rbg").addEventListener("change", handleRadioChange);
document.querySelectorAll(".pattern_button").forEach((button) => { document.querySelectorAll(".pattern_button").forEach((button) => {
console.log(button.value);
button.addEventListener("click", async (event) => { button.addEventListener("click", async (event) => {
event.preventDefault(); event.preventDefault();
await updatePattern(button.value); await updatePattern(button.value);

View File

@@ -10,6 +10,43 @@
</head> </head>
<body> <body>
<h1>{{settings['name']}}</h1> <h1>{{settings['name']}}</h1>
<section class="sensor-section" id="sensor-section" aria-labelledby="sensor-heading">
<h2 id="sensor-heading">Sensors</h2>
<p class="sensor-hint">
Live readings (WebSocket). Roll/pitch use gravity (accel). Yaw is integrated from the
gyro (relative, drifts; not compass heading unless you add a magnetometer).
</p>
<div class="telemetry">
<p class="voltage"><strong>Voltage:</strong> <span id="voltage"></span> V</p>
<div id="imu-block" class="imu-block">
<p><strong>IMU temp:</strong> <span id="imu-temp"></span> °C</p>
<p>
<strong>Accel (g):</strong>
x <span id="imu-ax"></span>,
y <span id="imu-ay"></span>,
z <span id="imu-az"></span>
</p>
<p>
<strong>Gyro (abs °/s):</strong>
<span id="imu-gx"></span>,
<span id="imu-gy"></span>,
<span id="imu-gz"></span>
</p>
<p class="tilt-line">
<strong>Roll (°):</strong> <span id="imu-roll"></span>
</p>
<p class="tilt-line">
<strong>Pitch (°):</strong> <span id="imu-pitch"></span>
</p>
<p class="tilt-line">
<strong>Yaw (°):</strong> <span id="imu-yaw"></span>
<span class="yaw-note">gyro ∫z, 180…180</span>
</p>
</div>
</div>
</section>
<button onclick="selectControls()">Controls</button> <button onclick="selectControls()">Controls</button>
<button onclick="selectSettings()">Settings</button> <button onclick="selectSettings()">Settings</button>

View File

@@ -1,14 +1,43 @@
import asyncio
import utime
from microdot import Microdot, send_file, Response from microdot import Microdot, send_file, Response
from microdot.utemplate import Template from microdot.utemplate import Template
from microdot.websocket import with_websocket from microdot.websocket import WebSocketError, with_websocket
import machine import machine
import wifi import wifi
import json import json
def web(settings, patterns):
def _telemetry_snapshot(telemetry):
"""Flat scalars for JSON (MicroPython-safe); imu.ok as 0/1 for clients."""
im = telemetry["imu"]
return {
"voltage": float(telemetry["voltage"]),
"imu": {
"ok": 1 if im.get("ok") else 0,
"temp": float(im["temp"]),
"ax": float(im["ax"]),
"ay": float(im["ay"]),
"az": float(im["az"]),
"gx": float(im["gx"]),
"gy": float(im["gy"]),
"gz": float(im["gz"]),
"roll_deg": float(im.get("roll_deg", 0.0)),
"pitch_deg": float(im.get("pitch_deg", 0.0)),
"yaw_deg": float(im.get("yaw_deg", 0.0)),
},
}
def web(settings, patterns, telemetry):
app = Microdot() app = Microdot()
Response.default_content_type = 'text/html' Response.default_content_type = 'text/html'
@app.route("/api/telemetry")
def telemetry_handler(request):
return Response(_telemetry_snapshot(telemetry))
@app.route('/') @app.route('/')
async def index_hnadler(request): async def index_hnadler(request):
mac = wifi.get_mac().hex() mac = wifi.get_mac().hex()
@@ -30,14 +59,63 @@ def web(settings, patterns):
@app.route("/ws") @app.route("/ws")
@with_websocket @with_websocket
async def ws(request, ws): async def ws(request, ws):
while True: # One coroutine only: a background send task interleaves badly with
data = await ws.receive() # receive() on ESP32. Use short wait_for slices so we keep receiving
if data: # client commands and still push telemetry on an interval.
push_every_ms = 1000
recv_slice_s = 0.2
last_push = utime.ticks_add(utime.ticks_ms(), -push_every_ms)
# Process the received data def telemetry_payload():
_, status_code = settings.set_settings(json.loads(data), patterns, True) try:
#await ws.send(status_code) snap = _telemetry_snapshot(telemetry)
else: snap["_t"] = "telemetry"
return json.dumps(snap)
except Exception:
return json.dumps(
{
"_t": "telemetry",
"voltage": 0.0,
"imu": {
"ok": 0,
"temp": 0.0,
"ax": 0.0,
"ay": 0.0,
"az": 0.0,
"gx": 0.0,
"gy": 0.0,
"gz": 0.0,
"roll_deg": 0.0,
"pitch_deg": 0.0,
"yaw_deg": 0.0,
},
}
)
while True:
try:
data = await asyncio.wait_for(ws.receive(), recv_slice_s)
except asyncio.TimeoutError:
data = None
except WebSocketError:
break break
if data:
try:
msg = json.loads(data)
except (ValueError, TypeError):
msg = None
if isinstance(msg, dict) and msg.get("_t") != "telemetry":
settings.set_settings(msg, patterns, True)
now = utime.ticks_ms()
if utime.ticks_diff(now, last_push) >= push_every_ms:
if ws.closed:
break
try:
await ws.send(telemetry_payload())
except (OSError, WebSocketError):
break
last_push = now
return app return app

9
test/leds.py Normal file
View File

@@ -0,0 +1,9 @@
from machine import Pin
from neopixel import NeoPixel
led = NeoPixel(Pin(18, Pin.OUT), 10)
led.fill((255, 0, 0))
led.write()

74
test/lsm6ds3_test.py Normal file
View File

@@ -0,0 +1,74 @@
from machine import Pin, I2C
import time
from lsm6ds3 import LSM6DS3, is_lsm6ds3
# XIAO ESP32C3 default I2C pins: SDA=GPIO6 (D4), SCL=GPIO7 (D5)
i2c = I2C(0, scl=Pin(23), sda=Pin(22), freq=400000)
def scan_i2c():
devices = i2c.scan()
if not devices:
print("No I2C devices found")
else:
print("I2C devices found:", [hex(d) for d in devices])
def read_who_am_i(addr):
try:
sensor = LSM6DS3(i2c, addr)
who = sensor.who_am_i()
print("WHO_AM_I at", hex(addr), "=", hex(who))
if is_lsm6ds3(who):
print("Looks like an LSM6DS3")
else:
print("Unexpected WHO_AM_I value")
except OSError as e:
print("Failed to read WHO_AM_I from", hex(addr), "error:", e)
def basic_test():
"""Run an I2C scan and WHO_AM_I check on common LSM6DS3 addresses."""
scan_i2c()
for addr in (0x6A, 0x6B):
read_who_am_i(addr)
def configure_and_read(addr=0x6A):
"""Configure accel/gyro and continuously print readings."""
sensor = LSM6DS3(i2c, addr)
sensor.configure()
time.sleep_ms(100)
while True:
try:
temp_c, accel_g, gyro_dps = sensor.read()
ax_g, ay_g, az_g = accel_g
gx_dps, gy_dps, gz_dps = gyro_dps
print("Temp: {:.2f} C".format(temp_c))
print(
"Accel g: ax={:.3f}, ay={:.3f}, az={:.3f}".format(
ax_g, ay_g, az_g
)
)
print(
"Gyro dps: gx={:.2f}, gy={:.2f}, gz={:.2f}".format(
gx_dps, gy_dps, gz_dps
)
)
print("----")
except OSError as e:
print("I2C read error:", e)
time.sleep(0.1)
if __name__ == "__main__":
# First run a basic scan + WHO_AM_I test.
basic_test()
# Uncomment this to go straight into continuous sensor reading once
# you know the correct I2C address for your board (0x6A or 0x6B).
configure_and_read(addr=0x6B)