"""Linak BLE device""" import asyncio import logging import struct from bleak import BleakClient from bleak.backends.device import BLEDevice from bleak.exc import BleakError from bleak_retry_connector import establish_connection from .lock import ble_lock, GLOBAL_BLE_LOCK _LOGGER = logging.getLogger(__name__) UUID_DEVICE_NAME: str = "00002a00-0000-1000-8000-00805f9b34fb" UUID_HEIGHT: str = "99fa0021-338a-1024-8a49-009c0215f78a" UUID_COMMAND: str = "99fa0002-338a-1024-8a49-009c0215f78a" UUID_REFERENCE_INPUT: str = "99fa0031-338a-1024-8a49-009c0215f78a" UUID_ADV_SERVICE: str = "99fa0001-338a-1024-8a49-009c0215f78a" COMMAND_STOP = bytearray(struct.pack(" tuple[int, int]: """Parses and converts height and speed data""" raw_height, raw_speed = struct.unpack(" None: self._ble_device = device self._is_connected = False self._mac = device.address self._height = 620 self._speed = 0 def disconnected_callback(self, client): """Callback for when the device is disconnected""" _LOGGER.warning("Disconnected from %s", self._mac) self._is_connected = False async def _read_model(self): """Does something""" manufacturer_data = await self._connection.read_gatt_char(UUID_DEVICE_NAME) name = manufacturer_data.decode("utf-8") _LOGGER.debug("Read manufacturer data: %s", name) self._name = name @ble_lock async def connect(self, retry_attempts=4) -> None: """Connect to the device""" if self._is_connected or self._connection_lock.locked(): return async with self._connection_lock: try: _LOGGER.debug("Connecting to %s", self._mac) self._connection = await establish_connection( client_class=BleakClient, device=self._ble_device, name=self._mac, disconnected_callback=self.disconnected_callback, max_attempts=retry_attempts, use_services_cache=True, ) _LOGGER.debug("Listing device data") for service in self._connection.services: _LOGGER.debug("Service found: %s", service.uuid) for characteristic in service.characteristics: _LOGGER.debug( " Characteristing found: %s", characteristic.uuid ) await self._read_model() self._is_connected = True _LOGGER.debug("Connected to %s", self._mac) except BleakError as error: _LOGGER.error("Failed to connect to %s: %s", self._mac, error) self._is_connected = False @ble_lock async def disconnect(self) -> None: """Disconnect the device""" await self._write(UUID_COMMAND, COMMAND_STOP) await self._connection.disconnect() async def fetch_state(self) -> None: """Update the state of the device""" if not self._is_connected: await self.connect(retry_attempts=1) async with GLOBAL_BLE_LOCK: uuids = [UUID_HEIGHT] try: bytes_array: list[bytes] = await asyncio.gather( *[self._read(uuid) for uuid in uuids], return_exceptions=True ) for i, some_bytes in enumerate(bytes_array): uuid = uuids[i] # _LOGGER.debug("Received %s", some_bytes) if isinstance(some_bytes, BleakError): error: BleakError = BleakError(some_bytes) _LOGGER.error( "Error reading UUID %s: (%s) %s", uuid, type(some_bytes), error, ) elif uuid == UUID_HEIGHT: self._height, self._speed = parse_height_speed(some_bytes) except BleakError as error: if self._is_connected: raise error async def move_to_height(self, height: float) -> None: """Move to a specific height""" if height == self._height: _LOGGER.debug("Already at target height") return _LOGGER.debug("Moving to %f", height) await self._write(UUID_COMMAND, COMMAND_WAKEUP) await self._write(UUID_COMMAND, COMMAND_STOP) target_height = int((height - 620) * 10) encoded_height = struct.pack(" bytes: return await self._connection.read_gatt_char(uuid) async def _write(self, uuid: str, data: bytes) -> None: await self._connection.write_gatt_char(uuid, data) @property def is_connected(self) -> bool: """Returns whether the device is connected""" return self._is_connected @property def mac(self) -> str: """Return the MAC address of the device""" return self._mac @property def name(self) -> str: """Return device name""" return self._name @property def height(self) -> float: """Return current height in mm""" return self._height @property def speed(self) -> float: """Return current movement speed in mm/s""" return self._speed def update_ble_device(self, ble_device: BLEDevice) -> None: """Updates the cached BLEDevice for the device""" self._ble_device = ble_device