This commit is contained in:
david rice
2026-04-20 10:34:42 +01:00
parent 118d8ad380
commit e718a93667
8 changed files with 735 additions and 77 deletions

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python3
ni w#!/usr/bin/env python3
"""
MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY
Interactive flicker confirmation test.
@@ -37,7 +37,8 @@ from dotenv import load_dotenv
import ai_mgmt
import rigol_scope
from csv_preprocessor import (analyze_lp_file, LPMetrics,
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS,
analyze_int_file, CLK_LP_LOW_MIN_NS)
load_dotenv(Path(__file__).parent / ".env")
@@ -555,6 +556,85 @@ def _restore_hs_config():
time.sleep(0.1)
def _arm_scope_for_clk_startup() -> None:
"""
Configure scope for CLK lane LP startup capture and arm in single-shot mode.
Must be called BEFORE display ON — the CLK+ LP-11 falling edge (start of CLK
lane SoT preamble) triggers it. The DAT0+-triggered LP pass would miss this
because CLK is already in continuous HS by the time DAT0+ falls.
"""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel1") # CLK+ — fires before DAT0+
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {LP_POINTS}")
time.sleep(0.3)
scope.write(":SINGle") # arm without blocking — display ON happens next
time.sleep(0.1)
def _collect_clk_startup(ts: str, iteration: int,
timeout: float = 10.0) -> list[str]:
"""
Wait for the CLK startup trigger fired by _arm_scope_for_clk_startup(),
save CLK+ and DAT0+ channels, and return LP analysis summaries.
Restores HS config before returning.
"""
print(" CLK STARTUP: waiting for trigger...")
deadline = time.time() + timeout
triggered = False
while time.time() < deadline:
try:
status = scope.ask(":TRIGger:STATus?").strip().upper()
if status in ("STOP", "TD"):
triggered = True
break
except Exception:
pass
time.sleep(0.1)
if not triggered:
print(" CLK STARTUP: trigger timeout — CLK LP startup not captured.")
_restore_hs_config()
return []
_save_pass_channels("lp", iteration, ts)
_restore_hs_config()
try:
copied, _ = ai_mgmt.transfer_csv_files()
print(f" CLK STARTUP: {copied} file(s) transferred.")
except Exception as e:
print(f" CLK STARTUP TRANSFER ERROR: {e}")
summaries = []
for channel in ("clk", "dat"):
path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv"
if not path.exists():
continue
try:
m = analyze_lp_file(path)
summaries.append(m.summary())
if channel == "clk":
if m.clk_lp_startup_ok is False:
print(f"\n *** CLK STARTUP SUSPECT: capture {iteration:04d} "
f"CLK LP-00={m.lp_low_duration_ns:.0f} ns "
f"< {CLK_LP_LOW_MIN_NS:.0f} ns — bridge may not lock CLK ***\n")
elif m.clk_lp_startup_ok is True:
print(f" CLK startup: LP-00={m.lp_low_duration_ns:.0f} ns ✓")
else:
print(" CLK startup: CLK already in continuous HS at trigger point.")
except Exception as e:
print(f" CLK STARTUP ANALYSIS ERROR ({channel}): {e}")
return summaries
def _fetch_registers(ts: str, iteration: int) -> None:
"""GET /registers from device server and save to data/ as JSON."""
try:
@@ -603,7 +683,14 @@ def dual_capture(iteration: int) -> str:
if n:
print(f" SAVED: {v18_path.name} ({n} samples)")
else:
print(" RIGOL: Waveform read failed — check connection and probe.")
print(" RIGOL: 1V8 waveform read failed.")
# CH2 — INTB pin (read after CH1; scope already stopped)
int_path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
n_int = rigol_scope.read_int_csv(int_path)
if n_int:
print(f" SAVED: {int_path.name} ({n_int} samples)")
else:
print(" RIGOL: INTB waveform read failed.")
_restore_hs_config()
# ── Pass 2: HS signal quality ──────────────────────────────────────────
@@ -662,10 +749,13 @@ def _build_system_prompt(config: dict | None = None) -> str:
def _build_claude_prompt(ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
config: dict | None = None) -> str:
config: dict | None = None,
followup_summaries: list[str] | None = None) -> str:
"""
Build a concise prompt asking Claude to assess a single capture.
The rule-based pre-filter has already flagged at least one LP suspect.
If followup_summaries is provided it contains the next-frame LP capture taken
immediately after the suspect — the frame the operator will actually observe.
"""
suspect_lines = "\n".join(
f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns "
@@ -686,13 +776,26 @@ def _build_claude_prompt(ts: str, iteration: int,
f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns."
)
followup_text = ""
if followup_summaries:
followup_text = (
f"\n\nFOLLOW-UP CAPTURE (next display frame — what the operator sees "
f"on screen while assessing):\n"
f"Note: due to the display pipeline lag, the visual flicker caused by "
f"the electrical event above appears one frame later. If the follow-up "
f"frame looks electrically normal, the flicker observed by the operator "
f"was caused by the preceding capture, not this one.\n\n"
+ "\n\n".join(followup_summaries)
)
return (
f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n"
f"The rule-based LP pre-processor has flagged the following measurements as "
f"potential flicker suspects because the LP-low plateau is absent or shorter "
f"than 50 ns:\n{suspect_lines}\n\n"
f"Full LP capture summaries:\n{summaries_text}"
f"{config_text}\n\n"
f"{config_text}"
f"{followup_text}\n\n"
f"Based solely on these LP timing metrics, do you believe this capture "
f"represents a genuine screen flicker event — i.e., was the SoT sequence "
f"too brief for the SN65DSI83 bridge to detect start-of-transmission, "
@@ -718,21 +821,13 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
])
def analyze_lp_and_ask_claude(
ts: str, iteration: int, config: dict | None = None
) -> tuple[bool, str, list[LPMetrics]]:
def _analyze_lp_files(
ts: str, iteration: int
) -> tuple[list[str], list[LPMetrics]]:
"""
Analyse the LP files for this iteration.
1. Run csv_preprocessor on lp_clk and lp_dat.
2. If any file is flagged as a flicker suspect by the rule-based detector,
call the Claude API for a focused single-capture assessment.
3. Parse Claude's YES/NO response.
Returns:
claude_says_flicker — True if Claude opened with YES
reasoning — Claude's full response text (or "" if not called)
suspects — list of LPMetrics objects that were flagged
Run rule-based LP analysis for one iteration.
Returns (lp_summaries, suspects). Logs suspects and prints alerts.
Does NOT call Claude.
"""
lp_summaries: list[str] = []
suspects: list[LPMetrics] = []
@@ -748,11 +843,14 @@ def analyze_lp_and_ask_claude(
if m.flicker_suspect:
suspects.append(m)
_append_flicker_log(ts, iteration, m)
if (m.hs_amplitude_mv is not None
if not m.lp_transition_valid and not m.lp11_voltage_v:
reason = "MIPI link silent (no LP-11/LP-low/HS detected)"
elif (m.hs_amplitude_mv is not None
and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
and m.lp11_to_hs_ns is not None
and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS):
reason = f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)"
reason = (f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, "
f"lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)")
else:
reason = f"lp_low={m.lp_low_duration_ns} ns"
print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} "
@@ -760,36 +858,119 @@ def analyze_lp_and_ask_claude(
except Exception as e:
print(f" LP ANALYSIS ERROR ({channel}): {e}")
if not suspects:
return False, "", []
return lp_summaries, suspects
# ── Call Claude ────────────────────────────────────────────────────────
def _analyze_int_file(ts: str, iteration: int) -> str | None:
"""
Analyse the INTB pin CSV for this iteration.
Returns a summary string, or None if the file is missing.
"""
path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
if not path.exists():
return None
try:
m = analyze_int_file(path)
if m.int_asserted:
print(f"\n *** INTB ASSERTED: capture {iteration:04d} "
f"bridge flagged error ({m.asserted_duration_us:.2f} µs) ***\n")
return m.summary()
except Exception as e:
print(f" INTB ANALYSIS ERROR: {e}")
return None
def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics]]:
"""
LP-only follow-up capture taken immediately after a suspect is detected.
Captures the next display frame — the one the operator will actually see
when asked to assess whether the screen is flickering.
Returns (ts_followup, lp_summaries, suspects).
Returns ("", [], []) silently if the scope is not reachable.
"""
print(" FOLLOW-UP CAPTURE: acquiring next frame for display-lag context...")
try:
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected():
rigol_scope.arm()
if _arm_and_wait(timeout=10):
_save_pass_channels("lp", iteration, ts_fu)
else:
print(" FOLLOW-UP: trigger timeout — skipping.")
_restore_hs_config()
return "", [], []
_restore_hs_config()
# Transfer the new files from scope
try:
copied, _ = ai_mgmt.transfer_csv_files()
print(f" FOLLOW-UP: {copied} file(s) transferred.")
except Exception as e:
print(f" FOLLOW-UP TRANSFER ERROR: {e}")
summaries, suspects = _analyze_lp_files(ts_fu, iteration)
return ts_fu, summaries, suspects
except Exception as e:
print(f" FOLLOW-UP CAPTURE ERROR: {e}")
return "", [], []
def _call_claude(
ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
config: dict | None = None,
followup_summaries: list[str] | None = None,
) -> tuple[bool, str]:
"""
Call the Claude API to assess whether the flagged capture is a flicker event.
Returns (claude_says_flicker, response_text).
"""
print(" CALLING CLAUDE FOR ASSESSMENT...")
try:
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
max_tokens = 512,
max_tokens = 600,
system = _build_system_prompt(config),
messages = [{"role": "user", "content":
_build_claude_prompt(ts, iteration, lp_summaries,
suspects, config)}],
suspects, config,
followup_summaries)}],
)
response = message.content[0].text.strip()
# Parse YES/NO from the first line
first_line = response.splitlines()[0].strip().upper()
claude_says_flicker = first_line.startswith("YES")
label = "FLICKER" if claude_says_flicker else "NOT FLICKER"
print(f" CLAUDE: {label} ({message.usage.input_tokens} in / "
f"{message.usage.output_tokens} out tokens)")
return claude_says_flicker, response, suspects
return claude_says_flicker, response
except Exception as e:
print(f" CLAUDE API ERROR: {e}")
# Fall back to the rule-based result so the operator still gets asked
fallback = (f"(Claude API unavailable: {e})\n"
f"Rule-based detector flagged LP-low plateau < 50 ns — "
f"treat as potential flicker suspect.")
return True, fallback, suspects
return True, fallback
def analyze_lp_and_ask_claude(
ts: str, iteration: int, config: dict | None = None
) -> tuple[bool, str, list[LPMetrics]]:
"""
Analyse the LP files for this iteration (rule-based + Claude if suspect).
Kept for backwards compatibility — used by report generation and tests.
Does not perform a follow-up capture; call the test loop directly for that.
"""
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
if not suspects:
return False, "", []
claude_flicker, response = _call_claude(ts, iteration, lp_summaries,
suspects, config)
return claude_flicker, response, suspects
# ---------------------------------------------------------------------------
@@ -1118,12 +1299,22 @@ def run_interactive_test() -> None:
try:
while True:
# ── Arm scope for CLK lane startup (BEFORE display ON) ─────────
# CLK+ LP-11 falls before DAT0+, so we must arm here.
# The regular LP pass (triggered on DAT0+) would miss CLK startup
# because CLK is already in continuous HS by the time DAT0+ falls.
ts_startup = datetime.now().strftime("%Y%m%d_%H%M%S")
_arm_scope_for_clk_startup()
# ── Display ON ─────────────────────────────────────────────────
try:
requests.put(URL, json={"state": "on"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display ON failed: {e}")
# ── Collect CLK startup capture ────────────────────────────────
clk_startup_summaries = _collect_clk_startup(ts_startup, iteration)
# ── Three-pass capture ─────────────────────────────────────────
ts = dual_capture(iteration)
@@ -1135,9 +1326,28 @@ def run_interactive_test() -> None:
except Exception as e:
print(f" TRANSFER ERROR: {e}")
# ── Analyse LP + ask Claude if suspect ─────────────────────────
claude_flicker, reasoning, suspects = analyze_lp_and_ask_claude(
ts, iteration, config)
# ── INTB pin analysis ──────────────────────────────────────────
_analyze_int_file(ts, iteration)
# ── Rule-based LP analysis ─────────────────────────────────────
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
followup_summaries: list[str] = []
if suspects:
# Take an LP-only follow-up capture before calling Claude.
# The visual flicker caused by a missed SoT appears one display
# frame later (pipeline lag), so the operator observes flicker
# on the frame AFTER the electrical event. Including the next
# frame gives Claude — and the operator — the correct context.
_, followup_summaries, _ = _lp_followup_capture(iteration)
# ── Call Claude if any rule-based suspect was found ────────────
claude_flicker = False
reasoning = ""
if suspects:
claude_flicker, reasoning = _call_claude(
ts, iteration, lp_summaries, suspects, config,
followup_summaries or None)
if claude_flicker:
# ── Keep display ON — ask operator ─────────────────────────
@@ -1181,7 +1391,7 @@ def run_interactive_test() -> None:
else:
print(" NOT FLICKERING — false alarm logged. Continuing.\n")
elif suspects:
if suspects and not claude_flicker:
# Rule-based suspect but Claude said no — record for reference
_log_interaction(events, ts, iteration, suspects,
False, None, reasoning)