Source code for frame_msg.rx_tap

import asyncio
import logging
import time
from typing import Optional
from frame_msg import FrameMsg

logging.basicConfig()
_log = logging.getLogger("RxTap")

[docs] class RxTap: def __init__(self, tap_flag: int = 0x09, threshold: float = 0.3): """ Initialize a tap handler that aggregates taps within a threshold window. Args: tap_flag: The message type identifier for tap events (default: 0x09) threshold: Time window in seconds to aggregate taps (default: 0.3) """ self.tap_flag = tap_flag self.threshold = threshold self.queue: Optional[asyncio.Queue] = None self._last_tap_time = 0 self._tap_count = 0 self._threshold_task: Optional[asyncio.Task] = None async def _reset_threshold_timer(self) -> None: """Cancel existing threshold timer and start a new one""" if self._threshold_task and not self._threshold_task.done(): self._threshold_task.cancel() self._threshold_task = asyncio.create_task(self._threshold_timeout()) async def _threshold_timeout(self) -> None: """Handle threshold timer expiration by sending tap count to queue""" try: await asyncio.sleep(self.threshold) if self.queue and self._tap_count > 0: await self.queue.put(self._tap_count) self._tap_count = 0 except asyncio.CancelledError: pass
[docs] def handle_data(self, data: bytes) -> None: """ Process an incoming Tap message Args: data: A single byte with the tap_flag prefix """ if not self.queue: _log.warning("Received data but queue not initialized - call start() first") return current_time = time.time() # Debounce taps that occur too close together (40ms) if current_time - self._last_tap_time < 0.04: self._last_tap_time = current_time return self._last_tap_time = current_time self._tap_count += 1 # Reset the threshold timer asyncio.create_task(self._reset_threshold_timer())
[docs] async def attach(self, frame: FrameMsg) -> asyncio.Queue: """ Attach the tap handler to the Frame data response and return a queue that will receive tap counts. Returns: asyncio.Queue that will receive integers representing tap counts """ self.queue = asyncio.Queue() self._last_tap_time = 0 self._tap_count = 0 # subscribe for notifications frame.register_data_response_handler(self, [self.tap_flag], self.handle_data) return self.queue
[docs] def detach(self, frame: FrameMsg) -> None: """Detach the tap handler from the Frame data response and clean up resources""" frame.unregister_data_response_handler(self) if self._threshold_task and not self._threshold_task.done(): self._threshold_task.cancel() self.queue = None self._tap_count = 0