#!/usr/bin/env python3 """ ble_discover.py — BLE characteristic discovery + wake-up probe. Connects to the RS-95-EM, subscribes to every NOTIFY characteristic, then tries writing common "start measurement" byte sequences to every WRITE characteristic, watching for any data after each write. Run this while Meterbox is NOT connected, then compare with traffic observed when Meterbox IS connected. """ import asyncio from bleak import BleakClient BLE_MAC = "B0:D2:78:5C:20:07" WATCH_SEC = 2.0 # seconds to listen after each write attempt # Common "start measurement" payloads seen on BLE meters PROBE_PAYLOADS = [ # Confirmed from Meterbox logcat (MeterBleClass.java onWriteSuccess) b'\xd5\xfc\x11\x0d', # Short single-byte b'\x01', b'\x02', b'\x00', b'\xff', b'\xa0', b'\xa5', # Common 2-byte wake sequences b'\xaa\x00', b'\xaa\x01', b'\xaa\x55', b'\xaa\xbb', b'\xa5\x5a', b'\xab\xbc', b'\xd5\xf0', # Common longer init sequences seen on BLE meters b'\xaa\x55\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'\xaa\xbb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', b'\xa5\x5a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', ] async def run(): print(f"Connecting to {BLE_MAC} …") async with BleakClient(BLE_MAC, timeout=15.0) as client: print("Connected.\n") # ── 1. Enumerate all characteristics ───────────────────────────────── notify_chars = [] write_chars = [] for svc in client.services: print(f"SVC {svc.uuid} — {svc.description}") for ch in svc.characteristics: props = ', '.join(ch.properties) print(f" CHAR {ch.uuid} [{props}]") if 'notify' in ch.properties or 'indicate' in ch.properties: notify_chars.append(ch) if 'write' in ch.properties or 'write-without-response' in ch.properties: write_chars.append(ch) print() # ── 2. Subscribe to every notify characteristic ─────────────────────── traffic = {} # uuid → list of bytearray def make_handler(uuid): def handler(sender, data): traffic.setdefault(uuid, []).append(bytes(data)) print(f" [NOTIFY {uuid}] {data.hex(' ')}") return handler for ch in notify_chars: try: await client.start_notify(ch.uuid, make_handler(ch.uuid)) print(f"Subscribed to NOTIFY: {ch.uuid}") except Exception as e: print(f"Could not subscribe to {ch.uuid}: {e}") print() # ── 3. Listen passively first (maybe it sends on its own) ───────────── print(f"Listening passively for {WATCH_SEC}s …") await asyncio.sleep(WATCH_SEC) active_passive = {k for k, v in traffic.items() if v} if active_passive: print(f"\nPassive traffic on: {active_passive}") else: print("No passive traffic — device needs a wake-up write.\n") # ── 4. Try each payload on each writable characteristic ─────────────── for ch in write_chars: for payload in PROBE_PAYLOADS: before = {k: len(v) for k, v in traffic.items()} try: if 'write-without-response' in ch.properties: await client.write_gatt_char(ch.uuid, payload, response=False) else: await client.write_gatt_char(ch.uuid, payload, response=True) print(f"Wrote {payload.hex(' ')} → {ch.uuid} … ", end='', flush=True) except Exception as e: print(f"Write {payload.hex(' ')} → {ch.uuid} FAILED: {e}") continue await asyncio.sleep(WATCH_SEC) after = {k: len(v) for k, v in traffic.items()} new_traffic = {k for k, v in after.items() if v > before.get(k, 0)} if new_traffic: print(f"\n *** DATA on {new_traffic} after writing {payload.hex(' ')} to {ch.uuid} ***") else: print("no response.") # ── 5. Summary ──────────────────────────────────────────────────────── print("\n── Summary ──────────────────────────────────────────") for uuid, packets in traffic.items(): if packets: print(f" {uuid}: {len(packets)} packets received") print(f" First: {packets[0].hex(' ')}") if len(packets) > 1: print(f" Last: {packets[-1].hex(' ')}") if not any(traffic.values()): print(" No data received on any characteristic.") if __name__ == '__main__': asyncio.run(run())