23 lines
538 B
Python
23 lines
538 B
Python
"""Locking for BLE ops"""
|
|
|
|
import asyncio
|
|
from functools import wraps
|
|
from typing import Any, TypeVar
|
|
from collections.abc import Callable, Coroutine
|
|
|
|
GLOBAL_BLE_LOCK: asyncio.Lock = asyncio.Lock()
|
|
|
|
RT = TypeVar("RT")
|
|
|
|
|
|
def ble_lock(
|
|
func: Callable[..., Coroutine[Any, Any, RT]]
|
|
) -> Callable[..., Coroutine[Any, Any, RT]]:
|
|
"""Decorator to lock BLE operations"""
|
|
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs) -> RT:
|
|
async with GLOBAL_BLE_LOCK:
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|