This commit is contained in:
david rice
2026-04-20 16:06:01 +01:00
parent 712a42ecb7
commit ac65270cef
6 changed files with 103 additions and 70 deletions

View File

@@ -29,6 +29,10 @@ from datetime import datetime
from pathlib import Path
import math
import os
import struct
import tempfile
import wave
import anthropic
import vxi11
@@ -37,8 +41,7 @@ from dotenv import load_dotenv
import ai_mgmt
import rigol_scope
from csv_preprocessor import (analyze_lp_file, LPMetrics,
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS,
analyze_int_file)
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
load_dotenv(Path(__file__).parent / ".env")
@@ -565,6 +568,7 @@ def _configure_for_lp():
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal") # must wait for real LP-11→LP-01 edge, not auto-fire on HS
time.sleep(0.1)
@@ -653,19 +657,6 @@ def _fetch_registers(ts: str, iteration: int) -> None:
def _analyze_int_file(ts: str, iteration: int) -> None:
"""Print IRQ pin summary and alert if the SN65DSI83 asserted the IRQ line."""
path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
if not path.exists():
return
try:
m = analyze_int_file(path)
print(m.summary())
if m.int_asserted:
print(f"\n *** IRQ ASSERTED: SN65DSI83 flagged a bridge error at "
f"capture {iteration:04d} — check CSR 0xE5 for error bits ***\n")
except Exception as e:
print(f" INT ANALYSIS ERROR: {e}")
def dual_capture(iteration: int) -> str:
@@ -699,12 +690,6 @@ def dual_capture(iteration: int) -> str:
print(f" SAVED: {v18_path.name} ({n} samples)")
else:
print(" RIGOL CH1: waveform read failed — check connection and probe.")
int_path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
n_int = rigol_scope.read_int_csv(int_path)
if n_int:
print(f" SAVED: {int_path.name} ({n_int} samples)")
else:
print(" RIGOL CH2: IRQ read failed.")
_restore_hs_config()
try:
requests.put(URL, json={"state": "on"}, timeout=1)
@@ -856,6 +841,56 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
])
def _play_alarm() -> None:
"""Play three short beeps using a generated WAV tone."""
sample_rate = 44100
freq = 880
duration = 0.35
n_samples = int(sample_rate * duration)
samples = [int(32767 * math.sin(2 * math.pi * freq * i / sample_rate))
for i in range(n_samples)]
packed = struct.pack(f"<{n_samples}h", *samples)
tmp = None
try:
fd, tmp = tempfile.mkstemp(suffix=".wav")
os.close(fd)
with wave.open(tmp, "w") as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(sample_rate)
w.writeframes(packed)
# os.system inherits the full shell environment (XDG_RUNTIME_DIR, PULSE_SERVER, etc.)
played = False
for cmd in (f"aplay -q {tmp}", f"pw-play {tmp}", f"paplay {tmp}"):
if os.system(f"{cmd} 2>/dev/null") == 0:
played = True
for _ in range(2):
time.sleep(0.2)
os.system(f"{cmd} 2>/dev/null")
break
if not played:
try:
with open("/dev/tty", "w") as tty:
for _ in range(5):
tty.write("\a")
tty.flush()
time.sleep(0.3)
except Exception:
print("\a" * 5, end="", flush=True)
except Exception:
print("\a" * 5, end="", flush=True)
finally:
if tmp:
try:
os.unlink(tmp)
except Exception:
pass
def _analyze_lp_files(
ts: str, iteration: int
) -> tuple[list[str], list[LPMetrics]]:
@@ -1340,9 +1375,6 @@ def run_interactive_test() -> None:
except Exception as e:
print(f" TRANSFER ERROR: {e}")
# ── IRQ pin analysis ───────────────────────────────────────────
_analyze_int_file(ts, iteration)
# ── Rule-based LP analysis ─────────────────────────────────────
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
@@ -1365,11 +1397,7 @@ def run_interactive_test() -> None:
if claude_flicker:
# ── Keep display ON — ask operator ─────────────────────────
# Play alarm sound once to alert the operator
subprocess.run(
["pw-play", "/usr/share/sounds/freedesktop/stereo/alarm-clock-elapsed.oga"],
check=False,
)
_play_alarm()
print("\n" + "=" * 64)
print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW")
print("=" * 64)