Files
MiPi_TEST/mipi_test_interactive.py

1581 lines
62 KiB
Python
Raw Normal View History

2026-04-20 10:35:06 +01:00
#!/usr/bin/env python3
2026-04-16 11:23:25 +01:00
"""
MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY
Interactive flicker confirmation test.
Same three-pass capture sequence as mipi_test.py. After each iteration:
1. LP files are transferred from the scope immediately and analysed.
2. If the rule-based LP pre-processor flags a flicker suspect, Claude is
asked to assess the single capture.
3. If Claude agrees it looks like a flicker event, the test pauses and
asks the operator to confirm by looking at the display.
Confirmed event is logged, HTML report is written, test STOPS.
Not flickering (false alarm) event is logged, test CONTINUES.
4. A final HTML report is written when the test ends for any reason.
VERSION: 0.1
AUTHOR: D. RICE 16/04/2026
© 2026 ARRIVE
"""
import csv as _csv_mod
import html
import json
import subprocess
import time
import sys
import requests
from datetime import datetime
from pathlib import Path
import math
2026-04-20 16:06:01 +01:00
import os
import struct
import tempfile
import wave
2026-04-16 11:23:25 +01:00
import anthropic
import vxi11
from dotenv import load_dotenv
import ai_mgmt
import rigol_scope
from csv_preprocessor import (analyze_lp_file, LPMetrics,
2026-04-20 16:06:01 +01:00
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
2026-04-16 11:23:25 +01:00
load_dotenv(Path(__file__).parent / ".env")
# ---------------------------------------------------------------------------
# Configuration (same as mipi_test.py)
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
URL = f"{DEVICE_BASE}/display"
2026-04-21 12:26:10 +01:00
VIDEO_URL = f"{DEVICE_BASE}/video"
2026-04-16 11:23:25 +01:00
SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3"
# Pass 1 — signal quality (HS differential, rise/fall)
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
SIG_POINTS = 500_000
# Pass 2 — protocol/frame structure (HS differential, jitter/freq)
2026-04-24 14:30:48 +01:00
PROTO_SCALE = 4e-6 # 4 µs/div → 40 µs window (was 1 µs/div)
2026-04-16 11:23:25 +01:00
PROTO_POINTS = 500_000
# Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11)
2026-04-27 10:35:56 +01:00
LP_SCALE = 1e-6 # 1 µs/div → 20 µs actual window (was 500 ns/div)
2026-04-16 11:23:25 +01:00
LP_POINTS = 200_000
2026-04-27 10:35:56 +01:00
LP_TRIG_OFFSET = 9e-6 # shift centre 9 µs after trigger → 1 µs pre / 19 µs post
2026-04-16 11:23:25 +01:00
LP_V_SCALE = 0.2 # V/div
LP_V_OFFSET = 0.6 # V — centres display between LP-00 (0 V) and LP-11 (1.2 V)
LP_TRIG_LEVEL = 0.6 # V — catches LP-11 → LP-01 falling edge
DATA_DIR = Path(__file__).parent / "data"
REPORTS_DIR = Path(__file__).parent / "reports"
# Persistent logs (shared paths — consistent with mipi_test.py / analyze_captures.py)
FLICKER_LOG = REPORTS_DIR / "flicker_log.csv"
INTERACTIVE_LOG = REPORTS_DIR / "interactive_log.csv"
# Claude model for per-capture flicker assessment
CLAUDE_MODEL = "claude-opus-4-6"
# ---------------------------------------------------------------------------
# D-PHY timing calculation
# ---------------------------------------------------------------------------
# Ordered field list used for table formatting and u-boot output
_TIMING_FIELD_ORDER = [
"lpx", "hs_prepare", "hs_zero", "hs_trail", "hs_exit",
"clk_prepare", "clk_zero", "clk_post", "clk_trail",
]
# Maps field names → flb_dtovar property names (NXP i.MX 8M Mini Samsung DSIM driver)
_EXTRA_PROP_MAP = {
"lpx": "dsi-phy-extra-lpx",
"hs_prepare": "dsi-phy-extra-hs-prepare",
"hs_zero": "dsi-phy-extra-hs-zero",
"hs_trail": "dsi-phy-extra-hs-trail",
"hs_exit": "dsi-phy-extra-hs-exit",
"clk_prepare": "dsi-phy-extra-clk-prepare",
"clk_zero": "dsi-phy-extra-clk-zero",
"clk_post": "dsi-phy-extra-clk-post",
"clk_trail": "dsi-phy-extra-clk-trail",
}
def calculate_dphy_timing(pixel_clock_mhz: float,
extra_cycles: dict | None = None) -> dict:
"""
Calculate Samsung DSIM PHY timing register values for a given pixel clock.
Assumes RGB888 (24 bpp), 4 DSI lanes NXP i.MX 8M Mini / Ampire 1280×800.
Timing constraints from MIPI D-PHY v1.1 Table 14.
extra_cycles: optional dict mapping field names to additional cycles above the
Round-Up minimum, e.g. {'clk_zero': 3, 'hs_prepare': 1, 'hs_trail': 1}.
Samsung DSIM register packing:
PHY_TIMING (0xb4): (lpx << 8) | hs_exit
PHY_TIMING1 (0xb8): (clk_prepare << 24) | (clk_zero << 16) | (clk_post << 8) | clk_trail
PHY_TIMING2 (0xbc): (hs_prepare << 16) | (hs_zero << 8) | hs_trail
Returns a dict with: pixel_clock_mhz, bit_rate_mbps, byte_clock_mhz,
byte_period_ns, ui_ns, fields, registers, violations.
"""
bpp = 24
lanes = 4
extras = extra_cycles or {}
bit_rate_mbps = pixel_clock_mhz * bpp / lanes # Mbit/s per lane
byte_clock_mhz = bit_rate_mbps / 8 # MHz
byte_period_ns = 1000.0 / byte_clock_mhz # ns per byte-clock cycle
ui_ns = 1000.0 / bit_rate_mbps # ns per UI
def _ru(t_ns: float) -> int:
return max(1, math.ceil(t_ns / byte_period_ns))
def _rb(t_ns: float) -> int:
return max(1, round(t_ns / byte_period_ns))
def _field(name: str, min_ns: float, max_ns: float | None = None) -> dict:
extra = extras.get(name, 0)
ru = _ru(min_ns)
rb = _rb(min_ns)
final = ru + extra
return {
"min_ns": min_ns,
"max_ns": max_ns,
"round_best": rb,
"round_up": ru,
"extra": extra,
"final": final,
"actual_ns": final * byte_period_ns,
}
fields: dict = {}
# LPX ≥ 50 ns
fields["lpx"] = _field("lpx", 50.0)
# HS-PREPARE: 40+4UI to 85+6UI ns
hs_p_min = 40.0 + 4 * ui_ns
hs_p_max = 85.0 + 6 * ui_ns
fields["hs_prepare"] = _field("hs_prepare", hs_p_min, hs_p_max)
hs_p_actual = fields["hs_prepare"]["actual_ns"]
# HS-ZERO: combined hs_prepare + hs_zero ≥ 145+10UI ns
hs_z_combined_min = 145.0 + 10 * ui_ns
hs_z_min = max(1.0, hs_z_combined_min - hs_p_actual)
fields["hs_zero"] = _field("hs_zero", hs_z_min)
fields["hs_zero"]["combined_min_ns"] = hs_z_combined_min
# HS-TRAIL ≥ max(8UI, 60+4UI) ns
fields["hs_trail"] = _field("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
# HS-EXIT ≥ 100 ns
fields["hs_exit"] = _field("hs_exit", 100.0)
# CLK-PREPARE: 38 to 95 ns
fields["clk_prepare"] = _field("clk_prepare", 38.0, 95.0)
clk_p_actual = fields["clk_prepare"]["actual_ns"]
# CLK-ZERO: combined clk_prepare + clk_zero ≥ 300 ns
clk_z_combined_min = 300.0
clk_z_min = max(1.0, clk_z_combined_min - clk_p_actual)
fields["clk_zero"] = _field("clk_zero", clk_z_min)
fields["clk_zero"]["combined_min_ns"] = clk_z_combined_min
# CLK-POST ≥ 60+52UI ns
fields["clk_post"] = _field("clk_post", 60.0 + 52 * ui_ns)
# CLK-TRAIL ≥ max(12UI, 60) ns
fields["clk_trail"] = _field("clk_trail", max(12 * ui_ns, 60.0))
# ── Register packing ──────────────────────────────────────────────────
def _f(name: str) -> int:
return fields[name]["final"]
phy_timing = (_f("lpx") << 8) | _f("hs_exit")
phy_timing1 = ((_f("clk_prepare") << 24) | (_f("clk_zero") << 16) |
(_f("clk_post") << 8) | _f("clk_trail"))
phy_timing2 = ((_f("hs_prepare") << 16) | (_f("hs_zero") << 8) | _f("hs_trail"))
registers = {
"PHY_TIMING": {"addr": 0xb4, "value": phy_timing},
"PHY_TIMING1": {"addr": 0xb8, "value": phy_timing1},
"PHY_TIMING2": {"addr": 0xbc, "value": phy_timing2},
}
# ── Constraint checks ─────────────────────────────────────────────────
violations: list[str] = []
def _chk(name: str, lo: float, hi: float | None = None) -> None:
a = fields[name]["actual_ns"]
if a < lo:
violations.append(f"{name}: {a:.2f} ns < {lo:.2f} ns (min)")
if hi is not None and a > hi:
violations.append(f"{name}: {a:.2f} ns > {hi:.2f} ns (max)")
_chk("lpx", 50.0)
_chk("hs_prepare", hs_p_min, hs_p_max)
hs_combined = fields["hs_prepare"]["actual_ns"] + fields["hs_zero"]["actual_ns"]
if hs_combined < hs_z_combined_min:
violations.append(
f"hs_prepare+hs_zero: {hs_combined:.2f} ns < {hs_z_combined_min:.2f} ns (min)")
_chk("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
_chk("hs_exit", 100.0)
_chk("clk_prepare", 38.0, 95.0)
clk_combined = fields["clk_prepare"]["actual_ns"] + fields["clk_zero"]["actual_ns"]
if clk_combined < clk_z_combined_min:
violations.append(
f"clk_prepare+clk_zero: {clk_combined:.2f} ns < {clk_z_combined_min:.2f} ns (min)")
_chk("clk_post", 60.0 + 52 * ui_ns)
_chk("clk_trail", max(12 * ui_ns, 60.0))
return {
"pixel_clock_mhz": pixel_clock_mhz,
"bit_rate_mbps": bit_rate_mbps,
"byte_clock_mhz": byte_clock_mhz,
"byte_period_ns": byte_period_ns,
"ui_ns": ui_ns,
"fields": fields,
"registers": registers,
"violations": violations,
}
def format_timing_table(t: dict) -> str:
"""Return a fixed-width console table of all D-PHY timing fields."""
fields = t["fields"]
hdr = (f"{'Field':<14} {'Spec (ns)':<18} {'Rnd Best':>9} "
f"{'Rnd Up':>7} {'Extra':>6} {'Final':>6} {'Actual (ns)':>12} Status")
sep = "-" * len(hdr)
lines = [sep, hdr, sep]
for name in _TIMING_FIELD_ORDER:
f = fields[name]
min_ns = f["min_ns"]
max_ns = f.get("max_ns")
actual = f["actual_ns"]
spec_str = (f"{min_ns:.1f} {max_ns:.1f}" if max_ns is not None
else f">= {min_ns:.1f}")
ok = actual >= min_ns
if max_ns is not None and actual > max_ns:
ok = False
if name == "hs_zero":
comb = fields["hs_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
if name == "clk_zero":
comb = fields["clk_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
status = "OK" if ok else "FAIL"
lines.append(
f"{name:<14} {spec_str:<18} {f['round_best']:>9} "
f"{f['round_up']:>7} +{f['extra']:>5} {f['final']:>6} {actual:>12.2f} {status}"
)
lines.append(sep)
return "\n".join(lines)
def format_uboot_commands(t: dict) -> str:
"""Return u-boot shell commands to apply the computed PHY timing configuration."""
fields = t["fields"]
regs = t["registers"]
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
phy_t = regs["PHY_TIMING"]["value"]
phy_t1 = regs["PHY_TIMING1"]["value"]
phy_t2 = regs["PHY_TIMING2"]["value"]
non_zero_extras = [
(_EXTRA_PROP_MAP[name], fields[name]["extra"])
for name in _TIMING_FIELD_ORDER
if fields[name]["extra"] > 0
]
lines = [
f"# D-PHY PHY timing registers "
f"(pixel clock {t['pixel_clock_mhz']} MHz, "
f"{t['bit_rate_mbps']:.1f} Mbit/s, "
f"byte clock {t['byte_clock_mhz']:.3f} MHz)",
"#",
f"# PHY_TIMING (0xb4) = 0x{phy_t:08x} "
f"lpx={fv['lpx']} hs_exit={fv['hs_exit']}",
f"# PHY_TIMING1 (0xb8) = 0x{phy_t1:08x} "
f"clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']} "
f"clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}",
f"# PHY_TIMING2 (0xbc) = 0x{phy_t2:08x} "
f"hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']} "
f"hs_trail={fv['hs_trail']}",
"",
"# Enable Round-Up rounding (dsi-tweak bit 2)",
'setenv flb_dtovar "${flb_dtovar} dsi-tweak=4"',
]
if non_zero_extras:
lines += ["", "# Extra PHY cycles above Round-Up minimum"]
for prop, val in non_zero_extras:
lines.append(f'setenv flb_dtovar "${{flb_dtovar}} {prop}={val}"')
lines += ["", "saveenv", "boot"]
return "\n".join(lines)
def prompt_for_config() -> dict:
"""
Interactive startup prompt: ask for pixel clock and optional extra PHY cycles.
Prints timing table and u-boot commands, then returns a config dict.
"""
print("\n" + "=" * 64)
print(" D-PHY TIMING CONFIGURATION")
print("=" * 64)
print("RGB888, 4 DSI lanes, Samsung DSIM on NXP i.MX 8M Mini.")
print("Timing constraints: MIPI D-PHY v1.1 Table 14.\n")
# ── Pixel clock ───────────────────────────────────────────────────────
while True:
try:
pix_str = input("Enter pixel clock in MHz (e.g. 72.0): ").strip()
pixel_clock_mhz = float(pix_str)
if not (10.0 <= pixel_clock_mhz <= 300.0):
print(" Out of range — enter a value between 10 and 300.")
continue
break
except ValueError:
print(" Invalid — enter a number (e.g. 72.0).")
# ── Baseline (no extras) ──────────────────────────────────────────────
t_base = calculate_dphy_timing(pixel_clock_mhz)
print(f"\nPixel clock : {pixel_clock_mhz} MHz")
print(f"Bit rate : {t_base['bit_rate_mbps']:.1f} Mbit/s per lane")
print(f"Byte clock : {t_base['byte_clock_mhz']:.3f} MHz "
f"({t_base['byte_period_ns']:.3f} ns/byte)")
print(f"UI : {t_base['ui_ns']:.3f} ns\n")
print(format_timing_table(t_base))
if t_base["violations"]:
print("\n *** BASELINE VIOLATIONS ***")
for v in t_base["violations"]:
print(f" ! {v}")
else:
print("\n All D-PHY v1.1 Table 14 constraints satisfied at baseline.")
# ── Extra cycles ──────────────────────────────────────────────────────
print("\n--- Extra PHY cycles (added on top of Round-Up minimum) ---")
print("Suggested values for SN65DSI83 reliability (from dev testing):")
print(" clk_zero +3, hs_prepare +1, hs_trail +1 (others 0)")
print("Press Enter to accept 0 for each field.\n")
extras: dict[str, int] = {}
for name in _TIMING_FIELD_ORDER:
while True:
try:
val = input(f" Extra cycles for {name:<12s} [0]: ").strip()
extras[name] = int(val) if val else 0
if extras[name] < 0:
print(" Cannot be negative.")
continue
break
except ValueError:
print(" Enter an integer (or press Enter for 0).")
# ── Final calculation with extras ─────────────────────────────────────
t_final = calculate_dphy_timing(pixel_clock_mhz, extras)
if any(v > 0 for v in extras.values()):
print("\n--- Final timing (with extras) ---")
print(format_timing_table(t_final))
if t_final["violations"]:
print("\n *** VIOLATIONS WITH EXTRAS ***")
for v in t_final["violations"]:
print(f" ! {v}")
else:
print("\n All constraints satisfied.")
print("\n--- u-boot commands ---")
print(format_uboot_commands(t_final))
print()
return {
"pixel_clock_mhz": pixel_clock_mhz,
"extras": extras,
"timing": t_final,
}
# ---------------------------------------------------------------------------
# Instrument connection
# ---------------------------------------------------------------------------
try:
psu = vxi11.Instrument(PSU_IP)
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30
psu.timeout = 5
except Exception as e:
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
sys.exit(1)
rigol_scope.connect()
# ---------------------------------------------------------------------------
# Scope configuration (identical to mipi_test.py)
# ---------------------------------------------------------------------------
def setup_scope():
"""Initialises scope for MIPI DSI signals (~210 MHz)."""
print("CONFIGURING SCOPE...")
cmds = [
"*RST", ":RUN", ":STOP",
# Channel 1 — Clock D+
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
":CHANnel1:SCALe 0.1", ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'",
# Channel 2 — Clock D-
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'",
# Channel 3 — Data Lane 0 D+
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'",
# Channel 4 — Data Lane 0 D-
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'",
# Timebase
":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer",
# Trigger — rising edge on Ch1 (Clock D+)
":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1",
":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05",
":TRIGger:SWEep NORMal",
# Acquisition
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000",
":DISPlay:LAYout STACKED",
":RUN",
]
for cmd in cmds:
scope.write(cmd)
time.sleep(0.05)
print("CHANNEL SETUP COMPLETE.")
setup_math_channels()
def setup_math_channels():
"""F1 = Ch1Ch2 (clock differential), F2 = Ch3Ch4 (lane 0 differential)."""
print("SETTING UP MATH CHANNELS...")
scope.write("*CLS")
time.sleep(0.2)
for cmd in [
":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2",
":FUNCtion1:RANGe 0.8", ":FUNCtion1:OFFSet 0.0",
":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4",
":FUNCtion2:RANGe 0.8", ":FUNCtion2:OFFSet 0.0",
]:
scope.write(cmd)
time.sleep(0.2)
try:
time.sleep(1.0)
opc = scope.ask("*OPC?")
print(f" SCOPE SYNC OK (OPC={opc.strip()})")
except Exception as e:
print(f" WARNING: OPC SYNC FAILED ({e})")
try:
err = scope.ask(":SYSTem:ERRor?")
if err.strip().startswith("0"):
print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.")
print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)")
else:
print(f" SCPI ERROR: {err.strip()}")
except Exception as e:
print(f" COULD NOT READ ERROR QUEUE ({e})")
def _set_timebase(scale, points):
scope.write(f":TIMebase:SCALe {scale:.3E}")
scope.write(f":ACQuire:POINts {points}")
time.sleep(0.3)
def _arm_and_wait(timeout=20):
2026-04-20 12:13:06 +01:00
"""Single acquisition via :DIGitize + *OPC?.
:DIGitize holds the SCPI bus until it triggers if it times out we close
and reopen the VXI-11 link to abort the pending RPC so subsequent writes
to a fresh connection succeed. :SINGle + status polling is NOT used: on
this scope it generates QUERY UNTERMINATED errors that break later acquires."""
global scope
2026-04-16 11:23:25 +01:00
prev_timeout = scope.timeout
try:
scope.timeout = timeout + 5
scope.write(":DIGitize")
return scope.ask("*OPC?").strip() == "1"
except Exception as e:
print(f" ACQUIRE ERROR: {e}")
2026-04-20 12:13:06 +01:00
try:
scope.close()
except Exception:
pass
time.sleep(2.0)
for attempt in range(3):
try:
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30
scope.write(":STOP")
time.sleep(0.2)
break
except Exception as re:
print(f" SCOPE RECONNECT ERROR (attempt {attempt+1}): {re}")
time.sleep(1.0)
2026-04-16 11:23:25 +01:00
return False
finally:
2026-04-20 12:13:06 +01:00
try:
scope.timeout = prev_timeout
except Exception:
pass
2026-04-16 11:23:25 +01:00
def _save_pass(tag, iteration, ts):
"""Save F1 (CLK diff) and F2 (DAT diff) as CSV."""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _save_pass_channels(tag, iteration, ts):
"""Save Ch1 (CLK+) and Ch3 (DAT0+) single-ended for LP state analysis."""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _configure_for_lp():
"""Widen vertical range for LP states and switch to falling-edge trigger."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
2026-04-20 16:06:01 +01:00
scope.write(":TRIGger:SWEep NORMal") # must wait for real LP-11→LP-01 edge, not auto-fire on HS
2026-04-16 11:23:25 +01:00
time.sleep(0.1)
def _restore_hs_config():
"""Restore HS-mode channel scales and trigger after LP capture."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe 0.1")
scope.write(f":CHANnel{ch}:OFFSet 0.0")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
scope.write(":TRIGger:EDGE:SLOPe POSitive")
scope.write(":TRIGger:EDGE:LEVel 0.05")
2026-04-20 13:48:24 +01:00
scope.write(":TRIGger:SWEep AUTO") # CLK startup sets NORMAL — restore AUTO so :DIGitize captures HS data
2026-04-16 11:23:25 +01:00
time.sleep(0.1)
def _fetch_registers(ts: str, iteration: int) -> None:
2026-04-20 13:48:24 +01:00
"""GET /registers (DSIM PHY) and /sn65_registers (SN65DSI83 CSRs) then save combined JSON."""
combined: dict = {}
# DSIM PHY timing registers (memtool / memory-mapped)
2026-04-16 11:23:25 +01:00
try:
resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
resp.raise_for_status()
2026-04-20 13:48:24 +01:00
dsim = resp.json()
combined["dsim"] = dsim
if dsim.get("errors"):
print(f" REGISTERS: DSIM warnings — {dsim['errors']}")
2026-04-16 11:23:25 +01:00
except requests.exceptions.RequestException as e:
2026-04-20 13:48:24 +01:00
print(f" REGISTERS: DSIM fetch failed — {e}")
combined["dsim"] = None
2026-04-16 11:23:25 +01:00
except Exception as e:
2026-04-20 13:48:24 +01:00
print(f" REGISTERS: DSIM error — {e}")
combined["dsim"] = None
2026-04-20 10:42:51 +01:00
2026-04-20 13:48:24 +01:00
# SN65DSI83 CSR snapshot (I2C)
try:
resp = requests.get(f"{DEVICE_BASE}/sn65_registers", timeout=5)
resp.raise_for_status()
sn65 = resp.json()
combined["sn65"] = sn65
regs = sn65.get("registers", {})
csr_0a = regs.get("csr_0a", {})
if csr_0a:
pll_str = "LOCKED " if csr_0a["pll_lock"] else "*** UNLOCKED ***"
clk_str = "detected " if csr_0a["clk_det"] else "NOT detected"
print(f" SN65DSI83: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a['value']})")
csr_e5 = regs.get("csr_e5", {})
if csr_e5:
_error_bits = [
("pll_unlock", "PLL_UNLOCK"),
("cha_sot_bit_err", "SOT_BIT_ERR"),
("cha_llp_err", "LLP_ERR"),
("cha_ecc_err", "ECC_ERR"),
("cha_lp_err", "LP_ERR"),
("cha_crc_err", "CRC_ERR"),
]
active = [label for key, label in _error_bits if csr_e5.get(key)]
if active:
print(f" SN65DSI83: *** ERROR FLAGS SET: {', '.join(active)} (CSR 0xE5 = {csr_e5['value']}) ***")
else:
print(f" SN65DSI83: no error flags (CSR 0xE5 = {csr_e5['value']})")
2026-04-20 10:42:51 +01:00
2026-04-20 13:48:24 +01:00
if sn65.get("errors"):
print(f" SN65DSI83: I2C warnings — {sn65['errors']}")
2026-04-20 10:42:51 +01:00
2026-04-20 13:48:24 +01:00
except requests.exceptions.RequestException as e:
print(f" REGISTERS: SN65DSI83 fetch failed — {e}")
combined["sn65"] = None
except Exception as e:
print(f" REGISTERS: SN65DSI83 error — {e}")
combined["sn65"] = None
2026-04-20 10:42:51 +01:00
2026-04-27 10:35:56 +01:00
# SN65DSI83 post-restart settling poll
try:
resp = requests.get(f"{DEVICE_BASE}/sn65_settling", timeout=10)
resp.raise_for_status()
settling = resp.json()
combined["sn65_settling"] = settling
n = settling.get("n_readings", 0)
n_err = settling.get("n_error", 0)
dur = settling.get("duration_s", 0)
if n_err:
# Print the first and last error readings for quick diagnosis
err_readings = [r for r in settling.get("readings", []) if r.get("any_error")]
times = [r["t_ms"] for r in err_readings]
print(f" SN65 SETTLING: *** {n_err}/{n} readings had csr_e5 errors "
f"over {dur:.1f} s (t={times[0]:.0f}{times[-1]:.0f} ms) ***")
for r in err_readings[:3]: # show up to first 3 error readings
print(f" t={r['t_ms']:6.1f} ms csr_0a={r['csr_0a']} "
f"csr_e5={r['csr_e5']} "
f"pll={'Y' if r['pll_lock'] else 'N'} "
f"clk={'Y' if r['clk_det'] else 'N'}")
else:
clk_false = sum(1 for r in settling.get("readings", [])
if r.get("clk_det") is False)
print(f" SN65 SETTLING: no csr_e5 errors in {n} readings over {dur:.1f} s"
+ (f" ({clk_false} readings with clk_det=False)" if clk_false else ""))
except requests.exceptions.RequestException as e:
print(f" REGISTERS: settling poll fetch failed — {e}")
combined["sn65_settling"] = None
except Exception as e:
print(f" REGISTERS: settling poll error — {e}")
combined["sn65_settling"] = None
2026-04-20 13:48:24 +01:00
# Save combined JSON
2026-04-20 10:42:51 +01:00
try:
2026-04-20 13:48:24 +01:00
DATA_DIR.mkdir(exist_ok=True)
reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
reg_path.write_text(json.dumps(combined, indent=2))
dsim_count = len((combined.get("dsim") or {}).get("registers", []))
print(f" SAVED: {reg_path.name} ({dsim_count} DSIM registers)")
2026-04-20 10:42:51 +01:00
except Exception as e:
2026-04-20 13:48:24 +01:00
print(f" REGISTERS: save error — {e}")
2026-04-20 10:42:51 +01:00
2026-04-16 11:23:25 +01:00
def dual_capture(iteration: int) -> str:
"""
Three-pass capture per test iteration.
Pass 1 LP / SoT startup (no settle delay fires immediately after display ON)
Pass 2 signal quality (HS differential, rise/fall)
Pass 3 frame structure (HS differential, jitter/freq)
Returns the timestamp string used in all filenames for this iteration.
"""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"CAPTURE #{iteration:04d} [{ts}]")
# ── Pass 1: LP / SoT startup ───────────────────────────────────────────
2026-04-27 10:35:56 +01:00
# Trigger position shifted so only 1 µs of LP-11 is pre-trigger; the
# remaining 19 µs shows the full HS burst so truncated bursts are visible.
2026-04-16 11:23:25 +01:00
print(" PASS 1: LP STARTUP TRANSITION...")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
2026-04-27 10:35:56 +01:00
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
2026-04-16 11:23:25 +01:00
if rigol_scope.is_connected():
rigol_scope.arm()
if _arm_and_wait(timeout=30):
_save_pass_channels("lp", iteration, ts)
else:
print(" SKIPPING LP SAVE.")
2026-04-27 10:35:56 +01:00
scope.write(":TIMebase:POSition 0") # restore centred for subsequent passes
2026-04-16 11:23:25 +01:00
if rigol_scope.is_connected():
DATA_DIR.mkdir(exist_ok=True)
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
n = rigol_scope.read_waveform_csv(v18_path)
if n:
print(f" SAVED: {v18_path.name} ({n} samples)")
else:
2026-04-20 10:42:51 +01:00
print(" RIGOL CH1: waveform read failed — check connection and probe.")
2026-04-16 11:23:25 +01:00
_restore_hs_config()
# ── Pass 2: HS signal quality ──────────────────────────────────────────
print(" PASS 2: SIGNAL QUALITY...")
_set_timebase(SIG_SCALE, SIG_POINTS)
if _arm_and_wait():
_save_pass("sig", iteration, ts)
else:
print(" SKIPPING SIG SAVE.")
2026-04-24 14:30:48 +01:00
# ── Pass 3: frame/protocol structure (LP-triggered differential) ─────────
# Re-apply LP trigger so the LP-00 → HS transition lands near t=0 in F2.
# This gives a fixed byte-framing anchor: HS sync byte 0xB8 appears at
# t≈380 ns, followed by DI, WC, ECC, then the full pixel payload.
print(" PASS 3: FRAME STRUCTURE (LP-triggered differential)...")
_configure_for_lp()
2026-04-20 13:48:24 +01:00
try:
_set_timebase(PROTO_SCALE, PROTO_POINTS)
except Exception:
print(" SKIPPING PROTO SAVE.")
2026-04-24 14:30:48 +01:00
_restore_hs_config()
2026-04-20 13:48:24 +01:00
_fetch_registers(ts, iteration)
try:
_set_timebase(5e-9, 500_000)
scope.write(":RUN")
except Exception:
pass
return ts
2026-04-16 11:23:25 +01:00
if _arm_and_wait():
_save_pass("proto", iteration, ts)
else:
print(" SKIPPING PROTO SAVE.")
2026-04-24 14:30:48 +01:00
_restore_hs_config()
2026-04-16 11:23:25 +01:00
# ── DSI register snapshot ─────────────────────────────────────────────
_fetch_registers(ts, iteration)
# ── Restore default timebase ──────────────────────────────────────────
2026-04-20 12:13:06 +01:00
try:
_set_timebase(5e-9, 500_000)
scope.write(":RUN")
except Exception as e:
print(f" WARNING: scope restore failed: {e}")
2026-04-16 11:23:25 +01:00
return ts
# ---------------------------------------------------------------------------
# Per-iteration LP analysis + Claude assessment
# ---------------------------------------------------------------------------
def _build_system_prompt(config: dict | None = None) -> str:
"""Build the Claude system prompt with correct bit-rate values for the active config."""
if config and "timing" in config:
t = config["timing"]
bit_rate = t["bit_rate_mbps"]
byte_clk = t["byte_clock_mhz"]
byte_per = t["byte_period_ns"]
else:
bit_rate = 432.0
byte_clk = 54.0
byte_per = 18.518
return (
"You are an expert in MIPI D-PHY signal integrity analysis. "
"You will receive a pre-processed summary of a single LP capture from a MIPI DAT0 lane "
"(NXP i.MX 8M Mini Samsung DSIM IP driving a SN65DSI83 MIPI-to-LVDS bridge). "
f"HS bit rate: {bit_rate:.1f} Mbit/s. "
f"Byte clock: {byte_clk:.3f} MHz ({byte_per:.3f} ns/byte). "
"The LP-low plateau (LP-01/LP-00 SoT preamble) must be ≥ 50 ns for the SN65DSI83 "
"to detect start-of-transmission. A plateau shorter than 50 ns or absent means the "
"bridge missed the SoT and the display will flicker visibly. "
"Be decisive. Answer YES or NO on the first line."
)
def _build_claude_prompt(ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
2026-04-20 10:34:42 +01:00
config: dict | None = None,
followup_summaries: list[str] | None = None) -> str:
2026-04-16 11:23:25 +01:00
"""
Build a concise prompt asking Claude to assess a single capture.
The rule-based pre-filter has already flagged at least one LP suspect.
2026-04-20 10:34:42 +01:00
If followup_summaries is provided it contains the next-frame LP capture taken
immediately after the suspect the frame the operator will actually observe.
2026-04-16 11:23:25 +01:00
"""
suspect_lines = "\n".join(
f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns "
f"(spec ≥ 50 ns) lp11_to_hs={m.lp11_to_hs_ns} ns "
f"lp11_voltage={m.lp11_voltage_v} V "
f"hs_amplitude={m.hs_amplitude_mv} mV (normal 105122 mV; absent <50 mV)"
for m in suspects
)
summaries_text = "\n\n".join(lp_summaries)
config_text = ""
if config and "timing" in config:
t = config["timing"]
config_text = (
f"\n\nTest configuration: pixel clock {t['pixel_clock_mhz']} MHz, "
f"bit rate {t['bit_rate_mbps']:.1f} Mbit/s per lane, "
f"byte clock {t['byte_clock_mhz']:.3f} MHz "
f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns."
)
2026-04-20 10:34:42 +01:00
followup_text = ""
if followup_summaries:
followup_text = (
f"\n\nFOLLOW-UP CAPTURE (next display frame — what the operator sees "
f"on screen while assessing):\n"
f"Note: due to the display pipeline lag, the visual flicker caused by "
f"the electrical event above appears one frame later. If the follow-up "
f"frame looks electrically normal, the flicker observed by the operator "
f"was caused by the preceding capture, not this one.\n\n"
+ "\n\n".join(followup_summaries)
)
2026-04-16 11:23:25 +01:00
return (
f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n"
f"The rule-based LP pre-processor has flagged the following measurements as "
f"potential flicker suspects because the LP-low plateau is absent or shorter "
f"than 50 ns:\n{suspect_lines}\n\n"
f"Full LP capture summaries:\n{summaries_text}"
2026-04-20 10:34:42 +01:00
f"{config_text}"
f"{followup_text}\n\n"
2026-04-16 11:23:25 +01:00
f"Based solely on these LP timing metrics, do you believe this capture "
f"represents a genuine screen flicker event — i.e., was the SoT sequence "
f"too brief for the SN65DSI83 bridge to detect start-of-transmission, "
f"likely causing visible display flicker?\n\n"
f"Start your response with YES or NO on the first line, then explain your "
f"reasoning briefly (24 sentences) referencing the specific metric values."
)
def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
"""Append a flicker suspect to the shared flicker_log.csv."""
FLICKER_LOG.parent.mkdir(exist_ok=True)
write_header = not FLICKER_LOG.exists()
with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
w = _csv_mod.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
"lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
w.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
ts, f"{iteration:04d}", m.channel,
m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
])
2026-04-21 12:26:10 +01:00
def _start_video() -> None:
try:
2026-04-24 15:24:27 +01:00
requests.put(VIDEO_URL, json={"action": "start", "mode": "static-pink"}, timeout=3)
print(" VIDEO: static-pink display started.")
2026-04-21 12:26:10 +01:00
except Exception as e:
print(f" WARNING: video start failed: {e}")
def _stop_video() -> None:
try:
requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3)
print(" VIDEO: kiosk player stopped.")
except Exception as e:
print(f" WARNING: video stop failed: {e}")
2026-04-20 16:06:01 +01:00
def _play_alarm() -> None:
"""Play three short beeps using a generated WAV tone."""
sample_rate = 44100
freq = 880
duration = 0.35
n_samples = int(sample_rate * duration)
samples = [int(32767 * math.sin(2 * math.pi * freq * i / sample_rate))
for i in range(n_samples)]
packed = struct.pack(f"<{n_samples}h", *samples)
tmp = None
try:
fd, tmp = tempfile.mkstemp(suffix=".wav")
os.close(fd)
with wave.open(tmp, "w") as w:
w.setnchannels(1)
w.setsampwidth(2)
w.setframerate(sample_rate)
w.writeframes(packed)
# os.system inherits the full shell environment (XDG_RUNTIME_DIR, PULSE_SERVER, etc.)
played = False
for cmd in (f"aplay -q {tmp}", f"pw-play {tmp}", f"paplay {tmp}"):
if os.system(f"{cmd} 2>/dev/null") == 0:
played = True
for _ in range(2):
time.sleep(0.2)
os.system(f"{cmd} 2>/dev/null")
break
if not played:
try:
with open("/dev/tty", "w") as tty:
for _ in range(5):
tty.write("\a")
tty.flush()
time.sleep(0.3)
except Exception:
print("\a" * 5, end="", flush=True)
except Exception:
print("\a" * 5, end="", flush=True)
finally:
if tmp:
try:
os.unlink(tmp)
except Exception:
pass
2026-04-20 10:34:42 +01:00
def _analyze_lp_files(
ts: str, iteration: int
) -> tuple[list[str], list[LPMetrics]]:
2026-04-16 11:23:25 +01:00
"""
2026-04-20 10:34:42 +01:00
Run rule-based LP analysis for one iteration.
Returns (lp_summaries, suspects). Logs suspects and prints alerts.
Does NOT call Claude.
2026-04-16 11:23:25 +01:00
"""
lp_summaries: list[str] = []
suspects: list[LPMetrics] = []
for channel in ("clk", "dat"):
path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv"
if not path.exists():
print(f" LP ANALYSIS: {path.name} not found — skipping.")
continue
try:
m = analyze_lp_file(path)
lp_summaries.append(m.summary())
if m.flicker_suspect:
suspects.append(m)
_append_flicker_log(ts, iteration, m)
2026-04-20 10:34:42 +01:00
if not m.lp_transition_valid and not m.lp11_voltage_v:
reason = "MIPI link silent (no LP-11/LP-low/HS detected)"
elif (m.hs_amplitude_mv is not None
2026-04-16 11:23:25 +01:00
and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
2026-04-16 12:08:00 +01:00
and m.lp11_to_hs_ns is not None
and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS):
2026-04-20 10:34:42 +01:00
reason = (f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, "
f"lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)")
2026-04-16 11:23:25 +01:00
else:
reason = f"lp_low={m.lp_low_duration_ns} ns"
print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} "
f"channel={m.channel} {reason} ***\n")
except Exception as e:
print(f" LP ANALYSIS ERROR ({channel}): {e}")
2026-04-20 10:34:42 +01:00
return lp_summaries, suspects
def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics]]:
"""
LP-only follow-up capture taken immediately after a suspect is detected.
Captures the next display frame the one the operator will actually see
when asked to assess whether the screen is flickering.
Returns (ts_followup, lp_summaries, suspects).
Returns ("", [], []) silently if the scope is not reachable.
"""
print(" FOLLOW-UP CAPTURE: acquiring next frame for display-lag context...")
try:
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected():
rigol_scope.arm()
if _arm_and_wait(timeout=10):
_save_pass_channels("lp", iteration, ts_fu)
else:
print(" FOLLOW-UP: trigger timeout — skipping.")
_restore_hs_config()
return "", [], []
_restore_hs_config()
2026-04-16 11:23:25 +01:00
2026-04-20 10:34:42 +01:00
# Transfer the new files from scope
try:
copied, _ = ai_mgmt.transfer_csv_files()
print(f" FOLLOW-UP: {copied} file(s) transferred.")
except Exception as e:
print(f" FOLLOW-UP TRANSFER ERROR: {e}")
summaries, suspects = _analyze_lp_files(ts_fu, iteration)
return ts_fu, summaries, suspects
except Exception as e:
print(f" FOLLOW-UP CAPTURE ERROR: {e}")
return "", [], []
def _call_claude(
ts: str, iteration: int,
lp_summaries: list[str],
suspects: list[LPMetrics],
config: dict | None = None,
followup_summaries: list[str] | None = None,
) -> tuple[bool, str]:
"""
Call the Claude API to assess whether the flagged capture is a flicker event.
Returns (claude_says_flicker, response_text).
"""
2026-04-16 11:23:25 +01:00
print(" CALLING CLAUDE FOR ASSESSMENT...")
try:
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
2026-04-20 10:34:42 +01:00
max_tokens = 600,
2026-04-16 11:23:25 +01:00
system = _build_system_prompt(config),
messages = [{"role": "user", "content":
_build_claude_prompt(ts, iteration, lp_summaries,
2026-04-20 10:34:42 +01:00
suspects, config,
followup_summaries)}],
2026-04-16 11:23:25 +01:00
)
response = message.content[0].text.strip()
first_line = response.splitlines()[0].strip().upper()
claude_says_flicker = first_line.startswith("YES")
label = "FLICKER" if claude_says_flicker else "NOT FLICKER"
print(f" CLAUDE: {label} ({message.usage.input_tokens} in / "
f"{message.usage.output_tokens} out tokens)")
2026-04-20 10:34:42 +01:00
return claude_says_flicker, response
2026-04-16 11:23:25 +01:00
except Exception as e:
print(f" CLAUDE API ERROR: {e}")
fallback = (f"(Claude API unavailable: {e})\n"
f"Rule-based detector flagged LP-low plateau < 50 ns — "
f"treat as potential flicker suspect.")
2026-04-20 10:34:42 +01:00
return True, fallback
def analyze_lp_and_ask_claude(
ts: str, iteration: int, config: dict | None = None
) -> tuple[bool, str, list[LPMetrics]]:
"""
Analyse the LP files for this iteration (rule-based + Claude if suspect).
Kept for backwards compatibility used by report generation and tests.
Does not perform a follow-up capture; call the test loop directly for that.
"""
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
if not suspects:
return False, "", []
claude_flicker, response = _call_claude(ts, iteration, lp_summaries,
suspects, config)
return claude_flicker, response, suspects
2026-04-16 11:23:25 +01:00
# ---------------------------------------------------------------------------
# Event logging and HTML report
# ---------------------------------------------------------------------------
def _config_section_html(config: dict) -> str:
"""Generate the D-PHY configuration section for the HTML report."""
t = config["timing"]
fields = t["fields"]
regs = t["registers"]
rows_html = ""
for name in _TIMING_FIELD_ORDER:
f = fields[name]
min_ns = f["min_ns"]
max_ns = f.get("max_ns")
actual = f["actual_ns"]
ok = actual >= min_ns
if max_ns is not None and actual > max_ns:
ok = False
if name == "hs_zero":
comb = fields["hs_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
if name == "clk_zero":
comb = fields["clk_prepare"]["actual_ns"] + actual
ok = ok and (comb >= f.get("combined_min_ns", 0))
spec_str = (f"{min_ns:.1f} &ndash; {max_ns:.1f}" if max_ns is not None
else f"&ge; {min_ns:.1f}")
cell_style = "" if ok else ' style="color:#c62828;font-weight:bold"'
status_html = "&#10003;" if ok else '<span style="color:#c62828">&#10006; FAIL</span>'
rows_html += (
f"<tr>"
f"<td><code>{name}</code></td>"
f"<td>{spec_str}</td>"
f"<td>{f['round_best']}</td>"
f"<td>{f['round_up']}</td>"
f"<td>+{f['extra']}</td>"
f"<td><strong>{f['final']}</strong></td>"
f"<td{cell_style}>{actual:.2f}</td>"
f"<td>{status_html}</td>"
f"</tr>\n"
)
if t["violations"]:
items_html = "".join(f"<li>{html.escape(v)}</li>" for v in t["violations"])
viol_html = (
f'<p style="color:#c62828;font-weight:bold">Timing violations:</p>'
f"<ul>{items_html}</ul>"
)
else:
viol_html = (
'<p style="color:#2e7d32">&#10003; All D-PHY v1.1 Table&nbsp;14 '
"constraints satisfied.</p>"
)
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
phy_t = regs["PHY_TIMING"]["value"]
phy_t1 = regs["PHY_TIMING1"]["value"]
phy_t2 = regs["PHY_TIMING2"]["value"]
uboot = html.escape(format_uboot_commands(t))
return f"""<h2>D-PHY Configuration</h2>
<p>
Pixel clock: <strong>{t['pixel_clock_mhz']} MHz</strong> &nbsp;|&nbsp;
Bit rate: <strong>{t['bit_rate_mbps']:.1f} Mbit/s per lane</strong> &nbsp;|&nbsp;
Byte clock: <strong>{t['byte_clock_mhz']:.3f} MHz</strong>
({t['byte_period_ns']:.3f}&thinsp;ns/byte) &nbsp;|&nbsp;
UI: <strong>{t['ui_ns']:.3f} ns</strong>
</p>
<table>
<tr>
<th>Field</th><th>Spec (ns)</th><th>Rnd Best</th><th>Rnd Up</th>
<th>Extra</th><th>Final</th><th>Actual (ns)</th><th>Status</th>
</tr>
{rows_html}
</table>
{viol_html}
<h3>Samsung DSIM Registers</h3>
<table>
<tr><th>Register</th><th>Address</th><th>Value</th><th>Field breakdown</th></tr>
<tr>
<td>PHY_TIMING</td><td><code>0xb4</code></td>
<td><code>0x{phy_t:08x}</code></td>
<td>lpx={fv['lpx']} &nbsp; hs_exit={fv['hs_exit']}</td>
</tr>
<tr>
<td>PHY_TIMING1</td><td><code>0xb8</code></td>
<td><code>0x{phy_t1:08x}</code></td>
<td>clk_prepare={fv['clk_prepare']} &nbsp; clk_zero={fv['clk_zero']} &nbsp;
clk_post={fv['clk_post']} &nbsp; clk_trail={fv['clk_trail']}</td>
</tr>
<tr>
<td>PHY_TIMING2</td><td><code>0xbc</code></td>
<td><code>0x{phy_t2:08x}</code></td>
<td>hs_prepare={fv['hs_prepare']} &nbsp; hs_zero={fv['hs_zero']} &nbsp;
hs_trail={fv['hs_trail']}</td>
</tr>
</table>
<h3>u-boot Commands</h3>
<pre style="background:#f5f5f5;padding:12px;border-radius:4px;
white-space:pre-wrap;font-size:0.88em">{uboot}</pre>
"""
def _log_interaction(
events: list,
ts: str,
iteration: int,
suspects: list[LPMetrics],
claude_said: bool,
user_confirmed: bool | None,
reasoning: str,
) -> None:
"""
Append an event to the in-memory list and to interactive_log.csv.
user_confirmed values:
True operator confirmed flicker
False operator said no (false alarm)
None operator was not asked (Claude said no)
"""
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
events.append({
"ts": ts,
"iteration": iteration,
"suspects": suspects,
"claude_said": claude_said,
"user_confirmed": user_confirmed,
"reasoning": reasoning,
"logged_at": now_str,
})
INTERACTIVE_LOG.parent.mkdir(exist_ok=True)
write_header = not INTERACTIVE_LOG.exists()
with open(INTERACTIVE_LOG, "a", newline="", encoding="utf-8") as f:
w = _csv_mod.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num",
"claude_said_flicker", "user_confirmed",
"lp_low_ns", "reasoning_summary"])
for m in suspects:
w.writerow([
now_str, ts, f"{iteration:04d}",
"YES" if claude_said else "NO",
("YES" if user_confirmed is True else
"NO" if user_confirmed is False else
"NOT_ASKED"),
m.lp_low_duration_ns,
reasoning[:150].replace("\n", " "),
])
def save_report(events: list, stop_reason: str,
config: dict | None = None) -> Path:
"""Write a timestamped HTML report summarising all flicker interactions."""
REPORTS_DIR.mkdir(exist_ok=True)
now = datetime.now()
filename = now.strftime("%Y%m%d_%H%M%S_interactive.html")
path = REPORTS_DIR / filename
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
claude_no_n = sum(1 for e in events if e["user_confirmed"] is None)
# ── Event table rows ───────────────────────────────────────────────────
rows_html = ""
for e in events:
for m in e["suspects"]:
conf = e["user_confirmed"]
if conf is True:
badge = ('<span style="color:#c62828;font-weight:bold">'
'&#10006; CONFIRMED FLICKER</span>')
elif conf is False:
badge = ('<span style="color:#2e7d32;font-weight:bold">'
'&#10003; FALSE ALARM</span>')
else:
badge = ('<span style="color:#e65100">'
'Claude said NO — user not asked</span>')
lp_val = m.lp_low_duration_ns
lp_bad = lp_val is None or lp_val < 50
lp_cell = (f'<td style="color:red">{lp_val} ns</td>' if lp_bad
else f'<td>{lp_val} ns</td>')
rows_html += (
f"<tr>"
f"<td>{e['iteration']:04d}</td>"
f"<td>{e['ts']}</td>"
f"<td>{m.channel}</td>"
f"{lp_cell}"
f"<td>{m.lp11_to_hs_ns} ns</td>"
f"<td>{m.lp11_voltage_v} V</td>"
f"<td>{'YES' if e['claude_said'] else 'NO'}</td>"
f"<td>{badge}</td>"
f"</tr>"
)
table_html = (
'<p>No flicker suspects were detected during this test run.</p>'
if not rows_html else
f"""<table>
<tr>
<th>Capture</th><th>Timestamp</th><th>Channel</th>
<th>LP-low plateau</th><th>LP exit&rarr;HS</th><th>LP-11 voltage</th>
<th>Claude: flicker?</th><th>Outcome</th>
</tr>
{rows_html}
</table>"""
)
# ── D-PHY config section ───────────────────────────────────────────────
config_html = _config_section_html(config) if config else ""
# ── Claude reasoning sections ──────────────────────────────────────────
reasoning_html = ""
for e in events:
if not e["reasoning"] or not e["claude_said"]:
continue
conf = e["user_confirmed"]
label = (" — CONFIRMED FLICKER" if conf is True else
" — FALSE ALARM" if conf is False else "")
reasoning_html += (
f'<h3>Capture {e["iteration"]:04d} [{e["ts"]}]{html.escape(label)}</h3>'
f'<pre style="background:#f5f5f5;padding:12px;border-radius:4px;'
f'white-space:pre-wrap;font-size:0.88em">'
f'{html.escape(e["reasoning"])}</pre>'
)
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>MIPI Interactive Flicker Test &mdash; {now.strftime('%Y-%m-%d %H:%M:%S')}</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 1020px; margin: 40px auto;
padding: 0 20px; color: #222; }}
h1 {{ color: #1a3a5c; border-bottom: 2px solid #1a3a5c; padding-bottom: 8px; }}
h2 {{ color: #1a3a5c; margin-top: 32px; }}
h3 {{ color: #333; }}
.meta {{ color: #555; font-size: 0.92em; margin-top: -6px; margin-bottom: 20px; }}
.stop-box {{ background: #e8f4fd; border-left: 4px solid #1a3a5c;
padding: 10px 16px; margin: 16px 0 24px; border-radius: 3px; }}
.stat {{ display: inline-block; margin: 0 16px 20px 0; padding: 12px 22px;
border-radius: 6px; font-size: 1.05em; font-weight: bold; }}
.s-confirmed {{ background: #fdecea; border: 2px solid #c62828; color: #c62828; }}
.s-false {{ background: #e8f5e9; border: 2px solid #2e7d32; color: #2e7d32; }}
.s-claude-no {{ background: #fff8e1; border: 2px solid #f9a825; color: #795548; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
th {{ background: #1a3a5c; color: white; padding: 7px 10px; text-align: left; }}
td {{ border: 1px solid #ddd; padding: 5px 10px; }}
tr:nth-child(even) {{ background: #fafafa; }}
pre {{ margin: 0; }}
</style>
</head>
<body>
<h1>MIPI Interactive Flicker Test Report</h1>
<p class="meta">
Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} &nbsp;|&nbsp;
Model: {CLAUDE_MODEL}
</p>
<div class="stop-box">
<strong>Stop reason:</strong> {html.escape(stop_reason)}
</div>
<div>
<div class="stat s-confirmed">{confirmed_n} confirmed flicker(s)</div>
<div class="stat s-false">{false_alarm_n} false alarm(s)</div>
<div class="stat s-claude-no">{claude_no_n} Claude said no</div>
</div>
{config_html}
<h2>Event Log</h2>
{table_html}
{'<h2>Claude Assessments</h2>' + reasoning_html if reasoning_html else ''}
</body>
</html>
"""
path.write_text(html_content, encoding="utf-8")
return path
# ---------------------------------------------------------------------------
# Interactive test loop
# ---------------------------------------------------------------------------
def run_interactive_test() -> None:
"""
Blocking interactive test loop (runs in the calling thread no background threads).
Flow per iteration:
1. Display ON
2. Three-pass dual_capture saves LP/sig/proto CSVs to scope
3. Transfer all scope CSVs to local data/
4. Analyse LP files for this iteration (rule-based + Claude if suspect)
5. If Claude says flicker:
- Keep display ON so operator can observe
- Ask operator to confirm
- Confirmed log event, save report, STOP
- False alarm log event, continue
6. Display OFF, 1 s pause, next iteration
Press Ctrl+C to exit at any time. A report is always saved on exit.
"""
iteration = 1
events: list = []
stop_reason = "Test stopped by user"
# ── Pixel clock configuration ──────────────────────────────────────────
config = prompt_for_config()
2026-04-20 12:13:06 +01:00
# Flush any stale SCPI state from previous runs (QUERY UNTERMINATED errors etc.)
try:
scope.write("*CLS")
scope.write(":STOP")
time.sleep(0.3)
except Exception as e:
print(f" WARNING: scope pre-flush failed: {e}")
2026-04-16 11:23:25 +01:00
print("\nINTERACTIVE FLICKER TEST STARTED.")
print("Each iteration: display ON → 3-pass capture → LP analysis → Claude check.")
print("The display stays ON while Claude and the operator assess the frame.")
print("Press Ctrl+C at any time to stop.\n")
2026-04-21 12:26:10 +01:00
_start_video()
2026-04-16 11:23:25 +01:00
try:
while True:
# ── Display ON ─────────────────────────────────────────────────
try:
requests.put(URL, json={"state": "on"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display ON failed: {e}")
# ── Three-pass capture ─────────────────────────────────────────
ts = dual_capture(iteration)
# ── Transfer scope files to local data/ ────────────────────────
print(" TRANSFERRING FILES FROM SCOPE...")
try:
copied, failed = ai_mgmt.transfer_csv_files()
print(f" TRANSFERRED {copied} FILE(S). {failed} FAILED.")
except Exception as e:
print(f" TRANSFER ERROR: {e}")
2026-04-20 10:34:42 +01:00
# ── Rule-based LP analysis ─────────────────────────────────────
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
followup_summaries: list[str] = []
if suspects:
# Take an LP-only follow-up capture before calling Claude.
# The visual flicker caused by a missed SoT appears one display
# frame later (pipeline lag), so the operator observes flicker
# on the frame AFTER the electrical event. Including the next
# frame gives Claude — and the operator — the correct context.
_, followup_summaries, _ = _lp_followup_capture(iteration)
# ── Call Claude if any rule-based suspect was found ────────────
claude_flicker = False
reasoning = ""
if suspects:
claude_flicker, reasoning = _call_claude(
ts, iteration, lp_summaries, suspects, config,
followup_summaries or None)
2026-04-16 11:23:25 +01:00
if claude_flicker:
# ── Keep display ON — ask operator ─────────────────────────
2026-04-20 16:06:01 +01:00
_play_alarm()
2026-04-16 11:23:25 +01:00
print("\n" + "=" * 64)
print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW")
print("=" * 64)
print(f"\n{reasoning}\n")
confirmed: bool | None = None
while confirmed is None:
try:
ans = input("IS THE SCREEN ACTUALLY FLICKERING? (Y/N): ").strip().upper()
except EOFError:
ans = "N"
if ans in ("Y", "YES"):
confirmed = True
elif ans in ("N", "NO"):
confirmed = False
else:
print(" Please enter Y or N.")
_log_interaction(events, ts, iteration, suspects,
claude_flicker, confirmed, reasoning)
if confirmed:
print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.")
stop_reason = (f"Flicker confirmed by operator at "
f"capture {iteration:04d} [{ts}]")
2026-04-21 12:26:10 +01:00
_stop_video()
2026-04-16 11:23:25 +01:00
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception:
pass
break # exit the while loop → save report below
else:
print(" NOT FLICKERING — false alarm logged. Continuing.\n")
2026-04-20 10:34:42 +01:00
if suspects and not claude_flicker:
2026-04-16 11:23:25 +01:00
# Rule-based suspect but Claude said no — record for reference
_log_interaction(events, ts, iteration, suspects,
False, None, reasoning)
# ── Display OFF, brief pause before next iteration ─────────────
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display OFF failed: {e}")
iteration += 1
time.sleep(1.0)
except KeyboardInterrupt:
print("\n\n TEST INTERRUPTED (Ctrl+C).")
stop_reason = "Test interrupted by operator (Ctrl+C)"
2026-04-21 12:26:10 +01:00
_stop_video()
2026-04-16 11:23:25 +01:00
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception:
pass
# ── Always save a report on exit ───────────────────────────────────────
report_path = save_report(events, stop_reason, config)
print(f"\nREPORT SAVED: {report_path}")
if events:
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
print(f" {confirmed_n} confirmed flicker(s), {false_alarm_n} false alarm(s) "
f"({len(events)} total suspect(s) assessed)")
# ---------------------------------------------------------------------------
# Menu
# ---------------------------------------------------------------------------
def main_menu() -> None:
while True:
print("\n===== MIPI INTERACTIVE TEST CONTROL =====")
print("1. RUN IDN CHECK (PSU & SCOPE)")
print("2. SETUP SCOPE (RUN FIRST)")
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
print("4. PSU OUTPUT ON/OFF (CH1)")
print("5. START INTERACTIVE FLICKER TEST")
print("6. EXIT")
choice = input("\nSELECT OPTION (1-6): ").strip()
if choice == '1':
print(f"PSU : {psu.ask('*IDN?').strip()}")
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
if rigol_scope.is_connected():
print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}")
else:
print("RIGOL: NOT CONNECTED")
elif choice == '2':
setup_scope()
if rigol_scope.is_connected():
rigol_scope.configure()
elif choice == '3':
psu.write('CH1:VOLT 24.0')
psu.write('CH1:CURR 1.5')
print("PSU CONFIGURED: 24V / 1.5A")
elif choice == '4':
state = input("TYPE 'ON' OR 'OFF': ").strip().upper()
if state in ('ON', 'OFF'):
psu.write(f'OUTP CH1,{state}')
print(f"PSU OUTPUT {state}.")
else:
print("INVALID — TYPE 'ON' OR 'OFF'.")
elif choice == '5':
run_interactive_test()
elif choice == '6':
psu.close()
scope.close()
rigol_scope.disconnect()
print("INSTRUMENTS CLOSED. BYE.")
break
else:
print("INVALID ENTRY. PLEASE CHOOSE 1-6.")
if __name__ == "__main__":
main_menu()