168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
soundmeter_log.py — BLE sound-meter logger + iMX8 tone-server client.
|
|
Target: Windows 11 / Python 3.10+.
|
|
|
|
pip install bleak requests
|
|
|
|
For each test step:
|
|
1. POST /play to tone_server.py on the iMX8
|
|
2. Collect dB readings from the RS-95-EM BLE meter for the step duration
|
|
3. Write avg / min / max to CSV
|
|
|
|
BLE packet format
|
|
-----------------
|
|
Header : bytes[0]=0xD5 bytes[1]=0xF0
|
|
dB : bytes[4:6] uint16 big-endian / 10.0
|
|
"""
|
|
|
|
import asyncio
|
|
import csv
|
|
import datetime
|
|
import struct
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from bleak import BleakClient
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
IMX8_IP = "192.168.45.2"
|
|
IMX8_URL = f"http://{IMX8_IP}:5000"
|
|
|
|
BLE_MAC = "B0:D2:78:5C:20:07"
|
|
BLE_CHAR = "0000fff2-0000-1000-8000-00805f9b34fb"
|
|
BLE_WAKE_CHAR = "0000fff1-0000-1000-8000-00805f9b34fb"
|
|
BLE_WAKE_CMD = bytes([0xD5, 0xFC, 0x11, 0x0D]) # from Meterbox logcat
|
|
|
|
# ── Test sequence — edit as required ─────────────────────────────────────────
|
|
_FREQS = [100, 250, 500, 1000, 2000, 3000, 4000, 5000, 6000, 7000,
|
|
8000, 9000, 10000, 11000, 12000, 13000, 14000, 15000]
|
|
|
|
TEST_STEPS = [
|
|
{"freq": freq, "vol": 1.0, "duration": 30.0}
|
|
for freq in _FREQS
|
|
]
|
|
|
|
# ── Output ────────────────────────────────────────────────────────────────────
|
|
_ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
CSV_FILE = Path(f"audio_test_{_ts}.csv")
|
|
CSV_HDR = ['timestamp', 'freq_hz', 'vol', 'duration_s',
|
|
'avg_db', 'min_db', 'max_db', 'samples']
|
|
|
|
|
|
# ── BLE packet parser ─────────────────────────────────────────────────────────
|
|
def _parse_db(data: bytearray) -> float | None:
|
|
if len(data) < 6 or data[0] != 0xD5 or data[1] != 0xF0:
|
|
return None
|
|
return struct.unpack_from('>H', data, 4)[0] / 10.0
|
|
|
|
|
|
# ── HTTP helpers (blocking — run via asyncio.to_thread) ───────────────────────
|
|
def _http_play(freq: float, vol: float, duration: float) -> dict:
|
|
r = requests.post(
|
|
f"{IMX8_URL}/play",
|
|
json={"freq": freq, "vol": vol, "duration": duration},
|
|
timeout=5,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def _http_stop() -> dict:
|
|
r = requests.post(f"{IMX8_URL}/stop", timeout=5)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
# ── Main test runner ──────────────────────────────────────────────────────────
|
|
async def run_test():
|
|
readings: list[float] = []
|
|
_buf = bytearray()
|
|
|
|
def on_notify(_sender, data: bytearray):
|
|
nonlocal _buf
|
|
_buf.extend(data)
|
|
# reassemble: packets are always 11 bytes starting with D5 F0
|
|
while len(_buf) >= 11:
|
|
idx = bytes(_buf).find(b'\xd5\xf0')
|
|
if idx == -1:
|
|
_buf.clear()
|
|
break
|
|
if idx > 0:
|
|
del _buf[:idx] # discard garbage before header
|
|
if len(_buf) < 11:
|
|
break
|
|
packet = bytes(_buf[:11])
|
|
del _buf[:11]
|
|
db = _parse_db(packet)
|
|
if db is not None:
|
|
readings.append(db)
|
|
|
|
print(f"Connecting to BLE {BLE_MAC} …")
|
|
async with BleakClient(BLE_MAC, timeout=15.0) as client:
|
|
print("Connected. Enumerating services…")
|
|
for svc in client.services:
|
|
print(f" SVC {svc.uuid}")
|
|
for ch in svc.characteristics:
|
|
props = ','.join(ch.properties)
|
|
print(f" CHAR {ch.uuid} [{props}]")
|
|
|
|
# Wake-up sequence: write D5 FC 11 0D before subscribing (from Meterbox logcat)
|
|
await client.write_gatt_char(BLE_WAKE_CHAR, BLE_WAKE_CMD, response=True)
|
|
print(f"Wake-up sent: {BLE_WAKE_CMD.hex(' ')}")
|
|
|
|
await client.start_notify(BLE_CHAR, on_notify)
|
|
print(f"Subscribed to {BLE_CHAR}\n")
|
|
|
|
with CSV_FILE.open('w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(CSV_HDR)
|
|
|
|
for i, step in enumerate(TEST_STEPS, 1):
|
|
freq = step['freq']
|
|
vol = step['vol']
|
|
dur = step['duration']
|
|
print(f"[{i}/{len(TEST_STEPS)}] {freq:>5} Hz vol={vol:.2f} dur={dur:.1f}s …", end='', flush=True)
|
|
|
|
try:
|
|
await asyncio.to_thread(_http_play, freq, vol, dur)
|
|
except Exception as e:
|
|
print(f" ERROR /play: {e}")
|
|
continue
|
|
|
|
readings.clear()
|
|
await asyncio.sleep(dur)
|
|
|
|
snap = list(readings)
|
|
if snap:
|
|
avg_db = sum(snap) / len(snap)
|
|
min_db = min(snap)
|
|
max_db = max(snap)
|
|
print(f" avg={avg_db:5.1f} dB min={min_db:5.1f} max={max_db:5.1f} n={len(snap)}")
|
|
else:
|
|
avg_db = min_db = max_db = 0.0
|
|
print(" WARNING: no BLE readings received")
|
|
|
|
ts = datetime.datetime.now().isoformat(timespec='seconds')
|
|
writer.writerow([
|
|
ts, freq, vol, dur,
|
|
f"{avg_db:.1f}", f"{min_db:.1f}", f"{max_db:.1f}", len(snap),
|
|
])
|
|
f.flush()
|
|
|
|
await asyncio.sleep(0.5) # brief gap between steps
|
|
|
|
# Ensure tone is off
|
|
try:
|
|
await asyncio.to_thread(_http_stop)
|
|
except Exception:
|
|
pass
|
|
|
|
await client.stop_notify(BLE_CHAR)
|
|
|
|
print(f"\nResults saved → {CSV_FILE.resolve()}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(run_test())
|