223 lines
7.4 KiB
223 lines
7.4 KiB
"""Linak BLE device"""
import asyncio
import logging
import struct
from bleak import BleakClient
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from bleak_retry_connector import establish_connection
from .lock import ble_lock, GLOBAL_BLE_LOCK
_LOGGER = logging.getLogger(__name__)
UUID_DEVICE_NAME: str = "00002a00-0000-1000-8000-00805f9b34fb"
UUID_HEIGHT: str = "99fa0021-338a-1024-8a49-009c0215f78a"
UUID_COMMAND: str = "99fa0002-338a-1024-8a49-009c0215f78a"
UUID_REFERENCE_INPUT: str = "99fa0031-338a-1024-8a49-009c0215f78a"
UUID_ADV_SERVICE: str = "99fa0001-338a-1024-8a49-009c0215f78a"
COMMAND_STOP = bytearray(struct.pack("<H", 255))
COMMAND_WAKEUP = bytearray(struct.pack("<H", 254))
# Listing device data
# Service found: 00001800-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a00-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a01-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a04-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002aa6-0000-1000-8000-00805f9b34fb
# Service found: 00001801-0000-1000-8000-00805f9b34fb
# Characteristing found: 00002a05-0000-1000-8000-00805f9b34fb
# Service found: 99fa0001-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0002-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0003-338a-1024-8a49-009c0215f78a
# Service found: 99fa0010-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0011-338a-1024-8a49-009c0215f78a
# Service found: 99fa0020-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0021-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0029-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa002a-338a-1024-8a49-009c0215f78a
# Service found: 99fa0030-338a-1024-8a49-009c0215f78a
# Characteristing found: 99fa0031-338a-1024-8a49-009c0215f78a
def parse_height_speed(value: bytes) -> tuple[int, int]:
"""Parses and converts height and speed data"""
raw_height, raw_speed = struct.unpack("<Hh", value)
height = (raw_height / 10) + 620
speed = raw_speed / 100
return (height, speed)
class Device:
"""Bluetooth stuff"""
_ble_device: BLEDevice
_connection: BleakClient
_connection_lock = asyncio.Lock()
_is_connected: bool
_height: float
_speed: float
_name: str
def __init__(self, device: BLEDevice) -> None:
self._ble_device = device
self._is_connected = False
self._mac = device.address
self._height = 620
self._speed = 0
def disconnected_callback(self, client):
"""Callback for when the device is disconnected"""
_LOGGER.info("Disconnected from %s", self._mac)
self._is_connected = False
async def _read_model(self):
"""Does something"""
manufacturer_data = await self._connection.read_gatt_char(UUID_DEVICE_NAME)
name = manufacturer_data.decode("utf-8")
_LOGGER.debug("Read manufacturer data: %s", name)
self._name = name
async def connect(self, retry_attempts=4) -> None:
"""Connect to the device"""
if self._is_connected or self._connection_lock.locked():
async with self._connection_lock:
_LOGGER.debug("Connecting to %s", self._mac)
self._connection = await establish_connection(
_LOGGER.debug("Listing device data")
for service in self._connection.services:
_LOGGER.debug("Service found: %s", service.uuid)
for characteristic in service.characteristics:
" Characteristing found: %s", characteristic.uuid
await self._read_model()
self._is_connected = True
_LOGGER.debug("Connected to %s", self._mac)
except BleakError as error:
_LOGGER.error("Failed to connect to %s: %s", self._mac, error)
self._is_connected = False
async def disconnect(self) -> None:
"""Disconnect the device"""
await self._write(UUID_COMMAND, COMMAND_STOP)
await self._connection.disconnect()
async def fetch_state(self) -> None:
"""Update the state of the device"""
if not self._is_connected:
await self.connect(retry_attempts=1)
async with GLOBAL_BLE_LOCK:
uuids = [UUID_HEIGHT]
bytes_array: list[bytes] = await asyncio.gather(
*[self._read(uuid) for uuid in uuids], return_exceptions=True
for i, some_bytes in enumerate(bytes_array):
uuid = uuids[i]
# _LOGGER.debug("Received %s", some_bytes)
if isinstance(some_bytes, BleakError):
error: BleakError = BleakError(some_bytes)
"Error reading UUID %s: (%s) %s",
elif uuid == UUID_HEIGHT:
self._height, self._speed = parse_height_speed(some_bytes)
except BleakError as error:
if self._is_connected:
raise error
async def move_to_height(self, height: float) -> None:
"""Move to a specific height"""
if height == self._height:
_LOGGER.debug("Already at target height")
_LOGGER.debug("Moving to %f", height)
await self._write(UUID_COMMAND, COMMAND_STOP)
target_height = int((height - 620) * 10)
encoded_height = struct.pack("<H", target_height)
while True:
await self._write(UUID_REFERENCE_INPUT, encoded_height)
await asyncio.sleep(0.2)
_height, speed = parse_height_speed(await self._read(UUID_HEIGHT))
if speed == 0:
async def _read(self, uuid: str) -> bytes:
return await self._connection.read_gatt_char(uuid)
async def _write(self, uuid: str, data: bytes) -> None:
await self._connection.write_gatt_char(uuid, data)
def is_connected(self) -> bool:
"""Returns whether the device is connected"""
return self._is_connected
def mac(self) -> str:
"""Return the MAC address of the device"""
return self._mac
def name(self) -> str:
"""Return device name"""
return self._name
def height(self) -> float:
"""Return current height in mm"""
return self._height
def speed(self) -> float:
"""Return current movement speed in mm/s"""
return self._speed
def update_ble_device(self, ble_device: BLEDevice) -> None:
"""Updates the cached BLEDevice for the device"""
self._ble_device = ble_device