142 lines
3.9 KiB
Python
142 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
tone_server.py — Sine-wave tone HTTP server for iMX8 audio test rig.
|
|
Target: iMX8 Debian Bookworm.
|
|
|
|
pip3 install flask numpy pyaudio (pyaudio already installed)
|
|
|
|
Endpoints
|
|
---------
|
|
POST /play { "freq": 1000, "vol": 0.8, "duration": 1.0 }
|
|
POST /stop
|
|
GET /status
|
|
"""
|
|
|
|
import math
|
|
import subprocess
|
|
import threading
|
|
|
|
import numpy as np
|
|
import pyaudio
|
|
from flask import Flask, jsonify, request
|
|
|
|
# ── ALSA routing (SGTL5000 on iMX8) ──────────────────────────────────────────
|
|
def _alsa_init():
|
|
for cmd in [
|
|
['amixer', '-c', '0', 'sset', 'Headphone Mux', 'DAC'],
|
|
['amixer', '-c', '0', 'sset', 'DAP Mux', 'I2S'],
|
|
['amixer', '-c', '0', 'sset', 'Headphone', 'unmute'],
|
|
['amixer', '-c', '0', 'sset', 'Lineout', 'unmute'],
|
|
]:
|
|
try:
|
|
subprocess.run(cmd, capture_output=True, timeout=2)
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Audio ─────────────────────────────────────────────────────────────────────
|
|
SAMPLE_RATE = 44100
|
|
BLOCK_FRAMES = 512
|
|
|
|
_pa = pyaudio.PyAudio()
|
|
_lock = threading.Lock()
|
|
_state = {
|
|
'stream': None,
|
|
'timer': None,
|
|
'freq': 1000.0,
|
|
'vol': 0.0,
|
|
'phase': 0,
|
|
}
|
|
|
|
|
|
def _callback(in_data, frame_count, time_info, status):
|
|
freq = _state['freq']
|
|
vol = _state['vol']
|
|
phase = _state['phase']
|
|
t = (np.arange(frame_count, dtype=np.float64) + phase) / SAMPLE_RATE
|
|
buf = (vol * np.sin(2.0 * math.pi * freq * t) * 32767).astype(np.int16)
|
|
_state['phase'] = (phase + frame_count) % (SAMPLE_RATE * 100)
|
|
return (buf.tobytes(), pyaudio.paContinue)
|
|
|
|
|
|
def _do_stop():
|
|
"""Stop active stream and cancel auto-stop timer. Call with _lock held."""
|
|
t = _state['timer']
|
|
s = _state['stream']
|
|
_state['timer'] = None
|
|
_state['stream'] = None
|
|
if t:
|
|
t.cancel()
|
|
if s:
|
|
try:
|
|
s.stop_stream()
|
|
s.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _timer_stop():
|
|
with _lock:
|
|
_do_stop()
|
|
|
|
|
|
# ── Flask app ─────────────────────────────────────────────────────────────────
|
|
app = Flask(__name__)
|
|
|
|
|
|
@app.route('/play', methods=['POST'])
|
|
def play():
|
|
body = request.get_json(force=True) or {}
|
|
freq = float(body.get('freq', 1000.0))
|
|
vol = max(0.0, min(1.0, float(body.get('vol', 0.8))))
|
|
duration = max(0.05, float(body.get('duration', 1.0)))
|
|
|
|
with _lock:
|
|
_do_stop()
|
|
_state['freq'] = freq
|
|
_state['vol'] = vol
|
|
_state['phase'] = 0
|
|
|
|
s = _pa.open(
|
|
rate=SAMPLE_RATE,
|
|
channels=1,
|
|
format=pyaudio.paInt16,
|
|
output=True,
|
|
frames_per_buffer=BLOCK_FRAMES,
|
|
stream_callback=_callback,
|
|
)
|
|
s.start_stream()
|
|
_state['stream'] = s
|
|
|
|
t = threading.Timer(duration, _timer_stop)
|
|
_state['timer'] = t
|
|
t.start()
|
|
|
|
print(f"[play] {freq:.0f} Hz vol={vol:.2f} dur={duration:.2f}s")
|
|
return jsonify({'status': 'ok', 'freq': freq, 'vol': vol, 'duration': duration})
|
|
|
|
|
|
@app.route('/stop', methods=['POST'])
|
|
def stop():
|
|
with _lock:
|
|
_do_stop()
|
|
print("[stop]")
|
|
return jsonify({'status': 'ok'})
|
|
|
|
|
|
@app.route('/status', methods=['GET'])
|
|
def status():
|
|
with _lock:
|
|
playing = _state['stream'] is not None
|
|
return jsonify({'playing': playing, 'freq': _state['freq'], 'vol': _state['vol']})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_alsa_init()
|
|
print("Tone server starting on 0.0.0.0:5000")
|
|
try:
|
|
app.run(host='0.0.0.0', port=5000, threaded=True)
|
|
finally:
|
|
with _lock:
|
|
_do_stop()
|
|
_pa.terminate()
|