import asyncio
import logging
import math
import struct
from typing import Optional, Tuple
from dataclasses import dataclass
from frame_msg import FrameMsg
logging.basicConfig()
_log = logging.getLogger("RxIMU")
[docs]
class SensorBuffer:
"""Buffer class to provide smoothed moving average of samples"""
def __init__(self, max_size: int):
self.max_size = max_size
self._buffer: list[Tuple[int, int, int]] = []
[docs]
def add(self, value: Tuple[int, int, int]) -> None:
self._buffer.append(value)
if len(self._buffer) > self.max_size:
self._buffer.pop(0)
@property
def average(self) -> Tuple[int, int, int]:
if not self._buffer:
return (0, 0, 0)
sum_x = sum(x for x, _, _ in self._buffer)
sum_y = sum(y for _, y, _ in self._buffer)
sum_z = sum(z for _, _, z in self._buffer)
length = len(self._buffer)
return (
sum_x // length,
sum_y // length,
sum_z // length
)
[docs]
@dataclass
class IMURawData:
compass: Tuple[int, int, int]
accel: Tuple[int, int, int]
[docs]
@dataclass
class IMUData:
compass: Tuple[int, int, int]
accel: Tuple[int, int, int]
raw: Optional[IMURawData] = None
@property
def pitch(self) -> float:
return math.atan2(self.accel[1], self.accel[2]) * 180.0 / math.pi
@property
def roll(self) -> float:
return math.atan2(self.accel[0], self.accel[2]) * 180.0 / math.pi
[docs]
class RxIMU:
def __init__(
self,
imu_flag: int = 0x0A,
smoothing_samples: int = 1,
):
"""
Initialize IMU handler for processing magnetometer and accelerometer data.
Args:
imu_flag: Message type identifier for IMU data
smoothing_samples: Number of samples to use for moving average
"""
self.imu_flag = imu_flag
self._smoothing_samples = smoothing_samples
self.queue: Optional[asyncio.Queue] = None
self._compass_buffer = SensorBuffer(smoothing_samples)
self._accel_buffer = SensorBuffer(smoothing_samples)
[docs]
def handle_data(self, data: bytes) -> None:
"""
Process incoming IMU data packets.
Args:
data: Bytes containing IMU data with flag byte prefix
"""
if not self.queue:
_log.warning("Received data but queue not initialized - call start() first")
return
# Parse six signed 16-bit integers from the data starting at offset 2
values = struct.unpack('<6h', data[2:14])
# Extract compass and accelerometer values
raw_compass = (values[0], values[1], values[2])
raw_accel = (values[3], values[4], values[5])
# Add to buffers
self._compass_buffer.add(raw_compass)
self._accel_buffer.add(raw_accel)
# Create IMU data with smoothed and raw values
imu_data = IMUData(
compass=self._compass_buffer.average,
accel=self._accel_buffer.average,
raw=IMURawData(
compass=raw_compass,
accel=raw_accel
)
)
# Queue the data
asyncio.create_task(self.queue.put(imu_data))
[docs]
async def attach(self, frame: FrameMsg) -> asyncio.Queue:
"""
Attach the IMU handler to the Frame data response and return a queue that will receive IMU data.
Returns:
asyncio.Queue that will receive IMUData objects
"""
self.queue = asyncio.Queue()
# subscribe for notifications
frame.register_data_response_handler(self, [self.imu_flag], self.handle_data)
return self.queue
[docs]
def detach(self, frame: FrameMsg) -> None:
"""Detach the IMU handler from the Frame data response and clean up resources"""
frame.unregister_data_response_handler(self)
self.queue = None