Source code for frame_msg.rx_metering_data

import asyncio
import logging
import struct
from typing import Optional

from frame_msg import FrameMsg

logging.basicConfig()
_log = logging.getLogger("RxMeteringData")

[docs] class RxMeteringData: def __init__( self, msg_code: int = 0x12, ): """ Initialize receive handler for processing metering data. Args: msg_code: Message type identifier for metering data """ self.msg_code = msg_code self.queue: Optional[asyncio.Queue] = None
[docs] def handle_data(self, data: bytes) -> None: """ Process incoming data packets. Args: data: Bytes containing metering data with flag byte prefix and 6 unsigned bytes (spot r,g,b, matrix r,g,b) """ if not self.queue: _log.warning("Received data but queue not initialized - call start() first") return # Parse six unsigned bytes from the data starting at offset 2 unpacked = struct.unpack("<BBBBBB", data[1:7]) # Create the result dictionary result = { 'spot_r': unpacked[0], 'spot_g': unpacked[1], 'spot_b': unpacked[2], 'matrix_r': unpacked[3], 'matrix_g': unpacked[4], 'matrix_b': unpacked[5], } # Queue the data asyncio.create_task(self.queue.put(result))
[docs] async def attach(self, frame: FrameMsg) -> asyncio.Queue: """ Attach the receive handler to the Frame data response and return a queue that will receive metering data. Returns: asyncio.Queue that will receive metering data objects """ self.queue = asyncio.Queue() # subscribe for notifications frame.register_data_response_handler(self, [self.msg_code], self.handle_data) return self.queue
[docs] def detach(self, frame: FrameMsg) -> None: """Detach the receive handler from the Frame data response and clean up resources""" frame.unregister_data_response_handler(self) self.queue = None