Files
MiPi_TEST/mipi_test_interactive.py

1470 lines
58 KiB
Python
Raw Normal View History

2026-04-20 10:35:06 +01:00
#!/usr/bin/env python3
2026-04-16 11:23:25 +01:00
"""
MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY
Interactive flicker confirmation test.
Same three-pass capture sequence as mipi_test.py. After each iteration:
1. LP files are transferred from the scope immediately and analysed.
2. If the rule-based LP pre-processor flags a flicker suspect, Claude is
asked to assess the single capture.
3. If Claude agrees it looks like a flicker event, the test pauses and
asks the operator to confirm by looking at the display.
Confirmed event is logged, HTML report is written, test STOPS.
Not flickering (false alarm) event is logged, test CONTINUES.
4. A final HTML report is written when the test ends for any reason.
VERSION: 0.1
AUTHOR: D. RICE 16/04/2026
© 2026 ARRIVE
"""
import csv as _csv_mod
import html
import json
import subprocess
import time
import sys
import requests
from datetime import datetime
from pathlib import Path
import math
import anthropic
import vxi11
from dotenv import load_dotenv
import ai_mgmt
import rigol_scope
from csv_preprocessor import (analyze_lp_file, LPMetrics,
2026-04-20 10:42:51 +01:00
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS,
analyze_int_file, CLK_LP_LOW_MIN_NS)
2026-04-16 11:23:25 +01:00
load_dotenv(Path(__file__).parent / ".env")
# ---------------------------------------------------------------------------
# Configuration (same as mipi_test.py)
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
URL = f"{DEVICE_BASE}/display"
SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3"
# Pass 1 — signal quality (HS differential, rise/fall)
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
SIG_POINTS = 500_000
# Pass 2 — protocol/frame structure (HS differential, jitter/freq)
PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window
PROTO_POINTS = 500_000
# Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11)
LP_SCALE = 500e-9 # 500 ns/div → 5 µs window
LP_POINTS = 200_000
LP_V_SCALE = 0.2 # V/div
LP_V_OFFSET = 0.6 # V — centres display between LP-00 (0 V) and LP-11 (1.2 V)
LP_TRIG_LEVEL = 0.6 # V — catches LP-11 → LP-01 falling edge
DATA_DIR = Path(__file__).parent / "data"
REPORTS_DIR = Path(__file__).parent / "reports"
# Persistent logs (shared paths — consistent with mipi_test.py / analyze_captures.py)
FLICKER_LOG = REPORTS_DIR / "flicker_log.csv"
INTERACTIVE_LOG = REPORTS_DIR / "interactive_log.csv"
# Claude model for per-capture flicker assessment
CLAUDE_MODEL = "claude-opus-4-6"
# ---------------------------------------------------------------------------
# D-PHY timing calculation
# ---------------------------------------------------------------------------
# Ordered field list used for table formatting and u-boot output
_TIMING_FIELD_ORDER = [
"lpx", "hs_prepare", "hs_zero", "hs_trail", "hs_exit",
"clk_prepare", "clk_zero", "clk_post", "clk_trail",
]
# Maps field names → flb_dtovar property names (NXP i.MX 8M Mini Samsung DSIM driver)
_EXTRA_PROP_MAP = {
"lpx": "dsi-phy-extra-lpx",
"hs_prepare": "dsi-phy-extra-hs-prepare",
"hs_zero": "dsi-phy-extra-hs-zero",
"hs_trail": "dsi-phy-extra-hs-trail",
"hs_exit": "dsi-phy-extra-hs-exit",
"clk_prepare": "dsi-phy-extra-clk-prepare",
"clk_zero": "dsi-phy-extra-clk-zero",
"clk_post": "dsi-phy-extra-clk-post",
"clk_trail": "dsi-phy-extra-clk-trail",
}
def calculate_dphy_timing(pixel_clock_mhz: float,
extra_cycles: dict | None = None) -> dict:
"""
Calculate Samsung DSIM PHY timing register values for a given pixel clock.
Assumes RGB888 (24 bpp), 4 DSI lanes NXP i.MX 8M Mini / Ampire 1280×800.
Timing constraints from MIPI D-PHY v1.1 Table 14.
extra_cycles: optional dict mapping field names to additional cycles above the
Round-Up minimum, e.g. {'clk_zero': 3, 'hs_prepare': 1, 'hs_trail': 1}.
Samsung DSIM register packing:
PHY_TIMING (0xb4): (lpx << 8) | hs_exit
PHY_TIMING1 (0xb8): (clk_prepare << 24) | (clk_zero << 16) | (clk_post << 8) | clk_trail
PHY_TIMING2 (0xbc): (hs_prepare << 16) | (hs_zero << 8) | hs_trail
Returns a dict with: pixel_clock_mhz, bit_rate_mbps, byte_clock_mhz,
byte_period_ns, ui_ns, fields, registers, violations.
"""
bpp = 24
lanes = 4
extras = extra_cycles or {}
bit_rate_mbps = pixel_clock_mhz * bpp / lanes # Mbit/s per lane
byte_clock_mhz = bit_rate_mbps / 8 # MHz
byte_period_ns = 1000.0 / byte_clock_mhz # ns per byte-clock cycle
ui_ns = 1000.0 / bit_rate_mbps # ns per UI
def _ru(t_ns: float) -> int:
return max(1, math.ceil(t_ns / byte_period_ns))
def _rb(t_ns: float) -> int:
return max(1, round(t_ns / byte_period_ns))
def _field(name: str, min_ns: float, max_ns: float | None = None) -> dict:
extra = extras.get(name, 0)
ru = _ru(min_ns)
rb = _rb(min_ns)
final = ru + extra
return {
"min_ns": min_ns,
"max_ns": max_ns,
"round_best": rb,
"round_up": ru,
"extra": extra,
"final": final,
"actual_ns": final * byte_period_ns,
}
fields: dict = {}
# LPX ≥ 50 ns
fields["lpx"] = _field("lpx", 50.0)
# HS-PREPARE: 40+4UI to 85+6UI ns
hs_p_min = 40.0 + 4 * ui_ns
hs_p_max = 85.0 + 6 * ui_ns
fields["hs_prepare"] = _field("hs_prepare", hs_p_min, hs_p_max)
hs_p_actual = fields["hs_prepare"]["actual_ns"]
# HS-ZERO: combined hs_prepare + hs_zero ≥ 145+10UI ns
hs_z_combined_min = 145.0 + 10 * ui_ns
hs_z_min = max(1.0, hs_z_combined_min - hs_p_actual)
fields["hs_zero"] = _field("hs_zero", hs_z_min)
fields["hs_zero"]["combined_min_ns"] = hs_z_combined_min
# HS-TRAIL ≥ max(8UI, 60+4UI) ns
fields["hs_trail"] = _field("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
# HS-EXIT ≥ 100 ns
fields["hs_exit"] = _field("hs_exit", 100.0)
# CLK-PREPARE: 38 to 95 ns
fields["clk_prepare"] = _field("clk_prepare", 38.0, 95.0)
clk_p_actual = fields["clk_prepare"]["actual_ns"]
# CLK-ZERO: combined clk_prepare + clk_zero ≥ 300 ns
clk_z_combined_min = 300.0
clk_z_min = max(1.0, clk_z_combined_min - clk_p_actual)
fields["clk_zero"] = _field("clk_zero", clk_z_min)
fields["clk_zero"]["combined_min_ns"] = clk_z_combined_min
# CLK-POST ≥ 60+52UI ns
fields["clk_post"] = _field("clk_post", 60.0 + 52 * ui_ns)
# CLK-TRAIL ≥ max(12UI, 60) ns
fields["clk_trail"] = _field("clk_trail", max(12 * ui_ns, 60.0))
# ── Register packing ──────────────────────────────────────────────────
def _f(name: str) -> int:
return fields[name]["final"]
phy_timing = (_f("lpx") << 8) | _f("hs_exit")
phy_timing1 = ((_f("clk_prepare") << 24) | (_f("clk_zero") << 16) |
(_f("clk_post") << 8) | _f("clk_trail"))
phy_timing2 = ((_f("hs_prepare") << 16) | (_f("hs_zero") << 8) | _f("hs_trail"))
registers = {
"PHY_TIMING": {"addr": 0xb4, "value": phy_timing},
"PHY_TIMING1": {"addr": 0xb8, "value": phy_timing1},
"PHY_TIMING2": {"addr": 0xbc, "value": phy_timing2},
}
# ── Constraint checks ─────────────────────────────────────────────────
violations: list[str] = []
def _chk(name: str, lo: float, hi: float | None = None) -> None:
a = fields[name]["actual_ns"]
if a < lo:
violations.append(f"{name}: {a:.2f} ns < {lo:.2f} ns (min)")
if hi is not None and a > hi:
violations.append(f"{name}: {a:.2f} ns > {hi:.2f} ns (max)")
_chk("lpx", 50.0)
_chk("hs_prepare", hs_p_min, hs_p_max)
hs_combined = fields["hs_prepare"]["actual_ns"] + fields["hs_zero"]["actual_ns"]
if hs_combined < hs_z_combined_min:
violations.append(
f"hs_prepare+hs_zero: {hs_combined:.2f} ns < {hs_z_combined_min:.2f} ns (min)")
_chk("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
_chk("hs_exit", 100.0)
_chk("clk_prepare", 38.0, 95.0)
clk_combined = fields["clk_prepare"]["actual_ns"] + fields["clk_zero"]["actual_ns"]
if clk_combined < clk_z_combined_min:
violations.append(
f"clk_prepare+clk_zero: {clk_combined:.2f} ns < {clk_z_combined_min:.2f} ns (min)")
_chk("clk_post", 60.0 + 52 * ui_ns)
_chk("clk_trail", max(12 * ui_ns, 60.0))
return {
"pixel_clock_mhz": pixel_clock_mhz,
"bit_rate_mbps": bit_rate_mbps,
"byte_clock_mhz": byte_clock_mhz,
"byte_period_ns": byte_period_ns,
"ui_ns": ui_ns,
"fields": fields,
"registers": registers,
"violations": violations,
}
def format_timing_table(t: dict) -> str:
"""Return a fixed-width console table of all D-PHY timing fields."""
fields = t["fields"]
hdr = (f"{'Field':<14} {'Spec (ns)':<18} {'Rnd Best':>9} "
f"{'Rnd Up':>7} {'Extra':>6} {'Final':>6} {'Actual (ns)':>12} Status")
sep = "-" * len(hdr)
lines = [sep, hdr, sep]
for name in _TIMING_FIELD_ORDER:
f = fields[name]
min_ns = f["min_ns"]
max_ns = f.get("max_ns")
actual = f["actual_ns"]
spec_str = (f"{min_ns:.1f} {max_ns:.1f}" if max_ns is not None
else f">= {min_ns:.1f}")
ok = actual >= min_ns
if max_ns is not None and actual > max_ns:
ok = False
if name == "hs_zero":
comb = fields["hs_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
if name == "clk_zero":
comb = fields["clk_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
status = "OK" if ok else "FAIL"
lines.append(
f"{name:<14} {spec_str:<18} {f['round_best']:>9} "
f"{f['round_up']:>7} +{f['extra']:>5} {f['final']:>6} {actual:>12.2f} {status}"
)
lines.append(sep)
return "\n".join(lines)
def format_uboot_commands(t: dict) -> str:
"""Return u-boot shell commands to apply the computed PHY timing configuration."""
fields = t["fields"]
regs = t["registers"]
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
phy_t = regs["PHY_TIMING"]["value"]
phy_t1 = regs["PHY_TIMING1"]["value"]
phy_t2 = regs["PHY_TIMING2"]["value"]
non_zero_extras = [
(_EXTRA_PROP_MAP[name], fields[name]["extra"])
for name in _TIMING_FIELD_ORDER
if fields[name]["extra"] > 0
]
lines = [
f"# D-PHY PHY timing registers "
f"(pixel clock {t['pixel_clock_mhz']} MHz, "
f"{t['bit_rate_mbps']:.1f} Mbit/s, "
f"byte clock {t['byte_clock_mhz']:.3f} MHz)",
"#",
f"# PHY_TIMING (0xb4) = 0x{phy_t:08x} "
f"lpx={fv['lpx']} hs_exit={fv['hs_exit']}",
f"# PHY_TIMING1 (0xb8) = 0x{phy_t1:08x} "
f"clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']} "
f"clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}",
f"# PHY_TIMING2 (0xbc) = 0x{phy_t2:08x} "
f"hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']} "
f"hs_trail={fv['hs_trail']}",
"",
"# Enable Round-Up rounding (dsi-tweak bit 2)",
'setenv flb_dtovar "${flb_dtovar} dsi-tweak=4"',
]
if non_zero_extras:
lines += ["", "# Extra PHY cycles above Round-Up minimum"]
for prop, val in non_zero_extras:
lines.append(f'setenv flb_dtovar "${{flb_dtovar}} {prop}={val}"')
lines += ["", "saveenv", "boot"]
return "\n".join(lines)
def prompt_for_config() -> dict:
"""
Interactive startup prompt: ask for pixel clock and optional extra PHY cycles.
Prints timing table and u-boot commands, then returns a config dict.
"""
print("\n" + "=" * 64)
print(" D-PHY TIMING CONFIGURATION")
print("=" * 64)
print("RGB888, 4 DSI lanes, Samsung DSIM on NXP i.MX 8M Mini.")
print("Timing constraints: MIPI D-PHY v1.1 Table 14.\n")
# ── Pixel clock ───────────────────────────────────────────────────────
while True:
try:
pix_str = input("Enter pixel clock in MHz (e.g. 72.0): ").strip()
pixel_clock_mhz = float(pix_str)
if not (10.0 <= pixel_clock_mhz <= 300.0):
print(" Out of range — enter a value between 10 and 300.")
continue
break
except ValueError:
print(" Invalid — enter a number (e.g. 72.0).")
# ── Baseline (no extras) ──────────────────────────────────────────────
t_base = calculate_dphy_timing(pixel_clock_mhz)
print(f"\nPixel clock : {pixel_clock_mhz} MHz")
print(f"Bit rate : {t_base['bit_rate_mbps']:.1f} Mbit/s per lane")
print(f"Byte clock : {t_base['byte_clock_mhz']:.3f} MHz "
f"({t_base['byte_period_ns']:.3f} ns/byte)")
print(f"UI : {t_base['ui_ns']:.3f} ns\n")
print(format_timing_table(t_base))
if t_base["violations"]:
print("\n *** BASELINE VIOLATIONS ***")
for v in t_base["violations"]:
print(f" ! {v}")
else:
print("\n All D-PHY v1.1 Table 14 constraints satisfied at baseline.")
# ── Extra cycles ──────────────────────────────────────────────────────
print("\n--- Extra PHY cycles (added on top of Round-Up minimum) ---")
print("Suggested values for SN65DSI83 reliability (from dev testing):")
print(" clk_zero +3, hs_prepare +1, hs_trail +1 (others 0)")
print("Press Enter to accept 0 for each field.\n")
extras: dict[str, int] = {}
for name in _TIMING_FIELD_ORDER:
while True:
try:
val = input(f" Extra cycles for {name:<12s} [0]: ").strip()
extras[name] = int(val) if val else 0
if extras[name] < 0:
print(" Cannot be negative.")
continue
break
except ValueError:
print(" Enter an integer (or press Enter for 0).")
# ── Final calculation with extras ─────────────────────────────────────
t_final = calculate_dphy_timing(pixel_clock_mhz, extras)
if any(v > 0 for v in extras.values()):
print("\n--- Final timing (with extras) ---")
print(format_timing_table(t_final))
if t_final["violations"]:
print("\n *** VIOLATIONS WITH EXTRAS ***")
for v in t_final["violations"]:
print(f" ! {v}")
else:
print("\n All constraints satisfied.")
print("\n--- u-boot commands ---")
print(format_uboot_commands(t_final))
print()
return {
"pixel_clock_mhz": pixel_clock_mhz,
"extras": extras,
"timing": t_final,
}
# ---------------------------------------------------------------------------
# Instrument connection
# ---------------------------------------------------------------------------
try:
psu = vxi11.Instrument(PSU_IP)
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30
psu.timeout = 5
except Exception as e:
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
sys.exit(1)
rigol_scope.connect()
# ---------------------------------------------------------------------------
# Scope configuration (identical to mipi_test.py)
# ---------------------------------------------------------------------------
def setup_scope():
"""Initialises scope for MIPI DSI signals (~210 MHz)."""
print("CONFIGURING SCOPE...")
cmds = [
"*RST", ":RUN", ":STOP",
# Channel 1 — Clock D+
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
":CHANnel1:SCALe 0.1", ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'",
# Channel 2 — Clock D-
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'",
# Channel 3 — Data Lane 0 D+
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'",
# Channel 4 — Data Lane 0 D-
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'",
# Timebase
":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer",
# Trigger — rising edge on Ch1 (Clock D+)
":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1",
":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05",
":TRIGger:SWEep NORMal",
# Acquisition
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000",
":DISPlay:LAYout STACKED",
":RUN",
]
for cmd in cmds:
scope.write(cmd)
time.sleep(0.05)
print("CHANNEL SETUP COMPLETE.")
setup_math_channels()
def setup_math_channels():
"""F1 = Ch1Ch2 (clock differential), F2 = Ch3Ch4 (lane 0 differential)."""
print("SETTING UP MATH CHANNELS...")
scope.write("*CLS")
time.sleep(0.2)
for cmd in [
":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2",
":FUNCtion1:RANGe 0.8", ":FUNCtion1:OFFSet 0.0",
":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4",
":FUNCtion2:RANGe 0.8", ":FUNCtion2:OFFSet 0.0",
]:
scope.write(cmd)
time.sleep(0.2)
try:
time.sleep(1.0)
opc = scope.ask("*OPC?")
print(f" SCOPE SYNC OK (OPC={opc.strip()})")
except Exception as e:
print(f" WARNING: OPC SYNC FAILED ({e})")
try:
err = scope.ask(":SYSTem:ERRor?")
if err.strip().startswith("0"):
print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.")
print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)")
else:
print(f" SCPI ERROR: {err.strip()}")
except Exception as e:
print(f" COULD NOT READ ERROR QUEUE ({e})")
def _set_timebase(scale, points):
scope.write(f":TIMebase:SCALe {scale:.3E}")
scope.write(f":ACQuire:POINts {points}")
time.sleep(0.3)
def _arm_and_wait(timeout=20):
prev_timeout = scope.timeout
try:
scope.timeout = timeout + 5
scope.write(":DIGitize")
return scope.ask("*OPC?").strip() == "1"
except Exception as e:
print(f" ACQUIRE ERROR: {e}")
return False
finally:
scope.timeout = prev_timeout
def _save_pass(tag, iteration, ts):
"""Save F1 (CLK diff) and F2 (DAT diff) as CSV."""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _save_pass_channels(tag, iteration, ts):
"""Save Ch1 (CLK+) and Ch3 (DAT0+) single-ended for LP state analysis."""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _configure_for_lp():
"""Widen vertical range for LP states and switch to falling-edge trigger."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
time.sleep(0.1)
def _restore_hs_config():
"""Restore HS-mode channel scales and trigger after LP capture."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe 0.1")
scope.write(f":CHANnel{ch}:OFFSet 0.0")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
scope.write(":TRIGger:EDGE:SLOPe POSitive")
scope.write(":TRIGger:EDGE:LEVel 0.05")
time.sleep(0.1)
def _fetch_registers(ts: str, iteration: int) -> None:
"""GET /registers from device server and save to data/ as JSON."""
try:
resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
resp.raise_for_status()
data = resp.json()
if data.get("errors"):
print(f" REGISTERS: device warnings — {data['errors']}")
DATA_DIR.mkdir(exist_ok=True)
reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
reg_path.write_text(json.dumps(data, indent=2))
print(f" SAVED: {reg_path.name} ({len(data.get('registers', []))} registers)")
except requests.exceptions.RequestException as e:
print(f" REGISTERS: fetch failed — {e}")
except Exception as e:
print(f" REGISTERS: error — {e}")
2026-04-20 10:42:51 +01:00
def _arm_scope_for_clk_startup() -> None:
"""
Configure scope for CLK lane LP startup and arm with :SINGle (non-blocking).
Trigger: CLK+ (Ch1) falling edge fires as CLK leaves LP-11, before DAT0+.
Call this BEFORE display ON so the trigger is armed when the LP-11HS sequence starts.
"""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {LP_POINTS}")
time.sleep(0.3)
scope.write(":SINGle")
time.sleep(0.1)
print(" CLK STARTUP: scope armed on CLK+ falling edge.")
def _collect_clk_startup(ts: str, iteration: int, timeout: float = 10.0) -> list[str]:
"""
Poll for CLK startup trigger, save, transfer, and analyse the capture.
Returns LP summary strings (empty list if trigger timed out).
The CLK LP-00 duration is checked against the 300 ns SN65DSI83 lock minimum.
"""
print(" CLK STARTUP: waiting for trigger...")
deadline = time.time() + timeout
triggered = False
while time.time() < deadline:
try:
status = scope.ask(":TRIGger:STATus?").strip().upper()
if status in ("STOP", "TD"):
triggered = True
break
except Exception:
pass
time.sleep(0.1)
if not triggered:
print(" CLK STARTUP: trigger timeout — CLK may already be in continuous HS.")
_restore_hs_config()
return []
_save_pass_channels("lp", iteration, ts)
_restore_hs_config()
try:
copied, _ = ai_mgmt.transfer_csv_files()
print(f" CLK STARTUP: {copied} file(s) transferred.")
except Exception as e:
print(f" CLK STARTUP TRANSFER ERROR: {e}")
summaries = []
for channel in ("clk", "dat"):
path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv"
if not path.exists():
continue
try:
m = analyze_lp_file(path)
summaries.append(m.summary())
if m.clk_lp_startup_ok is False:
print(f"\n *** CLK STARTUP WARNING: CLK LP-00 too short "
f"({m.lp_low_duration_ns:.0f} ns < {CLK_LP_LOW_MIN_NS:.0f} ns) — "
f"SN65DSI83 may fail to lock CLK lane ***\n")
except Exception as e:
print(f" CLK STARTUP ANALYSIS ERROR ({channel}): {e}")
return summaries
def _analyze_int_file(ts: str, iteration: int) -> None:
"""Print IRQ pin summary and alert if the SN65DSI83 asserted the IRQ line."""
path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
if not path.exists():
return
try:
m = analyze_int_file(path)
print(m.summary())
if m.int_asserted:
print(f"\n *** IRQ ASSERTED: SN65DSI83 flagged a bridge error at "
f"capture {iteration:04d} — check CSR 0xE5 for error bits ***\n")
except Exception as e:
print(f" INT ANALYSIS ERROR: {e}")
2026-04-16 11:23:25 +01:00
def dual_capture(iteration: int) -> str:
"""
Three-pass capture per test iteration.
Pass 1 LP / SoT startup (no settle delay fires immediately after display ON)
Pass 2 signal quality (HS differential, rise/fall)
Pass 3 frame structure (HS differential, jitter/freq)
Returns the timestamp string used in all filenames for this iteration.
"""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"CAPTURE #{iteration:04d} [{ts}]")
# ── Pass 1: LP / SoT startup ───────────────────────────────────────────
print(" PASS 1: LP STARTUP TRANSITION...")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected():
rigol_scope.arm()
if _arm_and_wait(timeout=30):
_save_pass_channels("lp", iteration, ts)
else:
print(" SKIPPING LP SAVE.")
if rigol_scope.is_connected():
DATA_DIR.mkdir(exist_ok=True)
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
n = rigol_scope.read_waveform_csv(v18_path)
if n:
print(f" SAVED: {v18_path.name} ({n} samples)")
else:
2026-04-20 10:42:51 +01:00
print(" RIGOL CH1: waveform read failed — check connection and probe.")
int_path = DATA_DIR / f"{ts}_int_{iteration:04d}.csv"
n_int = rigol_scope.read_int_csv(int_path)
if n_int:
print(f" SAVED: {int_path.name} ({n_int} samples)")
else:
print(" RIGOL CH2: IRQ read failed.")
2026-04-16 11:23:25 +01:00
_restore_hs_config()
# ── Pass 2: HS signal quality ──────────────────────────────────────────
print(" PASS 2: SIGNAL QUALITY...")
_set_timebase(SIG_SCALE, SIG_POINTS)
if _arm_and_wait():
_save_pass("sig", iteration, ts)
else:
print(" SKIPPING SIG SAVE.")
# ── Pass 3: frame/protocol structure ──────────────────────────────────
print(" PASS 3: FRAME STRUCTURE...")
_set_timebase(PROTO_SCALE, PROTO_POINTS)
if _arm_and_wait():
_save_pass("proto", iteration, ts)
else:
print(" SKIPPING PROTO SAVE.")
# ── DSI register snapshot ─────────────────────────────────────────────
_fetch_registers(ts, iteration)
# ── Restore default timebase ──────────────────────────────────────────
_set_timebase(5e-9, 500_000)
scope.write(":RUN")
return ts
# ---------------------------------------------------------------------------
# Per-iteration LP analysis + Claude assessment
# ---------------------------------------------------------------------------
def _build_system_prompt(config: dict | None = None) -> str:
"""Build the Claude system prompt with correct bit-rate values for the active config."""
if config and "timing" in config:
t = config["timing"]
bit_rate = t["bit_rate_mbps"]
byte_clk = t["byte_clock_mhz"]
byte_per = t["byte_period_ns"]
else:
bit_rate = 432.0
byte_clk = 54.0
byte_per = 18.518
return (
"You are an expert in MIPI D-PHY signal integrity analysis. "
"You will receive a pre-processed summary of a single LP capture from a MIPI DAT0 lane "
"(NXP i.MX 8M Mini Samsung DSIM IP driving a SN65DSI83 MIPI-to-LVDS bridge). "
f"HS bit rate: {bit_rate:.1f} Mbit/s. "
f"Byte clock: {byte_clk:.3f} MHz ({byte_per:.3f} ns/byte). "
"The LP-low plateau (LP-01/LP-00 SoT preamble) must be ≥ 50 ns for the SN65DSI83 "
"to detect start-of-transmission. A plateau shorter than 50 ns or absent means the "
"bridge missed the SoT and the display will flicker visibly. "
"Be decisive. Answer YES or NO on the first line."
)
def _build_claude_prompt(ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
2026-04-20 10:34:42 +01:00
config: dict | None = None,
followup_summaries: list[str] | None = None) -> str:
2026-04-16 11:23:25 +01:00
"""
Build a concise prompt asking Claude to assess a single capture.
The rule-based pre-filter has already flagged at least one LP suspect.
2026-04-20 10:34:42 +01:00
If followup_summaries is provided it contains the next-frame LP capture taken
immediately after the suspect the frame the operator will actually observe.
2026-04-16 11:23:25 +01:00
"""
suspect_lines = "\n".join(
f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns "
f"(spec ≥ 50 ns) lp11_to_hs={m.lp11_to_hs_ns} ns "
f"lp11_voltage={m.lp11_voltage_v} V "
f"hs_amplitude={m.hs_amplitude_mv} mV (normal 105122 mV; absent <50 mV)"
for m in suspects
)
summaries_text = "\n\n".join(lp_summaries)
config_text = ""
if config and "timing" in config:
t = config["timing"]
config_text = (
f"\n\nTest configuration: pixel clock {t['pixel_clock_mhz']} MHz, "
f"bit rate {t['bit_rate_mbps']:.1f} Mbit/s per lane, "
f"byte clock {t['byte_clock_mhz']:.3f} MHz "
f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns."
)
2026-04-20 10:34:42 +01:00
followup_text = ""
if followup_summaries:
followup_text = (
f"\n\nFOLLOW-UP CAPTURE (next display frame — what the operator sees "
f"on screen while assessing):\n"
f"Note: due to the display pipeline lag, the visual flicker caused by "
f"the electrical event above appears one frame later. If the follow-up "
f"frame looks electrically normal, the flicker observed by the operator "
f"was caused by the preceding capture, not this one.\n\n"
+ "\n\n".join(followup_summaries)
)
2026-04-16 11:23:25 +01:00
return (
f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n"
f"The rule-based LP pre-processor has flagged the following measurements as "
f"potential flicker suspects because the LP-low plateau is absent or shorter "
f"than 50 ns:\n{suspect_lines}\n\n"
f"Full LP capture summaries:\n{summaries_text}"
2026-04-20 10:34:42 +01:00
f"{config_text}"
f"{followup_text}\n\n"
2026-04-16 11:23:25 +01:00
f"Based solely on these LP timing metrics, do you believe this capture "
f"represents a genuine screen flicker event — i.e., was the SoT sequence "
f"too brief for the SN65DSI83 bridge to detect start-of-transmission, "
f"likely causing visible display flicker?\n\n"
f"Start your response with YES or NO on the first line, then explain your "
f"reasoning briefly (24 sentences) referencing the specific metric values."
)
def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
"""Append a flicker suspect to the shared flicker_log.csv."""
FLICKER_LOG.parent.mkdir(exist_ok=True)
write_header = not FLICKER_LOG.exists()
with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
w = _csv_mod.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
"lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
w.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
ts, f"{iteration:04d}", m.channel,
m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
])
2026-04-20 10:34:42 +01:00
def _analyze_lp_files(
ts: str, iteration: int
) -> tuple[list[str], list[LPMetrics]]:
2026-04-16 11:23:25 +01:00
"""
2026-04-20 10:34:42 +01:00
Run rule-based LP analysis for one iteration.
Returns (lp_summaries, suspects). Logs suspects and prints alerts.
Does NOT call Claude.
2026-04-16 11:23:25 +01:00
"""
lp_summaries: list[str] = []
suspects: list[LPMetrics] = []
for channel in ("clk", "dat"):
path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv"
if not path.exists():
print(f" LP ANALYSIS: {path.name} not found — skipping.")
continue
try:
m = analyze_lp_file(path)
lp_summaries.append(m.summary())
if m.flicker_suspect:
suspects.append(m)
_append_flicker_log(ts, iteration, m)
2026-04-20 10:34:42 +01:00
if not m.lp_transition_valid and not m.lp11_voltage_v:
reason = "MIPI link silent (no LP-11/LP-low/HS detected)"
elif (m.hs_amplitude_mv is not None
2026-04-16 11:23:25 +01:00
and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
2026-04-16 12:08:00 +01:00
and m.lp11_to_hs_ns is not None
and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS):
2026-04-20 10:34:42 +01:00
reason = (f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, "
f"lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)")
2026-04-16 11:23:25 +01:00
else:
reason = f"lp_low={m.lp_low_duration_ns} ns"
print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} "
f"channel={m.channel} {reason} ***\n")
except Exception as e:
print(f" LP ANALYSIS ERROR ({channel}): {e}")
2026-04-20 10:34:42 +01:00
return lp_summaries, suspects
def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics]]:
"""
LP-only follow-up capture taken immediately after a suspect is detected.
Captures the next display frame the one the operator will actually see
when asked to assess whether the screen is flickering.
Returns (ts_followup, lp_summaries, suspects).
Returns ("", [], []) silently if the scope is not reachable.
"""
print(" FOLLOW-UP CAPTURE: acquiring next frame for display-lag context...")
try:
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected():
rigol_scope.arm()
if _arm_and_wait(timeout=10):
_save_pass_channels("lp", iteration, ts_fu)
else:
print(" FOLLOW-UP: trigger timeout — skipping.")
_restore_hs_config()
return "", [], []
_restore_hs_config()
2026-04-16 11:23:25 +01:00
2026-04-20 10:34:42 +01:00
# Transfer the new files from scope
try:
copied, _ = ai_mgmt.transfer_csv_files()
print(f" FOLLOW-UP: {copied} file(s) transferred.")
except Exception as e:
print(f" FOLLOW-UP TRANSFER ERROR: {e}")
summaries, suspects = _analyze_lp_files(ts_fu, iteration)
return ts_fu, summaries, suspects
except Exception as e:
print(f" FOLLOW-UP CAPTURE ERROR: {e}")
return "", [], []
def _call_claude(
ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
config: dict | None = None,
followup_summaries: list[str] | None = None,
) -> tuple[bool, str]:
"""
Call the Claude API to assess whether the flagged capture is a flicker event.
Returns (claude_says_flicker, response_text).
"""
2026-04-16 11:23:25 +01:00
print(" CALLING CLAUDE FOR ASSESSMENT...")
try:
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
2026-04-20 10:34:42 +01:00
max_tokens = 600,
2026-04-16 11:23:25 +01:00
system = _build_system_prompt(config),
messages = [{"role": "user", "content":
_build_claude_prompt(ts, iteration, lp_summaries,
2026-04-20 10:34:42 +01:00
suspects, config,
followup_summaries)}],
2026-04-16 11:23:25 +01:00
)
response = message.content[0].text.strip()
first_line = response.splitlines()[0].strip().upper()
claude_says_flicker = first_line.startswith("YES")
label = "FLICKER" if claude_says_flicker else "NOT FLICKER"
print(f" CLAUDE: {label} ({message.usage.input_tokens} in / "
f"{message.usage.output_tokens} out tokens)")
2026-04-20 10:34:42 +01:00
return claude_says_flicker, response
2026-04-16 11:23:25 +01:00
except Exception as e:
print(f" CLAUDE API ERROR: {e}")
fallback = (f"(Claude API unavailable: {e})\n"
f"Rule-based detector flagged LP-low plateau < 50 ns — "
f"treat as potential flicker suspect.")
2026-04-20 10:34:42 +01:00
return True, fallback
def analyze_lp_and_ask_claude(
ts: str, iteration: int, config: dict | None = None
) -> tuple[bool, str, list[LPMetrics]]:
"""
Analyse the LP files for this iteration (rule-based + Claude if suspect).
Kept for backwards compatibility used by report generation and tests.
Does not perform a follow-up capture; call the test loop directly for that.
"""
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
if not suspects:
return False, "", []
claude_flicker, response = _call_claude(ts, iteration, lp_summaries,
suspects, config)
return claude_flicker, response, suspects
2026-04-16 11:23:25 +01:00
# ---------------------------------------------------------------------------
# Event logging and HTML report
# ---------------------------------------------------------------------------
def _config_section_html(config: dict) -> str:
"""Generate the D-PHY configuration section for the HTML report."""
t = config["timing"]
fields = t["fields"]
regs = t["registers"]
rows_html = ""
for name in _TIMING_FIELD_ORDER:
f = fields[name]
min_ns = f["min_ns"]
max_ns = f.get("max_ns")
actual = f["actual_ns"]
ok = actual >= min_ns
if max_ns is not None and actual > max_ns:
ok = False
if name == "hs_zero":
comb = fields["hs_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
if name == "clk_zero":
comb = fields["clk_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
spec_str = (f"{min_ns:.1f} &ndash; {max_ns:.1f}" if max_ns is not None
else f"&ge; {min_ns:.1f}")
cell_style = "" if ok else ' style="color:#c62828;font-weight:bold"'
status_html = "&#10003;" if ok else '<span style="color:#c62828">&#10006; FAIL</span>'
rows_html += (
f"<tr>"
f"<td><code>{name}</code></td>"
f"<td>{spec_str}</td>"
f"<td>{f['round_best']}</td>"
f"<td>{f['round_up']}</td>"
f"<td>+{f['extra']}</td>"
f"<td><strong>{f['final']}</strong></td>"
f"<td{cell_style}>{actual:.2f}</td>"
f"<td>{status_html}</td>"
f"</tr>\n"
)
if t["violations"]:
items_html = "".join(f"<li>{html.escape(v)}</li>" for v in t["violations"])
viol_html = (
f'<p style="color:#c62828;font-weight:bold">Timing violations:</p>'
f"<ul>{items_html}</ul>"
)
else:
viol_html = (
'<p style="color:#2e7d32">&#10003; All D-PHY v1.1 Table&nbsp;14 '
"constraints satisfied.</p>"
)
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
phy_t = regs["PHY_TIMING"]["value"]
phy_t1 = regs["PHY_TIMING1"]["value"]
phy_t2 = regs["PHY_TIMING2"]["value"]
uboot = html.escape(format_uboot_commands(t))
return f"""<h2>D-PHY Configuration</h2>
<p>
Pixel clock: <strong>{t['pixel_clock_mhz']} MHz</strong> &nbsp;|&nbsp;
Bit rate: <strong>{t['bit_rate_mbps']:.1f} Mbit/s per lane</strong> &nbsp;|&nbsp;
Byte clock: <strong>{t['byte_clock_mhz']:.3f} MHz</strong>
({t['byte_period_ns']:.3f}&thinsp;ns/byte) &nbsp;|&nbsp;
UI: <strong>{t['ui_ns']:.3f} ns</strong>
</p>
<table>
<tr>
<th>Field</th><th>Spec (ns)</th><th>Rnd Best</th><th>Rnd Up</th>
<th>Extra</th><th>Final</th><th>Actual (ns)</th><th>Status</th>
</tr>
{rows_html}
</table>
{viol_html}
<h3>Samsung DSIM Registers</h3>
<table>
<tr><th>Register</th><th>Address</th><th>Value</th><th>Field breakdown</th></tr>
<tr>
<td>PHY_TIMING</td><td><code>0xb4</code></td>
<td><code>0x{phy_t:08x}</code></td>
<td>lpx={fv['lpx']} &nbsp; hs_exit={fv['hs_exit']}</td>
</tr>
<tr>
<td>PHY_TIMING1</td><td><code>0xb8</code></td>
<td><code>0x{phy_t1:08x}</code></td>
<td>clk_prepare={fv['clk_prepare']} &nbsp; clk_zero={fv['clk_zero']} &nbsp;
clk_post={fv['clk_post']} &nbsp; clk_trail={fv['clk_trail']}</td>
</tr>
<tr>
<td>PHY_TIMING2</td><td><code>0xbc</code></td>
<td><code>0x{phy_t2:08x}</code></td>
<td>hs_prepare={fv['hs_prepare']} &nbsp; hs_zero={fv['hs_zero']} &nbsp;
hs_trail={fv['hs_trail']}</td>
</tr>
</table>
<h3>u-boot Commands</h3>
<pre style="background:#f5f5f5;padding:12px;border-radius:4px;
white-space:pre-wrap;font-size:0.88em">{uboot}</pre>
"""
def _log_interaction(
events: list,
ts: str,
iteration: int,
suspects: list[LPMetrics],
claude_said: bool,
user_confirmed: bool | None,
reasoning: str,
) -> None:
"""
Append an event to the in-memory list and to interactive_log.csv.
user_confirmed values:
True operator confirmed flicker
False operator said no (false alarm)
None operator was not asked (Claude said no)
"""
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
events.append({
"ts": ts,
"iteration": iteration,
"suspects": suspects,
"claude_said": claude_said,
"user_confirmed": user_confirmed,
"reasoning": reasoning,
"logged_at": now_str,
})
INTERACTIVE_LOG.parent.mkdir(exist_ok=True)
write_header = not INTERACTIVE_LOG.exists()
with open(INTERACTIVE_LOG, "a", newline="", encoding="utf-8") as f:
w = _csv_mod.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num",
"claude_said_flicker", "user_confirmed",
"lp_low_ns", "reasoning_summary"])
for m in suspects:
w.writerow([
now_str, ts, f"{iteration:04d}",
"YES" if claude_said else "NO",
("YES" if user_confirmed is True else
"NO" if user_confirmed is False else
"NOT_ASKED"),
m.lp_low_duration_ns,
reasoning[:150].replace("\n", " "),
])
def save_report(events: list, stop_reason: str,
config: dict | None = None) -> Path:
"""Write a timestamped HTML report summarising all flicker interactions."""
REPORTS_DIR.mkdir(exist_ok=True)
now = datetime.now()
filename = now.strftime("%Y%m%d_%H%M%S_interactive.html")
path = REPORTS_DIR / filename
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
claude_no_n = sum(1 for e in events if e["user_confirmed"] is None)
# ── Event table rows ───────────────────────────────────────────────────
rows_html = ""
for e in events:
for m in e["suspects"]:
conf = e["user_confirmed"]
if conf is True:
badge = ('<span style="color:#c62828;font-weight:bold">'
'&#10006; CONFIRMED FLICKER</span>')
elif conf is False:
badge = ('<span style="color:#2e7d32;font-weight:bold">'
'&#10003; FALSE ALARM</span>')
else:
badge = ('<span style="color:#e65100">'
'Claude said NO — user not asked</span>')
lp_val = m.lp_low_duration_ns
lp_bad = lp_val is None or lp_val < 50
lp_cell = (f'<td style="color:red">{lp_val} ns</td>' if lp_bad
else f'<td>{lp_val} ns</td>')
rows_html += (
f"<tr>"
f"<td>{e['iteration']:04d}</td>"
f"<td>{e['ts']}</td>"
f"<td>{m.channel}</td>"
f"{lp_cell}"
f"<td>{m.lp11_to_hs_ns} ns</td>"
f"<td>{m.lp11_voltage_v} V</td>"
f"<td>{'YES' if e['claude_said'] else 'NO'}</td>"
f"<td>{badge}</td>"
f"</tr>"
)
table_html = (
'<p>No flicker suspects were detected during this test run.</p>'
if not rows_html else
f"""<table>
<tr>
<th>Capture</th><th>Timestamp</th><th>Channel</th>
<th>LP-low plateau</th><th>LP exit&rarr;HS</th><th>LP-11 voltage</th>
<th>Claude: flicker?</th><th>Outcome</th>
</tr>
{rows_html}
</table>"""
)
# ── D-PHY config section ───────────────────────────────────────────────
config_html = _config_section_html(config) if config else ""
# ── Claude reasoning sections ──────────────────────────────────────────
reasoning_html = ""
for e in events:
if not e["reasoning"] or not e["claude_said"]:
continue
conf = e["user_confirmed"]
label = (" — CONFIRMED FLICKER" if conf is True else
" — FALSE ALARM" if conf is False else "")
reasoning_html += (
f'<h3>Capture {e["iteration"]:04d} [{e["ts"]}]{html.escape(label)}</h3>'
f'<pre style="background:#f5f5f5;padding:12px;border-radius:4px;'
f'white-space:pre-wrap;font-size:0.88em">'
f'{html.escape(e["reasoning"])}</pre>'
)
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>MIPI Interactive Flicker Test &mdash; {now.strftime('%Y-%m-%d %H:%M:%S')}</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 1020px; margin: 40px auto;
padding: 0 20px; color: #222; }}
h1 {{ color: #1a3a5c; border-bottom: 2px solid #1a3a5c; padding-bottom: 8px; }}
h2 {{ color: #1a3a5c; margin-top: 32px; }}
h3 {{ color: #333; }}
.meta {{ color: #555; font-size: 0.92em; margin-top: -6px; margin-bottom: 20px; }}
.stop-box {{ background: #e8f4fd; border-left: 4px solid #1a3a5c;
padding: 10px 16px; margin: 16px 0 24px; border-radius: 3px; }}
.stat {{ display: inline-block; margin: 0 16px 20px 0; padding: 12px 22px;
border-radius: 6px; font-size: 1.05em; font-weight: bold; }}
.s-confirmed {{ background: #fdecea; border: 2px solid #c62828; color: #c62828; }}
.s-false {{ background: #e8f5e9; border: 2px solid #2e7d32; color: #2e7d32; }}
.s-claude-no {{ background: #fff8e1; border: 2px solid #f9a825; color: #795548; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
th {{ background: #1a3a5c; color: white; padding: 7px 10px; text-align: left; }}
td {{ border: 1px solid #ddd; padding: 5px 10px; }}
tr:nth-child(even) {{ background: #fafafa; }}
pre {{ margin: 0; }}
</style>
</head>
<body>
<h1>MIPI Interactive Flicker Test Report</h1>
<p class="meta">
Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} &nbsp;|&nbsp;
Model: {CLAUDE_MODEL}
</p>
<div class="stop-box">
<strong>Stop reason:</strong> {html.escape(stop_reason)}
</div>
<div>
<div class="stat s-confirmed">{confirmed_n} confirmed flicker(s)</div>
<div class="stat s-false">{false_alarm_n} false alarm(s)</div>
<div class="stat s-claude-no">{claude_no_n} Claude said no</div>
</div>
{config_html}
<h2>Event Log</h2>
{table_html}
{'<h2>Claude Assessments</h2>' + reasoning_html if reasoning_html else ''}
</body>
</html>
"""
path.write_text(html_content, encoding="utf-8")
return path
# ---------------------------------------------------------------------------
# Interactive test loop
# ---------------------------------------------------------------------------
def run_interactive_test() -> None:
"""
Blocking interactive test loop (runs in the calling thread no background threads).
Flow per iteration:
1. Display ON
2. Three-pass dual_capture saves LP/sig/proto CSVs to scope
3. Transfer all scope CSVs to local data/
4. Analyse LP files for this iteration (rule-based + Claude if suspect)
5. If Claude says flicker:
- Keep display ON so operator can observe
- Ask operator to confirm
- Confirmed log event, save report, STOP
- False alarm log event, continue
6. Display OFF, 1 s pause, next iteration
Press Ctrl+C to exit at any time. A report is always saved on exit.
"""
iteration = 1
events: list = []
stop_reason = "Test stopped by user"
# ── Pixel clock configuration ──────────────────────────────────────────
config = prompt_for_config()
print("\nINTERACTIVE FLICKER TEST STARTED.")
print("Each iteration: display ON → 3-pass capture → LP analysis → Claude check.")
print("The display stays ON while Claude and the operator assess the frame.")
print("Press Ctrl+C at any time to stop.\n")
try:
while True:
2026-04-20 10:42:51 +01:00
# ── Arm scope for CLK startup BEFORE display ON ────────────────
ts_startup = datetime.now().strftime("%Y%m%d_%H%M%S")
_arm_scope_for_clk_startup()
2026-04-16 11:23:25 +01:00
# ── Display ON ─────────────────────────────────────────────────
try:
requests.put(URL, json={"state": "on"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display ON failed: {e}")
2026-04-20 10:42:51 +01:00
# ── Collect CLK startup (polls, saves, transfers, analyses) ────
_collect_clk_startup(ts_startup, iteration)
2026-04-16 11:23:25 +01:00
# ── Three-pass capture ─────────────────────────────────────────
ts = dual_capture(iteration)
# ── Transfer scope files to local data/ ────────────────────────
print(" TRANSFERRING FILES FROM SCOPE...")
try:
copied, failed = ai_mgmt.transfer_csv_files()
print(f" TRANSFERRED {copied} FILE(S). {failed} FAILED.")
except Exception as e:
print(f" TRANSFER ERROR: {e}")
2026-04-20 10:42:51 +01:00
# ── IRQ pin analysis ───────────────────────────────────────────
_analyze_int_file(ts, iteration)
2026-04-20 10:34:42 +01:00
# ── Rule-based LP analysis ─────────────────────────────────────
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
followup_summaries: list[str] = []
if suspects:
# Take an LP-only follow-up capture before calling Claude.
# The visual flicker caused by a missed SoT appears one display
# frame later (pipeline lag), so the operator observes flicker
# on the frame AFTER the electrical event. Including the next
# frame gives Claude — and the operator — the correct context.
_, followup_summaries, _ = _lp_followup_capture(iteration)
# ── Call Claude if any rule-based suspect was found ────────────
claude_flicker = False
reasoning = ""
if suspects:
claude_flicker, reasoning = _call_claude(
ts, iteration, lp_summaries, suspects, config,
followup_summaries or None)
2026-04-16 11:23:25 +01:00
if claude_flicker:
# ── Keep display ON — ask operator ─────────────────────────
# Play alarm sound once to alert the operator
subprocess.run(
["pw-play", "/usr/share/sounds/freedesktop/stereo/alarm-clock-elapsed.oga"],
check=False,
)
print("\n" + "=" * 64)
print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW")
print("=" * 64)
print(f"\n{reasoning}\n")
confirmed: bool | None = None
while confirmed is None:
try:
ans = input("IS THE SCREEN ACTUALLY FLICKERING? (Y/N): ").strip().upper()
except EOFError:
ans = "N"
if ans in ("Y", "YES"):
confirmed = True
elif ans in ("N", "NO"):
confirmed = False
else:
print(" Please enter Y or N.")
_log_interaction(events, ts, iteration, suspects,
claude_flicker, confirmed, reasoning)
if confirmed:
print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.")
stop_reason = (f"Flicker confirmed by operator at "
f"capture {iteration:04d} [{ts}]")
# Display OFF
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception:
pass
break # exit the while loop → save report below
else:
print(" NOT FLICKERING — false alarm logged. Continuing.\n")
2026-04-20 10:34:42 +01:00
if suspects and not claude_flicker:
2026-04-16 11:23:25 +01:00
# Rule-based suspect but Claude said no — record for reference
_log_interaction(events, ts, iteration, suspects,
False, None, reasoning)
# ── Display OFF, brief pause before next iteration ─────────────
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display OFF failed: {e}")
iteration += 1
time.sleep(1.0)
except KeyboardInterrupt:
print("\n\n TEST INTERRUPTED (Ctrl+C).")
stop_reason = "Test interrupted by operator (Ctrl+C)"
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception:
pass
# ── Always save a report on exit ───────────────────────────────────────
report_path = save_report(events, stop_reason, config)
print(f"\nREPORT SAVED: {report_path}")
if events:
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
print(f" {confirmed_n} confirmed flicker(s), {false_alarm_n} false alarm(s) "
f"({len(events)} total suspect(s) assessed)")
# ---------------------------------------------------------------------------
# Menu
# ---------------------------------------------------------------------------
def main_menu() -> None:
while True:
print("\n===== MIPI INTERACTIVE TEST CONTROL =====")
print("1. RUN IDN CHECK (PSU & SCOPE)")
print("2. SETUP SCOPE (RUN FIRST)")
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
print("4. PSU OUTPUT ON/OFF (CH1)")
print("5. START INTERACTIVE FLICKER TEST")
print("6. EXIT")
choice = input("\nSELECT OPTION (1-6): ").strip()
if choice == '1':
print(f"PSU : {psu.ask('*IDN?').strip()}")
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
if rigol_scope.is_connected():
print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}")
else:
print("RIGOL: NOT CONNECTED")
elif choice == '2':
setup_scope()
if rigol_scope.is_connected():
rigol_scope.configure()
elif choice == '3':
psu.write('CH1:VOLT 24.0')
psu.write('CH1:CURR 1.5')
print("PSU CONFIGURED: 24V / 1.5A")
elif choice == '4':
state = input("TYPE 'ON' OR 'OFF': ").strip().upper()
if state in ('ON', 'OFF'):
psu.write(f'OUTP CH1,{state}')
print(f"PSU OUTPUT {state}.")
else:
print("INVALID — TYPE 'ON' OR 'OFF'.")
elif choice == '5':
run_interactive_test()
elif choice == '6':
psu.close()
scope.close()
rigol_scope.disconnect()
print("INSTRUMENTS CLOSED. BYE.")
break
else:
print("INVALID ENTRY. PLEASE CHOOSE 1-6.")
if __name__ == "__main__":
main_menu()