Files
AXIO-TESTS/ble_discover.py
David Rice 1aa81e6758 Changes
2026-06-17 14:41:43 +02:00

120 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
ble_discover.py — BLE characteristic discovery + wake-up probe.
Connects to the RS-95-EM, subscribes to every NOTIFY characteristic,
then tries writing common "start measurement" byte sequences to every
WRITE characteristic, watching for any data after each write.
Run this while Meterbox is NOT connected, then compare with traffic
observed when Meterbox IS connected.
"""
import asyncio
from bleak import BleakClient
BLE_MAC = "B0:D2:78:5C:20:07"
WATCH_SEC = 2.0 # seconds to listen after each write attempt
# Common "start measurement" payloads seen on BLE meters
PROBE_PAYLOADS = [
# Confirmed from Meterbox logcat (MeterBleClass.java onWriteSuccess)
b'\xd5\xfc\x11\x0d',
# Short single-byte
b'\x01', b'\x02', b'\x00', b'\xff', b'\xa0', b'\xa5',
# Common 2-byte wake sequences
b'\xaa\x00', b'\xaa\x01', b'\xaa\x55', b'\xaa\xbb',
b'\xa5\x5a', b'\xab\xbc', b'\xd5\xf0',
# Common longer init sequences seen on BLE meters
b'\xaa\x55\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
b'\xaa\xbb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
b'\xa5\x5a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
]
async def run():
print(f"Connecting to {BLE_MAC}")
async with BleakClient(BLE_MAC, timeout=15.0) as client:
print("Connected.\n")
# ── 1. Enumerate all characteristics ─────────────────────────────────
notify_chars = []
write_chars = []
for svc in client.services:
print(f"SVC {svc.uuid}{svc.description}")
for ch in svc.characteristics:
props = ', '.join(ch.properties)
print(f" CHAR {ch.uuid} [{props}]")
if 'notify' in ch.properties or 'indicate' in ch.properties:
notify_chars.append(ch)
if 'write' in ch.properties or 'write-without-response' in ch.properties:
write_chars.append(ch)
print()
# ── 2. Subscribe to every notify characteristic ───────────────────────
traffic = {} # uuid → list of bytearray
def make_handler(uuid):
def handler(sender, data):
traffic.setdefault(uuid, []).append(bytes(data))
print(f" [NOTIFY {uuid}] {data.hex(' ')}")
return handler
for ch in notify_chars:
try:
await client.start_notify(ch.uuid, make_handler(ch.uuid))
print(f"Subscribed to NOTIFY: {ch.uuid}")
except Exception as e:
print(f"Could not subscribe to {ch.uuid}: {e}")
print()
# ── 3. Listen passively first (maybe it sends on its own) ─────────────
print(f"Listening passively for {WATCH_SEC}s …")
await asyncio.sleep(WATCH_SEC)
active_passive = {k for k, v in traffic.items() if v}
if active_passive:
print(f"\nPassive traffic on: {active_passive}")
else:
print("No passive traffic — device needs a wake-up write.\n")
# ── 4. Try each payload on each writable characteristic ───────────────
for ch in write_chars:
for payload in PROBE_PAYLOADS:
before = {k: len(v) for k, v in traffic.items()}
try:
if 'write-without-response' in ch.properties:
await client.write_gatt_char(ch.uuid, payload, response=False)
else:
await client.write_gatt_char(ch.uuid, payload, response=True)
print(f"Wrote {payload.hex(' ')}{ch.uuid}", end='', flush=True)
except Exception as e:
print(f"Write {payload.hex(' ')}{ch.uuid} FAILED: {e}")
continue
await asyncio.sleep(WATCH_SEC)
after = {k: len(v) for k, v in traffic.items()}
new_traffic = {k for k, v in after.items() if v > before.get(k, 0)}
if new_traffic:
print(f"\n *** DATA on {new_traffic} after writing {payload.hex(' ')} to {ch.uuid} ***")
else:
print("no response.")
# ── 5. Summary ────────────────────────────────────────────────────────
print("\n── Summary ──────────────────────────────────────────")
for uuid, packets in traffic.items():
if packets:
print(f" {uuid}: {len(packets)} packets received")
print(f" First: {packets[0].hex(' ')}")
if len(packets) > 1:
print(f" Last: {packets[-1].hex(' ')}")
if not any(traffic.values()):
print(" No data received on any characteristic.")
if __name__ == '__main__':
asyncio.run(run())