#!/usr/bin/env python3 """ soundmeter_log.py — BLE sound-meter logger + iMX8 tone-server client. Target: Windows 11 / Python 3.10+. pip install bleak requests For each test step: 1. POST /play to tone_server.py on the iMX8 2. Collect dB readings from the RS-95-EM BLE meter for the step duration 3. Write avg / min / max to CSV BLE packet format ----------------- Header : bytes[0]=0xD5 bytes[1]=0xF0 dB : bytes[4:6] uint16 big-endian / 10.0 """ import asyncio import csv import datetime import struct from pathlib import Path import requests from bleak import BleakClient # ── Configuration ───────────────────────────────────────────────────────────── IMX8_IP = "192.168.45.2" IMX8_URL = f"http://{IMX8_IP}:5000" BLE_MAC = "B0:D2:78:5C:20:07" BLE_CHAR = "0000fff2-0000-1000-8000-00805f9b34fb" BLE_WAKE_CHAR = "0000fff1-0000-1000-8000-00805f9b34fb" BLE_WAKE_CMD = bytes([0xD5, 0xFC, 0x11, 0x0D]) # from Meterbox logcat # ── Test sequence — edit as required ───────────────────────────────────────── _FREQS = [100, 250, 500, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000, 13000, 14000, 15000] TEST_STEPS = [ {"freq": freq, "vol": 1.0, "duration": 30.0} for freq in _FREQS ] # ── Output ──────────────────────────────────────────────────────────────────── _ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') CSV_FILE = Path(f"audio_test_{_ts}.csv") CSV_HDR = ['timestamp', 'freq_hz', 'vol', 'duration_s', 'avg_db', 'min_db', 'max_db', 'samples'] # ── BLE packet parser ───────────────────────────────────────────────────────── def _parse_db(data: bytearray) -> float | None: if len(data) < 6 or data[0] != 0xD5 or data[1] != 0xF0: return None return struct.unpack_from('>H', data, 4)[0] / 10.0 # ── HTTP helpers (blocking — run via asyncio.to_thread) ─────────────────────── def _http_play(freq: float, vol: float, duration: float) -> dict: r = requests.post( f"{IMX8_URL}/play", json={"freq": freq, "vol": vol, "duration": duration}, timeout=5, ) r.raise_for_status() return r.json() def _http_stop() -> dict: r = requests.post(f"{IMX8_URL}/stop", timeout=5) r.raise_for_status() return r.json() # ── Main test runner ────────────────────────────────────────────────────────── async def run_test(): readings: list[float] = [] _buf = bytearray() def on_notify(_sender, data: bytearray): nonlocal _buf _buf.extend(data) # reassemble: packets are always 11 bytes starting with D5 F0 while len(_buf) >= 11: idx = bytes(_buf).find(b'\xd5\xf0') if idx == -1: _buf.clear() break if idx > 0: del _buf[:idx] # discard garbage before header if len(_buf) < 11: break packet = bytes(_buf[:11]) del _buf[:11] db = _parse_db(packet) if db is not None: readings.append(db) print(f"Connecting to BLE {BLE_MAC} …") async with BleakClient(BLE_MAC, timeout=15.0) as client: print("Connected. Enumerating services…") for svc in client.services: print(f" SVC {svc.uuid}") for ch in svc.characteristics: props = ','.join(ch.properties) print(f" CHAR {ch.uuid} [{props}]") # Wake-up sequence: write D5 FC 11 0D before subscribing (from Meterbox logcat) await client.write_gatt_char(BLE_WAKE_CHAR, BLE_WAKE_CMD, response=True) print(f"Wake-up sent: {BLE_WAKE_CMD.hex(' ')}") await client.start_notify(BLE_CHAR, on_notify) print(f"Subscribed to {BLE_CHAR}\n") with CSV_FILE.open('w', newline='') as f: writer = csv.writer(f) writer.writerow(CSV_HDR) for i, step in enumerate(TEST_STEPS, 1): freq = step['freq'] vol = step['vol'] dur = step['duration'] print(f"[{i}/{len(TEST_STEPS)}] {freq:>5} Hz vol={vol:.2f} dur={dur:.1f}s …", end='', flush=True) try: await asyncio.to_thread(_http_play, freq, vol, dur) except Exception as e: print(f" ERROR /play: {e}") continue readings.clear() await asyncio.sleep(dur) snap = list(readings) if snap: avg_db = sum(snap) / len(snap) min_db = min(snap) max_db = max(snap) print(f" avg={avg_db:5.1f} dB min={min_db:5.1f} max={max_db:5.1f} n={len(snap)}") else: avg_db = min_db = max_db = 0.0 print(" WARNING: no BLE readings received") ts = datetime.datetime.now().isoformat(timespec='seconds') writer.writerow([ ts, freq, vol, dur, f"{avg_db:.1f}", f"{min_db:.1f}", f"{max_db:.1f}", len(snap), ]) f.flush() await asyncio.sleep(0.5) # brief gap between steps # Ensure tone is off try: await asyncio.to_thread(_http_stop) except Exception: pass await client.stop_notify(BLE_CHAR) print(f"\nResults saved → {CSV_FILE.resolve()}") if __name__ == '__main__': asyncio.run(run_test())