213 lines
5.8 KiB
Python
213 lines
5.8 KiB
Python
import asyncio
|
|
from bleak import BleakClient, BleakScanner, BleakGATTCharacteristic
|
|
|
|
from time import sleep
|
|
|
|
from uuid import UUID
|
|
|
|
import struct
|
|
|
|
disconnect_cond = asyncio.Semaphore(1)
|
|
|
|
|
|
def parse_bike_trainer_data(data_bytes):
|
|
# Unpack the first two bytes for flags
|
|
flags, = struct.unpack('<H', data_bytes[:2])
|
|
|
|
# print(flags)
|
|
|
|
# Define the format string dynamically based on flags
|
|
format_string = "<" # little-endian
|
|
fields = []
|
|
|
|
# Check each bit in flags and add corresponding format character
|
|
if not flags & 0b1:
|
|
# print('spd')
|
|
format_string += "H"
|
|
fields.append("instant_speed")
|
|
if flags & 0b10:
|
|
# print('avs')
|
|
format_string += "H"
|
|
fields.append("avg_speed")
|
|
if flags & 0b100:
|
|
# print('cad')
|
|
format_string += "H"
|
|
fields.append("instant_cadence")
|
|
if flags & 0b1000:
|
|
# print('acad')
|
|
format_string += "H"
|
|
fields.append("avg_cadence")
|
|
if flags & 0b10000:
|
|
# print('tdist')
|
|
format_string += "3s"
|
|
fields.append("total_distance")
|
|
if flags & 0b100000:
|
|
# print('resl')
|
|
format_string += "B"
|
|
fields.append("resistance_level")
|
|
if flags & 0b1000000:
|
|
# print('instp')
|
|
format_string += "h"
|
|
fields.append("instant_power")
|
|
if flags & 0b10000000:
|
|
# print('avgp')
|
|
format_string += "h"
|
|
fields.append("avg_power")
|
|
if flags & 0b100000000:
|
|
format_string += "HBB" # total_energy, energy_per_hour, and energy_per_min come together
|
|
fields.extend(["total_energy", "energy_per_hour", "energy_per_min"])
|
|
if flags & 0b1000000000:
|
|
# print('hr')
|
|
format_string += "B"
|
|
fields.append("heart_rate")
|
|
if flags & 0b10000000000:
|
|
# print('metequiv')
|
|
format_string += "B"
|
|
fields.append("metabolic_equivalent")
|
|
if flags & 0b100000000000:
|
|
# print('elapsedt')
|
|
format_string += "H"
|
|
fields.append("elapsed_time")
|
|
if flags & 0b1000000000000:
|
|
# print('remt')
|
|
format_string += "H"
|
|
fields.append("remaining_time")
|
|
|
|
# Unpack the remaining bytes
|
|
values = struct.unpack(format_string, data_bytes[2:])
|
|
|
|
# Convert total_distance from bytes to integer if present
|
|
values = list(values)
|
|
if "total_distance" in fields:
|
|
idx = fields.index("total_distance")
|
|
values[idx] = int.from_bytes(values[idx], "little")
|
|
|
|
# Construct and return the dictionary
|
|
return dict(zip(fields, values))
|
|
|
|
|
|
def format_bike_trainer_data(parsed_data):
|
|
# Field units based on the struct declaration
|
|
field_units = {
|
|
"instant_speed": "km/h",
|
|
"avg_speed": "km/h",
|
|
"instant_cadence": "revolutions/min",
|
|
"avg_cadence": "revolutions/min",
|
|
"total_distance": "m",
|
|
"resistance_level": "units",
|
|
"instant_power": "watts",
|
|
"avg_power": "watts",
|
|
"total_energy": "kcal",
|
|
"energy_per_hour": "kcal/h",
|
|
"energy_per_min": "kcal/min",
|
|
"heart_rate": "bpm",
|
|
"metabolic_equivalent": "1/10 METs",
|
|
"elapsed_time": "s",
|
|
"remaining_time": "s"
|
|
}
|
|
|
|
formatted_output = []
|
|
|
|
# Iterate over each field in the parsed data and format with units
|
|
for field, value in parsed_data.items():
|
|
unit = field_units.get(field, "")
|
|
if field in ["instant_speed", "avg_speed"]:
|
|
value /= 100 # Convert to km/h from 1/100 km/h
|
|
elif field in ["instant_cadence", "avg_cadence"]:
|
|
value /= 2 # Convert to revolutions/min from 1/2 revolutions/min
|
|
elif field == "metabolic_equivalent":
|
|
value /= 10 # Convert to METs from 1/10 METs
|
|
|
|
formatted_output.append(f"{field.replace('_', ' ').title()}: {value} {unit}")
|
|
|
|
return ', '.join(formatted_output)
|
|
|
|
|
|
async def main():
|
|
# devs = await BleakScanner.discover(timeout=30.0)
|
|
# print(devs)
|
|
|
|
global disconnect_cond
|
|
|
|
print("searching for keant...")
|
|
|
|
keant = None
|
|
|
|
while not keant:
|
|
devs = await BleakScanner.discover()
|
|
print(devs)
|
|
for dev in devs:
|
|
if 'keant' in dev.name.lower():
|
|
print("KeAnt found:", dev)
|
|
keant = dev
|
|
|
|
address = keant.address
|
|
|
|
connected = False
|
|
|
|
def on_disconnect(_):
|
|
print("Client disconnected.")
|
|
disconnect_cond.release()
|
|
|
|
|
|
client = BleakClient(address, disconnected_callback=on_disconnect)
|
|
|
|
try:
|
|
print("connecting...")
|
|
await client.connect()
|
|
connected = True
|
|
print("connected.")
|
|
|
|
print(client.services.descriptors)
|
|
|
|
for num in client.services.characteristics:
|
|
desc = client.services.characteristics[num]
|
|
print(desc)
|
|
|
|
# u = UUID(int=0x2ad2)
|
|
|
|
u = '00002ad2-0000-1000-8000-00805f9b34fb'
|
|
|
|
char = client.services.get_characteristic(u) # .get_characteristic("2ad2") # ftms
|
|
|
|
print(char)
|
|
|
|
# fitness equipment service read
|
|
# char_uuid = "1826"
|
|
# char_uuid = "00002ad2"
|
|
|
|
def callback(sender: BleakGATTCharacteristic, data: bytearray):
|
|
print(f"{sender}: {data}")
|
|
|
|
btd = parse_bike_trainer_data(data)
|
|
print(btd)
|
|
|
|
fdata = format_bike_trainer_data(btd)
|
|
print(fdata)
|
|
|
|
|
|
|
|
print("starting notify...")
|
|
await client.start_notify(char, callback)
|
|
|
|
await disconnect_cond.acquire() # sem 0
|
|
await disconnect_cond.acquire() # sem 1 (lock)
|
|
|
|
print("disconnected, stopping...")
|
|
|
|
# sleep(500)
|
|
|
|
# REMOVE::
|
|
|
|
# model_number = await client.read_gatt_char(MODEL_NBR_UUID)
|
|
# print("Model Number: {0}".format("".join(map(chr, model_number))))
|
|
|
|
except Exception as e:
|
|
print("Exception:")
|
|
print(e)
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
asyncio.run(main())
|
|
|