Files
AXIO-TESTS/tone_server.py
2026-06-16 17:36:21 +02:00

142 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""
tone_server.py — Sine-wave tone HTTP server for iMX8 audio test rig.
Target: iMX8 Debian Bookworm.
pip3 install flask numpy pyaudio (pyaudio already installed)
Endpoints
---------
POST /play { "freq": 1000, "vol": 0.8, "duration": 1.0 }
POST /stop
GET /status
"""
import math
import subprocess
import threading
import numpy as np
import pyaudio
from flask import Flask, jsonify, request
# ── ALSA routing (SGTL5000 on iMX8) ──────────────────────────────────────────
def _alsa_init():
for cmd in [
['amixer', '-c', '0', 'sset', 'Headphone Mux', 'DAC'],
['amixer', '-c', '0', 'sset', 'DAP Mux', 'I2S'],
['amixer', '-c', '0', 'sset', 'Headphone', 'unmute'],
['amixer', '-c', '0', 'sset', 'Lineout', 'unmute'],
]:
try:
subprocess.run(cmd, capture_output=True, timeout=2)
except Exception:
pass
# ── Audio ─────────────────────────────────────────────────────────────────────
SAMPLE_RATE = 44100
BLOCK_FRAMES = 512
_pa = pyaudio.PyAudio()
_lock = threading.Lock()
_state = {
'stream': None,
'timer': None,
'freq': 1000.0,
'vol': 0.0,
'phase': 0,
}
def _callback(in_data, frame_count, time_info, status):
freq = _state['freq']
vol = _state['vol']
phase = _state['phase']
t = (np.arange(frame_count, dtype=np.float64) + phase) / SAMPLE_RATE
buf = (vol * np.sin(2.0 * math.pi * freq * t) * 32767).astype(np.int16)
_state['phase'] = (phase + frame_count) % (SAMPLE_RATE * 100)
return (buf.tobytes(), pyaudio.paContinue)
def _do_stop():
"""Stop active stream and cancel auto-stop timer. Call with _lock held."""
t = _state['timer']
s = _state['stream']
_state['timer'] = None
_state['stream'] = None
if t:
t.cancel()
if s:
try:
s.stop_stream()
s.close()
except Exception:
pass
def _timer_stop():
with _lock:
_do_stop()
# ── Flask app ─────────────────────────────────────────────────────────────────
app = Flask(__name__)
@app.route('/play', methods=['POST'])
def play():
body = request.get_json(force=True) or {}
freq = float(body.get('freq', 1000.0))
vol = max(0.0, min(1.0, float(body.get('vol', 0.8))))
duration = max(0.05, float(body.get('duration', 1.0)))
with _lock:
_do_stop()
_state['freq'] = freq
_state['vol'] = vol
_state['phase'] = 0
s = _pa.open(
rate=SAMPLE_RATE,
channels=1,
format=pyaudio.paInt16,
output=True,
frames_per_buffer=BLOCK_FRAMES,
stream_callback=_callback,
)
s.start_stream()
_state['stream'] = s
t = threading.Timer(duration, _timer_stop)
_state['timer'] = t
t.start()
print(f"[play] {freq:.0f} Hz vol={vol:.2f} dur={duration:.2f}s")
return jsonify({'status': 'ok', 'freq': freq, 'vol': vol, 'duration': duration})
@app.route('/stop', methods=['POST'])
def stop():
with _lock:
_do_stop()
print("[stop]")
return jsonify({'status': 'ok'})
@app.route('/status', methods=['GET'])
def status():
with _lock:
playing = _state['stream'] is not None
return jsonify({'playing': playing, 'freq': _state['freq'], 'vol': _state['vol']})
if __name__ == '__main__':
_alsa_init()
print("Tone server starting on 0.0.0.0:5000")
try:
app.run(host='0.0.0.0', port=5000, threaded=True)
finally:
with _lock:
_do_stop()
_pa.terminate()