Changes
This commit is contained in:
119
ble_discover.py
Normal file
119
ble_discover.py
Normal file
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ble_discover.py — BLE characteristic discovery + wake-up probe.
|
||||
Connects to the RS-95-EM, subscribes to every NOTIFY characteristic,
|
||||
then tries writing common "start measurement" byte sequences to every
|
||||
WRITE characteristic, watching for any data after each write.
|
||||
|
||||
Run this while Meterbox is NOT connected, then compare with traffic
|
||||
observed when Meterbox IS connected.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from bleak import BleakClient
|
||||
|
||||
BLE_MAC = "B0:D2:78:5C:20:07"
|
||||
WATCH_SEC = 2.0 # seconds to listen after each write attempt
|
||||
|
||||
# Common "start measurement" payloads seen on BLE meters
|
||||
PROBE_PAYLOADS = [
|
||||
# Confirmed from Meterbox logcat (MeterBleClass.java onWriteSuccess)
|
||||
b'\xd5\xfc\x11\x0d',
|
||||
# Short single-byte
|
||||
b'\x01', b'\x02', b'\x00', b'\xff', b'\xa0', b'\xa5',
|
||||
# Common 2-byte wake sequences
|
||||
b'\xaa\x00', b'\xaa\x01', b'\xaa\x55', b'\xaa\xbb',
|
||||
b'\xa5\x5a', b'\xab\xbc', b'\xd5\xf0',
|
||||
# Common longer init sequences seen on BLE meters
|
||||
b'\xaa\x55\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
|
||||
b'\xaa\xbb\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
|
||||
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
|
||||
b'\xa5\x5a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
|
||||
]
|
||||
|
||||
|
||||
async def run():
|
||||
print(f"Connecting to {BLE_MAC} …")
|
||||
async with BleakClient(BLE_MAC, timeout=15.0) as client:
|
||||
print("Connected.\n")
|
||||
|
||||
# ── 1. Enumerate all characteristics ─────────────────────────────────
|
||||
notify_chars = []
|
||||
write_chars = []
|
||||
|
||||
for svc in client.services:
|
||||
print(f"SVC {svc.uuid} — {svc.description}")
|
||||
for ch in svc.characteristics:
|
||||
props = ', '.join(ch.properties)
|
||||
print(f" CHAR {ch.uuid} [{props}]")
|
||||
if 'notify' in ch.properties or 'indicate' in ch.properties:
|
||||
notify_chars.append(ch)
|
||||
if 'write' in ch.properties or 'write-without-response' in ch.properties:
|
||||
write_chars.append(ch)
|
||||
print()
|
||||
|
||||
# ── 2. Subscribe to every notify characteristic ───────────────────────
|
||||
traffic = {} # uuid → list of bytearray
|
||||
|
||||
def make_handler(uuid):
|
||||
def handler(sender, data):
|
||||
traffic.setdefault(uuid, []).append(bytes(data))
|
||||
print(f" [NOTIFY {uuid}] {data.hex(' ')}")
|
||||
return handler
|
||||
|
||||
for ch in notify_chars:
|
||||
try:
|
||||
await client.start_notify(ch.uuid, make_handler(ch.uuid))
|
||||
print(f"Subscribed to NOTIFY: {ch.uuid}")
|
||||
except Exception as e:
|
||||
print(f"Could not subscribe to {ch.uuid}: {e}")
|
||||
print()
|
||||
|
||||
# ── 3. Listen passively first (maybe it sends on its own) ─────────────
|
||||
print(f"Listening passively for {WATCH_SEC}s …")
|
||||
await asyncio.sleep(WATCH_SEC)
|
||||
|
||||
active_passive = {k for k, v in traffic.items() if v}
|
||||
if active_passive:
|
||||
print(f"\nPassive traffic on: {active_passive}")
|
||||
else:
|
||||
print("No passive traffic — device needs a wake-up write.\n")
|
||||
|
||||
# ── 4. Try each payload on each writable characteristic ───────────────
|
||||
for ch in write_chars:
|
||||
for payload in PROBE_PAYLOADS:
|
||||
before = {k: len(v) for k, v in traffic.items()}
|
||||
|
||||
try:
|
||||
if 'write-without-response' in ch.properties:
|
||||
await client.write_gatt_char(ch.uuid, payload, response=False)
|
||||
else:
|
||||
await client.write_gatt_char(ch.uuid, payload, response=True)
|
||||
print(f"Wrote {payload.hex(' ')} → {ch.uuid} … ", end='', flush=True)
|
||||
except Exception as e:
|
||||
print(f"Write {payload.hex(' ')} → {ch.uuid} FAILED: {e}")
|
||||
continue
|
||||
|
||||
await asyncio.sleep(WATCH_SEC)
|
||||
|
||||
after = {k: len(v) for k, v in traffic.items()}
|
||||
new_traffic = {k for k, v in after.items() if v > before.get(k, 0)}
|
||||
if new_traffic:
|
||||
print(f"\n *** DATA on {new_traffic} after writing {payload.hex(' ')} to {ch.uuid} ***")
|
||||
else:
|
||||
print("no response.")
|
||||
|
||||
# ── 5. Summary ────────────────────────────────────────────────────────
|
||||
print("\n── Summary ──────────────────────────────────────────")
|
||||
for uuid, packets in traffic.items():
|
||||
if packets:
|
||||
print(f" {uuid}: {len(packets)} packets received")
|
||||
print(f" First: {packets[0].hex(' ')}")
|
||||
if len(packets) > 1:
|
||||
print(f" Last: {packets[-1].hex(' ')}")
|
||||
if not any(traffic.values()):
|
||||
print(" No data received on any characteristic.")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run())
|
||||
Reference in New Issue
Block a user