1000 lines
37 KiB
Python
1000 lines
37 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Arrive Audio Test Console
|
||
Framebuffer PyQt5 UI for IMX6 SoM (Digi CC-MX-L77C-Z1) + SGTL5000
|
||
Display: 800x480 RGB, Debian 12 custom kernel
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import subprocess
|
||
import threading
|
||
import math
|
||
import struct
|
||
import wave
|
||
import tempfile
|
||
import io
|
||
import time
|
||
|
||
try:
|
||
import usb.core
|
||
import usb.util
|
||
_HAS_USB = True
|
||
except ImportError:
|
||
_HAS_USB = False
|
||
|
||
# ── STM32 companion USB IDs — update after running lsusb ─────────────────────
|
||
_STM32_VID = 0x2f8f # Flowbird Axio4 Companion
|
||
_STM32_PID = 0x0003
|
||
|
||
# ── Force framebuffer QPA before any Qt import ────────────────────────────────
|
||
if 'DISPLAY' not in os.environ and 'QT_QPA_PLATFORM' not in os.environ:
|
||
os.environ['QT_QPA_PLATFORM'] = 'linuxfb'
|
||
|
||
from PyQt5.QtWidgets import (
|
||
QApplication, QMainWindow, QWidget,
|
||
QVBoxLayout, QHBoxLayout, QPushButton,
|
||
QLabel, QFrame, QSizePolicy,
|
||
)
|
||
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject
|
||
from PyQt5.QtGui import QFont, QFontDatabase, QPalette, QColor, QPixmap, QImage
|
||
|
||
# ── Arrive brand palette (official) ──────────────────────────────────────────
|
||
C_BG = "#5F016F" # P1 — deep purple
|
||
C_PANEL = "#720180" # P1 lightened — card / panel
|
||
C_PINK = "#FF80D4" # P3 — medium pink, inactive text/buttons
|
||
C_PINK_HOT = "#FF33BB" # P2 — hot pink, active / pressed state
|
||
C_PINK_LIGHT = "#FFADE4" # P4 — light pink, accents
|
||
C_GREY = "#F9F5F4" # Off-White — secondary labels
|
||
C_WHITE = "#FFFFFF"
|
||
C_ACCEPT = "#00C878" # green — ticket accepted
|
||
C_REJECT = "#FF3344" # red — ticket rejected
|
||
|
||
# ── A1000 card reader ─────────────────────────────────────────────────────────
|
||
_NFC_DEMO = '/usr/bin/nfc_nl_demo'
|
||
_VALID_UIDS = {'f6 4b f6 cb'} # whitelisted cards → accept tone
|
||
|
||
# ── Custom font (set after QApplication is created) ──────────────────────────
|
||
_FONT_FAMILY = "Sans Serif" # overwritten at startup if OptimismSans loads
|
||
|
||
# ── Audio constants ───────────────────────────────────────────────────────────
|
||
SAMPLE_RATE = 44100
|
||
FREQUENCY = 1000 # Hz
|
||
BLOCK_FRAMES = 512
|
||
AMPLITUDE = 0.85 # 0.0–1.0 (leave headroom)
|
||
|
||
# ── Audio backend detection ───────────────────────────────────────────────────
|
||
try:
|
||
import sounddevice as sd
|
||
_BACKEND = 'sounddevice'
|
||
except ImportError:
|
||
try:
|
||
import pyaudio
|
||
_BACKEND = 'pyaudio'
|
||
except ImportError:
|
||
_BACKEND = None
|
||
|
||
try:
|
||
import numpy as np
|
||
_HAS_NUMPY = True
|
||
except ImportError:
|
||
_HAS_NUMPY = False
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Audio Engine
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class _Signals(QObject):
|
||
status = pyqtSignal(str)
|
||
|
||
|
||
class AudioEngine:
|
||
"""
|
||
Continuous waveform generator.
|
||
Supports sounddevice (preferred), pyaudio, or aplay fallback.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.signals = _Signals()
|
||
self._mode = None # 'sine' | 'sawtooth' | None
|
||
self._phase = 0
|
||
self._lock = threading.Lock()
|
||
self._stream = None # sounddevice / pyaudio stream
|
||
self._pa = None # pyaudio instance
|
||
self._volume = 70 # logical 0–100
|
||
self._backend = _BACKEND
|
||
|
||
if self._backend == 'pyaudio':
|
||
self._pa = pyaudio.PyAudio()
|
||
|
||
self._alsa_init()
|
||
self._alsa_volume(self._volume)
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────
|
||
|
||
def play(self, mode: str):
|
||
with self._lock:
|
||
if self._mode == mode:
|
||
return
|
||
self._mode = mode
|
||
self._phase = 0
|
||
|
||
if self._stream is None:
|
||
self._open_stream()
|
||
|
||
def stop(self):
|
||
self._close_stream()
|
||
with self._lock:
|
||
self._mode = None
|
||
self._phase = 0
|
||
|
||
def set_volume(self, level: int):
|
||
level = max(0, min(100, level))
|
||
self._volume = level
|
||
self._alsa_volume(level)
|
||
|
||
def get_volume(self) -> int:
|
||
return self._volume
|
||
|
||
def current_mode(self):
|
||
with self._lock:
|
||
return self._mode
|
||
|
||
def shutdown(self):
|
||
self.stop()
|
||
if self._pa is not None:
|
||
self._pa.terminate()
|
||
|
||
def play_accept(self):
|
||
"""Ascending two-tone chime: 880 Hz (120 ms) then 1320 Hz (200 ms)."""
|
||
self._play_oneshot([(880, 120, 'sine'), (1320, 200, 'sine')])
|
||
|
||
def play_reject(self):
|
||
"""Low sawtooth buzz: 220 Hz (400 ms)."""
|
||
self._play_oneshot([(220, 400, 'sawtooth')])
|
||
|
||
def _play_oneshot(self, segments):
|
||
wav = self._build_wav(segments)
|
||
def _run():
|
||
try:
|
||
proc = subprocess.Popen(
|
||
['aplay', '-q', '-'],
|
||
stdin=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
||
)
|
||
proc.communicate(input=wav, timeout=3.0)
|
||
except Exception:
|
||
pass
|
||
threading.Thread(target=_run, daemon=True).start()
|
||
|
||
def _build_wav(self, segments):
|
||
"""Return WAV bytes for a list of (freq, duration_ms, waveform) tuples."""
|
||
samples = []
|
||
for freq, duration_ms, waveform in segments:
|
||
n = int(SAMPLE_RATE * duration_ms / 1000)
|
||
if _HAS_NUMPY:
|
||
t = np.arange(n, dtype=np.float64) / SAMPLE_RATE
|
||
if waveform == 'sine':
|
||
s = AMPLITUDE * np.sin(2.0 * math.pi * freq * t)
|
||
else:
|
||
s = AMPLITUDE * (2.0 * ((freq * t) % 1.0) - 1.0)
|
||
samples.extend((s * 32767).astype(np.int16).tolist())
|
||
else:
|
||
for i in range(n):
|
||
tt = i / SAMPLE_RATE
|
||
if waveform == 'sine':
|
||
v = AMPLITUDE * math.sin(2.0 * math.pi * freq * tt)
|
||
else:
|
||
v = AMPLITUDE * (2.0 * ((freq * tt) % 1.0) - 1.0)
|
||
samples.append(int(v * 32767))
|
||
buf = io.BytesIO()
|
||
with wave.open(buf, 'wb') as wf:
|
||
wf.setnchannels(1)
|
||
wf.setsampwidth(2)
|
||
wf.setframerate(SAMPLE_RATE)
|
||
wf.writeframes(struct.pack(f'{len(samples)}h', *samples))
|
||
return buf.getvalue()
|
||
|
||
# ── Stream management ─────────────────────────────────────────────────────
|
||
|
||
def _open_stream(self):
|
||
if self._backend == 'sounddevice':
|
||
self._stream = sd.OutputStream(
|
||
samplerate=SAMPLE_RATE,
|
||
channels=1,
|
||
dtype='float32',
|
||
blocksize=BLOCK_FRAMES,
|
||
callback=self._sd_callback,
|
||
)
|
||
self._stream.start()
|
||
|
||
elif self._backend == 'pyaudio':
|
||
self._stream = self._pa.open(
|
||
rate=SAMPLE_RATE,
|
||
channels=1,
|
||
format=pyaudio.paInt16,
|
||
output=True,
|
||
frames_per_buffer=BLOCK_FRAMES,
|
||
stream_callback=self._pa_callback,
|
||
)
|
||
self._stream.start_stream()
|
||
|
||
def _close_stream(self):
|
||
s = self._stream
|
||
self._stream = None
|
||
if s is None:
|
||
return
|
||
try:
|
||
if self._backend == 'sounddevice':
|
||
s.stop()
|
||
s.close()
|
||
elif self._backend == 'pyaudio':
|
||
s.stop_stream()
|
||
s.close()
|
||
except Exception:
|
||
pass
|
||
|
||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||
|
||
def _generate(self, frames: int):
|
||
"""Return numpy float32 array of the current waveform."""
|
||
with self._lock:
|
||
mode = self._mode
|
||
phase = self._phase
|
||
|
||
if _HAS_NUMPY:
|
||
t = (np.arange(frames, dtype=np.float64) + phase) / SAMPLE_RATE
|
||
if mode == 'sine':
|
||
wave = AMPLITUDE * np.sin(2.0 * math.pi * FREQUENCY * t)
|
||
elif mode == 'sawtooth':
|
||
wave = AMPLITUDE * (2.0 * ((FREQUENCY * t) % 1.0) - 1.0)
|
||
else:
|
||
wave = np.zeros(frames, dtype=np.float64)
|
||
buf = wave.astype(np.float32)
|
||
else:
|
||
# Pure-Python fallback (slow, but works without numpy)
|
||
buf_list = []
|
||
for i in range(frames):
|
||
tt = (i + phase) / SAMPLE_RATE
|
||
if mode == 'sine':
|
||
v = AMPLITUDE * math.sin(2.0 * math.pi * FREQUENCY * tt)
|
||
elif mode == 'sawtooth':
|
||
v = AMPLITUDE * (2.0 * ((FREQUENCY * tt) % 1.0) - 1.0)
|
||
else:
|
||
v = 0.0
|
||
buf_list.append(v)
|
||
buf = buf_list # list of floats
|
||
|
||
with self._lock:
|
||
self._phase = (phase + frames) % (SAMPLE_RATE * 10)
|
||
return buf
|
||
|
||
def _sd_callback(self, outdata, frames, time_info, status):
|
||
buf = self._generate(frames)
|
||
if _HAS_NUMPY:
|
||
outdata[:, 0] = buf
|
||
else:
|
||
for i, v in enumerate(buf):
|
||
outdata[i, 0] = v
|
||
|
||
def _pa_callback(self, in_data, frame_count, time_info, status):
|
||
buf = self._generate(frame_count)
|
||
if _HAS_NUMPY:
|
||
raw = (buf * 32767).astype(np.int16).tobytes()
|
||
else:
|
||
raw = struct.pack(f'{len(buf)}h', *[int(v * 32767) for v in buf])
|
||
return (raw, pyaudio.paContinue)
|
||
|
||
# ── ALSA volume ───────────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _alsa_init():
|
||
"""Set SGTL5000 routing and unmute outputs once at startup."""
|
||
cmds = [
|
||
['amixer', '-c', '0', 'sset', 'Headphone Mux', 'DAC'],
|
||
['amixer', '-c', '0', 'sset', 'DAP Mux', 'I2S'],
|
||
['amixer', '-c', '0', 'sset', 'Headphone', 'unmute'],
|
||
['amixer', '-c', '0', 'sset', 'Lineout', 'unmute'],
|
||
]
|
||
for cmd in cmds:
|
||
try:
|
||
subprocess.run(cmd, capture_output=True, timeout=2)
|
||
except Exception:
|
||
pass
|
||
|
||
@staticmethod
|
||
def _alsa_volume(level: int):
|
||
"""
|
||
Try common SGTL5000 / i.MX6 ALSA mixer control names in order.
|
||
Run `amixer -c 0 scontrols` on the device to find the right name.
|
||
"""
|
||
controls = ['Headphone', 'Lineout', 'PCM', 'Master', 'DAC', 'Speaker', 'HP Analog']
|
||
for ctl in controls:
|
||
try:
|
||
subprocess.run(
|
||
['amixer', '-q', 'sset', ctl, f'{level}%'],
|
||
capture_output=True, timeout=2
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Qt stylesheet helpers
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def _wave_btn_style(active: bool = False) -> str:
|
||
if active:
|
||
bg, fg, border = C_PINK_HOT, C_BG, C_PINK_HOT
|
||
else:
|
||
bg, fg, border = C_PANEL, C_PINK, C_PINK
|
||
return f"""
|
||
QPushButton {{
|
||
background-color: {bg};
|
||
color: {fg};
|
||
border: 2px solid {border};
|
||
border-radius: 14px;
|
||
font-size: 20px;
|
||
font-weight: bold;
|
||
}}
|
||
QPushButton:pressed {{
|
||
background-color: {C_PINK_HOT};
|
||
color: {C_BG};
|
||
border: 2px solid {C_PINK_HOT};
|
||
}}
|
||
"""
|
||
|
||
def _ticket_btn_style(kind: str) -> str:
|
||
c = C_ACCEPT if kind == 'accept' else C_REJECT
|
||
return f"""
|
||
QPushButton {{
|
||
background-color: {C_PANEL};
|
||
color: {c};
|
||
border: 2px solid {c};
|
||
border-radius: 14px;
|
||
font-size: 20px;
|
||
font-weight: bold;
|
||
}}
|
||
QPushButton:pressed {{
|
||
background-color: {c};
|
||
color: {C_WHITE};
|
||
border: 2px solid {c};
|
||
}}
|
||
"""
|
||
|
||
def _vol_btn_style() -> str:
|
||
return f"""
|
||
QPushButton {{
|
||
background-color: {C_PANEL};
|
||
color: {C_PINK};
|
||
border: 2px solid {C_PINK};
|
||
border-radius: 14px;
|
||
font-size: 36px;
|
||
font-weight: bold;
|
||
}}
|
||
QPushButton:pressed {{
|
||
background-color: {C_PINK_HOT};
|
||
color: {C_BG};
|
||
border: 2px solid {C_PINK_HOT};
|
||
}}
|
||
"""
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# LED Controller (STM32 companion via USB vendor control transfers)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
# Arrive brand colours for the LEDs
|
||
_LED_PURPLE = (95, 1, 111) # P1 #5F016F
|
||
_LED_PINK = (255, 51, 187) # P2 #FF33BB
|
||
|
||
_BMREQ = 0x41 # Vendor | Interface | Out
|
||
_CMD_BRI = 0x00 # SET_BRIGHTNESS
|
||
_CMD_COL = 0x01 # SET_COLOR / SET_PATTERN
|
||
|
||
|
||
class LedController:
|
||
"""
|
||
Controls 2 RGB LED strings on the STM32 companion chip via USB.
|
||
Runs a knight-rider thread: pink strobe bounces over a purple base.
|
||
"""
|
||
|
||
STRING_MAIN = 0
|
||
STRING_EXT = 1
|
||
|
||
def __init__(self):
|
||
self._dev = None
|
||
self._running = False
|
||
self._thread = None
|
||
if _HAS_USB and _STM32_VID:
|
||
try:
|
||
self._dev = usb.core.find(idVendor=_STM32_VID, idProduct=_STM32_PID)
|
||
except Exception:
|
||
self._dev = None
|
||
if self._dev:
|
||
try:
|
||
self._dev.set_configuration()
|
||
except Exception:
|
||
pass # already configured by kernel driver
|
||
try:
|
||
if self._dev.is_kernel_driver_active(1):
|
||
self._dev.detach_kernel_driver(1)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
usb.util.claim_interface(self._dev, 1)
|
||
except Exception:
|
||
pass
|
||
|
||
# ── Public ────────────────────────────────────────────────────────────────
|
||
|
||
def start(self):
|
||
if self._dev is None or self._running:
|
||
return
|
||
self._running = True
|
||
self._thread = threading.Thread(target=self._knight_rider, daemon=True)
|
||
self._thread.start()
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
if self._dev:
|
||
self._set_brightness(self.STRING_MAIN, 0)
|
||
try:
|
||
usb.util.release_interface(self._dev, 1)
|
||
except Exception:
|
||
pass
|
||
|
||
# ── USB helpers ───────────────────────────────────────────────────────────
|
||
|
||
def _windex(self, string):
|
||
return (string << 8) | 0x01
|
||
|
||
def _set_color(self, string, r, g, b, brightness=255):
|
||
try:
|
||
self._dev.ctrl_transfer(
|
||
_BMREQ, _CMD_COL, brightness, self._windex(string), [r, g, b]
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _set_brightness(self, string, brightness):
|
||
try:
|
||
self._dev.ctrl_transfer(
|
||
_BMREQ, _CMD_BRI, brightness, self._windex(string), None
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _set_pattern(self, string, steps):
|
||
"""steps: list of (brightness, duration_ms) — max 5."""
|
||
data = []
|
||
for bri, dur in steps[:5]:
|
||
data += [bri & 0xFF, dur & 0xFF, (dur >> 8) & 0xFF]
|
||
try:
|
||
self._dev.ctrl_transfer(
|
||
_BMREQ, _CMD_COL, 0, self._windex(string), data
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# ── Knight rider loop ─────────────────────────────────────────────────────
|
||
|
||
def _knight_rider(self):
|
||
STEPS = 30 # steps per half-cycle
|
||
STEP_MS = 0.05 # 50 ms per step → ~1.5 s fade up, ~1.5 s fade down
|
||
|
||
self._set_brightness(self.STRING_MAIN, 255)
|
||
while self._running:
|
||
# Fade purple → pink
|
||
for i in range(STEPS):
|
||
if not self._running:
|
||
return
|
||
t = i / STEPS
|
||
r = int(_LED_PURPLE[0] + (_LED_PINK[0] - _LED_PURPLE[0]) * t)
|
||
g = int(_LED_PURPLE[1] + (_LED_PINK[1] - _LED_PURPLE[1]) * t)
|
||
b = int(_LED_PURPLE[2] + (_LED_PINK[2] - _LED_PURPLE[2]) * t)
|
||
self._set_color(self.STRING_MAIN, r, g, b)
|
||
time.sleep(STEP_MS)
|
||
|
||
# Fade pink → purple
|
||
for i in range(STEPS):
|
||
if not self._running:
|
||
return
|
||
t = i / STEPS
|
||
r = int(_LED_PINK[0] + (_LED_PURPLE[0] - _LED_PINK[0]) * t)
|
||
g = int(_LED_PINK[1] + (_LED_PURPLE[1] - _LED_PINK[1]) * t)
|
||
b = int(_LED_PINK[2] + (_LED_PURPLE[2] - _LED_PINK[2]) * t)
|
||
self._set_color(self.STRING_MAIN, r, g, b)
|
||
time.sleep(STEP_MS)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# A1000 card reader
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class CardReader(QObject):
|
||
"""Polls nfc_nl_demo in a loop. Emits card_detected(uid) on each successful read."""
|
||
card_detected = pyqtSignal(str)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._running = False
|
||
self._thread = None
|
||
self._proc = None
|
||
self.available = os.path.exists(_NFC_DEMO)
|
||
|
||
def start(self):
|
||
if not self.available:
|
||
return
|
||
self._running = True
|
||
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||
self._thread.start()
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
if self._proc:
|
||
try:
|
||
self._proc.terminate()
|
||
except Exception:
|
||
pass
|
||
|
||
def _poll_loop(self):
|
||
while self._running:
|
||
try:
|
||
self._proc = subprocess.Popen(
|
||
[_NFC_DEMO],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.DEVNULL,
|
||
text=True,
|
||
)
|
||
out, _ = self._proc.communicate(timeout=15)
|
||
self._proc = None
|
||
if 'Target detected' in out:
|
||
uid = ''
|
||
for line in out.splitlines():
|
||
if 'UID' in line and ':' in line:
|
||
uid = line.split(':', 1)[1].strip()
|
||
self.card_detected.emit(uid)
|
||
except subprocess.TimeoutExpired:
|
||
if self._proc:
|
||
self._proc.kill()
|
||
self._proc = None
|
||
except Exception:
|
||
self._proc = None
|
||
if self._running:
|
||
time.sleep(0.1)
|
||
|
||
|
||
def _autocrop(img):
|
||
"""Crop a QImage (ARGB32) to the bounding box of non-transparent pixels."""
|
||
if not _HAS_NUMPY:
|
||
return img
|
||
w, h = img.width(), img.height()
|
||
b = img.bits()
|
||
b.setsize(h * w * 4)
|
||
arr = np.frombuffer(b, dtype=np.uint8).reshape(h, w, 4)
|
||
alpha = arr[:, :, 3]
|
||
rows = np.where(alpha.max(axis=1) > 5)[0]
|
||
cols = np.where(alpha.max(axis=0) > 5)[0]
|
||
if len(rows) == 0 or len(cols) == 0:
|
||
return img
|
||
pad = 6
|
||
r0 = max(0, int(rows[0]) - pad)
|
||
r1 = min(h, int(rows[-1]) + pad + 1)
|
||
c0 = max(0, int(cols[0]) - pad)
|
||
c1 = min(w, int(cols[-1]) + pad + 1)
|
||
return img.copy(c0, r0, c1 - c0, r1 - r0)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Volume bar widget (simple segmented indicator)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
from PyQt5.QtWidgets import QWidget as _QW
|
||
from PyQt5.QtGui import QPainter, QColor as _QC
|
||
|
||
class VolumeBar(_QW):
|
||
SEGMENTS = 20
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self._level = 70
|
||
self.setMinimumSize(200, 40)
|
||
|
||
def set_level(self, v: int):
|
||
self._level = max(0, min(100, v))
|
||
self.update()
|
||
|
||
def paintEvent(self, event):
|
||
p = QPainter(self)
|
||
p.setRenderHint(QPainter.Antialiasing)
|
||
w, h = self.width(), self.height()
|
||
seg_w = (w - self.SEGMENTS) // self.SEGMENTS
|
||
filled = round(self._level / 100 * self.SEGMENTS)
|
||
for i in range(self.SEGMENTS):
|
||
x = i * (seg_w + 1)
|
||
colour = _QC(C_PINK_HOT) if i < filled else _QC(C_PANEL)
|
||
p.fillRect(x, 4, seg_w, h - 8, colour)
|
||
p.end()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Main window
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class MainWindow(QMainWindow):
|
||
|
||
SCREEN_W = 800
|
||
SCREEN_H = 480
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.audio = AudioEngine()
|
||
self.audio.signals.status.connect(self._on_audio_status)
|
||
self.leds = LedController()
|
||
self._card = CardReader()
|
||
self._card.card_detected.connect(self._on_card_detected)
|
||
self._build_ui()
|
||
self._set_bg(C_BG)
|
||
self.leds.start()
|
||
self._card.start()
|
||
|
||
# ── UI construction ───────────────────────────────────────────────────────
|
||
|
||
def _build_ui(self):
|
||
self.setWindowTitle("Arrive")
|
||
self.setFixedSize(self.SCREEN_W, self.SCREEN_H)
|
||
|
||
central = QWidget()
|
||
self.setCentralWidget(central)
|
||
root = QVBoxLayout(central)
|
||
root.setContentsMargins(16, 6, 16, 6)
|
||
root.setSpacing(4)
|
||
|
||
root.addLayout(self._header())
|
||
root.addWidget(self._divider())
|
||
root.addSpacing(4)
|
||
root.addLayout(self._waveform_section())
|
||
root.addSpacing(10)
|
||
root.addLayout(self._volume_section())
|
||
root.addSpacing(8)
|
||
root.addLayout(self._ticket_section())
|
||
root.addStretch()
|
||
root.addWidget(self._divider())
|
||
root.addLayout(self._status_bar())
|
||
|
||
def _header(self) -> QHBoxLayout:
|
||
row = QHBoxLayout()
|
||
row.setSpacing(12)
|
||
|
||
# Logo image
|
||
logo_lbl = QLabel()
|
||
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'arrive_logo.png')
|
||
if not os.path.exists(logo_path):
|
||
logo_path = os.path.expanduser('~/arrive_logo.png')
|
||
pix = QPixmap()
|
||
if os.path.exists(logo_path):
|
||
img = QImage(logo_path).convertToFormat(QImage.Format_ARGB32)
|
||
if not img.isNull():
|
||
img = _autocrop(img)
|
||
pix = QPixmap.fromImage(img)
|
||
if not pix.isNull():
|
||
pix = pix.scaledToHeight(80, Qt.SmoothTransformation)
|
||
logo_lbl.setPixmap(pix)
|
||
else:
|
||
logo_lbl.setText("Arrive")
|
||
logo_lbl.setFont(QFont(_FONT_FAMILY, 38, QFont.Bold))
|
||
logo_lbl.setStyleSheet(f"color: {C_PINK};")
|
||
|
||
sub = QLabel("Audio Test Console")
|
||
sub.setFont(QFont(_FONT_FAMILY, 13))
|
||
sub.setStyleSheet(f"color: {C_GREY};")
|
||
sub.setAlignment(Qt.AlignBottom | Qt.AlignLeft)
|
||
|
||
btn_quit = QPushButton("X")
|
||
btn_quit.setFixedSize(48, 48)
|
||
btn_quit.setFocusPolicy(Qt.NoFocus)
|
||
btn_quit.setStyleSheet(f"""
|
||
QPushButton {{
|
||
background-color: {C_PANEL};
|
||
color: {C_PINK};
|
||
border: 2px solid {C_PINK};
|
||
border-radius: 14px;
|
||
font-size: 20px;
|
||
font-weight: bold;
|
||
}}
|
||
QPushButton:pressed {{
|
||
background-color: {C_PINK_HOT};
|
||
color: {C_BG};
|
||
}}
|
||
""")
|
||
btn_quit.clicked.connect(self.close)
|
||
|
||
row.addWidget(logo_lbl, 0, Qt.AlignVCenter)
|
||
row.addStretch(1)
|
||
row.addWidget(sub, 0, Qt.AlignVCenter)
|
||
row.addStretch(2)
|
||
row.addWidget(btn_quit, 0, Qt.AlignVCenter)
|
||
return row
|
||
|
||
def _waveform_section(self) -> QVBoxLayout:
|
||
col = QVBoxLayout()
|
||
col.setSpacing(8)
|
||
|
||
lbl = QLabel("WAVEFORM OUTPUT")
|
||
lbl.setFont(QFont(_FONT_FAMILY, 10))
|
||
lbl.setStyleSheet(f"color: {C_GREY}; letter-spacing: 3px;")
|
||
col.addWidget(lbl)
|
||
|
||
row = QHBoxLayout()
|
||
row.setSpacing(14)
|
||
|
||
self.btn_sine = QPushButton("SINE 1 kHz")
|
||
self.btn_saw = QPushButton("SAWTOOTH 1 kHz")
|
||
self.btn_stop = QPushButton("STOP")
|
||
|
||
for btn in (self.btn_sine, self.btn_saw, self.btn_stop):
|
||
btn.setMinimumHeight(72)
|
||
btn.setStyleSheet(_wave_btn_style(False))
|
||
btn.setFocusPolicy(Qt.NoFocus)
|
||
|
||
self.btn_sine.clicked.connect(lambda: self._play('sine'))
|
||
self.btn_saw.clicked.connect(lambda: self._play('sawtooth'))
|
||
self.btn_stop.clicked.connect(self._stop)
|
||
|
||
row.addWidget(self.btn_sine)
|
||
row.addWidget(self.btn_saw)
|
||
row.addWidget(self.btn_stop)
|
||
col.addLayout(row)
|
||
return col
|
||
|
||
def _volume_section(self) -> QVBoxLayout:
|
||
col = QVBoxLayout()
|
||
col.setSpacing(8)
|
||
|
||
lbl = QLabel("SPEAKER VOLUME")
|
||
lbl.setFont(QFont(_FONT_FAMILY, 10))
|
||
lbl.setStyleSheet(f"color: {C_GREY}; letter-spacing: 3px;")
|
||
col.addWidget(lbl)
|
||
|
||
row = QHBoxLayout()
|
||
row.setSpacing(14)
|
||
|
||
self.btn_dn = QPushButton("-")
|
||
self.btn_up = QPushButton("+")
|
||
for btn in (self.btn_dn, self.btn_up):
|
||
btn.setFixedSize(80, 64)
|
||
btn.setStyleSheet(_vol_btn_style())
|
||
btn.setFocusPolicy(Qt.NoFocus)
|
||
|
||
# Numeric readout
|
||
self.vol_num = QLabel(f"{self.audio.get_volume()}%")
|
||
self.vol_num.setAlignment(Qt.AlignCenter)
|
||
self.vol_num.setFont(QFont(_FONT_FAMILY, 32, QFont.Bold))
|
||
self.vol_num.setStyleSheet(f"color: {C_PINK};")
|
||
self.vol_num.setMinimumWidth(100)
|
||
|
||
# Segmented bar
|
||
self.vol_bar = VolumeBar()
|
||
self.vol_bar.set_level(self.audio.get_volume())
|
||
self.vol_bar.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed)
|
||
self.vol_bar.setFixedHeight(64)
|
||
|
||
centre = QVBoxLayout()
|
||
centre.setSpacing(6)
|
||
centre.addWidget(self.vol_num, 0, Qt.AlignCenter)
|
||
centre.addWidget(self.vol_bar)
|
||
|
||
self.btn_dn.clicked.connect(lambda: self._change_vol(-5))
|
||
self.btn_up.clicked.connect(lambda: self._change_vol(+5))
|
||
|
||
# Hold-to-repeat via QTimer
|
||
self._vol_timer = QTimer()
|
||
self._vol_timer.setInterval(150)
|
||
self.btn_dn.pressed.connect(lambda: self._start_vol_repeat(-5))
|
||
self.btn_up.pressed.connect(lambda: self._start_vol_repeat(+5))
|
||
self.btn_dn.released.connect(self._stop_vol_repeat)
|
||
self.btn_up.released.connect(self._stop_vol_repeat)
|
||
|
||
row.addWidget(self.btn_dn)
|
||
row.addLayout(centre)
|
||
row.addWidget(self.btn_up)
|
||
col.addLayout(row)
|
||
return col
|
||
|
||
def _ticket_section(self) -> QVBoxLayout:
|
||
col = QVBoxLayout()
|
||
col.setSpacing(8)
|
||
|
||
hdr = QHBoxLayout()
|
||
lbl = QLabel("TICKET VALIDATOR")
|
||
lbl.setFont(QFont(_FONT_FAMILY, 10))
|
||
lbl.setStyleSheet(f"color: {C_GREY}; letter-spacing: 3px;")
|
||
|
||
self.card_status_lbl = QLabel()
|
||
self.card_status_lbl.setFont(QFont(_FONT_FAMILY, 10))
|
||
self._set_card_indicator(False)
|
||
|
||
hdr.addWidget(lbl)
|
||
hdr.addStretch()
|
||
hdr.addWidget(self.card_status_lbl)
|
||
col.addLayout(hdr)
|
||
|
||
row = QHBoxLayout()
|
||
row.setSpacing(14)
|
||
|
||
self.btn_accept = QPushButton("✓ ACCEPT")
|
||
self.btn_reject = QPushButton("✗ REJECT")
|
||
for btn, kind in ((self.btn_accept, 'accept'), (self.btn_reject, 'reject')):
|
||
btn.setMinimumHeight(72)
|
||
btn.setStyleSheet(_ticket_btn_style(kind))
|
||
btn.setFocusPolicy(Qt.NoFocus)
|
||
|
||
self.btn_accept.clicked.connect(self._play_accept)
|
||
self.btn_reject.clicked.connect(self._play_reject)
|
||
|
||
row.addWidget(self.btn_accept)
|
||
row.addWidget(self.btn_reject)
|
||
col.addLayout(row)
|
||
return col
|
||
|
||
def _status_bar(self) -> QHBoxLayout:
|
||
row = QHBoxLayout()
|
||
self.status_lbl = QLabel("Ready · SGTL5000 / ALSA")
|
||
self.status_lbl.setFont(QFont(_FONT_FAMILY, 10))
|
||
self.status_lbl.setStyleSheet(f"color: {C_GREY};")
|
||
|
||
backend_lbl = QLabel(f"backend: {_BACKEND or 'none'}")
|
||
backend_lbl.setFont(QFont("Monospace", 9))
|
||
backend_lbl.setStyleSheet(f"color: {C_GREY};")
|
||
backend_lbl.setAlignment(Qt.AlignRight | Qt.AlignVCenter)
|
||
|
||
row.addWidget(self.status_lbl)
|
||
row.addStretch()
|
||
row.addWidget(backend_lbl)
|
||
return row
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||
|
||
def _divider(self) -> QFrame:
|
||
f = QFrame()
|
||
f.setFrameShape(QFrame.HLine)
|
||
f.setFixedHeight(1)
|
||
f.setStyleSheet(f"background-color: {C_PANEL}; border: none;")
|
||
return f
|
||
|
||
def _set_bg(self, colour: str):
|
||
pal = QPalette()
|
||
pal.setColor(QPalette.Window, QColor(colour))
|
||
self.setPalette(pal)
|
||
self.setAutoFillBackground(True)
|
||
|
||
def _refresh_wave_buttons(self, active_mode):
|
||
self.btn_sine.setStyleSheet(_wave_btn_style(active_mode == 'sine'))
|
||
self.btn_saw.setStyleSheet(_wave_btn_style(active_mode == 'sawtooth'))
|
||
self.btn_stop.setStyleSheet(_wave_btn_style(active_mode is None))
|
||
|
||
# ── Slots ─────────────────────────────────────────────────────────────────
|
||
|
||
def _play(self, mode: str):
|
||
if _BACKEND is None:
|
||
self.status_lbl.setText("No audio backend — install sounddevice or pyaudio")
|
||
return
|
||
self.audio.play(mode)
|
||
label = "Sine 1 kHz" if mode == 'sine' else "Sawtooth 1 kHz"
|
||
self.status_lbl.setText(f"Playing {label} ▶")
|
||
self._refresh_wave_buttons(mode)
|
||
|
||
def _stop(self):
|
||
self.audio.stop()
|
||
self.status_lbl.setText("Stopped")
|
||
self._refresh_wave_buttons(None)
|
||
|
||
def _change_vol(self, delta: int):
|
||
new_v = self.audio.get_volume() + delta
|
||
self.audio.set_volume(new_v)
|
||
self.vol_num.setText(f"{self.audio.get_volume()}%")
|
||
self.vol_bar.set_level(self.audio.get_volume())
|
||
self.status_lbl.setText(f"Volume {self.audio.get_volume()}%")
|
||
|
||
def _start_vol_repeat(self, delta: int):
|
||
self._vol_delta = delta
|
||
self._change_vol(delta)
|
||
self._vol_timer.timeout.connect(lambda: self._change_vol(self._vol_delta))
|
||
self._vol_timer.start()
|
||
|
||
def _stop_vol_repeat(self):
|
||
self._vol_timer.stop()
|
||
try:
|
||
self._vol_timer.timeout.disconnect()
|
||
except TypeError:
|
||
pass
|
||
|
||
def _set_card_indicator(self, present: bool):
|
||
if present:
|
||
self.card_status_lbl.setText("● CARD")
|
||
self.card_status_lbl.setStyleSheet(f"color: {C_ACCEPT};")
|
||
else:
|
||
if self._card.available:
|
||
self.card_status_lbl.setText("○ NO CARD")
|
||
self.card_status_lbl.setStyleSheet(f"color: {C_GREY};")
|
||
else:
|
||
self.card_status_lbl.setText("○ NO READER")
|
||
self.card_status_lbl.setStyleSheet(f"color: {C_PANEL};")
|
||
|
||
def _on_card_detected(self, uid: str):
|
||
self._set_card_indicator(True)
|
||
if uid in _VALID_UIDS:
|
||
self.audio.play_accept()
|
||
self.status_lbl.setText(f"ACCEPTED ✓ UID: {uid}")
|
||
else:
|
||
self.audio.play_reject()
|
||
self.status_lbl.setText(f"REJECTED ✗ UID: {uid}")
|
||
QTimer.singleShot(3000, lambda: self._set_card_indicator(False))
|
||
|
||
def _play_accept(self):
|
||
self.audio.play_accept()
|
||
self.status_lbl.setText("Ticket ACCEPTED ✓")
|
||
|
||
def _play_reject(self):
|
||
self.audio.play_reject()
|
||
self.status_lbl.setText("Ticket REJECTED ✗")
|
||
|
||
def _on_audio_status(self, msg: str):
|
||
self.status_lbl.setText(msg)
|
||
|
||
def set_fb_snapshot(self, data: bytes):
|
||
self._fb_snapshot = data
|
||
|
||
def closeEvent(self, event):
|
||
self._card.stop()
|
||
self.leds.stop()
|
||
self.audio.shutdown()
|
||
super().closeEvent(event)
|
||
# Restore the framebuffer to what it looked like before Qt launched
|
||
try:
|
||
snap = getattr(self, '_fb_snapshot', None)
|
||
if snap:
|
||
with open('/dev/fb0', 'wb') as fb:
|
||
fb.write(snap)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Entry point
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == '__main__':
|
||
# Snapshot the framebuffer before Qt overwrites it
|
||
_fb_snapshot = None
|
||
try:
|
||
with open('/dev/fb0', 'rb') as fb:
|
||
_fb_snapshot = fb.read(800 * 480 * 4)
|
||
except Exception:
|
||
pass
|
||
|
||
app = QApplication(sys.argv)
|
||
app.setStyle('Fusion')
|
||
|
||
# Load custom font
|
||
for _font_file in ('OptimismSans.ttf', 'OptimismSans.otf'):
|
||
_font_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), _font_file)
|
||
if not os.path.exists(_font_path):
|
||
_font_path = os.path.expanduser(f'~/{_font_file}')
|
||
_fid = QFontDatabase.addApplicationFont(_font_path)
|
||
if _fid >= 0:
|
||
_families = QFontDatabase.applicationFontFamilies(_fid)
|
||
if _families:
|
||
_FONT_FAMILY = _families[0]
|
||
break
|
||
|
||
# Hide mouse cursor on embedded framebuffer
|
||
app.setOverrideCursor(Qt.BlankCursor)
|
||
|
||
win = MainWindow()
|
||
if _fb_snapshot:
|
||
win.set_fb_snapshot(_fb_snapshot)
|
||
win.showFullScreen() # fills 800×480 framebuffer
|
||
sys.exit(app.exec_())
|