keant-ce-test-python-bt-helper/main.py

213 lines
5.8 KiB
Python

import asyncio
from bleak import BleakClient, BleakScanner, BleakGATTCharacteristic
from time import sleep
from uuid import UUID
import struct
disconnect_cond = asyncio.Semaphore(1)
def parse_bike_trainer_data(data_bytes):
# Unpack the first two bytes for flags
flags, = struct.unpack('<H', data_bytes[:2])
# print(flags)
# Define the format string dynamically based on flags
format_string = "<" # little-endian
fields = []
# Check each bit in flags and add corresponding format character
if not flags & 0b1:
# print('spd')
format_string += "H"
fields.append("instant_speed")
if flags & 0b10:
# print('avs')
format_string += "H"
fields.append("avg_speed")
if flags & 0b100:
# print('cad')
format_string += "H"
fields.append("instant_cadence")
if flags & 0b1000:
# print('acad')
format_string += "H"
fields.append("avg_cadence")
if flags & 0b10000:
# print('tdist')
format_string += "3s"
fields.append("total_distance")
if flags & 0b100000:
# print('resl')
format_string += "B"
fields.append("resistance_level")
if flags & 0b1000000:
# print('instp')
format_string += "h"
fields.append("instant_power")
if flags & 0b10000000:
# print('avgp')
format_string += "h"
fields.append("avg_power")
if flags & 0b100000000:
format_string += "HBB" # total_energy, energy_per_hour, and energy_per_min come together
fields.extend(["total_energy", "energy_per_hour", "energy_per_min"])
if flags & 0b1000000000:
# print('hr')
format_string += "B"
fields.append("heart_rate")
if flags & 0b10000000000:
# print('metequiv')
format_string += "B"
fields.append("metabolic_equivalent")
if flags & 0b100000000000:
# print('elapsedt')
format_string += "H"
fields.append("elapsed_time")
if flags & 0b1000000000000:
# print('remt')
format_string += "H"
fields.append("remaining_time")
# Unpack the remaining bytes
values = struct.unpack(format_string, data_bytes[2:])
# Convert total_distance from bytes to integer if present
values = list(values)
if "total_distance" in fields:
idx = fields.index("total_distance")
values[idx] = int.from_bytes(values[idx], "little")
# Construct and return the dictionary
return dict(zip(fields, values))
def format_bike_trainer_data(parsed_data):
# Field units based on the struct declaration
field_units = {
"instant_speed": "km/h",
"avg_speed": "km/h",
"instant_cadence": "revolutions/min",
"avg_cadence": "revolutions/min",
"total_distance": "m",
"resistance_level": "units",
"instant_power": "watts",
"avg_power": "watts",
"total_energy": "kcal",
"energy_per_hour": "kcal/h",
"energy_per_min": "kcal/min",
"heart_rate": "bpm",
"metabolic_equivalent": "1/10 METs",
"elapsed_time": "s",
"remaining_time": "s"
}
formatted_output = []
# Iterate over each field in the parsed data and format with units
for field, value in parsed_data.items():
unit = field_units.get(field, "")
if field in ["instant_speed", "avg_speed"]:
value /= 100 # Convert to km/h from 1/100 km/h
elif field in ["instant_cadence", "avg_cadence"]:
value /= 2 # Convert to revolutions/min from 1/2 revolutions/min
elif field == "metabolic_equivalent":
value /= 10 # Convert to METs from 1/10 METs
formatted_output.append(f"{field.replace('_', ' ').title()}: {value} {unit}")
return ', '.join(formatted_output)
async def main():
# devs = await BleakScanner.discover(timeout=30.0)
# print(devs)
global disconnect_cond
print("searching for keant...")
keant = None
while not keant:
devs = await BleakScanner.discover()
print(devs)
for dev in devs:
if 'keant' in dev.name.lower():
print("KeAnt found:", dev)
keant = dev
address = keant.address
connected = False
def on_disconnect(_):
print("Client disconnected.")
disconnect_cond.release()
client = BleakClient(address, disconnected_callback=on_disconnect)
try:
print("connecting...")
await client.connect()
connected = True
print("connected.")
print(client.services.descriptors)
for num in client.services.characteristics:
desc = client.services.characteristics[num]
print(desc)
# u = UUID(int=0x2ad2)
u = '00002ad2-0000-1000-8000-00805f9b34fb'
char = client.services.get_characteristic(u) # .get_characteristic("2ad2") # ftms
print(char)
# fitness equipment service read
# char_uuid = "1826"
# char_uuid = "00002ad2"
def callback(sender: BleakGATTCharacteristic, data: bytearray):
print(f"{sender}: {data}")
btd = parse_bike_trainer_data(data)
print(btd)
fdata = format_bike_trainer_data(btd)
print(fdata)
print("starting notify...")
await client.start_notify(char, callback)
await disconnect_cond.acquire() # sem 0
await disconnect_cond.acquire() # sem 1 (lock)
print("disconnected, stopping...")
# sleep(500)
# REMOVE::
# model_number = await client.read_gatt_char(MODEL_NBR_UUID)
# print("Model Number: {0}".format("".join(map(chr, model_number))))
except Exception as e:
print("Exception:")
print(e)
finally:
await client.disconnect()
asyncio.run(main())