Source code for frame_msg.rx_auto_exp_result

import asyncio
import logging
import struct
from typing import Optional

from frame_msg import FrameMsg

logging.basicConfig()
_log = logging.getLogger("RxAutoExpResult")

[docs] class RxAutoExpResult: def __init__( self, msg_code: int = 0x11, ): """ Initialize receive handler for processing auto exposure result data. Args: msg_code: Message type identifier for auto exposure result data """ self.msg_code = msg_code self.queue: Optional[asyncio.Queue] = None
[docs] def handle_data(self, data: bytes) -> None: """ Process incoming data packets. Args: data: Bytes containing auto exposure result data with flag byte prefix and 16 floats """ if not self.queue: _log.warning("Received data but queue not initialized - call start() first") return # Parse six signed 16-bit integers from the data starting at offset 2 unpacked = struct.unpack("<ffffff ff ffff ffff", data[1:65]) # Create the result dictionary result = { 'error': unpacked[0], 'shutter': unpacked[1], 'analog_gain': unpacked[2], 'red_gain': unpacked[3], 'green_gain': unpacked[4], 'blue_gain': unpacked[5], 'brightness': { 'center_weighted_average': unpacked[6], 'scene': unpacked[7], 'matrix': { 'r': unpacked[8], 'g': unpacked[9], 'b': unpacked[10], 'average': unpacked[11] }, 'spot': { 'r': unpacked[12], 'g': unpacked[13], 'b': unpacked[14], 'average': unpacked[15] } } } # Queue the data asyncio.create_task(self.queue.put(result))
[docs] async def attach(self, frame: FrameMsg) -> asyncio.Queue: """ Attach the receive handler to the Frame data response and return a queue that will receive autoexposure result data. Returns: asyncio.Queue that will receive autoexposure result objects """ self.queue = asyncio.Queue() # subscribe for notifications frame.register_data_response_handler(self, [self.msg_code], self.handle_data) return self.queue
[docs] def detach(self, frame: FrameMsg) -> None: """Detach the receive handler from the Frame data response and clean up resources""" frame.unregister_data_response_handler(self) self.queue = None