ha-linak-desk/custom_components/linak_desk/api/device.py

223 lines
7.3 KiB
Python
Raw Normal View History

2023-03-17 18:26:04 +01:00
"""Linak BLE device"""
import asyncio
import logging
import struct
from bleak import BleakClient
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from bleak_retry_connector import establish_connection
from .lock import ble_lock, GLOBAL_BLE_LOCK
_LOGGER = logging.getLogger(__name__)
UUID_DEVICE_NAME: str = "00002a00-0000-1000-8000-00805f9b34fb"
UUID_HEIGHT: str = "99fa0021-338a-1024-8a49-009c0215f78a"
UUID_COMMAND: str = "99fa0002-338a-1024-8a49-009c0215f78a"
UUID_REFERENCE_INPUT: str = "99fa0031-338a-1024-8a49-009c0215f78a"
UUID_ADV_SERVICE: str = "99fa0001-338a-1024-8a49-009c0215f78a"
COMMAND_STOP = bytearray(struct.pack("<H", 255))
COMMAND_WAKEUP = bytearray(struct.pack("<H", 254))
# Listing device data
# Service found: 00001800-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a00-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a01-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a04-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002aa6-0000-1000-8000-00805f9b34fb
# Service found: 00001801-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a05-0000-1000-8000-00805f9b34fb
# Service found: 99fa0001-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0002-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0003-338a-1024-8a49-009c0215f78a
# Service found: 99fa0010-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0011-338a-1024-8a49-009c0215f78a
# Service found: 99fa0020-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0021-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0029-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa002a-338a-1024-8a49-009c0215f78a
# Service found: 99fa0030-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0031-338a-1024-8a49-009c0215f78a
def parse_height_speed(value: bytes) -> tuple[int, int]:
"""Parses and converts height and speed data"""
raw_height, raw_speed = struct.unpack("<Hh", value)
height = (raw_height / 10) + 620
speed = raw_speed / 100
return (height, speed)
class Device:
"""Bluetooth stuff"""
_ble_device: BLEDevice
_connection: BleakClient
_connection_lock = asyncio.Lock()
_is_connected: bool
_height: float
_speed: float
_name: str
def __init__(self, device: BLEDevice) -> None:
self._ble_device = device
self._is_connected = False
self._mac = device.address
self._height = 620
self._speed = 0
def disconnected_callback(self, client):
"""Callback for when the device is disconnected"""
_LOGGER.warning("Disconnected from %s", self._mac)
self._is_connected = False
async def _read_model(self):
"""Does something"""
manufacturer_data = await self._connection.read_gatt_char(UUID_DEVICE_NAME)
name = manufacturer_data.decode("utf-8")
_LOGGER.debug("Read manufacturer data: %s", name)
self._name = name
@ble_lock
async def connect(self, retry_attempts=4) -> None:
"""Connect to the device"""
if self._is_connected or self._connection_lock.locked():
return
async with self._connection_lock:
try:
_LOGGER.debug("Connecting to %s", self._mac)
self._connection = await establish_connection(
client_class=BleakClient,
device=self._ble_device,
name=self._mac,
disconnected_callback=self.disconnected_callback,
max_attempts=retry_attempts,
use_services_cache=True,
)
_LOGGER.debug("Listing device data")
for service in self._connection.services:
_LOGGER.debug("Service found: %s", service.uuid)
for characteristic in service.characteristics:
_LOGGER.debug(
" Characteristing found: %s", characteristic.uuid
)
await self._read_model()
self._is_connected = True
_LOGGER.debug("Connected to %s", self._mac)
except BleakError as error:
_LOGGER.error("Failed to connect to %s: %s", self._mac, error)
self._is_connected = False
@ble_lock
async def disconnect(self) -> None:
"""Disconnect the device"""
await self._write(UUID_COMMAND, COMMAND_STOP)
await self._connection.disconnect()
async def fetch_state(self) -> None:
"""Update the state of the device"""
if not self._is_connected:
await self.connect(retry_attempts=1)
async with GLOBAL_BLE_LOCK:
uuids = [UUID_HEIGHT]
try:
bytes_array: list[bytes] = await asyncio.gather(
*[self._read(uuid) for uuid in uuids], return_exceptions=True
)
for i, some_bytes in enumerate(bytes_array):
uuid = uuids[i]
# _LOGGER.debug("Received %s", some_bytes)
if isinstance(some_bytes, BleakError):
error: BleakError = BleakError(some_bytes)
_LOGGER.error(
"Error reading UUID %s: (%s) %s",
uuid,
type(some_bytes),
error,
)
elif uuid == UUID_HEIGHT:
self._height, self._speed = parse_height_speed(some_bytes)
except BleakError as error:
if self._is_connected:
raise error
async def move_to_height(self, height: float) -> None:
"""Move to a specific height"""
if height == self._height:
_LOGGER.debug("Already at target height")
return
_LOGGER.debug("Moving to %f", height)
await self._write(UUID_COMMAND, COMMAND_WAKEUP)
await self._write(UUID_COMMAND, COMMAND_STOP)
target_height = int((height - 620) * 10)
encoded_height = struct.pack("<H", target_height)
while True:
await self._write(UUID_REFERENCE_INPUT, encoded_height)
await asyncio.sleep(0.5)
_height, speed = parse_height_speed(await self._read(UUID_HEIGHT))
if speed == 0:
break
async def _read(self, uuid: str) -> bytes:
return await self._connection.read_gatt_char(uuid)
async def _write(self, uuid: str, data: bytes) -> None:
await self._connection.write_gatt_char(uuid, data)
@property
def is_connected(self) -> bool:
"""Returns whether the device is connected"""
return self._is_connected
@property
def mac(self) -> str:
"""Return the MAC address of the device"""
return self._mac
@property
def name(self) -> str:
"""Return device name"""
return self._name
@property
def height(self) -> float:
"""Return current height in mm"""
return self._height
@property
def speed(self) -> float:
"""Return current movement speed in mm/s"""
return self._speed
def update_ble_device(self, ble_device: BLEDevice) -> None:
"""Updates the cached BLEDevice for the device"""
self._ble_device = ble_device