#!/usr/bin/env python3 """ tone_server.py — Sine-wave tone HTTP server for iMX8 audio test rig. Target: iMX8 Debian Bookworm. pip3 install flask numpy pyaudio (pyaudio already installed) Endpoints --------- POST /play { "freq": 1000, "vol": 0.8, "duration": 1.0 } POST /stop GET /status """ import math import subprocess import threading import numpy as np import pyaudio from flask import Flask, jsonify, request # ── ALSA routing (SGTL5000 on iMX8) ────────────────────────────────────────── def _alsa_init(): for cmd in [ ['amixer', '-c', '0', 'sset', 'Headphone Mux', 'DAC'], ['amixer', '-c', '0', 'sset', 'DAP Mux', 'I2S'], ['amixer', '-c', '0', 'sset', 'Headphone', 'unmute'], ['amixer', '-c', '0', 'sset', 'Lineout', 'unmute'], ]: try: subprocess.run(cmd, capture_output=True, timeout=2) except Exception: pass # ── Audio ───────────────────────────────────────────────────────────────────── SAMPLE_RATE = 44100 BLOCK_FRAMES = 512 _pa = pyaudio.PyAudio() _lock = threading.Lock() _state = { 'stream': None, 'timer': None, 'freq': 1000.0, 'vol': 0.0, 'phase': 0, } def _callback(in_data, frame_count, time_info, status): freq = _state['freq'] vol = _state['vol'] phase = _state['phase'] t = (np.arange(frame_count, dtype=np.float64) + phase) / SAMPLE_RATE buf = (vol * np.sin(2.0 * math.pi * freq * t) * 32767).astype(np.int16) _state['phase'] = (phase + frame_count) % (SAMPLE_RATE * 100) return (buf.tobytes(), pyaudio.paContinue) def _do_stop(): """Stop active stream and cancel auto-stop timer. Call with _lock held.""" t = _state['timer'] s = _state['stream'] _state['timer'] = None _state['stream'] = None if t: t.cancel() if s: try: s.stop_stream() s.close() except Exception: pass def _timer_stop(): with _lock: _do_stop() # ── Flask app ───────────────────────────────────────────────────────────────── app = Flask(__name__) @app.route('/play', methods=['POST']) def play(): body = request.get_json(force=True) or {} freq = float(body.get('freq', 1000.0)) vol = max(0.0, min(1.0, float(body.get('vol', 0.8)))) duration = max(0.05, float(body.get('duration', 1.0))) with _lock: _do_stop() _state['freq'] = freq _state['vol'] = vol _state['phase'] = 0 s = _pa.open( rate=SAMPLE_RATE, channels=1, format=pyaudio.paInt16, output=True, frames_per_buffer=BLOCK_FRAMES, stream_callback=_callback, ) s.start_stream() _state['stream'] = s t = threading.Timer(duration, _timer_stop) _state['timer'] = t t.start() print(f"[play] {freq:.0f} Hz vol={vol:.2f} dur={duration:.2f}s") return jsonify({'status': 'ok', 'freq': freq, 'vol': vol, 'duration': duration}) @app.route('/stop', methods=['POST']) def stop(): with _lock: _do_stop() print("[stop]") return jsonify({'status': 'ok'}) @app.route('/status', methods=['GET']) def status(): with _lock: playing = _state['stream'] is not None return jsonify({'playing': playing, 'freq': _state['freq'], 'vol': _state['vol']}) if __name__ == '__main__': _alsa_init() print("Tone server starting on 0.0.0.0:5000") try: app.run(host='0.0.0.0', port=5000, threaded=True) finally: with _lock: _do_stop() _pa.terminate()